示例:ActiveMQ+Windows服务创建消息转发器

2024-05-26 16:48

本文主要是介绍示例:ActiveMQ+Windows服务创建消息转发器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、目的:通过应用ActiveMQ实现,消息的转发

       应用场景:在阿里云部署ActiveMQ服务器,在银行柜台部署注册消息客户端,在后台发送消息,实现互联网消息的互通

 

二、安装ActiveMQ

 

1、下载地址:http://activemq.apache.org/download.html 

2、修改用户名和密码:用户名和密码在conf中的jetty-realm.properties中配置 默认admin@admin

3、启动服务../bin/activemq.bat

3、浏览器进入到127.0.0.1:8161 查看消息信息

 

三、发送和注册消息

1、发送端服务

 /// <summary>/// 发送消息服务类/// </summary>class ActiveMQServer{private IConnectionFactory factory;string _userName;string _passWord;bool _flag;public void Init(string user, string pw, string brokerUri = "tcp://192.168.1.11:61616"){_userName = user;_passWord = pw;try{//初始化工厂factory = new ConnectionFactory(brokerUri);ActiveMQDomain.Instance.LogInfo("初始化成功");_flag = true;}catch (Exception ex){ActiveMQDomain.Instance.LogInfo("初始化失败");ActiveMQDomain.Instance.LogError(ex);}}public void SendMessage(string ms){if (!_flag) return;//建立工厂连接using (IConnection connection = factory.CreateConnection()){//通过工厂连接创建Session会话using (ISession session = connection.CreateSession()){//通过会话创建生产者,方法里new出来MQ的QueueIMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"));//创建一个发送消息的对象ITextMessage message = prod.CreateTextMessage();//给这个消息对象赋实际的消息message.Text = ms; //设置消息对象的属性,是Queue的过滤条件也是P2P的唯一指定属性message.Properties.SetString("filter", "demo");prod.Send(message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);message.Text += "发送成功" + Environment.NewLine;}}}}

 

2、注册接收端服务

/// <summary>/// 注册消息客户端/// </summary>class ActiveMQClient{public event Action<IMessage> BeginMessage;public void Init(string brokerUri= "tcp://localhost:61616"){//创建连接工厂IConnectionFactory factory = new ConnectionFactory(brokerUri);//通过工厂构建连接IConnection connection = factory.CreateConnection();//这个是连接的客户端名称标识connection.ClientId = "firstQueueListener";//启动连接,监听的话要主动启动连接connection.Start();//通过连接创建一个会话ISession session = connection.CreateSession();//通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"), "filter='demo'");//注册监听事件consumer.Listener += l=>{ITextMessage msg = (ITextMessage)l;//异步调用下,否则无法回归主线程Task.Run(() => this.BeginMessage?.Invoke(msg));};//connection.Stop();//connection.Close();  }}

注册客户端的接口:(观察者模式注册消息)

  /// <summary> 注册消息对象 </summary>public interface IRegisterMessage{void Notice(string message);}

ActiveMQ应用服务(提供外部应用)

  /// <summary>/// ActiveMQ注册接收总服务/// </summary>public class ActiveMQService{#region - 客户端 -ActiveMQClient _activeMQClient = new ActiveMQClient();List<IRegisterMessage> _registerMessages = new List<IRegisterMessage>();/// <summary> 启动客户端 </summary>public void StartClient(){_activeMQClient.Init(ActiveMQDomain.Instance.GetBrokerUri());_activeMQClient.BeginMessage += l =>{foreach (var item in _registerMessages){ITextMessage textMessage = l as ITextMessage;item.Notice(textMessage.Text);}};}/// <summary> 注册订阅消息 </summary>public void Register<T>() where T : IRegisterMessage{T t = Activator.CreateInstance<T>();_registerMessages.Add(t);}#endregion#region - 服务端 -ActiveMQServer _activeMQServer = new ActiveMQServer();/// <summary> 启动服务端 </summary>public void StartServer(){string user = ActiveMQDomain.Instance.GetUser();string pw = ActiveMQDomain.Instance.GetUser();string url = ActiveMQDomain.Instance.GetBrokerUri();_activeMQServer.Init(user, pw, url);}/// <summary> 发送消息 </summary>public void SendMessage(string message){_activeMQServer.SendMessage(message);}#endregion}

四、示例

启动发送端服务并发送消息

 public partial class MainWindow : Window{public MainWindow(){InitializeComponent();ServerManager.Instance.StartServer();}private void Button_Click(object sender, RoutedEventArgs e){ServerManager.Instance.SendMessage(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " - " + this.txt_message.Text);}}

启动客户端并注册消息接收器

Windows服务宿主启动

