python基于ModBusTCP服务端的业务实现特定的client

2023-12-12 04:20

本文主要是介绍python基于ModBusTCP服务端的业务实现特定的client,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。

一、业务描述

我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。

1、ModBusTCP服务端交互流程

大概类似如下交互时序,其中PLC将用我们的脚本代替。

2、控制

根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。

3、状态

根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。

4、结果

根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。

5、 地址空间

(1)控制

类型:HoldingRegisters、Coils

起始地址与寄存器数量:自定义

(2)状态

类型:HoldingRegisters、DiscreteInputs、InputRegisters

起始地址与寄存器数量:自定义

(3)结果

类型:HoldingRegisters、InputRegisters

起始地址与寄存器数量:自定义

二、程序整体设计

class myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":...else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":...elif addrtype=="DiscreteInputs":...else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):if addrtype=="HoldingRegisters":...elif addrtype=="InputRegisters":...else:raise ValueError("plc_out_addrtype的值错误!")if __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():myclient = myModBusTCPclient(MDclient)myclient.ctrl(ByteSwap=1, binary="1100000000000000")time.sleep(1)myclient.status(ByteSwap=1)

1、程序结构

上述代码定义了一个名为 myModBusTCPclient 的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:

构造函数 __init__

接收一个参数 obj,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj 中。

custom_binary 方法:

将给定的整数 num 转换为指定长度 length 的二进制字符串。可选参数 ByteSwap 用于指定是否进行字节交换。如果 ByteSwap 为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。

custom_num 方法:

接收一个二进制字符串 binary,根据给定的长度 length 和是否进行字节交换 ByteSwap 将其转换为整数。返回转换得到的整数。

ctrl 方法:

根据给定的地址类型 addrtype(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap、二进制字符串 binary、Modbus 地址 address 和从站号 slave,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers 方法写入寄存器。

status 方法:

根据给定的地址类型 addrtype、是否进行字节交换 ByteSwap、寄存器地址 reg_addr、寄存器数量 reg_nb 和从站号 slave,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers 方法读取寄存器。

plc_out 方法:

根据给定的地址类型 addrtype 和是否进行字节交换 ByteSwap,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。

if __name__ == "__main__": 部分:

在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient。通过 myModBusTCPclient 类创建了一个自定义的客户端对象 myclient。调用了 ctrl 方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status 方法,从 Modbus 服务器读取数据。

2、不同地址空间的请求

在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。

为了满足业务,需要对原有的pymodbus进行封装改造。

为了满足大端序和小端序,需要加入字节交换的操作。

三、程序实现

import json
import time
import socket
from pymodbus.client import ModbusTcpClientclass myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def result_ByteSwap(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strif ByteSwap == 1:return result_strelif ByteSwap == 0:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":values = []for i in binary:if i == "0":values.append(False)elif i == "1":values.append(True)print(values)# 线圈不存在字节交换# value = self.custom_num(binary[0:16], ByteSwap=0)# print("------------------------------values :", [i for i in bin(value)[2:]])# self.obj.write_coils(address=address, values=[i for i in bin(value)[2:]], slave=slave)self.obj.write_coils(address=address, values=values, slave=slave)else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(address=reg_addr, count=reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="DiscreteInputs":# 离散寄存器不存在字节交换value = self.obj.read_discrete_inputs(address=reg_addr, count=reg_nb, slave=slave)print([value.bits[i] for i in range(reg_nb)])result = ""for i in [value.bits[i] for i in range(reg_nb)]:if i == False:result += "0"elif i == True:result += "1"# [value.bits[i] for i in range(reg_nb)]return [result[i:i+16] for i in range(0, len(result), 16)]else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result HoldingRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]else:raise ValueError("plc_out_addrtype的值错误!")def plc_out_only_result(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):result_list = self.plc_out(addrtype=addrtype, ByteSwap=ByteSwap, reg_addr=reg_addr, reg_nb=reg_nb, slave=slave)[5:]result_list_have = [i for i in result_list if i != 0]result_binary = ""result = ""for num in result_list_have:# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = 16 - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strresult_binary += result_strfor binary in [result_binary[i:i+8] for i in range(0, len(result_binary), 8)]:  # 以长度8进行分割if binary != "00000000":result += chr(int(binary, 2))return resultif __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():ByteSwap = 0ctrl_addrtype = "Coils"  # HoldingRegisters、Coilsstatus_addrtype = "HoldingRegisters"  # HoldingRegisters、DiscreteInputs、InputRegistersresult_addrtype = "InputRegisters"  # HoldingRegisters、InputRegistersmyclient = myModBusTCPclient(MDclient)# 初始化控制位状态ctrl = "0000000000000000"myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary="0000000000000000")# 使能TriggerEnable并常置为1TriggerEnable = "1"ctrl = TriggerEnable + ctrl[1:]while True:print("ctrl: ", ctrl)# 写入控制myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary=ctrl, address=0)ResultACK = "0"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# 查询状态status = myclient.status(addrtype=status_addrtype, ByteSwap=ByteSwap, reg_addr=2, reg_nb=2)[0]TriggerReady = status[0]print("------TriggerReady", TriggerReady)TriggerACK = status[1]ResultOKorNG = status[11]# print("---------------------------------ResultOKorNG:", ResultOKorNG)if TriggerReady == "1":Trigger = "1"ctrl = ctrl[:1] + TriggerEnable + ctrl[2:]print("-----------------ctrl")if TriggerACK == "1":Trigger = "0"ctrl = ctrl[:1] + Trigger + ctrl[2:]if ResultOKorNG == "1":# print("---------------------------------------")ResultACK = "1"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# myclient.plc_out(ByteSwap=1)result = myclient.plc_out_only_result(addrtype=result_addrtype, ByteSwap=ByteSwap)print(result)

这篇关于python基于ModBusTCP服务端的业务实现特定的client的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/483214

相关文章

Spring Boot 实现 IP 限流的原理、实践与利弊解析

《SpringBoot实现IP限流的原理、实践与利弊解析》在SpringBoot中实现IP限流是一种简单而有效的方式来保障系统的稳定性和可用性,本文给大家介绍SpringBoot实现IP限... 目录一、引言二、IP 限流原理2.1 令牌桶算法2.2 漏桶算法三、使用场景3.1 防止恶意攻击3.2 控制资源

基于Python开发Windows屏幕控制工具

《基于Python开发Windows屏幕控制工具》在数字化办公时代,屏幕管理已成为提升工作效率和保护眼睛健康的重要环节,本文将分享一个基于Python和PySide6开发的Windows屏幕控制工具,... 目录概述功能亮点界面展示实现步骤详解1. 环境准备2. 亮度控制模块3. 息屏功能实现4. 息屏时间

Python如何去除图片干扰代码示例

《Python如何去除图片干扰代码示例》图片降噪是一个广泛应用于图像处理的技术,可以提高图像质量和相关应用的效果,:本文主要介绍Python如何去除图片干扰的相关资料,文中通过代码介绍的非常详细,... 目录一、噪声去除1. 高斯噪声(像素值正态分布扰动)2. 椒盐噪声(随机黑白像素点)3. 复杂噪声(如伪

springboot下载接口限速功能实现

《springboot下载接口限速功能实现》通过Redis统计并发数动态调整每个用户带宽,核心逻辑为每秒读取并发送限定数据量,防止单用户占用过多资源,确保整体下载均衡且高效,本文给大家介绍spring... 目录 一、整体目标 二、涉及的主要类/方法✅ 三、核心流程图解(简化) 四、关键代码详解1️⃣ 设置

Python中图片与PDF识别文本(OCR)的全面指南

《Python中图片与PDF识别文本(OCR)的全面指南》在数据爆炸时代,80%的企业数据以非结构化形式存在,其中PDF和图像是最主要的载体,本文将深入探索Python中OCR技术如何将这些数字纸张转... 目录一、OCR技术核心原理二、python图像识别四大工具库1. Pytesseract - 经典O

基于Linux的ffmpeg python的关键帧抽取

《基于Linux的ffmpegpython的关键帧抽取》本文主要介绍了基于Linux的ffmpegpython的关键帧抽取,实现以按帧或时间间隔抽取关键帧,文中通过示例代码介绍的非常详细,对大家的学... 目录1.FFmpeg的环境配置1) 创建一个虚拟环境envjavascript2) ffmpeg-py

Nginx 配置跨域的实现及常见问题解决

《Nginx配置跨域的实现及常见问题解决》本文主要介绍了Nginx配置跨域的实现及常见问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来... 目录1. 跨域1.1 同源策略1.2 跨域资源共享(CORS)2. Nginx 配置跨域的场景2.1

python使用库爬取m3u8文件的示例

《python使用库爬取m3u8文件的示例》本文主要介绍了python使用库爬取m3u8文件的示例,可以使用requests、m3u8、ffmpeg等库,实现获取、解析、下载视频片段并合并等步骤,具有... 目录一、准备工作二、获取m3u8文件内容三、解析m3u8文件四、下载视频片段五、合并视频片段六、错误

Python中提取文件名扩展名的多种方法实现

《Python中提取文件名扩展名的多种方法实现》在Python编程中,经常会遇到需要从文件名中提取扩展名的场景,Python提供了多种方法来实现这一功能,不同方法适用于不同的场景和需求,包括os.pa... 目录技术背景实现步骤方法一:使用os.path.splitext方法二:使用pathlib模块方法三

Python打印对象所有属性和值的方法小结

《Python打印对象所有属性和值的方法小结》在Python开发过程中,调试代码时经常需要查看对象的当前状态,也就是对象的所有属性和对应的值,然而,Python并没有像PHP的print_r那样直接提... 目录python中打印对象所有属性和值的方法实现步骤1. 使用vars()和pprint()2. 使