通用接口开放平台设计与实现——(17)消息服务端之消息发送器

本文主要是介绍通用接口开放平台设计与实现——(17)消息服务端之消息发送器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前面我们介绍了服务端,对接收到的消息,无论是请求消息还是响应消息,我们都是将消息主题传入工厂,构建对应的处理器进行处理。

同理,对于发送的消息,我们也可以采用相同的处理模式。

这里面有个问题需要思考,发送器与消息的关系是什么?

发送器相当于业务消息的封装,消息的参数是通过发送器属性设置,还是调用发送方法时作为方法参数处理?

实际上,这两种方式都能实现我们的目的,具体分析如下:

对于发送器对应的消息主题,我们希望具体的发送器直接内置固化,不能交由外部传入,否则会造成混乱,因此使用构造参数的方法来设置。

对于消息标识,有两种场景,一是消息新创建的时候,这时候消息标识直接使用请求消息构造方法中生成的标识即可,二是消息重发,这时候,我们需要读取原有的消息标识,即重发过程中保持消息不变,这样服务端才能根据消息标识去重。因此消息标识这个参数可以在调用方法时传入。

对于消息内容,同样有两种情况,一种是对应业务事件,包括标识、事件,另外一种,则对应部分技术框架部分的操作,如登录,需要将账号和密钥构建一个json对象传入。

综上考虑,我们设置两个属性,消息主题和消息内容,其中消息主题为私有属性,仅允许通过构造函数传入,消息内容则设置了一个保护方法,允许被子类调用。

处理框架由几个主要的部分组成:
1.消息主题:我们根据业务需求,将事件转换为消息主题,消息主题相当于是对不同类型的消息进行分类,是我们实现业务处理的基础支撑部分。
2.消息发送器工厂:根据消息主题来查找对应的发送器完整类名,然后通过反射技术来创建具体的消息发送器对象。
3.消息发送器:真正进行具体业务逻辑处理的地方,消息分为两类,消息发送器也有两类,一方面,可抽取公用部分形成父类,另一方面,可被具体的业务逻辑处理器继承。

下面来一一介绍。

消息主题

上面说了,消息主题是基础数据,我们需要在原有基础上,增加一个sender属性,存放消息发送器的完整类名

/*** 消息主题* @author wqliu* @date 2021-08-21**/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName("ip_api_message_topic")
public class ApiMessageTopic extends BaseEntity {……/*** 处理器*/@TableField("handler")private String handler;/*** 发送器*/@TableField("sender")private String sender;/*** 响应主题编码*/@TableField("response_topic_code")private String responseTopicCode;……}

消息发送器工厂

使用了设计模式中的简单工厂模式,根据消息主题编码拿到发送器的完整路径,通过反射实现发送器的实例化。

/*** 消息发送器工厂* @author wqliu* @date 2022-01-29 16:30**/
public  class MessageSenderFactory {private MessageSenderFactory(){};public static MessageSender createSender(String topic){//使用反射技术获取类Class<MessageSender> messageSender=null;try {//根据消息主题获取对应的消息处理类名ApiMessageTopicService service = SpringUtil.getBean(ApiMessageTopicService.class);String senderName = service.getSenderByCode(topic);messageSender = (Class<MessageSender>) Class.forName(senderName);//返回消息发送器类的实例return messageSender.newInstance();}catch (CustomException e){throw new MessageException("S101",e.getMessage());}catch (Exception e){throw new MessageException("S102","消息发送器不存在");}}
}

消息发送器

消息发送有两个维度,按消息类型分为请求和响应,按发送场景分为新创建和重发,组合出4种情况,但按照我们对于消息传输可靠性的设计思路,由消息发送方对消息进行重发,因此实际并不需要对响应消息重发。
1.新创建消息
请求消息:根据订阅列表查询要发送的目的地。
响应消息:将请求消息中的发送方作为目的地。

2.重发消息
只会对请求消息进行重发,并且基于消息日志构建,可以直接从记录中获取到要发送给谁。

因为消息分了两类,并且对于请求消息和响应消息的处理逻辑是不同的,相应的,消息发送器也有两个,分别是请求消息发送器RequestMessageSender和响应消息发送器ResponseMessageSender,并且可以提取这两个发送器的公用操作,形成一个抽象父类MessageSemder。

具体的业务消息发送器,则会继承RequestMessageSenderr或ResponseMessageSender,覆写其中的部分方法即可。

公用处理器

请求与响应发送器的父类,主要是公共部分复用,定义公共属性消息主题与消息内容。

/*** 消息发送器基类* @author  wqliu* @date  2021-10-5 14:02
*/
@Slf4j
public  class MessageSender {/*** 消息主题*/private String topic;/*** 消息内容*/private String content;public MessageSender(String topic){this.topic=topic;}/*** 获取消息主题*/public String getTopic() {return this.topic;}/*** 设置消息内容* @param content*/public void setContent(String content) {this.content = content;}/*** 获取消息内容*/public String getContent() {return this.content;}/*** 应用程序配置*/protected AppConfig appConfig= SpringUtil.getBean(AppConfig.class);/*** 消息日志服务*/protected ApiMessageLogService apiMessageLogService= SpringUtil.getBean(ApiMessageLogService.class);}

请求消息发送器

请求消息发送器比较复杂,会在以下两个场景下使用

