akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?

2024-04-09 04:32

本文主要是介绍akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在讨论lagom之前,先从遇到的需求开始介绍:现代企业的it系统变得越来越多元化、复杂化了。线上、线下各种系统必须用某种方式集成在一起。从各种it系统的基本共性分析:最明显的特征应该是后台数据库的角色了,起码,大家都需要使用数据。另外,每个系统都可能具备大量实时在线用户、海量数据特性,代表着对数据处理能力有极大的要求,预示系统只有通过分布式处理方式才能有效运行。

一个月前开始设计一个企业的it系统,在讨论数据中台时就遇到这样的需求。这个所谓的数据中台的主要作用是为整体系统提供一套统一的数据使用api,前后连接包括web,mobile,desktop的前端系统以及由多种传统及分布式数据库系统,形成一个统一的数据使用接口。实际上,数据库连接不只是简单的读写操作,还需要包括所有实时的数据处理:根据业务要求对数据进行相应的处理然后使用。那么这是一个怎样的系统呢?首先,它必须是分布式的:为了对付大量的前端用户同时调用同一个api,把这个api的功能同时分派到多个服务器上运行是个有效的解决方法。这是个akka-cluster-sharding模式。数据中台api是向所有内部系统以及一些特定的外部第三方系统开放的,用http标准协议支持各系统与数据后台的连接也是合理的。这个akka-http, akka-grpc可以胜任。然后各系统之间的集成可以通过一个流运算工具如kafka实现各聚合根之间的交互连接。

似乎所有需要的工具都齐备了,其中akka占了大部分功能。但有些问题是:基于akka技术栈来编程或多或少有些门槛要求。最起码需要一定程度的akka开发经验。更不用提组织一个开发团队了。如果市面上有个什么能提供相应能力的开发工具,可以轻松快速上手的,那么项目开发就可以立即启动了。

现在来谈谈lagom:lagom是一套scala栈的微服务软件开发工具。从官方文档介绍了解到lagom主要提供了一套服务接口定义及服务功能开发框架。值得一提的是服务功能可以是集群分片模式的。走了一遍lagom的启动示范代码,感觉这是一套集开发、测试、部署为一体的框架(framework)。在这个框架里按照规定开发几个简单的服务api非常顺利,很方便。这让我对使用lagom产生了兴趣,想继续调研一下利用lagoom来开发上面所提及数据中台的可行性。lagom服务接入部分是通过play实现的。play我不太熟悉,想深入了解一下用akka-http替代的可行性,不过看来不太容易。最让我感到失望的是lagom的服务分片(service-sharding)直接就是akka-cluster那一套:cluster、event-sourcing、CQRS什么的都需要自己从头到尾重新编写。用嵌入的kafka进行服务整合与单独用kafka也不会增加太多麻烦。倒是lagom提供的这个集开发、测试、部署为一体的框架在团队开发管理中应该能发挥良好的作用。

在我看来:服务接入方面由于涉及身份验证、使用权限、二进制文件类型数据交换等使用akka-http,akka-grpc会更有控制力。服务功能实现直接就用akka-cluster-sharding,把计算任务分布到各节点上,这个我们前面已经介绍过了。

所以,最后还是决定直接用akka-typed来实现这个数据中台。用了一个多月时间做研发,到现在看来效果不错,能够符合项目要求。下面是一些用akka-typed实现业务集成的过程介绍。首先,系统特点是功能分片:系统按业务条块分成多个片shardregion,每个片里的entity负责处理一项业务的多个功能。多个用户调用一项业务功能代表多个entity分布在不同的集群节点上并行运算。下面是一个业务群的代码示范:

