ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp

2024-08-21 00:18

本文主要是介绍ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、 背景

对目前前后端分离的开发开发的大环境下, 前端使用 vue 进行项目的开发, 后端不在使用以前的 jsp 的开发方式进行开发, 因此造成了对于前端推送方案的选型问题, 在项目的开发过程针对于前后端的开发时间和效率等综合考虑进行了一个技术的选型,其中有过多种的方案的考虑。

二、 各种推送方案的比较

1. 各种推送方案的简单介绍

Ajax 轮询:

轮询:缺点,糟糕的用户体验;对服务器压力很大,并造成带宽的极大浪费。

DWR:

DWR 是一个用于改善 web 页面与 Java 类交互的远程服务器端 Ajax 开源框架,可以帮助开发人员开发包含 AJAX 技术的网站。它可以允许在浏览器里的代码使用运行在 WEB 服务器上的 JAVA 函数,就像它就在浏览器里一样。 将 java 代码转为 js 文件,引入 js 文件路径不能更改,原理也是轮询。

Activemq 结合 Ajax:

ActiveMQ 支持 Ajax,利用 ActiveMQ 的“发布/订阅”的特性,来创建高度实时的 web 应用, 需要结合 Amq.js+JQuery 进行处理发布和订阅消息。参考文章官方使用说明可以简单的了解如何使用。

Netty:

基于原生 NIO 实现的高并发框架,配合 websocket 实现消息推送, netty 会单独开一个 websocket 端口处理请求,并不会占用中间件的连接数,而且一个线程可以处理几万个链接。

Spring 对于 Websocket 的支持:

websocket 是 Html5 新增加特性之一,目的是浏览器与服务端建立全双工的通信方式,解决 http 请求-响应带来过多的资源消耗,同时对特殊场景应用提供了全新的实现方式, 比如聊天、股票交易、游戏等对对实时性要求较高的行业领域。

ActiveMQ 对于 Websocket 的支持:

可以通过 stomp 协议简单的处理发布订阅这样的消息模型并且建立在 Websocket 的基础上进行。 STOMP 即 Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连
接格式,允许 STOMP 客户端与任意 STOMP 消息代理(Broker)进行交互。参考文章STOMP Over WebSocket

2. 推送方案的选型

对于上述的各种推送的方案都曾经有过考虑和分析, 最终选取了最后一种方案”ActiveMQ 对于Websocket 的支持”.

DWR 和 Ajax

轮询采用同样的技术原理轮询,性能上首先淘汰掉了这两个方案

Activemq 结合 Ajax

在使用上来说依赖于 amq.js,这个 js 文件还依赖一些基于公共JavaScript 框架: jquery.js、 amq_jquery_adapter.js。因此,使用 amq.js 时,,必须先引入 jquery 库文件和适配器库文件 amq_jquery_adapter.js,其次这个方案的实现原理也是页
面轮询的方式, 就目前我们前端开发的特点上来说,使用的是 vue 不再提倡使用 JQuery 这样的类库,如果采用这样的方式非常的不利于前端的处理, 所以排除。

采用 Websocket

最后的三种方案其实都是采用 Websocket 的方式进行处理, Spring 对于 Websocket 的支持这种方案的使用可以参考Spring WebSocket 实现消息推送,这种方案目前在部门中没有看到过使用不成熟,需要自己去手动维护连接用户的信息并且针对不同的推送信息需要对于客户端进行区分到底是谁需要当前的这个推送的请求.Netty 的这个方案在之前有过使用, 代码量庞大并且当前的开发人员对于这个模块并不是很熟悉且与 spring 对于websocket 的支持有同样的毛病, 需要代码手动维护区分当前的推送需要针对哪些用户或者哪些页面的请求, 前端开发也是蛮复杂的,迁移成本巨大。 这里有一点需要说明,其实 spring也是支持 Stomp 协议Spring+STOMP 实现 WebSocket 广播订阅、权限认证、一对一通讯的,之前对于这方面不太了解,spring 使用本地维护 session 信息支持 Stomp 协议,同时也是支持与 ActiveMQ 集成,但是需要 ActiveMQ 支持 ws 协议;Spring Boot系列十七 Spring Boot 集成 websocket,使用RabbitMQ做为消息代理

spring websocket 配置类

