python基于ModBusTCP服务端的业务实现特定的client

2023-12-11 19:30

本文主要是介绍python基于ModBusTCP服务端的业务实现特定的client,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。

一、业务描述

我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。

1、ModBusTCP服务端交互流程

大概类似如下交互时序,其中PLC将用我们的脚本代替。

2、控制

根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。

3、状态

根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。

4、结果

根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。

5、 地址空间

(1)控制

类型:HoldingRegisters、Coils

起始地址与寄存器数量:自定义

(2)状态

类型:HoldingRegisters、DiscreteInputs、InputRegisters

起始地址与寄存器数量:自定义

(3)结果

类型:HoldingRegisters、InputRegisters

起始地址与寄存器数量:自定义

二、程序整体设计

class myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":...else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":...elif addrtype=="DiscreteInputs":...else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):if addrtype=="HoldingRegisters":...elif addrtype=="InputRegisters":...else:raise ValueError("plc_out_addrtype的值错误!")if __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():myclient = myModBusTCPclient(MDclient)myclient.ctrl(ByteSwap=1, binary="1100000000000000")time.sleep(1)myclient.status(ByteSwap=1)

1、程序结构

上述代码定义了一个名为 myModBusTCPclient 的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:

构造函数 __init__

接收一个参数 obj,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj 中。

custom_binary 方法:

将给定的整数 num 转换为指定长度 length 的二进制字符串。可选参数 ByteSwap 用于指定是否进行字节交换。如果 ByteSwap 为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。

custom_num 方法:

接收一个二进制字符串 binary,根据给定的长度 length 和是否进行字节交换 ByteSwap 将其转换为整数。返回转换得到的整数。

ctrl 方法:

