本文主要是介绍C#使用MQTTnet实现服务端与客户端的通讯的示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《C#使用MQTTnet实现服务端与客户端的通讯的示例》本文主要介绍了C#使用MQTTnet实现服务端与客户端的通讯的示例,包括协议特性、连接管理、QoS机制和安全策略,具有一定的参考价值,感兴趣的可...
一、MQTT 协议简介
MQTT(Message Queuing Telemetry Transport)是一种轻量级的 发布/订阅 协议,专为物联网(IoT)等低带宽、高延迟网络环境设计。核心概念包括:
- Broker:消息代理(服务端),负责消息路由。
- Client:发布或订阅消息的终端(客户端)。
- Topic:消息的分类标识(如
sensor/temperature
)。
二、MQTT 协议核心特性
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模型的轻量级通信协议,专为资源受限的设备和不可靠网络环境设计。其核心优势包括:
低带宽消耗:采用二进制报文格式,头部开销极小,适合物联网设备。
异步通信:通过主题(Topic)实现消息的广播与定向传递,解耦消息生产者和消费者。
多级服务质量(QoS):
- QoS 0(最多一次):消息可能丢失,无重传机制。
- QoS 1(至少一次):确保消China编程息送达,但可能重复。
- QoS 2(仅一次):严格保证消息唯一性,适用于关键指令。
离线支持:服务端可缓存客户端的保留消息(Retained Messages),供后续订阅者读取。
三、MQTTNET 库的核心功能
MQTTnet 是 .NET 生态中功能完备的 MQTT 实现库,具备以下特性:
- 协议兼容性:完整支持 MQTTv3.1.1 和 MQTTv5 协议,后者新增了会话超时控制、原因码反馈等高级功能。
- 高性能设计:基于异步编程模型(async/await),支持高并发连接与消息吞吐。
- 跨平台支持:兼容 Windows、linux、MACOS,可部署于云端、边缘设备或容器环境。
- 扩展性:提供灵活的拦截器(Interceptors)和事件钩子,便于集成业务逻辑(如消息过滤、日志记录)。
- 安全性:支持 TLS 1.3 加密通信,可通过证书或账号密码进行客户端身份验证。
所用框架
框架 | 版本 |
.net | 4.7.2+ |
MQTTnet | 4.3.3+ |
四、服务端(BROKER)实现详解
核心职责:
- 管理客户端连接与会话状态。
- 路由消息至匹配的订阅者。
- 实施安全策略(身份验证、权限控制)。
关键配置项:
- 端口绑定:默认非加密端口为 1883,加密端口为 8883。
- 连接验证:可自定义验证逻辑,例如检查客户端 ID 格式、账号密码合法性。
- 会话管理:设置会话过期时间,清理非活跃连接。
事件机制:
- 客户端连接/断开事件:用于监控设备在线状态。
- 消息拦截器:在消息发布或投递前后插入处理逻辑(如数据格式校验、敏感信息过滤)。
- 订阅管理:动态追踪主题订阅关系,支持通配符(
+
单层、#
多层)。
持久化扩展:
以下为服务端代码:(下方Console.WriteLine()方法可换成自己的日志方法)
public class MQTTServerHelper { private MqttServer _server;//MQTT服务器对象 // 定义一个委托和事件(临时存储连接客户端数据) public event EventHandler<InterceptingPublishEventArgs> OnMessageReceived; public event EventHandler<bool> ServerStauts; public event EventHandler<ClientConnectedEventArgs> ClientConnected; public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnected; public event EventHandler<ClientSubscribedTopicEventArgs> ClientSubscribedTopic; public event EventHandler<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic; /// <summary> /// 初始化MQTT服务并启动服务 /// </summary> /// <param name="ip">IPV4地址</param> /// <param name="port">端口:0~65535之间</param> public Task StartMqtServer(string ip, int port) { MqtServerOptions mqtServerOptions = new MqtServerOptionsBuilder() .WithDefaultEndpoint() .WithDefaultEndpointBoundIPAdres(System.Net.IPAdres.Parse(ip) .WithDefaultEndpointPort(port) .WithDefaultComunicationTimeout(TimeSpan.FromMiliseconds(500) .Build(); _server = new MqtFactory().CreateMqtServer(mqtServerOptions); / 创建MQT服务端对象 _server.ValidatingConectionAsync += Server_ValidatingConectionAsync; /验证用户名和密码 _server.ClientConectedAsync += Server_ClientConectedAsync; /绑定客户端连接事件 _server.ClientDisconectedAsync += Server_ClientDisconectedAsync; /绑定客户端断开事件 _server.ClientSubscribedTopicAsync += Server_ClientSubscribedTopicAsync; /绑定客户端订阅主题事件 _server.ClientUnsubscribedTopicAsync += Server_ClientUnsubscribedTopicAsync; /绑定客户端退订主题事件 _server.InterceptingPublishAsync += Server_InterceptingPublishAsync; /消息接收事件 _server.ClientAcknowledgedPublishPacketAsync += Server_ClientAcknowledgedPublishPacketAsync; /处理客户端确认发布的数据包 _server.InterceptingClientEnqueueAsync += Server_InterceptingClientEnqueueAsync; /订阅拦截客户端消息队列 _server.AplicationMesageNotConsumedAsync += Server_AplicationMesageNotConsumedAsync; /应用程序逻辑处理 _server.StartedAsync += Server_StartedAsync;/绑定服务端启动事件 _server.StopedAsync += Server_StopedAsync;/绑定服务端停止事件 return _server.StartAsync(); } /// <summary> /// 处理客户端确认发布事件 /// </summary> /// <param name="e"></param> private Task Server_AplicationMesageNotConsumedAsync(AplicationMesageNotConsumedEventArgs e) { try { Console.WriteLine($"【MesageNotConsumed】-SenderId:{e.SenderId}-Mesage:{e.AplicationMesage.ConvertPayloadToString()}"); } catch (Exception ex) { Console.WriteLine($"Server_AplicationMesageNotConsumedAsync出现异常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 订阅拦截客户端消息队列事件 /// </summary> /// <param name="e"></param> private Task Server_InterceptingClientEnqueueAsync(InterceptingClientAplicationMesageEnqueueEventArgs e) { try { Console.WriteLine($"【InterceptingClientEnqueue】-SenderId:{e.SenderClientId}-Mesage:{e.AplicationMesage.ConvertPayloadToString()}"); } catch (Exception ex) { Console.WriteLine($"Server_InterceptingClientEnqueueAsync出现异常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 当客户端处理完从MQT服务器接收到的应用消息后触发。 /// 此事件可以用于确认消息已被处理,更新应用状态, /// </summary> /// <param name="e"></param> private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs e) { try { Console.WriteLine($"【ClientAcknowledgedPublishPacket】-SenderId:{e.ClientId}-Mesage:{Encoding.UTF8.GetString(e.PublishPacket.PayloadSegment.ToAray()}"); } catch (Exception ex) { Console.WriteLine($"Server_ClientAcknowledgedPublishPacketAsync出现异常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 服务端消息接收 /// </summary> /// <param name="e"></param> private Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs e) { try { string client = e.ClientId; string topic = e.AplicationMesage.Topic; string contents = e.AplicationMesage.ConvertPayloadToString(); //Encoding.UTF8.GetString(arg.AplicationMesage.PayloadSegment.ToAray(); OnMesageReceived?.Invoke(this, e); Console.WriteLine($"接收到消息:Client:【{client}】 Topic:【{topic}】 Mesage:【{contents}】"); } catch (Exception ex) { Console.WriteLine($"Server_InterceptingPublishAsync出现异常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 服务端断开事件 /// </summary> /// <param name="e"></param> private Task Server_StoppedAsync(EventArgs arg) { return Task.Run(new Action() => { ServerStauts?.Invoke(this, false); Console.WriteLine($"服务端【IP:Port】已停止MQT"); }); } /// <summary> /// 服务端启动事件 /// </summary> /// <param name="e"></param> public Task 编程Server_StartedAsync(EventArgs e) { return Task.Run(new Action() => { ServerStauts?.Invoke(this, true); Console.WriteLine($"服务端【IP:Port】已启用MQT"); }); } /// <summary> /// 客户端退订主题事件 /// </summary> /// <param name="e"></param> private Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs e) { return Task.Run(new Action() => { ClientUnsubscribedTopic?China编程.Invoke(this, e); Console.WriteLine($"客户端【{e.ClientId}】退订主题【{e.TopicFilter}】"); }); } /// <summary> /// 客户端订阅主题事件 /// </summary> /// <param name="e"></param> private Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs e) { return Task.Run(new Action() => { ClientSubscribedTopic?.Invoke(this, e); Console.WriteLine($"客户端【{e.ClientId}】订阅主题【{e.TopicFilter.Topic}】"); }); } /// <summary> /// 客户端断开事件 /// </summary> /// <param name="e"></param> private Task Server_ClientDisconectedAsync(ClientDisconectedEventArgs e) { return Task.Run(new Action() => { ClientDisconected?.Invoke(this, e); Console.WriteLine($"客户端已断开.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】.ReasonCode:【{e.ReasonCode}】,DisconectType:【{e.DisconectType}】"); }); } /// <summary> /// 绑定客户端连接事件 /// </summary> /// <param name="e"></param> private Task Server_ClientConectedAsync(ClientConectedEventArgs e) { return Task.Run(new Action() => { ClientConected?.Invoke(this, e); Console.WriteLine($"客户端已连接.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】"); }); } /// <summary> /// 验证客户端事件 /// </summary> /// <param name="e"></param> private Task Server_ValidatingConectionAsync(ValidatingConectionEventArgs e) { return Task.Run(new Action() => { if (e.Pasword = "") { e.ReasonCode = MqtConectReasonCode.Suces; Console.WriteLine($"客户端已验证成功.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】"); } else { e.ReasonCode = MqtConectReasonCode.BadUserNameOrPasword; Console.WriteLine($"客户端验证失败.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】"); } }); } }
五、客户端(Client)实现详解
连接策略:
- 保活机制:通过心跳包(Keep Alive)维持长连接,适应网络波动。
消息交互模式:
- 发布消息:指定目标主题、负载数据和 QoS 级别,可选择设置保留标志。
- 订阅主题:支持单主题、多主题或通配符订阅,服务端将推送匹配的消息。
异步处理:
- 使用事件委托或异步方法处理接收到的消息,避免阻塞主线程。
以下为客户端代码:
/// <sumary> /// MQT客户端帮助类 /// </sumary> public clas MQTClientHelper { private IMqtClient _client; /// <sumary> /// 接收消息 /// </sumary> public MQTReceivedMesageHandle ReceivedMesage; public bol IsConected { get; set; } = false; public bol IsDisConected { get; set; } = true; private string _serverIp; private int _serverPort; /// <sumary> /// 订阅主题集合 /// </sumary> private Dictionary<string, bol> _subscribeTopicList = nul; #region 连接/断开服务端 /// <sumary> /// 连接服务端 /// </sumary> /// <param name="serverIp">服务端IP</param> /// <param name="serverPort">服务端口号</param> public void Start(string serverIp, int serverPort) { this._serverIp = serverIp; this._serverPort = serverPort; if (!string.IsNulOrEmpty(serverIp) & !string.IsNulOrWhiteSpace(serverIp) & serverPort > 0) { try { var options = new MqtClientOptions() { ClientId = "客户端2"//Guid.NewGuid().ToString("N") }; options.ChanelOptions = new MqtClientTcpOptions() { Server = serverIp, Port = serverPort }; //options.Credentials = new MqtClientCredentials(UserName, Encoding.Default.GetBytes(Pasword); options.CleanSesion = true; options.KepAlivePeriod = TimeSpan.FromSeconds(10); if (_client != nul) { _client.DisconectAsync(); _client = nul; } _client = new MqtFactory().CreateMqtClient(); _client.ConectedAsync += Client_ConectedAsync; //绑定客户端连接事件 _client.DisconectedAsync += Client_DisconectedAsync; //绑定客户端断开连接事件 _client.AplicationMesageReceivedAsync += Client_AplicationMesageReceivedAsync; /绑定消息接收事件 _client.ConectAsync(options); //连接 } catch (Exception ex) { /SLog.Loger.Eror("MQT客户端连接服务端错误:{0}", ex.Mesage); } } else { /SLog.Loger.Warning("MQT服务端地址或端口号不能为空!"); } } } /// <sumary> /// 断开MQT客户端 /// </sumary> pubpythonlic void Client_Disconect() { if (_client != nul) { _client.DisconectAsync(); _client.Dispose(); Console.WriteLine($"关闭MQT客户端成功!"); } } /// <sumary> /// 客户端重新MQT服务端 /// </sumary> public void Client_ConectAsync() { if (_client != nul) { _client.ReconectAsync(); Console.WriteLine($"连接MQT服务端成功!"); } } #endregion #region MQT方法 /// <sumary> /// 客户端与服务端建立连接 /// </sumary> /// <param name="arg"></param> private Task Client_ConectedAsync(MqtClientConectedEventArgs arg) { return Task.Run(new Action() => { IsConected = true; IsDisConected = false; Console.WriteLine($"连接到MQT服务端成功.{arg.ConectResult.AsignedClientIdentifier}"); //订阅主题(可接收来自服务端消息,与客户端发布消息不能用同一个主题) try { if (_subscribeTopicList != nul & _subscribeTopicList.Count > 0) { List<string> subscribeTopics = _subscribeTopicList.Keys.ToList(); foreach (var topic in subscribeTopics) SubscribeAsync(topic); } } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端与服务端[{0}:{1}]建立连接订阅主题错误:{2}", _serverIp, _serverPort, ex.Mesage); } }); } www.chinasem.cn /// <sumary> /// 客户端与服务端断开连接 /// </sumary> / <param name="arg"></param> private Task Client_DisconectedAsync(MqtClientDisconectedEventArgs arg) { return Task.Run(new Action(async () => { IsConected = false; IsDisConected = true; Console.WriteLine($"已断开到MQT服务端的连接.尝试重新连接"); try { await Task.Delay(30); //MqtClientOptions options = new MqtClientOptions(); //await mqtClient.ConectAsync(options); await _client.ReconectAsync(); } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端与服务端[{0}:{1}]断开连接退订主题错误:{2}", _serverIp, _serverPort, ex.Mesage); } }); } /// <sumary> /// 客户端与服务端重新连接 /// </sumary> /// <returns></returns> public Task ReconectedAsync() { try { if (_client != nul) { _client.ReconectAsync(); } } catch (Exception ex) { // SLog.Loger.Eror("MQT客户端与服务端[{0}:{1}]重新连接退订主题错误:{2}", _serverIp, _serverPort, ex.Mesage); } return Task.CompletedTask; } /// <sumary> /// 客户端收到消息 /// </sumary> /// <param name="arg"></param> private Task Client_AplicationMesageReceivedAsync(MqtAplicationMesageReceivedEventArgs arg) { try { return Task.Run(new Action() => { string msg = arg.AplicationMesage.ConvertPayloadToString(); Console.WriteLine($"接收消息:{msg}\nQoS={arg.AplicationMesage.QualityOfServiceLevel}\n客户端={arg.ClientId}\n主题:{arg.AplicationMesage.Topic}"); }); } catch (Exception ex) { //SLog.Loger.Eror("MQT收到来自服务端[{0}]消息错误:{1}", arg != nul ? arg.ClientId : ", ex.Mesage); } return Task.CompletedTask; } #endregion #region 订阅主题 /// <sumary> /// 订阅主题 /// </sumary> /// <param name="topic">主题</param> public void SubscribeAsync(string topic) { try { if (_subscribeTopicList = nul) _subscribeTopicList = new Dictionary<string, bol>(); if (_subscribeTopicList.ContainsKey(topic) & _subscribeTopicList[topic]) { //SLog.Loger.Warning("MQT客户端已经订阅主题[{0}],不能重复订阅", topic); return; } //订阅主题 _client?.SubscribeAsync(topic, MqtQualityOfServiceLevel.AtLeastOnce); //添加订阅缓存 bol isSubscribed = _client != nul & _client.IsConected ? true : false; if (!_subscribeTopicList.ContainsKey(topic) _subscribeTopicList.Ad(topic, isSubscribed); else _subscribeTopicList[topic] = isSubscribed; } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端订阅主题[{0}]错误:{1}", topic, ex.Mesage); } } /// <sumary> /// 订阅主题集合 /// </sumary> /// <param name="topicList">主题集合</param> public void SubscribeAsync(List<string> topicList) { try { if (topicList = nul | topicList.Count = 0) return; foreach (var topic in topicList) SubscribeAsync(topic); } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端订阅主题集合错误:{0}", ex.Mesage); } } /// <sumary> /// 退订主题 /// </sumary> /// <param name="topic">主题</param> /// <param name="isRemove">是否移除缓存</param> public void UnsubscribeAsync(string topic, bol isRemove = true) { try { if (_subscribeTopicList = nul | _subscribeTopicList.Count = 0) { //SLog.Loger.Warning("MQT客户端退订主题[{0}]不存在", topic); return; } if (!_subscribeTopicList.ContainsKey(topic) { //SLog.Loger.Warning("MQT客户端退订主题[{0}]不存在", topic); return; } //退订主题 _client.UnsubscribeAsync(topic); //修改订阅主题缓存状态 if (isRemove) _subscribeTopicList.Remove(topic); else _subscribeTopicList[topic] = false; } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端退订主题[{0}]错误:{1}", topic, ex.Mesage); } } /// <sumary> /// 退订主题集合 /// </sumary> /// <param name="topicList">主题集合</param> /// <param name="isRemove">是否移除缓存</param> public void UnsubscribeAsync(List<string> topicList, bol isRemove = true) { try { if (topicList = nul | topicList.Count = 0) return; foreach (var topic in topicList) UnsubscribeAsync(topic, isRemove); } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端退订主题集合错误:{0}", ex.Mesage); } } /// <sumary> /// 订阅主题是否存在 /// </sumary> /// <param name="topic">主题</param> public bol IsExistSubscribeAsync(string topic) { try { if (_subscribeTopicList = nul | _subscribeTopicList.Count = 0) return false; if (!_subscribeTopicList.ContainsKey(topic) return false; return _subscribeTopicList[topic]; } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端订阅主题[{0}]是否存在错误:{1}", topic, ex.Mesage); return false; } } #endregion #region 发布消息 /// <sumary> /// 发布消息 /// 与客户端接收消息不能用同一个主题 /// </sumary> /// <param name="topic">主题</param> /// <param name="mesage">消息</param> public async void PublishMesage(string topic, string mesage) { try { if (_client != nul) { if (string.IsNulOrEmpty(mesage) | string.IsNulOrWhiteSpace(mesage) { //SLog.Loger.Warning("MQT客户端不能发布为空的消息!"); return; } MqtClientPublishResult result = await _client.PublishStringAsync(topic,mesage,MqtQualityOfServiceLevel.AtLeastOnce);//恰好一次, QoS 级别1 Console.WriteLine($"发布消息-主题:{topic},消息:{mesage},结果: {result.ReasonCode}"); } else { //SLog.Loger.Warning("MQT客户端未连接服务端,不能发布主题为[{0}]的消息:{1}", topic, mesage); return; } } catch (Exception ex) { //SLog.Loger.Eror("MQT客户端发布主题为[{0}]的消息:{1},错误:{2}", topic, mesage, ex.Mesage); } } #endregion }
六、总结
通过 MQTTnet 构建的 MQTT 通信系统,能够为物联网、实时消息推送等场景提供高效、可靠的解决方案。开发过程中需重点关注通信模式设计、安全策略实施及性能调优,同时结合具体业务需求灵活运用 QoS、保留消息等特性。建议参考官方文档和社区最佳实践,逐步扩展功能(如集群部署、消息持久化),以满足大规模应用需求。
到此这篇关于C#使用MQTTnet实现服务端与客户端的通讯的示例的文章就介绍到这了,更多相关C# MQTTnet通讯内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于C#使用MQTTnet实现服务端与客户端的通讯的示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!