数字化转型--基于事件驱动的中台架构实践(一)

2024-08-22 00:18

本文主要是介绍数字化转型--基于事件驱动的中台架构实践(一),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.背景

2.什么是业务流程驱动引擎

3.核心概念

4.技术实现

5.思考及后续规划


引言:在方舟(业务能力研发协作平台)、水滴(业务能力扩展框架)、业务能力共享模型(业务侧可视化的自定义业务对象)等数字中台基础设施的帮助下,我们的技术体系从幼儿园升入了小学,而事件可视化配置平台和业务流程驱动引擎则让我们完成了小升初。

1.背景

目前我司的业务环境和数据环境基于政府的监管要求而日趋复杂,因业务合规和数据合规的要求导致我们对业务和数据的隔离格外慎重。在这样的政策限定条件下,如何在面对不确定性的复杂业务和环境下环境,如何以安全、高效的方式进行业务复杂度的讲解及业务能力沉淀和复用是我们当前面临的课题。为什么需要业务流程驱动引擎?从业务的流程和业务活动的组织层面来讲,企业的业务实质上是一系列的业务活动基于特定的业务场景或业务规则在时间维度的时序性组合表达。在传统的研发模式中,开发人员在应用内部基于上帝之手般的代码组织聚合业务代码,但是在跨应用的消息触达和通知的场景上帝之手也难免难堪。

例如在业务的抽象和设计中,对于订单创建这个场景,在业务系统中的关联业务需要对订单创建的事件进行感知。如用户购买了一张从杭州飞往背景的机票,在支付环节因网络或某种因素导致支付失败,客服系统需要感知这个订单创建的事件然后基于订单的状态进行相关的障碍进行预测和行程预测等用户体验服务的提升,产品运营人员需要及时感知该机票支付失败后某段时间内依旧没有尝试重新支付,我们需要热线进行跟进。 在这种流程式聚合的场景上帝之手也不好使了。

2.什么是业务流程驱动引擎

业务流程驱动引擎是我们基于业务中台的业务场景打造的无服务器事件总线服务,解决跨应用之间的路由事件,消息注册、订阅、管理等问题,简化接入配合配置,帮助研发人员轻松构建松耦合、分布式的事件驱动架构的中台化的基础设施组件。

3.核心概念

事件业务中数据变化的记录或动作。

生产应用产生数据变化并发送事件消息的应用。

订阅应用:监听或消费事件消息的应用。

业务线数据产生业务部门或业务实体如新零售事业群、客服体验事业群。

消息投递通道:数据产生消息发送的通道如RockMateq或kafk。

事件名称:数据产生数据变化所产生的业务活动如创建订单/修改收货地址等。

消息主题:消息队列 RocketMQ的Topic。

事件编码:消息队列 RocketMQ的Tag。

运行模式:消息投递后消息消费的模式有自主消费模式(订阅方自己监听并处理消息)和托管消费模式(业务流程驱动引擎统一监听并处理消息)。

触发类型:消息触发处理的通道。

系统服务:业务提供的消息触达后调用的处理程序或服务接口。

4.技术实现

为实现事件驱动架构,我们从消息通道统一、云服务配置简化、业务系统接入集成、业务配置可视、业务流程可编排五个方面入手,来建设基于事件消息驱动的业务流程驱动引擎引擎系统。

4.1 统一消息通道

目前的业务中使用了多种消息中间件,在中台化建设中我们统一了消息中间件的选型,统一使用阿里云的消息队列 RocketMQ进行消息生产和消费,通过提供统一的消息总线服务,消息的接入可以简化到只需要调用消息发送的接口即可;系统无需额外进行配置,无需关心业务环境的隔离,我们通过SpringBoot规范提供了我司内部的Starter集成。

案列一:使用消息总线发送事件消息

  1 引入 依赖包:

            <dependency><groupId>com.xxxx.event</groupId><artifactId>business-driven-engine-client</artifactId><version>${business-driven-engine-version}</version></dependency>