object Shards extends LogSupport {def apply(mgoHosts: List[String],trace: Boolean, keepAlive: FiniteDuration, pocurl: String)(implicit authBase: AuthBase): Behavior[Nothing] = {Behaviors.setup[Nothing] { ctx =>val sharding = ClusterSharding(ctx.system)log.stepOn = truelog.step(s"starting cluster-monitor ...")(MachineId("",""))ctx.spawn(MonitorActor(),"abs-cluster-monitor")log.step(s"initializing sharding for ${Authenticator.EntityKey} ...")(MachineId("",""))val authEntityType = Entity(Authenticator.EntityKey) { entityContext =>Authenticator(entityContext.shard,mgoHosts,trace,keepAlive)}.withStopMessage(Authenticator.StopAuthenticator)sharding.init(authEntityType)log.step(s"initializing sharding for ${CrmWorker.EntityKey} ...")(MachineId("",""))val crmEntityType = Entity(CrmWorker.EntityKey) { entityContext =>CrmWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(CrmWorker.StopWorker)sharding.init(crmEntityType)log.step(s"initializing sharding for ${GateKeeper.EntityKey} ...")(MachineId("",""))val gateEntityType = Entity(GateKeeper.EntityKey) { entityContext =>GateKeeper(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(GateKeeper.StopGateKeeper)sharding.init(gateEntityType)log.step(s"initializing sharding for ${PluWorker.EntityKey} ...")(MachineId("",""))val pluEntityType = Entity(PluWorker.EntityKey) { entityContext =>PluWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(PluWorker.StopWorker)sharding.init(pluEntityType)log.step(s"initializing sharding for ${PocWorker.EntityKey} ...")(MachineId("",""))val pocEntityType = Entity(PocWorker.EntityKey) { entityContext =>PocWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive,pocurl)}.withStopMessage(PocWorker.StopWorker)sharding.init(pocEntityType)Behaviors.empty}}
}

可以看到,不同类型的片以不同的EntityKey来代表。前端接入是基于akka-http的,如下:

object CrmRoute extends LogSupport {def route(entityRef: EntityRef[CrmWorker.Command])(implicit ec: ExecutionContext, jsStreaming: EntityStreamingSupport, timeout: Timeout): akka.http.scaladsl.server.Route = {concat(pathPrefix("ismember") {parameter(Symbol("faceid")) { fid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.IsMemberFace(fid, _)).map {case CrmWorker.ValidMember(memberId) => memberIdcase CrmWorker.InvalidMember(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}},pathPrefix("getmember") {parameter(Symbol("memberid")) { mid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.GetMemberInfo(mid, _)).map {case CrmWorker.MemberInfo(json) => HttpEntity(MediaTypes.`application/json`,json)case CrmWorker.InvalidMemberInfo(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}})}
}

各项业务功能调用通过entityRef.ask发送给了某个用户指定节点上的entity。akka的actor是线程的再细分,即一个actor可能与其它成千上万个actor共享一条线程。所以绝对不容许任何blocking。我是用下面示范的模式来实现non-blocking的:

