ActiveMQ集群下的消息回流功能

2024-04-23 07:38

本文主要是介绍ActiveMQ集群下的消息回流功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

"丢失"的消息

  如果有broker1和broker2通过networkConnector连接,有一个consumer1连接到broker1,一个consumer2连接到broker2,程序往broker1上面发送30条消息,这时consumer2连接到broker2消费消息,当consumer2消费了15条消息时,broker2挂掉了。 但是还剩下15条消息在broker2上面,这些消息就好像消息了,除非broker2重启了,然后有消费者连接到broker2上来消费消息,遇到这样的情况该怎么办呢?

  从5.6版本起,在destinationPolicy上新增的选择replayWhenNoConsumers,这个选项使得broker2上有需要转发的消息但是没有消费者时,把消息回流到它原来的broker1上,同时需要把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发,activemq.xml配置如下:

多个broker配置

<policyEntry queue=">" enableAudit="false">
   <networkBridgeFilterFactory>
     <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
   </networkBridgeFilterFactory>
</policyEntry>
配置 ActiveMQ的静态网络链接
必须写上duplex="true"
<networkConnectors><networkConnector name="local network" duplex="true" uri="static://(tcp://ip:prot,tcp://ip:port)"/>
</networkConnectors>

配置允许双向连接

先运行发送消息程序,然后运行61716端口的程序接收消息,再接收了5条后将程序断掉,再去运行61616端口的程序,也是可以接收消息的,这样就实现了消息的回流。

消息发送程序:

复制代码
 1 import javax.jms.Connection;2 import javax.jms.ConnectionFactory;3 import javax.jms.Destination;4 import javax.jms.MessageProducer;5 import javax.jms.Session;6 import javax.jms.TextMessage;7 8 import org.apache.activemq.ActiveMQConnectionFactory;9 
10 public class JmsSend {
11     public static void main(String[] args) throws Exception {
12         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
13         Connection connection = connectionFactory.createConnection();
14         connection.start();
15         
16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
17         Destination destination = session.createQueue("my-queue");
18         
19         MessageProducer producer = session.createProducer(destination);
20         for(int i = 0;i < 10;i++){
21             TextMessage message = session.createTextMessage("message,1212 --->" + i);
22             Thread.sleep(1000);
23             //通过生产者发出消息
24             producer.send(message);
25         }
26         session.commit();
27         session.close();
28         connection.close();
29     }
30 }
复制代码
复制代码

 61616端口接收消息程序:

复制代码
 1 import javax.jms.Connection;2 import javax.jms.ConnectionFactory;3 import javax.jms.Destination;4 import javax.jms.MessageConsumer;5 import javax.jms.Session;6 import javax.jms.TextMessage;7 8 import org.apache.activemq.ActiveMQConnectionFactory;9 
10 public class JmsReceiver {
11     public static void main(String[] args) throws Exception {
12         ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
13         Connection connection =  cf.createConnection();
14         connection.start();
15         
16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
17         Destination destination = session.createQueue("my-queue");
18         MessageConsumer consumer = session.createConsumer(destination);
19         int i = 0;
20         while(i < 10){
21             Thread.sleep(3000);
22             i++;
23             TextMessage message = (TextMessage)consumer.receive();
24             session.commit();
25             System.out.println("111接收到的消息是:"+message.getText());
26         }
27         session.close();
28         connection.close();
29     }
30 }
复制代码

 61716端口接收消息程序:

复制代码
 1 import javax.jms.Connection;2 import javax.jms.ConnectionFactory;3 import javax.jms.Destination;4 import javax.jms.MessageConsumer;5 import javax.jms.Session;6 import javax.jms.TextMessage;7 8 import org.apache.activemq.ActiveMQConnectionFactory;9 
