Akka-路由策略

2024-08-28 17:44
文章标签 路由 策略 akka

本文主要是介绍Akka-路由策略,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文将演示如下示例:
用IDEA创建3个akka项目(创建3个服务),分别叫做service1,service2,service3,然后使用service1,通过不同的路由策略,给2和3发送消息,代码演示之前,先介绍一下Akka的路由策略,它们分别是:

  1. 轮询(Round Robin)
    • service2和service3轮流接到service1的消息
    • 保证消息被均匀分配到各个Actor,适用于需要均匀负载的场景。
    • 配置文件中的内容为round-robin-group或者round-robin-pool先不用管这俩有什么区别
  2. 随机(Random)
    • service2和service3不一定谁收到service1的消息
    • 适用于当Actor处理时间差异不大,且需要一定随机性的场景。
    • 配置文件中的内容为random-group或者random-pool
  3. 空闲(Smallest Mailbox)
    • 选择当前拥有最少消息的Actor实例发送消息。
    • 旨在减少等待时间,将消息发送到最有可能立即处理它的Actor。
    • 配置文件中的内容为smallest-mailbox-pool,不支持group
  4. 广播(Broadcast)
    • service2和service3同时都会收到service1的消息
    • 适用于需要将消息同时分发给多个Actor进行处理的情况。
    • 配置文件中的内容为broadcast-group或者broadcast-pool
  5. 分散聚集(Scatter/Gather First Completed)
    • 将消息分散给各个处理者,只使用第一个回复消息,并忽略其他回复。
    • 适用于需要从多个处理者中选择最快响应的场景。
    • 配置文件中的内容为scatter-gather-group或者scatter-gather-pool
  6. 一致性哈希(Consistent Hashing)
    • 根据消息内容的哈希值来确定消息应该发送到哪个Actor实例。
    • 适用于需要将特定数据发送到特定Actor,以实现分布式缓存或数据分区的情况。
    • 配置文件中的内容为consistent-hashing-group或者consistent-hashing-pool
  7. 尾切(Tail Chopping)
    • 类似于分散聚集,但不是一次性向所有Actor发送消息,而是每向一个Actor发送消息后等待一小段时间。
    • 这可以减少网络负载,同时仍能保证较快的响应时间。
    • 配置文件中的内容为tail-chopping-group或者tail-chopping-pool
  8. 平衡池(Balancing Pool)
    • 多个Actor共享同一个邮箱,一旦有空闲就处理邮箱中的任务。
    • 这种策略可以确保所有Actor都处于繁忙状态,适用于本地集群且Actor任务处理时间相近的场景。
    • 配置文件中的内容为balancing-pool,不支持group

步骤1: 创建3个Maven项目,这3个项目的maven的pom文件都相同,也可以参考这里搭建服务,依赖如下:

<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.4.20</version>
</dependency>
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.4.20</version>
</dependency>

步骤2: 创建Actor,Akka中的Actor就是一个继承UntypedActor的普通Java类,将这个类复制3份,注意包名一定要完全相同,然后service1,service2,service3这三个项目每个都存放一份这个类,注意本例的包名,保证三个项目包路径都一样

package akka.demo.actor;// 这个Actor01类,必须在三个项目中都有,且全限定名(包.类名称)完全相同import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;
import com.typesafe.config.ConfigFactory;public class MyActor extends UntypedActor {@Overridepublic void onReceive(Object o) throws Throwable {System.out.println("消息来自:" + getSender().path() + "消息内容:" + o);}
}

步骤3: 增加配置文件,在这3个项目中的resources文件夹下,都创建一个叫做application.conf的配置文件,3个服务的配置文件分别如下
service1的application.conf文件如下

akka{actor{provider="akka.remote.RemoteActorRefProvider"deployment{# 表示有一个叫abc的Actor,这个Actor其实是个路由器,因为在Akka的概念中,路由器也是Actor# 每次使用abc这个路由器发送消息的时候,这个路由器会将消息发给service2的actor02,或者发送给service3的actor03# 具体发给2还是3,要看我们使用哪种路由策略(也就是本文主要内容)/abc{router = round-robin-group # 注意这里,这里就是路由策略routees.paths = ["akka.tcp://sys@127.0.0.1:2552/user/actor02", # 这个就是service2的IP端口号,下面有service2的配置"akka.tcp://sys@127.0.0.1:2553/user/actor03"  # 这个就是service3的IP端口号,下面有service3的配置]}}}# 表示我自己的IP端口remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2551}}
}

service2的application.conf文件如下

akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2552}}
}

service3的application.conf文件如下,除了端口号,其他和service2是一样的

akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2553}}
}

步骤4: 创建Main方法,用来启动服务,分别在这三个项目中的任意位置,增加如下main方法:
service1的main方法

public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 读取service1的application.conf文件,根据这个文件里面的abc的信息,创建一个路由器类型的Actor引用ActorRef routeActorRef = system.actorOf(FromConfig.getInstance().props(), "abc");// 通过该引用,给service2和service3发消息,具体2和3谁能接到消息,取决于service1配置文件中的router属性for (int i = 0; i < 20; i++) {Thread.sleep(3000);routeActorRef.tell("内容" + i, ActorRef.noSender());}
}