使用示例代码:

@Reference(version = "1.0.0", timeout = 3000, check = false, interfaceClass = EventSourceService.class)private EventSourceService eventSourceService;/*** 同步发送事件消息* @param eventSourceMessage 事件消息* @return*/public  Result<Boolean>  publish(EventSourceMessage eventSourceMessage) {try {Result<Boolean> result = eventSourceService.publish(eventSourceMessage);logger.info("EventSourceService.publish### 同步发送信息到事件中心. eventSourceMessage:{}, result={}", eventSourceMessage.toString(), result);return result;} catch (Exception e) {logger.error("EventSourceService.publish### 同步消息发送异常, eventSourceMessage={} msg={}", eventSourceMessage.toString(), e.getMessage());return  Result.failure(ErrorCode.DATA_COLLECT_MESSAGE_SEND_EXCEPTION.getErrCode(), e.getMessage());}}/*** 异步发送事件消息* @param eventSourceMessage 事件消息* @return*/public  Result<Boolean>  asyncPublish(EventSourceMessage eventSourceMessage) {try {Result<Boolean> result = eventSourceService.asyncPublish(eventSourceMessage);logger.info("EventSourceService.asyncPublish### 异步发送信息到事件中心. eventSourceMessage:{}, result={}", eventSourceMessage.toString(), result);return result;} catch (Exception e) {logger.error("EventSourceService.asyncPublish### 异步消息发送异常, eventSourceMessage={} msg={}", eventSourceMessage.toString(), e.getMessage());return  Result.failure(ErrorCode.DATA_COLLECT_MESSAGE_SEND_EXCEPTION.getErrCode(), e.getMessage());}}

案列二:使用集成Starter发送事件消息

  1 引入依赖包:

            <dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 引用消息生产者

