EMQ百万级MQTT消息服务(小技巧)

2024-05-28 01:32

本文主要是介绍EMQ百万级MQTT消息服务(小技巧),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

				版权声明:本文为博主原创文章,未经博主允许不得转载。					https://blog.csdn.net/u011142688/article/details/79852325				</div><div id="content_views" class="markdown_views"><!-- flowchart 箭头图标 勿删 --><svg xmlns="http://www.w3.org/2000/svg" style="display: none;"><path stroke-linecap="round" d="M5,0 0,2.5 5,5z" id="raphael-marker-block" style="-webkit-tap-highlight-color: rgba(0, 0, 0, 0);"></path></svg><p><img src="http://i.imgur.com/vhdeDvX.png" alt="" title=""></p>

附上:

喵了个咪的博客:w-blog.cn
EMQ官方地址:http://emqtt.com/
EMQ中文文档:http://emqtt.com/docs/v2/guide.html

1.ACL鉴权规则化

在正常业务使用下对于客户端的行为可以使用ACL进行限制,比如A客户端只能订阅 /A/get 队列消息和向 /A/set 发布内容
但是在MYSQL里面处理这样的鉴权就需要写入两条记录,如果设备量有一百万数据库就要承担两百万条鉴权数据量会大大影响数据库的性能
那么有没有什么批量的方式来定义ACL鉴权呢?

在mysql-ACL鉴权的配置文件下关于如何使用鉴权的SQL是可以编辑的,也就意味着你可以通过SQL来实现批量ACL鉴权规则

> vim /usr/local/emqttd/etc/plugins/emq_auth_mysql.conf# 最下面有这样一条配置
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'
  • 1
  • 2
  • 3
  • 4

笔者这里就实现每个设备默认可以订阅 /A/get 队列消息和向 /A/set 发布

笔者现在的规则是客户端只能向hello写消息其他操作一概不允许,我们先加两条记录

insert `mqtt_acl`(`allow`,`username`,`access`,`topic`) values(1,'$all',1,'/$user/get');
insert `mqtt_acl`(`allow`,`username`,`access`,`topic`) values(1,'$all',2,'/$user/set');
  • 1
  • 2

然后修改默认的ACL鉴权的SQL语句如下(这里使用的是username作为topic动态名称也可以使用其他字段):

select allow, ipaddr, username, clientid, access, REPLACE(topic,'$user','%u') from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'
  • 1

这样一来就算没有独立配置/A/set可以写入,作为用户是A的客户端也可以进行消息的写入了,并且也可以监听消息/A/get

2.共享订阅

关于队列常见的使用中也有这样的场景,一条消息希望被多个监听程序接收到,可能的场景如下:

  • 一个程序处理,一个程序记录日志分别处理
  • 批量推送
                            ---------|       | --Msg1,Msg2,Msg3--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->|  EMQ  | --Msg1,Msg2,Msg3--> Subscriber2|       | --Msg1,Msg2,Msg3--> Subscriber3---------
  • 1
  • 2
  • 3
  • 4
  • 5

多条消息希望被多个程序中的某个进行处理,场景如下:

  • 并发情况下耗时操作进行并行处理提高系统吞吐量
                            ---------|       | --Msg1--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->|  EMQ  | --Msg2--> Subscriber2|       | --Msg3--> Subscriber3---------
  • 1
  • 2
  • 3
  • 4
  • 5

在默认情况下有多个客户端监听一个事件时会受到同样的消息,但是怎么共享订阅呢?EMQ共享订阅支持两种使用方式:

  • [Math Processing Error]queue/如:share/group/topic

以上两种都可以实现共享订阅(笔者测试下来值通过了share来完成了订阅),订阅和监听

  • 多个服务端监听 $share/group/topic
  • 客户端向 topic 发送消息

3.Qos 0/1/2的区别实测

最多一次的传输
消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。

至少一次的传输
服务器接收到消息会被确认,通过传输一个PUBACK信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的DUP位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。
如果客户端没有接收到PUBACK信息(无论是应用定义的超时,还是检测到失败然后通讯session重启),客户端都会再次发送PUBLISH信息,并且将DUP位置1。
当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个PUBACK消息。

笔者做了一个实现消费端阻塞2秒消费一个内容,发布端1秒发布一个内容,等EMQ的最大拥塞使用完了之后消息在EMQ缓存的会后就会出现很多的重复消息

只有一次的传输
在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。
QoS level 2在消息头有Message ID。

4.密码加盐

