C#实现数据采集系统-数据反写(1)MQTT订阅接收消息

2024-08-23 00:12

本文主要是介绍C#实现数据采集系统-数据反写(1)MQTT订阅接收消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息
  2. 反写内容写入通信类,添加到写入队列中 链接-消息内容处理和写入通信类队列
  3. 实现Modbustcp通信写入

具体实现

1.MQTT订阅,接收消息

Mqtt实现采集数据转发

Mqtt控制类增加订阅方法

  1. 增加一个通用的订阅方法,需要的参数是一个主题和一个委托,将主题跟对应的委托方法对应存储,然后再mqtt中订阅,收到对应的主题消息,然后执行对应的方法。
 public void SubscribeTopic(string topic, Action<string, string> topicAction){//订阅}

然后需要一个键值对用于存储这个关系

 private Dictionary<string, Action<string, string>> _topicActions;

订阅方法实现:订阅主题,添加到_topicActions,如果已经连接,则直接订阅,没有连接,则等待连上的时候自动订阅,增加锁来确保订阅成功

/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}

在连接方法中,添加订阅

在这里插入图片描述

public void MqttConnect()
{while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}
  1. 添加接收消息事件
 //客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}

完整Mqtt代码

 public class MqttControllor{private MqttConfig _config;private string _clientId;MqttClientOptions _clientOptions;private IMqttClient _mqttClient;private readonly object _topicActionsLock = new object();private Dictionary<string, Action<string, string>> _topicActions;public MqttControllor(MqttConfig config, bool isAutoConnect = true){_topicActions = new Dictionary<string, Action<string, string>>();_config = config;_clientId = config.ClientId == "" ? Guid.NewGuid().ToString() : config.ClientId;MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(_config.Ip, _config.Port).WithCredentials(_config.Username, _config.Password).WithClientId(_clientId);_clientOptions = optionsBuilder.Build();_mqttClient = new MqttFactory().CreateMqttClient();// 客户端连接关闭事件_mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;//客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;if (isAutoConnect){Task.Run(() =>{MqttConnect();});}}/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine($"客户端已断开与服务端的连接……");//断开重连_mqttClient = new MqttFactory().CreateMqttClient();MqttConnect();return Task.CompletedTask;}public void MqttConnect(){while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}Console.WriteLine($"客户端已连接到服务端……");//连接成功,订阅主题lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}/// <summary>/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅/// </summary>/// <param name="topic"></param>/// <param name="topicAction"></param>public void SubscribeTopic(string topic, Action<string, string> topicAction){lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}/// <summary>/// 推送消息/// </summary>/// <param name="topic">主题</param>/// <param name="data">消息内容</param>/// <param name="qsLevel"></param>/// <param name="retain"></param>public void Publish(string topic, string data, int qsLevel = 0, bool retain = false){qsLevel = Math.Clamp(qsLevel, 0, 2);if (!_mqttClient.IsConnected){throw new Exception("mqtt未连接");}var message = new MqttApplicationMessage{Topic = topic,PayloadSegment = Encoding.UTF8.GetBytes(data),QualityOfServiceLevel = (MqttQualityOfServiceLevel)qsLevel,Retain = retain // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。};_mqttClient.PublishAsync(message);}}

这篇关于C#实现数据采集系统-数据反写(1)MQTT订阅接收消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097795

相关文章

C#借助Spire.XLS for .NET实现在Excel中添加文档属性

《C#借助Spire.XLSfor.NET实现在Excel中添加文档属性》在日常的数据处理和项目管理中,Excel文档扮演着举足轻重的角色,本文将深入探讨如何在C#中借助强大的第三方库Spire.... 目录为什么需要程序化添加Excel文档属性使用Spire.XLS for .NET库实现文档属性管理Sp

Python+FFmpeg实现视频自动化处理的完整指南

《Python+FFmpeg实现视频自动化处理的完整指南》本文总结了一套在Python中使用subprocess.run调用FFmpeg进行视频自动化处理的解决方案,涵盖了跨平台硬件加速、中间素材处理... 目录一、 跨平台硬件加速:统一接口设计1. 核心映射逻辑2. python 实现代码二、 中间素材处

Java数组动态扩容的实现示例

《Java数组动态扩容的实现示例》本文主要介绍了Java数组动态扩容的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1 问题2 方法3 结语1 问题实现动态的给数组添加元素效果,实现对数组扩容,原始数组使用静态分配

Python实现快速扫描目标主机的开放端口和服务

《Python实现快速扫描目标主机的开放端口和服务》这篇文章主要为大家详细介绍了如何使用Python编写一个功能强大的端口扫描器脚本,实现快速扫描目标主机的开放端口和服务,感兴趣的小伙伴可以了解下... 目录功能介绍场景应用1. 网络安全审计2. 系统管理维护3. 网络故障排查4. 合规性检查报错处理1.

MySQL快速复制一张表的四种核心方法(包括表结构和数据)

《MySQL快速复制一张表的四种核心方法(包括表结构和数据)》本文详细介绍了四种复制MySQL表(结构+数据)的方法,并对每种方法进行了对比分析,适用于不同场景和数据量的复制需求,特别是针对超大表(1... 目录一、mysql 复制表(结构+数据)的 4 种核心方法(面试结构化回答)方法 1:CREATE

Python轻松实现Word到Markdown的转换

《Python轻松实现Word到Markdown的转换》在文档管理、内容发布等场景中,将Word转换为Markdown格式是常见需求,本文将介绍如何使用FreeSpire.DocforPython实现... 目录一、工具简介二、核心转换实现1. 基础单文件转换2. 批量转换Word文件三、工具特性分析优点局

Springboot3统一返回类设计全过程(从问题到实现)

《Springboot3统一返回类设计全过程(从问题到实现)》文章介绍了如何在SpringBoot3中设计一个统一返回类,以实现前后端接口返回格式的一致性,该类包含状态码、描述信息、业务数据和时间戳,... 目录Spring Boot 3 统一返回类设计:从问题到实现一、核心需求:统一返回类要解决什么问题?

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

Java使用Spire.Doc for Java实现Word自动化插入图片

《Java使用Spire.DocforJava实现Word自动化插入图片》在日常工作中,Word文档是不可或缺的工具,而图片作为信息传达的重要载体,其在文档中的插入与布局显得尤为关键,下面我们就来... 目录1. Spire.Doc for Java库介绍与安装2. 使用特定的环绕方式插入图片3. 在指定位

Java使用Spire.Barcode for Java实现条形码生成与识别

《Java使用Spire.BarcodeforJava实现条形码生成与识别》在现代商业和技术领域,条形码无处不在,本教程将引导您深入了解如何在您的Java项目中利用Spire.Barcodefor... 目录1. Spire.Barcode for Java 简介与环境配置2. 使用 Spire.Barco