Akka-路由模式Group/Pool

2024-08-28 19:04
文章标签 模式 路由 group pool akka

本文主要是介绍Akka-路由模式Group/Pool,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

为了文章好理解,我们先统一一下概念,在Akka中,Actor是一个高度抽象的概念,Akka的路由器也是一个Actor,所以我们把路由器,叫做路由Actor,接收消息(消费消息)的Actor,在本文中我们叫做消息Actor

Akka中有两种路由模式,分别是Group模式和Pool模式,如果我要将消息X通过路由Actor发送到多台机器,那么:
Pool: 在多台机器上,每台机器上的消息Actor都是由路由Actor创建的,也就是说,处理消息的这些Actor,是路由Actor的儿子,每台机器上的Actor可以通过parent()方法来测试,注意,路由Actor消息Actor可不一定在同一台机器上
Group: 多台机器上,每台机器上的消息Actor,都是我们程序员自己写代码创建好的,是通过actorOf创建的,路由Actor将消息X通过路径发送给这些处理消息的Actor

创建Akka项目

也可以参考本系列文章的环境搭建或者关于Akka的路由策略,可以参考这里,如果你已经搭建了akka环境,那么可跳过本段落,直接浏览下文的示例1
步骤1: Maven依赖

<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.4.20</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.4.20</version></dependency>

步骤2: 创建一个Actor