在用户验证中可以使用plain | md5 | sha | sha256 | bcrypt等hash方式(默认使用的sha256),但是出于安全性考虑EMQ也支持对密码加盐,可以解开注释使用一下加盐方式中的一种

vim /usr/local/emqttd/etc/plugins/emq_auth_mysql.conf## sha256 with salt prefix
## auth.mysql.password_hash = salt,sha256## bcrypt with salt only prefix
## auth.mysql.password_hash = salt,bcrypt## sha256 with salt suffix
## auth.mysql.password_hash = sha256,salt## pbkdf2 with macfun iterations dklen
## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
## auth.mysql.password_hash = pbkdf2,sha256,1000,20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

对应存储的密码就要进行加盐处理了

5.EMQ离线消息

  • 保留消息
    MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。
    例如mosquitto命令行发布一条保留消息到主题’a/b/c’:
    mosquitto_pub -r -q 1 -t a/b/c -m ‘hello’
    之后连接上来的MQTT客户端订阅主题’a/b/c’时候,仍可收到该消息:
    $ mosquitto_sub -t a/b/c -q 1
    hello
    保留消息(Retained Message)有两种清除方式:
    客户端向有保留消息的主题发布一个空消息:
    mosquitto_pub -r -q 1 -t a/b/c -m ”
    消息服务器设置保留消息的超期时间。

  • cleanSession 清理回话
    MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。
    ‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
    ‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

3 总结

在EMQ和MQTT使用过程中还有很多的细节需要注意,关注细节才能走的更远

注:笔者能力有限有说的不对的地方希望大家能够指出,也希望多多交流!

这篇关于EMQ百万级MQTT消息服务(小技巧)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1009167

相关文章

sysmain服务可以禁用吗? 电脑sysmain服务关闭后的影响与操作指南

《sysmain服务可以禁用吗?电脑sysmain服务关闭后的影响与操作指南》在Windows系统中,SysMain服务(原名Superfetch)作为一个旨在提升系统性能的关键组件,一直备受用户关... 在使用 Windows 系统时,有时候真有点像在「开盲盒」。全新安装系统后的「默认设置」,往往并不尽编

Python 基于http.server模块实现简单http服务的代码举例

《Python基于http.server模块实现简单http服务的代码举例》Pythonhttp.server模块通过继承BaseHTTPRequestHandler处理HTTP请求,使用Threa... 目录测试环境代码实现相关介绍模块简介类及相关函数简介参考链接测试环境win11专业版python

Java实现复杂查询优化的7个技巧小结

《Java实现复杂查询优化的7个技巧小结》在Java项目中,复杂查询是开发者面临的“硬骨头”,本文将通过7个实战技巧,结合代码示例和性能对比,手把手教你如何让复杂查询变得优雅,大家可以根据需求进行选择... 目录一、复杂查询的痛点:为何你的代码“又臭又长”1.1冗余变量与中间状态1.2重复查询与性能陷阱1.

Python内存优化的实战技巧分享

《Python内存优化的实战技巧分享》Python作为一门解释型语言,虽然在开发效率上有着显著优势,但在执行效率方面往往被诟病,然而,通过合理的内存优化策略,我们可以让Python程序的运行速度提升3... 目录前言python内存管理机制引用计数机制垃圾回收机制内存泄漏的常见原因1. 循环引用2. 全局变

Nginx中配置使用非默认80端口进行服务的完整指南

《Nginx中配置使用非默认80端口进行服务的完整指南》在实际生产环境中,我们经常需要将Nginx配置在其他端口上运行,本文将详细介绍如何在Nginx中配置使用非默认端口进行服务,希望对大家有所帮助... 目录一、为什么需要使用非默认端口二、配置Nginx使用非默认端口的基本方法2.1 修改listen指令

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

Python进阶之列表推导式的10个核心技巧

《Python进阶之列表推导式的10个核心技巧》在Python编程中,列表推导式(ListComprehension)是提升代码效率的瑞士军刀,本文将通过真实场景案例,揭示列表推导式的进阶用法,希望对... 目录一、基础语法重构:理解推导式的底层逻辑二、嵌套循环:破解多维数据处理难题三、条件表达式:实现分支

Python中的filter() 函数的工作原理及应用技巧

《Python中的filter()函数的工作原理及应用技巧》Python的filter()函数用于筛选序列元素,返回迭代器,适合函数式编程,相比列表推导式,内存更优,尤其适用于大数据集,结合lamb... 目录前言一、基本概念基本语法二、使用方式1. 使用 lambda 函数2. 使用普通函数3. 使用 N

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息