  def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {val (shopId,posId) = entityId.split(':').toList match {case sid::pid::Nil  => (sid,pid) }implicit val loc = Messages.MachineId(shopId,posId)log.stepOn = trace//    Behaviors.supervise(Behaviors.setup[Command] { ctx =>implicit val ec = ctx.executionContextctx.setReceiveTimeout(keepAlive, Idle)Behaviors.withTimers[Command] { timer =>Behaviors.receiveMessage[Command] {case IsMemberFace(fid, replyTo) =>log.step(s"CrmWorker: IsMemberFace($fid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(isMemberFace(fid)) {case Success(mid) => {if (mid._1.isEmpty) {replyTo ! InvalidMember(mid._2)Done(loc.shopid, loc.posid, s"IsMemberFace with Error ${mid._2}")} else {replyTo ! ValidMember(mid._1)Done(loc.shopid, loc.posid, s"IsMemberFace.")}}case Failure(err) =>log.error(s"CrmWorker: IsMemberFace Error: ${err.getMessage}")replyTo ! InvalidMember(err.getMessage)Done(loc.shopid, loc.posid, s"IsMemberFace with error: ${err.getMessage}")}Behaviors.samecase GetMemberInfo(mid, replyTo) =>log.step(s"CrmWorker: GetMemberInfo($mid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(getMemberInfo(mid)) {case Success(json) => {replyTo ! MemberInfo(json)Done(loc.shopid, loc.posid, s"GetMemberInfo with json ${json}")}case Failure(err) =>log.error(s"CrmWorker: GetMemberInfo Error: ${err.getMessage}")replyTo ! InvalidMemberInfo(err.getMessage)Done(loc.shopid, loc.posid, s"GetMemberInfo with error: ${err.getMessage}")}Behaviors.samecase Idle =>// after receive timeoutshard ! ClusterSharding.Passivate(ctx.self)Behaviors.samecase StopWorker =>Behaviors.stopped(() => log.step(s"CrmWorker: {$shopId,$posId} passivated to stop.")(MachineId(shopId, posId)))case Done(shopid, termid, work) =>if (maybeMgoClient.isDefined)maybeMgoClient.get.close()log.step(s"CrmWorker: {$shopid,$termid} finished $work")(MachineId(shopid,termid))Behaviors.samecase _ => Behaviors.same}.receiveSignal {case (_,PostStop) =>log.step(s"CrmWorker: {$shopId,$posId} stopped.")(MachineId(shopId, posId))Behaviors.same}}}//   ).onFailure(SupervisorStrategy.restart)}

主要是使用ctx.pipeToSelf(work)把一个Future转换成内部消息。这里的work的实现最终必须返回Future类型,如下面的示范:

object CrmServices extends JsonConverter with LogSupport {import MgoHelpers._def validMember(docs: Seq[Document], faceid: String): Future[(String,String)] = {val memberId: (String, String) = docs match {case Nil => ("", s"faceid[$faceid]不存在!")case docs =>val member = MemberInfo.fromDocument(docs.head)if (member.expireDt.compareTo(mgoDateTimeNow) < 0)("", s"会员:${member.memberId}-${member.memberName}会籍已过期!")else(member.memberId, "")}FastFuture.successful(memberId)}def isMemberFace(faceid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[(String,String)] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.FACEID,faceid)).toFuture()for {mi <- memberInfo(id,msg) <- validMember(mi,faceid)} yield (id,msg)}def getMemberInfo(memberid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[String] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.MEMBERID,memberid)).toFuture()for {docs <- memberInfojstr <- FastFuture.successful(if(docs.isEmpty) "" else toJson(MemberInfo.fromDocument(docs.head)))} yield jstr}}

另外,由于每个用户第一次调用一项业务功能时akka-cluster-shardregion都会自动在某个节点上构建一个新的entity,如果上万个用户使用过某个功能,那么就会有万个entity及其所占用的资源如mongodb客户端等停留在内存里。所以在完成一项功能运算后应关闭entity,释放占用的资源。这个是通过shard ! ClusterSharding.passivate(ctx.self)实现的。

这篇关于akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887177

相关文章

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

Springboot整合Redis主从实践

《Springboot整合Redis主从实践》:本文主要介绍Springboot整合Redis主从的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言原配置现配置测试LettuceConnectionFactory.setShareNativeConnect

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

SpringBoot整合Sa-Token实现RBAC权限模型的过程解析

《SpringBoot整合Sa-Token实现RBAC权限模型的过程解析》:本文主要介绍SpringBoot整合Sa-Token实现RBAC权限模型的过程解析,本文给大家介绍的非常详细,对大家的学... 目录前言一、基础概念1.1 RBAC模型核心概念1.2 Sa-Token核心功能1.3 环境准备二、表结

MQTT SpringBoot整合实战教程

《MQTTSpringBoot整合实战教程》:本文主要介绍MQTTSpringBoot整合实战教程,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录MQTT-SpringBoot创建简单 SpringBoot 项目导入必须依赖增加MQTT相关配置编写

MybatisPlus3.3.1整合clickhouse的过程

《MybatisPlus3.3.1整合clickhouse的过程》:本文主要介绍MybatisPlus3.3.1整合clickhouse的过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定... 前言ClickHouse是俄罗斯Yandex发布的一款数据分析型数据库支持sql语法,详情可以访问官网,

Spring Boot 整合 Redis 实现数据缓存案例详解

《SpringBoot整合Redis实现数据缓存案例详解》Springboot缓存,默认使用的是ConcurrentMap的方式来实现的,然而我们在项目中并不会这么使用,本文介绍SpringB... 目录1.添加 Maven 依赖2.配置Redis属性3.创建 redisCacheManager4.使用Sp