10 public class JmsReceiver2 {
11     public static void main(String[] args) throws Exception {
12         ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.81:61716");
13         Connection connection =  cf.createConnection();
14         connection.start();
15         
16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
17         Destination destination = session.createQueue("my-queue");
18         MessageConsumer consumer = session.createConsumer(destination);
19         int i = 0;
20         while(i < 10){
21             Thread.sleep(3000);
22             i++;
23             TextMessage message = (TextMessage)consumer.receive();
24             session.commit();
25             System.out.println("222接收到的消息是:"+message.getText());
26         }
27         session.close();
28         connection.close();
29     }
30 }

这篇关于ActiveMQ集群下的消息回流功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/928178

相关文章

使用EasyPoi快速导出Word文档功能的实现步骤

《使用EasyPoi快速导出Word文档功能的实现步骤》EasyPoi是一个基于ApachePOI的开源Java工具库,旨在简化Excel和Word文档的操作,本文将详细介绍如何使用EasyPoi快速... 目录一、准备工作1、引入依赖二、准备好一个word模版文件三、编写导出方法的工具类四、在Export

JS纯前端实现浏览器语音播报、朗读功能的完整代码

《JS纯前端实现浏览器语音播报、朗读功能的完整代码》在现代互联网的发展中,语音技术正逐渐成为改变用户体验的重要一环,下面:本文主要介绍JS纯前端实现浏览器语音播报、朗读功能的相关资料,文中通过代码... 目录一、朗读单条文本:① 语音自选参数,按钮控制语音:② 效果图:二、朗读多条文本:① 语音有默认值:②

C#实现高性能拍照与水印添加功能完整方案

《C#实现高性能拍照与水印添加功能完整方案》在工业检测、质量追溯等应用场景中,经常需要对产品进行拍照并添加相关信息水印,本文将详细介绍如何使用C#实现一个高性能的拍照和水印添加功能,包含完整的代码实现... 目录1. 概述2. 功能架构设计3. 核心代码实现python3.1 主拍照方法3.2 安全HBIT

录音功能在哪里? 电脑手机等设备打开录音功能的技巧

《录音功能在哪里?电脑手机等设备打开录音功能的技巧》很多时候我们需要使用录音功能,电脑和手机这些常用设备怎么使用录音功能呢?下面我们就来看看详细的教程... 我们在会议讨论、采访记录、课堂学习、灵感创作、法律取证、重要对话时,都可能有录音需求,便于留存关键信息。下面分享一下如何在电脑端和手机端上找到录音功能

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

Debian 13升级后网络转发等功能异常怎么办? 并非错误而是管理机制变更

《Debian13升级后网络转发等功能异常怎么办?并非错误而是管理机制变更》很多朋友反馈,更新到Debian13后网络转发等功能异常,这并非BUG而是Debian13Trixie调整... 日前 Debian 13 Trixie 发布后已经有众多网友升级到新版本,只不过升级后发现某些功能存在异常,例如网络转

Redis中哨兵机制和集群的区别及说明

《Redis中哨兵机制和集群的区别及说明》Redis哨兵通过主从复制实现高可用,适用于中小规模数据;集群采用分布式分片,支持动态扩展,适合大规模数据,哨兵管理简单但扩展性弱,集群性能更强但架构复杂,根... 目录一、架构设计与节点角色1. 哨兵机制(Sentinel)2. 集群(Cluster)二、数据分片

基于Java和FFmpeg实现视频压缩和剪辑功能

《基于Java和FFmpeg实现视频压缩和剪辑功能》在视频处理开发中,压缩和剪辑是常见的需求,本文将介绍如何使用Java结合FFmpeg实现视频压缩和剪辑功能,同时去除数据库操作,仅专注于视频处理,需... 目录引言1. 环境准备1.1 项目依赖1.2 安装 FFmpeg2. 视频压缩功能实现2.1 主要功

使用Python实现无损放大图片功能

《使用Python实现无损放大图片功能》本文介绍了如何使用Python的Pillow库进行无损图片放大,区分了JPEG和PNG格式在放大过程中的特点,并给出了示例代码,JPEG格式可能受压缩影响,需先... 目录一、什么是无损放大?二、实现方法步骤1:读取图片步骤2:无损放大图片步骤3:保存图片三、示php