package akka.demo.actor;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;public class MyActor extends UntypedActor {@Overridepublic void onReceive(Object o) throws Throwable {// getSelf()方法表示了我是谁// getSender()方法表示了谁发的消息System.out.println(getSelf() + "收到了来自:" + getSender().path() + "消息内容:" + o);}
}

步骤3: 在Resource文件夹下创建一个叫做application.conf的文件,至于文件的内容,下面示例中再给出

示例1:Pool模式

下面的代码演示了名字为"abc"的路由Actor创建了3个子Actor,然后通过广播的策略方式,给3个子Actor发送消息,子Actor接收到消息之后,打印消息内容

步骤1: 首先在配置文件application.conf中写如下内容

akka{actor{provider="akka.remote.RemoteActorRefProvider"deployment{# 定义一个叫做abc的路由Actor,它包含一个池,池子里有3个子Actor(也就是上文中的消息Actor)/abc{# 这个路由Actor使用的路由策略是broadcast-pool广播router = broadcast-pool# abc中包含3个处理消息的Actornr-of-instances = 3}}}# 自己的IP端口remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2551}}
}

步骤2: 写个main方法

public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");ActorRef routeActorRef = system.actorOf(FromConfig.getInstance().props(Props.create(MyActor.class)), "abc");System.out.println(routeActorRef.path());for (int i = 0; i < 20; i++) {Thread.sleep(3000);routeActorRef.tell("内容" + i, ActorRef.noSender());}}

步骤3: 运行main方法,打印内容如下

// 循环,第1次打印3条数据,说明广播策略生效,3个子actor都接到了消息
Actor[akka://sys/user/abc/$c#-119781784]收到了来自:akka://sys/deadLetters消息内容:内容0
Actor[akka://sys/user/abc/$b#2011987574]收到了来自:akka://sys/deadLetters消息内容:内容0
Actor[akka://sys/user/abc/$a#-755258569]收到了来自:akka://sys/deadLetters消息内容:内容0
// 循环,第2次打印3条数据,说明广播策略生效,3个子actor再次都接到了消息
Actor[akka://sys/user/abc/$a#-755258569]收到了来自:akka://sys/deadLetters消息内容:内容1
Actor[akka://sys/user/abc/$b#2011987574]收到了来自:akka://sys/deadLetters消息内容:内容1
Actor[akka://sys/user/abc/$c#-119781784]收到了来自:akka://sys/deadLetters消息内容:内容1

到这里pool模式演示完毕,需要注意的时候,由于我是一台机器,而且只开了一个服务,所以这3个子Actor都是在同一台机器上的同一个JVM下,实际的集群情况,这3个子Actor会分配到不同的机器上,不过这是后话了,后续文章会有专门的集群配置

示例1:Group模式

group示例需要开3个服务,并且每个服务的conf文件内容都不一样,如果没理解什么意思,你可以参考关于Akka的路由策略这篇文章起3个服务

下面的示例通过服务1,通过广播的路由策略,给服务2和服务3发送消息
步骤1:
服务1的配置文件application.conf中写如下内容

akka{actor{provider="akka.remote.RemoteActorRefProvider"deployment{/abc{# 使用广播的路由策略,注意,这个地方是-group,上一个例子是-poolrouter = broadcast-group# 下面的代码我们要通过服务1的路由Actor(名字叫abc),给服务2的消息Actor(名字叫actor02)和# 服务3的消息Actor(名字叫actor3)发消息# 后文的main方法中会创建actor02和actor03,而上文的Pool示例中是Akka自动创建的消息Actor# 并且自动创建的消息Actor的名字是随机的,从上文示例中可以看到名字类似"$a#-755258569"# 一个手动创建,一个自动创建,这也是group和pool的明显区别之一routees.paths = ["akka.tcp://sys@127.0.0.1:2552/user/actor02","akka.tcp://sys@127.0.0.1:2553/user/actor03"]}}}# 自己的IP端口remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2551}}
}

服务2的配置文件application.conf中写如下内容

akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2552}}
}

服务3的配置文件application.conf中写如下内容

akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2553}}
}

步骤2: 创建main方法
服务1的main方法

public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// FromConfig.getInstance().props()// 通过配置文件,创建一个叫做abc的路由ActorActorRef routeActorRef = system.actorOf(FromConfig.getInstance().props(Props.create(MyActor.class)), "abc");System.out.println(routeActorRef.path());for (int i = 0; i < 20; i++) {Thread.sleep(3000);// 由于配置文件中定义了abc要给名字为actor02和actor03的消息Actor发消息,所以// 下面这行代码会根据IP端口号,然后给它俩发消息,前提是必须通过IP和端口号能找到// 这俩Actor才行routeActorRef.tell("内容" + i, ActorRef.noSender());}}

服务2的main方法

public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 为了能让服务1(abc)找到actor02,所以我们创建一个叫做actor02的消息ActorActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor02");System.out.println(actorReference.path());System.out.println("sys系统创建完毕");
}

服务3的main方法

public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 为了能让服务1(abc)找到actor03,所以我们创建一个叫做actor03的消息ActorActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor03");System.out.println(actorReference.path());System.out.println("sys系统创建完毕");
}

步骤3: 先启动服务2和服务3,最后启动服务1,会发现服务2和服务3控制台打印如下

消息来自:akka.tcp://sys@127.0.0.1:2551/deadLetters消息内容:内容0

本文演示完毕,已经从技术角度说明了group模式和pool模式的区别,主要就是Actor的生命周期归谁管的问题,是程序员硬编码控制,还是通过路由Actor自己控制,关于生命周期的话题,后续文章会有

有一说一,具体的使用场景我还没有想出来,因为根据现有的结果来看,似乎任何情况都可以使用group的方式,而不用pool的方式,因为我也是调研akka,所以没有真正意义上的实战,也是摸着石头过河,目前我能想到的应该是需要动态拓容的场景,或许能需要pool模式????

下一篇文章:Akka的集群搭建(还没开始写 TODO)

这篇关于Akka-路由模式Group/Pool的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1115625

相关文章

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

golang 对象池sync.Pool的实现

《golang对象池sync.Pool的实现》:本文主要介绍golang对象池sync.Pool的实现,用于缓存和复用临时对象,以减少内存分配和垃圾回收的压力,下面就来介绍一下,感兴趣的可以了解... 目录sync.Pool的用法原理sync.Pool 的使用示例sync.Pool 的使用场景注意sync.

mysql中的group by高级用法详解

《mysql中的groupby高级用法详解》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,本文给大家介绍mysql中的groupby... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

golang实现动态路由的项目实践

《golang实现动态路由的项目实践》本文主要介绍了golang实现动态路由项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习... 目录一、动态路由1.结构体(数据库的定义)2.预加载preload3.添加关联的方法一、动态路由1

SQL Server身份验证模式步骤和示例代码

《SQLServer身份验证模式步骤和示例代码》SQLServer是一个广泛使用的关系数据库管理系统,通常使用两种身份验证模式:Windows身份验证和SQLServer身份验证,本文将详细介绍身份... 目录身份验证方式的概念更改身份验证方式的步骤方法一:使用SQL Server Management S

Nginx路由匹配规则及优先级详解

《Nginx路由匹配规则及优先级详解》Nginx作为一个高性能的Web服务器和反向代理服务器,广泛用于负载均衡、请求转发等场景,在配置Nginx时,路由匹配规则是非常重要的概念,本文将详细介绍Ngin... 目录引言一、 Nginx的路由匹配规则概述二、 Nginx的路由匹配规则类型2.1 精确匹配(=)2

MySQL连接池(Pool)常用方法详解

《MySQL连接池(Pool)常用方法详解》本文详细介绍了MySQL连接池的常用方法,包括创建连接池、核心方法连接对象的方法、连接池管理方法以及事务处理,同时,还提供了最佳实践和性能提示,帮助开发者构... 目录mysql 连接池 (Pool) 常用方法详解1. 创建连接池2. 核心方法2.1 pool.q

Redis高可用-主从复制、哨兵模式与集群模式详解

《Redis高可用-主从复制、哨兵模式与集群模式详解》:本文主要介绍Redis高可用-主从复制、哨兵模式与集群模式的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录Redis高可用-主从复制、哨兵模式与集群模式概要一、主从复制(Master-Slave Repli

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

mysql中的group by高级用法

《mysql中的groupby高级用法》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,下面给大家介绍mysql中的groupby用法... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使