C#实现数据采集系统-数据反写(1)MQTT订阅接收消息

2024-08-23 00:12

本文主要是介绍C#实现数据采集系统-数据反写(1)MQTT订阅接收消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息
  2. 反写内容写入通信类,添加到写入队列中 链接-消息内容处理和写入通信类队列
  3. 实现Modbustcp通信写入

具体实现

1.MQTT订阅,接收消息

Mqtt实现采集数据转发

Mqtt控制类增加订阅方法

  1. 增加一个通用的订阅方法,需要的参数是一个主题和一个委托,将主题跟对应的委托方法对应存储,然后再mqtt中订阅,收到对应的主题消息,然后执行对应的方法。
 public void SubscribeTopic(string topic, Action<string, string> topicAction){//订阅}

然后需要一个键值对用于存储这个关系

 private Dictionary<string, Action<string, string>> _topicActions;

订阅方法实现:订阅主题,添加到_topicActions,如果已经连接,则直接订阅,没有连接,则等待连上的时候自动订阅,增加锁来确保订阅成功

/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}

在连接方法中,添加订阅

在这里插入图片描述

public void MqttConnect()
{while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}
  1. 添加接收消息事件
 //客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}

完整Mqtt代码

 public class MqttControllor{private MqttConfig _config;private string _clientId;MqttClientOptions _clientOptions;private IMqttClient _mqttClient;private readonly object _topicActionsLock = new object();private Dictionary<string, Action<string, string>> _topicActions;public MqttControllor(MqttConfig config, bool isAutoConnect = true){_topicActions = new Dictionary<string, Action<string, string>>();_config = config;_clientId = config.ClientId == "" ? Guid.NewGuid().ToString() : config.ClientId;MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(_config.Ip, _config.Port).WithCredentials(_config.Username, _config.Password).WithClientId(_clientId);_clientOptions = optionsBuilder.Build();_mqttClient = new MqttFactory().CreateMqttClient();// 客户端连接关闭事件_mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;//客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;if (isAutoConnect){Task.Run(() =>{MqttConnect();});}}/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine($"客户端已断开与服务端的连接……");//断开重连_mqttClient = new MqttFactory().CreateMqttClient();MqttConnect();return Task.CompletedTask;}public void MqttConnect(){while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}Console.WriteLine($"客户端已连接到服务端……");//连接成功,订阅主题lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}/// <summary>/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅/// </summary>/// <param name="topic"></param>/// <param name="topicAction"></param>public void SubscribeTopic(string topic, Action<string, string> topicAction){lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}/// <summary>/// 推送消息/// </summary>/// <param name="topic">主题</param>/// <param name="data">消息内容</param>/// <param name="qsLevel"></param>/// <param name="retain"></param>public void Publish(string topic, string data, int qsLevel = 0, bool retain = false){qsLevel = Math.Clamp(qsLevel, 0, 2);if (!_mqttClient.IsConnected){throw new Exception("mqtt未连接");}var message = new MqttApplicationMessage{Topic = topic,PayloadSegment = Encoding.UTF8.GetBytes(data),QualityOfServiceLevel = (MqttQualityOfServiceLevel)qsLevel,Retain = retain // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。};_mqttClient.PublishAsync(message);}}

这篇关于C#实现数据采集系统-数据反写(1)MQTT订阅接收消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097795

相关文章

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

一文解析C#中的StringSplitOptions枚举

《一文解析C#中的StringSplitOptions枚举》StringSplitOptions是C#中的一个枚举类型,用于控制string.Split()方法分割字符串时的行为,核心作用是处理分割后... 目录C#的StringSplitOptions枚举1.StringSplitOptions枚举的常用

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S

JavaScript对象转数组的三种方法实现

《JavaScript对象转数组的三种方法实现》本文介绍了在JavaScript中将对象转换为数组的三种实用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友... 目录方法1:使用Object.keys()和Array.map()方法2:使用Object.entr

k8s中实现mysql主备过程详解

《k8s中实现mysql主备过程详解》文章讲解了在K8s中使用StatefulSet部署MySQL主备架构,包含NFS安装、storageClass配置、MySQL部署及同步检查步骤,确保主备数据一致... 目录一、k8s中实现mysql主备1.1 环境信息1.2 部署nfs-provisioner1.2.