@Autowired
private ProducerBean producerBean;

 ProducerBean已经在应用启动的时候构建完成。

 通过nacos配置中心对项目环境、日常环境、预发环境、生产环境进行了隔离,配置统一在配置中心配置,业务方无需关注技术细节。

    private static final Logger logger = LoggerFactory.getLogger(ProducerRepository.class);/*** Job处理线程池*/ExecutorService callbackExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,Runtime.getRuntime().availableProcessors() + 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactoryBuilder().setNameFormat("event-driven-engine-callback-executor-pool-%d").build());@Autowiredprivate ProducerBean producerBean;/*** 发布事件消息** @param eventSourceMessage*/public SendResult rocketPublish(EventSourceMessage eventSourceMessage, EventDO event) {//消息结构化MessageBody messageBody = new MessageBody(JSONObject.toJSONString(eventSourceMessage));Message message = new Message(event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), messageBody.getMessage().getBytes());return producerBean.send(message);}/*** 异步发送事件消息,只要不抛异常就是成功* @param eventSourceMessage* @param event*/public void asyncPublish(EventSourceMessage eventSourceMessage, EventDO event) {producerBean.setCallbackExecutor(callbackExecutor);//消息结构化MessageBody messageBody = new MessageBody(JSONObject.toJSONString(eventSourceMessage));Message message = new Message(event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), messageBody.getMessage().getBytes());producerBean.sendAsync(message, new SendCallback() {@Overridepublic void onSuccess(final SendResult sendResult) {if (StringUtils.isNotBlank(sendResult.getMessageId())) {logger.info("异步消息发送成功: topic={}, eventCode={}, bizKey={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey());} else {logger.info("异步消息发送失败: topic={}, eventCode={}, bizKey={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey());}}@Overridepublic void onException(final OnExceptionContext context) {logger.info("异步消息发送异常: topic={}, eventCode={}, bizKey={}, messageId={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), context.getMessageId());}});}

案列三:使用消息托管模式处理相关业务(业务驱动引擎统一消费消息并调用绑定的业务处理逻辑)

1 引入二方包

            <dependency><groupId>com.xxxx.event</groupId><artifactId>business-driven-engine-client</artifactId><version>${business-driven-engine-version}</version></dependency>

2 实现EventHandleService接口

基于便捷和统一的考量,我们统一提供泛化调用接口,使用Dubbo的泛化调用和版本隔离机制。具体实现如下:

                                     

托管消费会在消息触达后通过泛化调用工厂调用这个接口进行相关的业务处理。

案列四:使用自主消费模式处理相关业务(业务方监听消息并业务处理逻辑)

1 引入二方包

<dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 在yaml文件中配置订阅分组

spring:event:rocketmqConfig:groupId: GID_RMQ_TOPIC_EVENT_BUS

3 编写自主消费的订阅处理程序

@Slf4j
@Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")
@Component
public class BusinessDrivenEngineMessageHandler implements MessageHandler {@Overridepublic Action handle(String topic, String tag, Message message) {if(null == message){throw new EventBizException(EventEngineErrorCode.EVENT_MESSAGE_IS_EMPTY.getErrCode());}String appName =  DomainApplicationConstant.APPLICATION.toUpperCase();Subscribe subscribeAnn = this.getClass().getDeclaredAnnotation(Subscribe.class);String eventCode = subscribeAnn.tag();String subscribeCode = CommonConstant.SUBSCRIBE_PREFIX + eventCode + CommonConstant.SEPARATOR +  appName;log.info("EventDrivenEngineListener.handle#  messageId={}, topic={}, tag={}, key={}", message.getMsgID(), message.getTopic(), message.getTag(), message.getKey());Entry entry = null;try{// 定义资源。为了便于标识,资源名称定义为Group ID和Topic的组合。Group ID和Topic可以通过消息队列RocketMQ控制台获得。entry = SphU.entry("EventDrivenEngineListener.consume:" + topic);int reconsume = message.getReconsumeTimes();/** 消息已经重试了3次,如果不需要再次消费,则返回成功*/if (reconsume == CommonConstant.RETRIES_NUMBER) {log.info("EventTopicConsumeHandler.consume# 消息已经重试了{}次, 设置为消费成功", reconsume);return Action.CommitMessage;}return Action.CommitMessage;} catch(BlockException e){log.error("EventDrivenEngineListener.consume# 消息消费异常. messages={}, msg={}, exception={}", JSONObject.toJSONString(message), e.getMessage(), e);return Action.ReconsumeLater;} finally {if (entry != null) {entry.exit();}}}
}

4 注册订阅处理服务

    @Autowiredprivate BusinessDrivenEngineMessageHandler businessDrivenEngineMessageHandler;@Beanpublic EventBusRocketmqConsumeListener busRocketmqConsumeListener(){EventBusRocketmqConsumeListener eventBusRocketmqConsumeListener = new EventBusRocketmqConsumeListener();eventBusRocketmqConsumeListener.registerHandler(businessDrivenEngineMessageHandler);return eventBusRocketmqConsumeListener;}

备注:使用该注解的时候需要设置topic和tag(tag = "*"表示定约该消息下的所有Tag)

Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")

案列五:使用自主消费模式触发业务驱动引擎

              此方式适用于业务方自主监听消息,然后通过触发器触发在配置平台配置好的任务清单,详请请下下文。

1 引入二方包

<dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 在yaml文件中配置订阅分组

spring:event:rocketmqConfig:groupId: GID_RMQ_TOPIC_EVENT_BUS

3 编写自主消费的订阅处理程序

@Slf4j
@Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")
@Component
public class BusinessDrivenEngineMessageHandler implements MessageHandler {@Reference(version = "1.0.0", timeout = 3000, check = false, interfaceClass = EventSourceService.class)private EventSourceService eventSourceService;@Overridepublic Action handle(String topic, String tag, Message message) {if(null == message){throw new EventBizException(EventEngineErrorCode.EVENT_MESSAGE_IS_EMPTY.getErrCode());}log.info("EventDrivenEngineListener.handle#  messageId={}, topic={}, tag={}, key={}", message.getMsgID(), message.getTopic(), message.getTag(), message.getKey());try{Result<Boolean> result = eventSourceService.trigger(new EventMessageTriggerParam(message.getMsgID(), message.getTopic(), message.getTag(), message.getKey(), new String(message.getBody(), StandardCharsets.UTF_8)));return result.isSuccess() && result.getData() ? Action.CommitMessage : Action.ReconsumeLater;} catch(EventBizException e){log.error("EventDrivenEngineListener.consume# 消息消费异常. messages={}, msg={}, exception={}", JSONObject.toJSONString(message), e.getMessage(), e);return Action.ReconsumeLater;}}
}

4 注册订阅处理服务

    @Autowiredprivate BusinessDrivenEngineMessageHandler businessDrivenEngineMessageHandler;@Beanpublic EventBusRocketmqConsumeListener busRocketmqConsumeListener(){EventBusRocketmqConsumeListener eventBusRocketmqConsumeListener = new EventBusRocketmqConsumeListener();eventBusRocketmqConsumeListener.registerHandler(businessDrivenEngineMessageHandler);return eventBusRocketmqConsumeListener;}

4.2 阿里云服务配置简化

在业务调研中发现业务使用了多种消息通道,大量的Topic和队列不知道用在了什么业务场景、Topic是那也业务那个应用创建的、有哪些业务和应用订阅消费Topic、这些生产者和订阅者是为什么业务服务的,更有甚者在同一实例中跑着测试的和生产的Topic。这里的乱象我不过多叙述,基于这样的乱象,我们决定对事件的生产者和订阅者的关系进行维护,使其能够结构化的展现其组织关系,同时保证事件业务语义的场景化使其可以被复用。

4.2.1. 事件配置管理

为寻求事件治理的可视、可控、可管理,我们决定在研发协同层面做文章,具体实现方案如下:

  • 事件定义场景化:每个事件都有唯一的标识,每个事件都有业务化的语义描述;

  • 统一配置管理:提供统一的配合操作界面;

  • 集成消息通道:屏蔽底层的消息队列,对事件的消息投递业务可以选择多种消息投递通道;

  • 确定生产和消费的关系:通过结构化的方式展现事件原宿关系,如这个事件是那个业务线的哪个应用生产的、事件的业务场景是什么、使用什么消息通道、关联那个Topic以及这个事件被哪些应用订阅、订阅方订阅消息的用途、运行的模式等信息。

     

                                                         

 

4.2.2 订阅配置管理

为管理业务中的订阅者我们对订阅信息使用配置界面进行配置,具体方案如下:

  • 订阅关系层次化:每个订阅者都有唯一的标识及订阅分组;

  • 订阅数据同步:通过阿里云接口同步创建订阅者的分组信息;

  • 业务可配置化:订阅者消息消费、触达运行等都通过界面进行配置下发;

                                                                                   

                                                                                                                                     订阅配置界面

                                                                     

4.2.3. 订阅消费服务可配置可编排

为实现一个业务事件触发后,多个订阅者的消息触发任务的编排,我们对订阅者的消费处理逻辑做了抽象和封装,具体方案如下:

  • 任务可编排:事件订阅者可以同时挂载多个触发处理任务,任务之间的执行顺序可以通过拖拽实现调整;

  • 任务可复用:挂载的任务是独立的系统服务接口,是封装的细粒度的业务逻辑,可以复用;

  • 调用通道泛化:提供统一的接口,通过实现接口和版本进行泛化调用;                                                                     

                                                                 

5.思考及后续规划

业务驱动引擎一期上线后,彻底解决了依靠手工维护数据和配置的困境,使数据归属更加明确,使业务场景和业务语义更清晰,使业务关系更明确,使事件驱动和复用变得可能。但随着业务的发展,驱动任务的串行和并行的场景快速增加,我们又面临着新的课题。归纳起来有以下几点:

  • 如何保证并行网关和串行网关之间的输入和输出可以管道式的执行

  • 如何构建企业级的业务流程编排引擎实现地代码的业务开发?

这篇关于数字化转型--基于事件驱动的中台架构实践(一)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1094698

相关文章

Mybatis嵌套子查询动态SQL编写实践

《Mybatis嵌套子查询动态SQL编写实践》:本文主要介绍Mybatis嵌套子查询动态SQL编写方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、实体类1、主类2、子类二、Mapper三、XML四、详解总结前言MyBATis的xml文件编写动态SQL

apache的commons-pool2原理与使用实践记录

《apache的commons-pool2原理与使用实践记录》ApacheCommonsPool2是一个高效的对象池化框架,通过复用昂贵资源(如数据库连接、线程、网络连接)优化系统性能,这篇文章主... 目录一、核心原理与组件二、使用步骤详解(以数据库连接池为例)三、高级配置与优化四、典型应用场景五、注意事

python web 开发之Flask中间件与请求处理钩子的最佳实践

《pythonweb开发之Flask中间件与请求处理钩子的最佳实践》Flask作为轻量级Web框架,提供了灵活的请求处理机制,中间件和请求钩子允许开发者在请求处理的不同阶段插入自定义逻辑,实现诸如... 目录Flask中间件与请求处理钩子完全指南1. 引言2. 请求处理生命周期概述3. 请求钩子详解3.1

Jvm sandbox mock机制的实践过程

《Jvmsandboxmock机制的实践过程》:本文主要介绍Jvmsandboxmock机制的实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景二、定义一个损坏的钟1、 Springboot工程中创建一个Clock类2、 添加一个Controller

Mysql中的用户管理实践

《Mysql中的用户管理实践》:本文主要介绍Mysql中的用户管理实践,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录13. 用户管理13.1 用户 13.1.1 用户信息 13.1.2 创建用户 13.1.3 删除用户 13.1.4 修改用户

在Java中基于Geotools对PostGIS数据库的空间查询实践教程

《在Java中基于Geotools对PostGIS数据库的空间查询实践教程》本文将深入探讨这一实践,从连接配置到复杂空间查询操作,包括点查询、区域范围查询以及空间关系判断等,全方位展示如何在Java环... 目录前言一、相关技术背景介绍1、评价对象AOI2、数据处理流程二、对AOI空间范围查询实践1、空间查

qtcreater配置opencv遇到的坑及实践记录

《qtcreater配置opencv遇到的坑及实践记录》我配置opencv不管是按照网上的教程还是deepseek发现都有些问题,下面是我的配置方法以及实践成功的心得,感兴趣的朋友跟随小编一起看看吧... 目录电脑环境下载环境变量配置qmake加入外部库测试配置我配置opencv不管是按照网上的教程还是de

golang实现动态路由的项目实践

《golang实现动态路由的项目实践》本文主要介绍了golang实现动态路由项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习... 目录一、动态路由1.结构体(数据库的定义)2.预加载preload3.添加关联的方法一、动态路由1

Vue 2 项目中配置 Tailwind CSS 和 Font Awesome 的最佳实践举例

《Vue2项目中配置TailwindCSS和FontAwesome的最佳实践举例》:本文主要介绍Vue2项目中配置TailwindCSS和FontAwesome的最... 目录vue 2 项目中配置 Tailwind css 和 Font Awesome 的最佳实践一、Tailwind CSS 配置1. 安

MyBatis分页插件PageHelper深度解析与实践指南

《MyBatis分页插件PageHelper深度解析与实践指南》在数据库操作中,分页查询是最常见的需求之一,传统的分页方式通常有两种内存分页和SQL分页,MyBatis作为优秀的ORM框架,本身并未提... 目录1. 为什么需要分页插件?2. PageHelper简介3. PageHelper集成与配置3.