public partial class Service1 : ServiceBase{public Service1(){InitializeComponent();}protected override void OnStart(string[] args){ServerManager.Instance.InitLogger();ServerManager.Instance.LogInfo("Window服务准备启动!");ServerManager.Instance.LogInfo("准备开始启动客户端");try{ServerManager.Instance.StartClient();ServerManager.Instance.LogInfo("启动客户端完成");ServerManager.Instance.LogInfo("Window服务启动完成!");}catch (Exception ex){ServerManager.Instance.LogInfo("启动客户端错误");ServerManager.Instance.LogError(ex);}}protected override void OnStop(){ServerManager.Instance.LogInfo("Window服务停止!");}}

注册多个消息接收器并启动客户端

    ActiveMQService _activeMQService = new ActiveMQService();public void StartClient(){//  Message:注册消息_activeMQService.Register<LedMessageNotice>();_activeMQService.Register<VoiceMessageNotice>();//  Message:启动客户端_activeMQService.StartClient();}

LedMessageNotice和VoiceMessageNotice分别实现IRegisterMessage接口

 public class LedMessageNotice : IRegisterMessage{public void Notice(string message){Console.WriteLine(message);ModuleManager.Instance.LogInfo(message);}}
  public class VoiceMessageNotice : IRegisterMessage{public void Notice(string message){Console.WriteLine(message);ModuleManager.Instance.LogInfo(message);SpeechService.Instance.Speek(message);}}

五、部署

1、将ActiveMQ部署在阿里云服务器上,并启动服务

2、将windows服务客户端部分注册在银行大厅局域网主机内,当接收到消息转发给各个消息输出设备

3、消息发送部分部署在可连接到阿里云的设备上进行发送消息

注意:

1)部署在阿里云上的地址同样用tcp:\\****:61616

2)设置多个客户端(如有多个客户端连接是需要设置不同的ID否则报错):

            //这个是连接的客户端名称标识
            connection.ClientId = "firstQueueListener1";

3)发送到不同的客户端可以通过过滤条件来设置

         客户端设置分组为“demo1”

             //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
            IMessageConsumer consumer = session.CreateConsumer(new                      Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"), "filter='demo1'");

         服务端发送到分组为“demo1”

                    //设置消息对象的属性,是Queue的过滤条件也是P2P的唯一指定属性
                    message.Properties.SetString("filter", "demo1");

        (此时发送的数据只发送给demo1的客户端)

 

 

示例代码:https://github.com/HeBianGu/MQ-ActiveMQMessageDispatcher.git

这篇关于示例:ActiveMQ+Windows服务创建消息转发器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1005029

相关文章

Nginx进行平滑升级的实战指南(不中断服务版本更新)

《Nginx进行平滑升级的实战指南(不中断服务版本更新)》Nginx的平滑升级(也称为热升级)是一种在不停止服务的情况下更新Nginx版本或添加模块的方法,这种升级方式确保了服务的高可用性,避免了因升... 目录一.下载并编译新版Nginx1.下载解压2.编译二.替换可执行文件,并平滑升级1.替换可执行文件

Go语言使用select监听多个channel的示例详解

《Go语言使用select监听多个channel的示例详解》本文将聚焦Go并发中的一个强力工具,select,这篇文章将通过实际案例学习如何优雅地监听多个Channel,实现多任务处理、超时控制和非阻... 目录一、前言:为什么要使用select二、实战目标三、案例代码:监听两个任务结果和超时四、运行示例五

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

MySQL CTE (Common Table Expressions)示例全解析

《MySQLCTE(CommonTableExpressions)示例全解析》MySQL8.0引入CTE,支持递归查询,可创建临时命名结果集,提升复杂查询的可读性与维护性,适用于层次结构数据处... 目录基本语法CTE 主要特点非递归 CTE简单 CTE 示例多 CTE 示例递归 CTE基本递归 CTE 结

Spring AI使用tool Calling和MCP的示例详解

《SpringAI使用toolCalling和MCP的示例详解》SpringAI1.0.0.M6引入ToolCalling与MCP协议,提升AI与工具交互的扩展性与标准化,支持信息检索、行动执行等... 目录深入探索 Spring AI聊天接口示例Function CallingMCPSTDIOSSE结束语

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例

《PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例》词嵌入解决NLP维度灾难,捕捉语义关系,PyTorch的nn.Embedding模块提供灵活实现,支持参数配置、预训练及变长... 目录一、词嵌入(Word Embedding)简介为什么需要词嵌入?二、PyTorch中的nn.Em

Python Web框架Flask、Streamlit、FastAPI示例详解

《PythonWeb框架Flask、Streamlit、FastAPI示例详解》本文对比分析了Flask、Streamlit和FastAPI三大PythonWeb框架:Flask轻量灵活适合传统应用... 目录概述Flask详解Flask简介安装和基础配置核心概念路由和视图模板系统数据库集成实际示例Stre

Spring Bean初始化及@PostConstruc执行顺序示例详解

《SpringBean初始化及@PostConstruc执行顺序示例详解》本文给大家介绍SpringBean初始化及@PostConstruc执行顺序,本文通过实例代码给大家介绍的非常详细,对大家的... 目录1. Bean初始化执行顺序2. 成员变量初始化顺序2.1 普通Java类(非Spring环境)(