service2的main方法

public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 创建一个名字叫做actor02的Actor,actorOf方法返回了引用,但是我们除了打印不做别的事情// 这行代码表示在service2的服务中,创建完毕了一个叫做actor02的Actor,这样service1远程调用// 的时候,能调用到这个actor02ActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor02");System.out.println(actorReference.path()); // 这行打印的内容是与service1的配置文件中的routees.paths属性有关系的System.out.println("sys系统创建完毕");
}

service3的main方法

public static void main(String[] args) throws Throwable {// 没什么好说的,和service2基本一样,就是创建的Actor的名字叫做actor03ActorSystem system = ActorSystem.create("sys");ActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor03");System.out.println(actorReference.path()); // 这行打印的内容是与service1的配置文件中的routees.paths属性有关系的System.out.println("sys系统创建完毕");
}

步骤5: 启动服务,注意,我们先运行2和3的main方法,然后再运行1的main方法
由于在service1的配置文件中,router = round-robin-group,这表示我们使用的路由策略是轮询,所以运行service1之后,在2和3的控制台会轮流打印出0,1,2,3,4,5,6…

本文示例到此结束了,下面说点其他的
1.配置文件中我们可以见到xxx-group和xxx-pool,其中group和pool表示Akka中路由的两种模式
2.文中仅仅演示了多个机器调用的情况,还不属于完整的集群配置,这个后续文章中再补充
3.文中使用的配置文件叫做application.conf,这是akka默认的配置名,假如我的配置文件叫xzy.conf怎么办,这就需要我们指定配置文件名称,代码是ActorSystem system = ActorSystem.create("sys",, ConfigFactory.load("xzy.conf"));
4.配置文件中有一个叫做router,还有一个叫routees,其中router表示路由,routees表示路由的目标,从示例中你应该是可以看出来的

下一篇文章:Akka的路由模式Group/Pool

这篇关于Akka-路由策略的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115447

相关文章

前端缓存策略的自解方案全解析

《前端缓存策略的自解方案全解析》缓存从来都是前端的一个痛点,很多前端搞不清楚缓存到底是何物,:本文主要介绍前端缓存的自解方案,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、为什么“清缓存”成了技术圈的梗二、先给缓存“把个脉”:浏览器到底缓存了谁?三、设计思路:把“发版”做成“自愈”四、代码

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S

Vue实现路由守卫的示例代码

《Vue实现路由守卫的示例代码》Vue路由守卫是控制页面导航的钩子函数,主要用于鉴权、数据预加载等场景,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、概念二、类型三、实战一、概念路由守卫(Navigation Guards)本质上就是 在路

MySQL设置密码复杂度策略的完整步骤(附代码示例)

《MySQL设置密码复杂度策略的完整步骤(附代码示例)》MySQL密码策略还可能包括密码复杂度的检查,如是否要求密码包含大写字母、小写字母、数字和特殊字符等,:本文主要介绍MySQL设置密码复杂度... 目录前言1. 使用 validate_password 插件1.1 启用 validate_passwo

Go语言使用Gin处理路由参数和查询参数

《Go语言使用Gin处理路由参数和查询参数》在WebAPI开发中,处理路由参数(PathParameter)和查询参数(QueryParameter)是非常常见的需求,下面我们就来看看Go语言... 目录一、路由参数 vs 查询参数二、Gin 获取路由参数和查询参数三、示例代码四、运行与测试1. 测试编程路

Django中的函数视图和类视图以及路由的定义方式

《Django中的函数视图和类视图以及路由的定义方式》Django视图分函数视图和类视图,前者用函数处理请求,后者继承View类定义方法,路由使用path()、re_path()或url(),通过in... 目录函数视图类视图路由总路由函数视图的路由类视图定义路由总结Django允许接收的请求方法http

Python实现网格交易策略的过程

《Python实现网格交易策略的过程》本文讲解Python网格交易策略,利用ccxt获取加密货币数据及backtrader回测,通过设定网格节点,低买高卖获利,适合震荡行情,下面跟我一起看看我们的第一... 网格交易是一种经典的量化交易策略,其核心思想是在价格上下预设多个“网格”,当价格触发特定网格时执行买

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Redis过期删除机制与内存淘汰策略的解析指南

《Redis过期删除机制与内存淘汰策略的解析指南》在使用Redis构建缓存系统时,很多开发者只设置了EXPIRE但却忽略了背后Redis的过期删除机制与内存淘汰策略,下面小编就来和大家详细介绍一下... 目录1、简述2、Redis http://www.chinasem.cn的过期删除策略(Key Expir

利用Python实现时间序列动量策略

《利用Python实现时间序列动量策略》时间序列动量策略作为量化交易领域中最为持久且被深入研究的策略类型之一,其核心理念相对简明:对于显示上升趋势的资产建立多头头寸,对于呈现下降趋势的资产建立空头头寸... 目录引言传统策略面临的风险管理挑战波动率调整机制:实现风险标准化策略实施的技术细节波动率调整的战略价