C#实现数据采集系统-数据反写(1)MQTT订阅接收消息

2024-08-23 00:12

本文主要是介绍C#实现数据采集系统-数据反写(1)MQTT订阅接收消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息
  2. 反写内容写入通信类,添加到写入队列中 链接-消息内容处理和写入通信类队列
  3. 实现Modbustcp通信写入

具体实现

1.MQTT订阅,接收消息

Mqtt实现采集数据转发

Mqtt控制类增加订阅方法

  1. 增加一个通用的订阅方法,需要的参数是一个主题和一个委托,将主题跟对应的委托方法对应存储,然后再mqtt中订阅,收到对应的主题消息,然后执行对应的方法。
 public void SubscribeTopic(string topic, Action<string, string> topicAction){//订阅}

然后需要一个键值对用于存储这个关系

 private Dictionary<string, Action<string, string>> _topicActions;

订阅方法实现:订阅主题,添加到_topicActions,如果已经连接,则直接订阅,没有连接,则等待连上的时候自动订阅,增加锁来确保订阅成功

/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}

在连接方法中,添加订阅

在这里插入图片描述

public void MqttConnect()
{while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}
  1. 添加接收消息事件
 //客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}

完整Mqtt代码

 public class MqttControllor{private MqttConfig _config;private string _clientId;MqttClientOptions _clientOptions;private IMqttClient _mqttClient;private readonly object _topicActionsLock = new object();private Dictionary<string, Action<string, string>> _topicActions;public MqttControllor(MqttConfig config, bool isAutoConnect = true){_topicActions = new Dictionary<string, Action<string, string>>();_config = config;_clientId = config.ClientId == "" ? Guid.NewGuid().ToString() : config.ClientId;MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(_config.Ip, _config.Port).WithCredentials(_config.Username, _config.Password).WithClientId(_clientId);_clientOptions = optionsBuilder.Build();_mqttClient = new MqttFactory().CreateMqttClient();// 客户端连接关闭事件_mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;//客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;if (isAutoConnect){Task.Run(() =>{MqttConnect();});}}/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine($"客户端已断开与服务端的连接……");//断开重连_mqttClient = new MqttFactory().CreateMqttClient();MqttConnect();return Task.CompletedTask;}public void MqttConnect(){while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}Console.WriteLine($"客户端已连接到服务端……");//连接成功,订阅主题lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}/// <summary>/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅/// </summary>/// <param name="topic"></param>/// <param name="topicAction"></param>public void SubscribeTopic(string topic, Action<string, string> topicAction){lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}/// <summary>/// 推送消息/// </summary>/// <param name="topic">主题</param>/// <param name="data">消息内容</param>/// <param name="qsLevel"></param>/// <param name="retain"></param>public void Publish(string topic, string data, int qsLevel = 0, bool retain = false){qsLevel = Math.Clamp(qsLevel, 0, 2);if (!_mqttClient.IsConnected){throw new Exception("mqtt未连接");}var message = new MqttApplicationMessage{Topic = topic,PayloadSegment = Encoding.UTF8.GetBytes(data),QualityOfServiceLevel = (MqttQualityOfServiceLevel)qsLevel,Retain = retain // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。};_mqttClient.PublishAsync(message);}}

这篇关于C#实现数据采集系统-数据反写(1)MQTT订阅接收消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097795

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM

OpenCV实现实时颜色检测的示例

《OpenCV实现实时颜色检测的示例》本文主要介绍了OpenCV实现实时颜色检测的示例,通过HSV色彩空间转换和色调范围判断实现红黄绿蓝颜色检测,包含视频捕捉、区域标记、颜色分析等功能,具有一定的参考... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取

基于Python实现一个Windows Tree命令工具

《基于Python实现一个WindowsTree命令工具》今天想要在Windows平台的CMD命令终端窗口中使用像Linux下的tree命令,打印一下目录结构层级树,然而还真有tree命令,但是发现... 目录引言实现代码使用说明可用选项示例用法功能特点添加到环境变量方法一:创建批处理文件并添加到PATH1

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

Nexus安装和启动的实现教程

《Nexus安装和启动的实现教程》:本文主要介绍Nexus安装和启动的实现教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Nexus下载二、Nexus安装和启动三、关闭Nexus总结一、Nexus下载官方下载链接:DownloadWindows系统根

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte