Python使用MQTT连接新版ONENet

2024-05-28 07:28

本文主要是介绍Python使用MQTT连接新版ONENet,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Python MQTT 连接新版ONENet

简介

前几个教程我们使用mqtt.fx连接了新版的ONENet, 只是跑通了MQTT协议,但是在实际操作下还需要实现具体环境、具体设备的MQTT连接,本章教程将以Python MQTT的方式连接 ONENet

参考文档:

paho-mqtt · PyPI

OneNET - 中国移动物联网开放平台 (10086.cn)

准备环境

pip安装 paho-mqtt

pip install paho-mqtt

获取ONENet 三元组

准备好Onenet的三元组
在这里插入图片描述

三元组分别为

DeviceName=“wenshidap” #设备ID

Productid = “kuerSLKlo8” #产品ID

accesskey=“QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk=”#秘钥

根据ONENet手册的文档说明,mqtt连接onenet需要进行鉴权,可以访问链接查看鉴权算法,但是我们可以根据鉴权算法说明生成鉴权的秘钥

使用Pyhon生成的鉴权秘钥的函数为

#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'#   res = 'products/%s' % id  # 通过产品ID访问产品API# res = 'userid/%s' % id  # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return token

MQTT订阅和发布Topic说明

我们可以点击产品开发->设备开发->topic管理->数据流topic来查看当前设备可以订阅哪些topic,在这里我们是上传数据流,所以我们只关心发布的topic 和订阅上传成功和上传失败的topic

在这里插入图片描述

发布的topic :$sys/kuerSLKlo8/{device-name}/dp/post/json

只需要将我们的device-name换成当前的devicename即可 在本项目就是wenshidap

即:$sys/kuerSLKlo8/wenshidap/dp/post/json

订阅的topic:

上传成功:$sys/kuerSLKlo8/{device-name}/dp/post/json/accepted

上传失败:$sys/kuerSLKlo8/{device-name}/dp/post/json/rejected

当我们订阅上传成功topic时,数据流上传成功后就会返回msg的id 失败时 reject topic就会返回失败的原因

MQTT连接ONENet主程序

import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311
import struct
import json
import base64
import hmac
import time
from urllib.parse import quoteServerUrl = "mqtts.heclouds.com" #服务器url
ServerPort = 1883#服务器端口
DeviceName="wenshidap" #设备ID
Productid = "kuerSLKlo8" #产品ID
accesskey="QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk="# 发布的topic
Pub_topic1 = "$sys/"+Productid+"/"+ DeviceName+"/dp/post/json"#需要订阅的topic
#数据上传成功的消息
Sub_topic1 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/accepted"
#接收数据上传失败的消息
Sub_topic2 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/rejected"#测试用json数据格式
jsonstr = "{\"id\": 123,\"dp\": {\"ConEnv_Temp\": [{\"v\": 22.1}],\"ConEnv_Humi\": [{\"v\": 61.2}]}}"#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'#   res = 'products/%s' % id  # 通过产品ID访问产品API# res = 'userid/%s' % id  # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return tokendef on_subscribe(client, userdata, mid, reason_code_list, properties):# Since we subscribed only for a single channel, reason_code_list contains# a single entryif reason_code_list[0].is_failure:print(f"Broker rejected you subscription: {reason_code_list[0]}")else:print(f"Broker granted the following QoS: {reason_code_list[0].value}")def on_unsubscribe(client, userdata, mid, reason_code_list, properties):# Be careful, the reason_code_list is only present in MQTTv5.# In MQTTv3 it will always be emptyif len(reason_code_list) == 0 or not reason_code_list[0].is_failure:print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")else:print(f"Broker replied with failure: {reason_code_list[0]}")client.disconnect()# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
def on_connect(client, userdata, flags, reason_code, properties):if reason_code.is_failure:print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")else:# we should always subscribe from on_connect callback to be sure# our subscribed is persisted across reconnections.# client.subscribe("$SYS/#")print("连接结果:" + mqtt.connack_string(reason_code))#连接成功后就订阅topicclient.subscribe(Sub_topic1)client.subscribe(Sub_topic2)# 从服务器接收发布消息时的回调。
def on_message(client, userdata, message):print(str(message.payload,'utf-8'))#当消息已经被发送给中间人,on_publish()回调将会被触发
def on_publish(client, userdata, mid):print(str(mid))def main():passw=get_token(DeviceName,accesskey)print(passw)mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,DeviceName)mqttc.on_connect = on_connectmqttc.on_message = on_messagemqttc.on_subscribe = on_subscribemqttc.on_unsubscribe = on_unsubscribe# client = mqtt.Client(DeviceName,protocol=MQTTv311)#client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书mqttc.connect(ServerUrl, port=ServerPort, keepalive=120)mqttc.username_pw_set(Productid,passw)mqttc.loop_start()while(1):mqttc.publish(Pub_topic1,jsonstr,qos=0)print("okk")time.sleep(2)if __name__ == '__main__':main()

运行测试一下

在这里插入图片描述

可以看到我们订阅的topic 返回了我们消息的ID 123 说明我们的数据上传成功 ,平台上也可以看到我们的数据流

在这里插入图片描述

这篇关于Python使用MQTT连接新版ONENet的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1009911

相关文章

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python正则表达式匹配和替换的操作指南

《Python正则表达式匹配和替换的操作指南》正则表达式是处理文本的强大工具,Python通过re模块提供了完整的正则表达式功能,本文将通过代码示例详细介绍Python中的正则匹配和替换操作,需要的朋... 目录基础语法导入re模块基本元字符常用匹配方法1. re.match() - 从字符串开头匹配2.

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

通过Docker容器部署Python环境的全流程

《通过Docker容器部署Python环境的全流程》在现代化开发流程中,Docker因其轻量化、环境隔离和跨平台一致性的特性,已成为部署Python应用的标准工具,本文将详细演示如何通过Docker容... 目录引言一、docker与python的协同优势二、核心步骤详解三、进阶配置技巧四、生产环境最佳实践

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

springboot中使用okhttp3的小结

《springboot中使用okhttp3的小结》OkHttp3是一个JavaHTTP客户端,可以处理各种请求类型,比如GET、POST、PUT等,并且支持高效的HTTP连接池、请求和响应缓存、以及异... 在 Spring Boot 项目中使用 OkHttp3 进行 HTTP 请求是一个高效且流行的方式。

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下