根据给定的地址类型 addrtype(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap、二进制字符串 binary、Modbus 地址 address 和从站号 slave,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers 方法写入寄存器。

status 方法:

根据给定的地址类型 addrtype、是否进行字节交换 ByteSwap、寄存器地址 reg_addr、寄存器数量 reg_nb 和从站号 slave,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers 方法读取寄存器。

plc_out 方法:

根据给定的地址类型 addrtype 和是否进行字节交换 ByteSwap,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。

if __name__ == "__main__": 部分:

在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient。通过 myModBusTCPclient 类创建了一个自定义的客户端对象 myclient。调用了 ctrl 方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status 方法,从 Modbus 服务器读取数据。

2、不同地址空间的请求

在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。

为了满足业务,需要对原有的pymodbus进行封装改造。

为了满足大端序和小端序,需要加入字节交换的操作。

三、程序实现

import json
import time
import socket
from pymodbus.client import ModbusTcpClientclass myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def result_ByteSwap(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strif ByteSwap == 1:return result_strelif ByteSwap == 0:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":values = []for i in binary:if i == "0":values.append(False)elif i == "1":values.append(True)print(values)# 线圈不存在字节交换# value = self.custom_num(binary[0:16], ByteSwap=0)# print("------------------------------values :", [i for i in bin(value)[2:]])# self.obj.write_coils(address=address, values=[i for i in bin(value)[2:]], slave=slave)self.obj.write_coils(address=address, values=values, slave=slave)else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(address=reg_addr, count=reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="DiscreteInputs":# 离散寄存器不存在字节交换value = self.obj.read_discrete_inputs(address=reg_addr, count=reg_nb, slave=slave)print([value.bits[i] for i in range(reg_nb)])result = ""for i in [value.bits[i] for i in range(reg_nb)]:if i == False:result += "0"elif i == True:result += "1"# [value.bits[i] for i in range(reg_nb)]return [result[i:i+16] for i in range(0, len(result), 16)]else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result HoldingRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]else:raise ValueError("plc_out_addrtype的值错误!")def plc_out_only_result(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):result_list = self.plc_out(addrtype=addrtype, ByteSwap=ByteSwap, reg_addr=reg_addr, reg_nb=reg_nb, slave=slave)[5:]result_list_have = [i for i in result_list if i != 0]result_binary = ""result = ""for num in result_list_have:# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = 16 - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strresult_binary += result_strfor binary in [result_binary[i:i+8] for i in range(0, len(result_binary), 8)]:  # 以长度8进行分割if binary != "00000000":result += chr(int(binary, 2))return resultif __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():ByteSwap = 0ctrl_addrtype = "Coils"  # HoldingRegisters、Coilsstatus_addrtype = "HoldingRegisters"  # HoldingRegisters、DiscreteInputs、InputRegistersresult_addrtype = "InputRegisters"  # HoldingRegisters、InputRegistersmyclient = myModBusTCPclient(MDclient)# 初始化控制位状态ctrl = "0000000000000000"myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary="0000000000000000")# 使能TriggerEnable并常置为1TriggerEnable = "1"ctrl = TriggerEnable + ctrl[1:]while True:print("ctrl: ", ctrl)# 写入控制myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary=ctrl, address=0)ResultACK = "0"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# 查询状态status = myclient.status(addrtype=status_addrtype, ByteSwap=ByteSwap, reg_addr=2, reg_nb=2)[0]TriggerReady = status[0]print("------TriggerReady", TriggerReady)TriggerACK = status[1]ResultOKorNG = status[11]# print("---------------------------------ResultOKorNG:", ResultOKorNG)if TriggerReady == "1":Trigger = "1"ctrl = ctrl[:1] + TriggerEnable + ctrl[2:]print("-----------------ctrl")if TriggerACK == "1":Trigger = "0"ctrl = ctrl[:1] + Trigger + ctrl[2:]if ResultOKorNG == "1":# print("---------------------------------------")ResultACK = "1"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# myclient.plc_out(ByteSwap=1)result = myclient.plc_out_only_result(addrtype=result_addrtype, ByteSwap=ByteSwap)print(result)

这篇关于python基于ModBusTCP服务端的业务实现特定的client的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/481854

相关文章

Python中模块graphviz使用入门

《Python中模块graphviz使用入门》graphviz是一个用于创建和操作图形的Python库,本文主要介绍了Python中模块graphviz使用入门,具有一定的参考价值,感兴趣的可以了解一... 目录1.安装2. 基本用法2.1 输出图像格式2.2 图像style设置2.3 属性2.4 子图和聚

Python使用Matplotlib绘制3D曲面图详解

《Python使用Matplotlib绘制3D曲面图详解》:本文主要介绍Python使用Matplotlib绘制3D曲面图,在Python中,使用Matplotlib库绘制3D曲面图可以通过mpl... 目录准备工作绘制简单的 3D 曲面图绘制 3D 曲面图添加线框和透明度控制图形视角Matplotlib

一文教你Python如何快速精准抓取网页数据

《一文教你Python如何快速精准抓取网页数据》这篇文章主要为大家详细介绍了如何利用Python实现快速精准抓取网页数据,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录1. 准备工作2. 基础爬虫实现3. 高级功能扩展3.1 抓取文章详情3.2 保存数据到文件4. 完整示例

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

基于Python打造一个智能单词管理神器

《基于Python打造一个智能单词管理神器》这篇文章主要为大家详细介绍了如何使用Python打造一个智能单词管理神器,从查询到导出的一站式解决,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 项目概述:为什么需要这个工具2. 环境搭建与快速入门2.1 环境要求2.2 首次运行配置3. 核心功能使用指

Python实现微信自动锁定工具

《Python实现微信自动锁定工具》在数字化办公时代,微信已成为职场沟通的重要工具,但临时离开时忘记锁屏可能导致敏感信息泄露,下面我们就来看看如何使用Python打造一个微信自动锁定工具吧... 目录引言:当微信隐私遇到自动化守护效果展示核心功能全景图技术亮点深度解析1. 无操作检测引擎2. 微信路径智能获

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

利用Python打造一个Excel记账模板

《利用Python打造一个Excel记账模板》这篇文章主要为大家详细介绍了如何使用Python打造一个超实用的Excel记账模板,可以帮助大家高效管理财务,迈向财富自由之路,感兴趣的小伙伴快跟随小编一... 目录设置预算百分比超支标红预警记账模板功能介绍基础记账预算管理可视化分析摸鱼时间理财法碎片时间利用财

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