本文主要是介绍Python实现MQTT通信的示例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Python实现MQTT通信的示例代码》本文主要介绍了Python实现MQTT通信的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一...
适用于物联网设备、传感器数据采集等场景
1. 安装paho-mqtt库
安装Python的MQTT客户端库 paho-mqtt
,支持MQTT v3.1/v3.1.1协议,兼容性强
pip install paho-mqtt==1.6.1 # 推荐使用1.6.1版本避免兼容性问题
2. 搭建MQTT代理服务器(Broker)
选择本地搭建或使用公共Broker:
本地搭建(以EMQX为例)
下载并解压EMQX开源版。Directory listing for EMQX: /v5.3.2/ | EMQ
配置EMQX,EMQX的配置文件位于etc\emqx.conf
,可以根据需要修改配置文件,例如更改端口号、添加插件等:18083 EMQX Dashboard 管理控制台端口
在命令行中进入安装目录的 bin
文件夹,执行以下命令启动服务
检查启动状态emqx_ctl status
访问 http://localhost:18083
进入管理界面(默认账号:admin/public
)
公共Broker(如HiveMQ)
使用免费公共服务器:
broker_address = "broker.hivemq.com" broker_port = 1883
3. Python实现基础通信功能
分为 发布者(Publisher) 和 订阅者(Subscriber) 两类客户端:
3.1 发布者代码示例publisher.py
# python 3.8 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 broker = '127.0.0.1' port = 1883 topic = "python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 1000)}' username = 'emqx' password = '**********' def connect_mqtt(): def on_connect(client, userdata, flags, rc,properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # client = mqtt_client.Client(client_id) client = mqtt_client.Client(client_id=client_id, callback_api_vershttp://www.chinasem.cnion=mqtt_client.CallbackAPIVersion.VERSION2) # client.tls_set(ca_certs='./server-ca.crt') # client.username_pw_set(username, password) client.username_pw_set('admin', 'public') client.on_connect = on_connect client.connect(broker, port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {China编程msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_startphp() publish(client) if __name__ == '__main__': run()
3.2 订阅者代码示例subscriber.py
# python3.8 import random from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 broker = '127.0.0.1' port = 1883 topic = "python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 100)}' username = 'emqx' password = '**********' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc,properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # client = mqtt_client.Client(client_id) client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) # client.tls_set(ca_certs='./server-ca.crt') # client.username_pw_set(username, password) client.username_pw_set('admin', 'public') client.on_connect = on_connect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() if __name__ == '__main__': run()
3.3或者放同一个文件:
from paho.mqtt import client as mqtt_client
from threading import Thread
import time
broker = '127.0.0.1'
port = 1883
# broker = 'broker.emqx.io'
# port = 1883
su_topic = [("python_mqtt_server", 2), ("python_mqtt_command", 2)] # 订阅频道,数组格式为(topic,qos)
pu_topic = ["python_mqtt_server"] # 发布频道
client_id = 'websocket_publisher_001'
# 将连接与订阅绑在一起,防止复用client导致连接冲突
def connect_mqtt():
# For paho-mqtt 2.0.0, you need to add the properties parameter.
def on_connect(client, userdata, flags, rc, properties):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# For paho-mqtt 2.0.0, you need to set callback_api_version.
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
client.username_pw_set('admin', 'public')
cliejsnt.on_connect = on_connect
client.connect(broker, port)
subscribe_many(client)
print("订阅完成")
return client
# 订阅多个频道
def subscribe_many(client):
# 根据client源码可知,订阅支持的数组格式是(topic,qos)
client.subscribe(su_topic)
# 收到信息的回调函数
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
# 这里可以编写一些收到信息后的处理,比如收到什么信息开启什么任务,任务完成后再调publish发布信息给终端等等........
# ----------处理例子------------------
r_msg = msg.payload.decode()
if int(r_msg) % 2 == 0:
print("收到偶数")
# -----------------------------------
# 发布信息函数
def publish(client):
for i in range(1, 11):
time.sleep(1)
msg = f"{i}"
result = client.publish(pu_topic[0], msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{pu_topic[0]}`")
else:
print(f"Failed to send message to topic {pu_topic[0]}")
if __name__ == '__main__':
client = connect_mqtt()
client.username_pw_set('admin','public')
client.on_message = on_message # 重写回调函数,可以在方法体里,根据需求改收到信息后干什么
# 多线程,这里启动一个发布任务的线程,模拟有一个发布消息的客户端
thread1 = Thread(target=publish, args=(client,))
# --*如要要实现流水线式的收发,则可以将线程内方法改为任务队列处理,用一个全局变量管理任务执行,执行后再publish
thread1.start()
client.loop_forever() # 这个要加,监听(on_message)与loop_forever配套
关键配置说明
订阅所有传感器子主题
client.subscribe("sensors/#", qos=1)
WebSocket协议支持
- 必须设置
transport="websockets"
,否则默认使用TCP连接。 - 端口需与Broker配置匹配(例如EMQX默认WebSocket端口为8083)。
QoS与消息保留
qos=1
:确保消息至少送达一次。retain=True
:Broker保留最后一条消息,新订阅者立即收到。
异常处理
4. 关键功能扩展
异步通信,使用多线程实现非阻塞操作:
from threading import Thread def start_async(client): Thread(target=client.loop_start).start()
服务质量(QoS)设置 支持至多一次(0)、至少一次(1)、恰好一次(2)三种级别:
client.publish(topic, payload, qos=1) # 设置QoS为1
5. 注意事项
- 连接安全性:若需SSL加密,可使用
client.tls_set()
方法。 - 主题设计:遵循分层结构(如
sensor/room1/temp
),支持通配符+
(单层)和#
(多层)。 - 异常处理:添加
on_disconnect
回调函数实现断线重连逻辑。
完整示例流程
- 启动EMQX Broker服务。
- 运行订阅者代码监听主题
home/sensor/temp
。 - 运行发布者代码发送温度数据。
- 在EMQX管理界面验证消息收发状态
到此这篇关于Python实现MQTT通信的示例代码的文章就介绍到这了,更多相关Python MQTT通信内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于Python实现MQTT通信的示例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!