/*** Spring websocket 接入使用** @author: wangji* @date: 2018/04/13 15:37*/
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfig extends AbstractWebSocketMessageBrokerConfigurer {@Resource(name = "taskExecutor")private ThreadPoolTaskExecutor threadPool;/*** 连接的端点,客户端建立连接时需要连接这里配置的端点*/@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/client").setAllowedOrigins("*").setHandshakeHandler(new DefaultHandshakeHandler()).addInterceptors(new HandshakeInterceptor() {/*** 添加权限校验是否登录* 不进行校验可以使用默认的 HttpSessionHandshakeInterceptor** @param serverHttpRequest* @param serverHttpResponse* @param webSocketHandler* @param map* @return* @throws Exception* @see org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor*/@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {boolean result = true;if (!StringUtils.isNotEmpty(LoginInfoUtil.getRequesetUserId())) {result = false;}return result;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {}});}/*** 使用参考** @link {https://blog.csdn.net/elonpage/article/details/78446695?locationNum=5&fps=1}* @see org.springframework.messaging.simp.SimpMessageSendingOperations#convertAndSend(Object, Object) 发送消息(/topic/xxxx,"消息")*/@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes("/app");registry.setUserDestinationPrefix("/user");registry.enableSimpleBroker("/topic", "/queue");}/*** 输入通道参数设置设置多线程(接收消息通道)*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {//线程信息registration.taskExecutor(threadPool);}/*** 输出通道参数配置多线程(发送给给前端的推送通道)*/@Overridepublic void configureClientOutboundChannel(ChannelRegistration registration) {//线程信息registration.taskExecutor(threadPool);}}

发送工具类

*** stomp websocket协议手动发送通知到前端** @author: wangji* @date: 2018/04/16 10:00*/
@Component
public class StompWebsocketNotifier implements InitializingBean {private static final Logger log = LoggerFactory.getLogger(StompWebsocketNotifier.class);private static final String TOPIC_PREFIX = "/topic/";@Resourceprivate ApplicationContext applicationContext;private static SimpMessageSendingOperations simpMessageSendingOperations;@Overridepublic void afterPropertiesSet() throws Exception {simpMessageSendingOperations = applicationContext.getBean(SimpMessageSendingOperations.class);}/*** 推送消息通过 websocket到前端** @param topicName 队列的名称* @param message   发送消息的内容* @return*/public static boolean sendToWebSocketTopic(String topicName, String message) {boolean result = true;if (StringUtils.isNotEmpty(topicName) && StringUtils.isNotEmpty(message)) {try {simpMessageSendingOperations.convertAndSend(TOPIC_PREFIX + topicName, message);} catch (MessagingException e) {result = false;log.error(PrettyLogger.toMessage("push websocket error", "topicName", "message"), topicName, message, e);}}return result;}
}

配置线程池

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="shutdown"><!-- 核心线程数  --><property name="corePoolSize" value="10" /><!-- 最大线程数 --><property name="maxPoolSize" value="50" /><!-- 队列最大长度 >=mainExecutor.maxSize --><property name="queueCapacity" value="1000" /><!-- 线程池维护线程所允许的空闲时间 --><property name="keepAliveSeconds" value="300" /><!-- 线程池对拒绝任务(无线程可用)的处理策略 --><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/></property><property name="threadGroup" value="taskExecutor-Pool"/><property name="threadNamePrefix" value="taskExecutor-Pool"/><property name="waitForTasksToCompleteOnShutdown" value="true"/></bean>

三、 ActiveMQ与WebSocket的结合的使用

ActiveMQ结合Websocket的使用主要集中在前端,后端由于ActiveMQ历史沉淀了许多优秀的ActiveMQ使用的封装的类库,直接调用非常的方便。针对不同的topic队列可以根据前端页面的需求进行相应的订,如下是一个简单的使用Demo参考STOMP Over WebSocket

1. 简单使用

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title><script src="stomp.min.js"></script><script>var url = "ws://10.33.30.255:61614";var client = Stomp.client(url);// client will send heartbeats every 20000msclient.heartbeat.outgoing = 20000;client.heartbeat.incoming = 0;client.connect("", "", function (frame) {// upon connection, subscribe to the destinationvar sub = client.subscribe("/topic/testTopic", function (message) {console.log("receive: " + message.body);});});</script>
</head>
<body></body>
</html>

2. Vue组件封装

根据vue的特点,对于推送请求框架进行了简单的封装,各个页面可以简单的使用组件,需要下面的两个封装的文件
Message-push.vue

<template><!--@input: URI port  destination 详情见下方props--><!--@output:message-get 从服务器端得到的消息--><span></span>
</template>
<script>import Stomp from 'stompjs';export default {name: 'messagePush',props: {URI: {// 连接着服务端的WebSocket的代理的地址  类似于服务器的ip地址type: String,default: '10.11.165.16'},port: {type: String,default: '61614'},URL: String, // 带协议的连接地址,传入这个属性会覆盖上面的两个属性destination: {// 订阅消息的目的地,也就是消息的来源地址 类似于api中的地址;可以同时接收多个消息type: [String, Array],default: ''}},data () {return {client: null};},beforeDestroy() {this.destroy();},mounted() {this.init();},methods: {init() {if (window.WebSocket) {/** * 步骤创建STOMP客户端------连接服务端* */// STOMP javascript 客户端会使用ws://的URL与STOMP 服务端进行交互。let url = `ws://${this.URI}:${this.port}`;if (this.URL !== '') url = this.URL;// 为了创建一个STOMP客户端js对象,你需要使用Stomp.client(url),而这个URL连接着服务端的WebSocket的代理:this.client = Stomp.client(url);this.client.heartbeat.incoming = 0;this.client.heartbeat.outgoing = 1000 * 60;this.client.debug = null;// Stomp 客户端建立后,必须调用它的connect()方法去连接this.client.connect('', '', () => {// 为了在浏览器中接收消息,STOMP客户端必须先订阅一个目的地destination。// body 是字符串,请使用JSON.parse()去转换JSON对象let desList = [];if (typeof this.destination === 'string') {desList.push(this.destination);} else {desList = this.destination;}// 遍历每一个地址,订阅消息for (let des of desList) {((destination) => {this.client.subscribe(destination, (message) => {let _data = null;if (message.body) {// 解析接受到的数据_data = JSON.parse(message.body);} else {// 接受到空消息_data = '';}this.$emit('message-get', _data, destination);});})(des);}});} else {this.$message({message: `你的浏览器版本太低了,不支持Websocket,会导致你接收不到实时数据!'您可以更换一个支持websockets浏览器,如:IE11 或者 chrome 或firefox!`,type: 'warning'});}},destroy() {// 断开连接时,调用disconnect方法this.client.disconnect();this.client = null;}}};
</script>
<style lang='less' scoped></style>

messager.vue

<template><message-push :destination="destination" v-if="start" :URL="url" @message-get="onMessageGet"></message-push>
</template><script>import messagePush from './message-push.vue';import commonService from 'service/common.service';import {ErrorHandleMixin} from 'utils/mixin';export default {props: {destination: {type: [String, Array],default: ''}},mixins: [ErrorHandleMixin],components: {messagePush},data() {return {start: false,url: ''};},methods: {getWsInfo() {commonService.getWsInfo().then(res => {this.url = res.mqAddr;if (this.url) this.start = true;}).catch(this._handleError);},onMessageGet(data, topic) {this.$emit('message-get', data, topic);}},mounted() {this.getWsInfo();this.$emit('init');}};
</script>

3. 组件使用

只需要传入需要订阅的队列的名称和处理的地址即可,如下

<messager destination="/topic/XXXTopic" @message-get="msgPushHandler"></messager>

四、 总结

对于不同的推送方案的了解,在不断的咨询相关的同时和查找资料的阅读相应的官方文档下进行了了解,感谢开源,让我们能够利用优秀的方案不用重复的造轮子。

这篇关于ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1091580

相关文章

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

SpringBoot排查和解决JSON解析错误(400 Bad Request)的方法

《SpringBoot排查和解决JSON解析错误(400BadRequest)的方法》在开发SpringBootRESTfulAPI时,客户端与服务端的数据交互通常使用JSON格式,然而,JSON... 目录问题背景1. 问题描述2. 错误分析解决方案1. 手动重新输入jsON2. 使用工具清理JSON3.

java中long的一些常见用法

《java中long的一些常见用法》在Java中,long是一种基本数据类型,用于表示长整型数值,接下来通过本文给大家介绍java中long的一些常见用法,感兴趣的朋友一起看看吧... 在Java中,long是一种基本数据类型,用于表示长整型数值。它的取值范围比int更大,从-922337203685477

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过