  1. 服务端收到生产者的业务消息时,需要找到订阅该消息主题的所有消费者,推送消息。
  2. 通过消息日志实现的消息重发功能。

这两个场景的逻辑,特别是输入数据是不同的,场景1只需要消息发送内容就行了;场景2则是已经明确知道要发给谁,消息内容是什么,并且必须保证原消息标识不变。

场景1使用void sendMessage(String content)方法来实现,内部需要查找订阅列表及实现数据权限过滤,并将处理拿到的数据(要发给谁,发什么内容),调用场景2的方法去执行发送工作。
场景2使用void sendMessage(String appCode,String content, String id)方法来实现,实现真正的消息发送,内部实现了消息日志的处理

/*** 消息发送器基类* @author  wqliu* @date  2021-10-5 14:02
*/
@Slf4j
public  class RequestMessageSender extends MessageSender {protected ApiMessageSubscriptionService apiMessageSubscriptionService= SpringUtil.getBean(ApiMessageSubscriptionService.class);protected  ApiDataPermissionService apiDataPermissionService=SpringUtil.getBean(ApiDataPermissionService.class);/*** 数据权限通配符*/public static final String DATA_PERMISSION_ALL = "*";public RequestMessageSender(String topic){super(topic);}/*** 发送消息* @param topic 消息主题* @param content 消息内容* @param id 消息标识*/public  void sendMessage(String appCode,String content, String id){// 生成请求消息RequestMessage message = new RequestMessage();// 使用已有ID重置默认生成的ID,用于关联消息if(StringUtils.isNotBlank(id)){message.setId(id);}//设置相关属性message.setTopic(super.getTopic());message.setContent(content);message.setPublishAppCode(appConfig.getApiPlatformMessage().getMessageServerAppCode());//获取发送通道Channel channel= MessageServerHolder.getChannel(appCode);if(channel!=null && channel.isActive()){ChannelFuture channelFuture = channel.writeAndFlush(message);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(StringUtils.isBlank(id)){//设置状态message.setStatus(MessageStatusEnum.REQUESTED.name());message.setSendCount(message.getSendCount() + 1);//创建日志apiMessageLogService.createRequestPart(message);}else {// 更新发送次数apiMessageLogService.incrementSendCount(id);}}});}else{//创建日志apiMessageLogService.createRequestPart(message);}}/*** 发送请求消息* @param topic 消息主题* @param content 消息内容* @param id 消息标识*/public  void sendMessage(String content){//查找是否有消息订阅者,无则直接终止后续处理List<String> subscriberList = apiMessageSubscriptionService.getSubscriberList(super.getTopic());if (CollectionUtils.isEmpty(subscriberList)) {return;}//遍历订阅者,发送消息subscriberList.stream().forEach(appCode -> {//数据权限过滤boolean hasDataPermission = dataPermissionFilter(content, appCode);if (hasDataPermission) {//发送消息sendMessage(appCode,content,null);}});}/*** 数据权限过滤* @param content 消息内容,通常是业务实体标识* @param appCode 应用编码* @return true,有权限,false 无权限*/protected boolean dataPermissionFilter(String content,String appCode){//默认返回true,不进行数据权限控制,可被需要进行数据权限控制的子类覆写return true;}}

预留了数据权限过滤方法dataPermissionFilter,默认不进行数据权限控制,可被需要进行数据权限控制的子类覆写。

响应消息发送器

响应消息发送器不需要转发消息,相对简单一些,预留了一个设置消息响应内容setResponseContent用于子类覆写,例如登录场景下,需要根据登录请求消息携带的账号密钥进行身份验证,然后根据验证构建不同的结果。

/*** @author wqliu* @date 2022-1-31 8:14**/
@Data
public class ResponseMessageSender extends MessageSender{private String result;private String errorCode;private String errorMessage;public ResponseMessageSender(String topic) {super(topic);//默认设置响应结果为成功this.result= MessageResponseResultEnum.SUCCESS.name();}/*** 发送响应** @param channel        通道* @param requestMessage 请求消息*/public void sendMessage(Channel channel, RequestMessage requestMessage) {// 组织响应消息ResponseMessage responseMessage = new ResponseMessage();responseMessage.setPublishAppCode(appConfig.getApiPlatformMessage().getMessageServerAppCode());responseMessage.setRequestMessageId(requestMessage.getId());//设置主题responseMessage.setTopic(this.getTopic());//设置默认值responseMessage.setResult(this.result);responseMessage.setErrorCode(this.errorCode);responseMessage.setErrorMessage(this.errorMessage);//设置响应setResponseContent(requestMessage,responseMessage,channel);// 发送响应给请求方channel.writeAndFlush(responseMessage);// 更新消息日志apiMessageLogService.updateResponsePart(responseMessage);}/*** 设置响应消息内容** @param requestMessage  请求消息* @param responseMessage 响应消息* @param channel         通道*/protected void setResponseContent(RequestMessage requestMessage,ResponseMessage responseMessage,Channel channel) {}}

这篇关于通用接口开放平台设计与实现——(17)消息服务端之消息发送器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/391096

相关文章

Nginx 配置跨域的实现及常见问题解决

《Nginx配置跨域的实现及常见问题解决》本文主要介绍了Nginx配置跨域的实现及常见问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来... 目录1. 跨域1.1 同源策略1.2 跨域资源共享(CORS)2. Nginx 配置跨域的场景2.1

Python中提取文件名扩展名的多种方法实现

《Python中提取文件名扩展名的多种方法实现》在Python编程中,经常会遇到需要从文件名中提取扩展名的场景,Python提供了多种方法来实现这一功能,不同方法适用于不同的场景和需求,包括os.pa... 目录技术背景实现步骤方法一:使用os.path.splitext方法二:使用pathlib模块方法三

CSS实现元素撑满剩余空间的五种方法

《CSS实现元素撑满剩余空间的五种方法》在日常开发中,我们经常需要让某个元素占据容器的剩余空间,本文将介绍5种不同的方法来实现这个需求,并分析各种方法的优缺点,感兴趣的朋友一起看看吧... css实现元素撑满剩余空间的5种方法 在日常开发中,我们经常需要让某个元素占据容器的剩余空间。这是一个常见的布局需求

HTML5 getUserMedia API网页录音实现指南示例小结

《HTML5getUserMediaAPI网页录音实现指南示例小结》本教程将指导你如何利用这一API,结合WebAudioAPI,实现网页录音功能,从获取音频流到处理和保存录音,整个过程将逐步... 目录1. html5 getUserMedia API简介1.1 API概念与历史1.2 功能与优势1.3

Java实现删除文件中的指定内容

《Java实现删除文件中的指定内容》在日常开发中,经常需要对文本文件进行批量处理,其中,删除文件中指定内容是最常见的需求之一,下面我们就来看看如何使用java实现删除文件中的指定内容吧... 目录1. 项目背景详细介绍2. 项目需求详细介绍2.1 功能需求2.2 非功能需求3. 相关技术详细介绍3.1 Ja

spring中的ImportSelector接口示例详解

《spring中的ImportSelector接口示例详解》Spring的ImportSelector接口用于动态选择配置类,实现条件化和模块化配置,关键方法selectImports根据注解信息返回... 目录一、核心作用二、关键方法三、扩展功能四、使用示例五、工作原理六、应用场景七、自定义实现Impor

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解

PostgreSQL中MVCC 机制的实现

《PostgreSQL中MVCC机制的实现》本文主要介绍了PostgreSQL中MVCC机制的实现,通过多版本数据存储、快照隔离和事务ID管理实现高并发读写,具有一定的参考价值,感兴趣的可以了解一下... 目录一 MVCC 基本原理python1.1 MVCC 核心概念1.2 与传统锁机制对比二 Postg

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri