python基于ModBusTCP服务端的业务实现特定的client

2023-12-12 04:20

本文主要是介绍python基于ModBusTCP服务端的业务实现特定的client,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。

一、业务描述

我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。

1、ModBusTCP服务端交互流程

大概类似如下交互时序,其中PLC将用我们的脚本代替。

2、控制

根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。

3、状态

根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。

4、结果

根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。

5、 地址空间

(1)控制

类型:HoldingRegisters、Coils

起始地址与寄存器数量:自定义

(2)状态

类型:HoldingRegisters、DiscreteInputs、InputRegisters

起始地址与寄存器数量:自定义

(3)结果

类型:HoldingRegisters、InputRegisters

起始地址与寄存器数量:自定义

二、程序整体设计

class myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":...else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":...elif addrtype=="DiscreteInputs":...else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):if addrtype=="HoldingRegisters":...elif addrtype=="InputRegisters":...else:raise ValueError("plc_out_addrtype的值错误!")if __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():myclient = myModBusTCPclient(MDclient)myclient.ctrl(ByteSwap=1, binary="1100000000000000")time.sleep(1)myclient.status(ByteSwap=1)

1、程序结构

上述代码定义了一个名为 myModBusTCPclient 的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:

构造函数 __init__

接收一个参数 obj,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj 中。

custom_binary 方法:

将给定的整数 num 转换为指定长度 length 的二进制字符串。可选参数 ByteSwap 用于指定是否进行字节交换。如果 ByteSwap 为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。

custom_num 方法:

接收一个二进制字符串 binary,根据给定的长度 length 和是否进行字节交换 ByteSwap 将其转换为整数。返回转换得到的整数。

ctrl 方法:

根据给定的地址类型 addrtype(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap、二进制字符串 binary、Modbus 地址 address 和从站号 slave,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers 方法写入寄存器。

status 方法:

根据给定的地址类型 addrtype、是否进行字节交换 ByteSwap、寄存器地址 reg_addr、寄存器数量 reg_nb 和从站号 slave,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers 方法读取寄存器。

plc_out 方法:

根据给定的地址类型 addrtype 和是否进行字节交换 ByteSwap,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。

if __name__ == "__main__": 部分:

在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient。通过 myModBusTCPclient 类创建了一个自定义的客户端对象 myclient。调用了 ctrl 方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status 方法,从 Modbus 服务器读取数据。

2、不同地址空间的请求

在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。

为了满足业务,需要对原有的pymodbus进行封装改造。

为了满足大端序和小端序,需要加入字节交换的操作。

三、程序实现

import json
import time
import socket
from pymodbus.client import ModbusTcpClientclass myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def result_ByteSwap(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strif ByteSwap == 1:return result_strelif ByteSwap == 0:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":values = []for i in binary:if i == "0":values.append(False)elif i == "1":values.append(True)print(values)# 线圈不存在字节交换# value = self.custom_num(binary[0:16], ByteSwap=0)# print("------------------------------values :", [i for i in bin(value)[2:]])# self.obj.write_coils(address=address, values=[i for i in bin(value)[2:]], slave=slave)self.obj.write_coils(address=address, values=values, slave=slave)else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(address=reg_addr, count=reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="DiscreteInputs":# 离散寄存器不存在字节交换value = self.obj.read_discrete_inputs(address=reg_addr, count=reg_nb, slave=slave)print([value.bits[i] for i in range(reg_nb)])result = ""for i in [value.bits[i] for i in range(reg_nb)]:if i == False:result += "0"elif i == True:result += "1"# [value.bits[i] for i in range(reg_nb)]return [result[i:i+16] for i in range(0, len(result), 16)]else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result HoldingRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]else:raise ValueError("plc_out_addrtype的值错误!")def plc_out_only_result(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):result_list = self.plc_out(addrtype=addrtype, ByteSwap=ByteSwap, reg_addr=reg_addr, reg_nb=reg_nb, slave=slave)[5:]result_list_have = [i for i in result_list if i != 0]result_binary = ""result = ""for num in result_list_have:# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = 16 - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strresult_binary += result_strfor binary in [result_binary[i:i+8] for i in range(0, len(result_binary), 8)]:  # 以长度8进行分割if binary != "00000000":result += chr(int(binary, 2))return resultif __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():ByteSwap = 0ctrl_addrtype = "Coils"  # HoldingRegisters、Coilsstatus_addrtype = "HoldingRegisters"  # HoldingRegisters、DiscreteInputs、InputRegistersresult_addrtype = "InputRegisters"  # HoldingRegisters、InputRegistersmyclient = myModBusTCPclient(MDclient)# 初始化控制位状态ctrl = "0000000000000000"myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary="0000000000000000")# 使能TriggerEnable并常置为1TriggerEnable = "1"ctrl = TriggerEnable + ctrl[1:]while True:print("ctrl: ", ctrl)# 写入控制myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary=ctrl, address=0)ResultACK = "0"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# 查询状态status = myclient.status(addrtype=status_addrtype, ByteSwap=ByteSwap, reg_addr=2, reg_nb=2)[0]TriggerReady = status[0]print("------TriggerReady", TriggerReady)TriggerACK = status[1]ResultOKorNG = status[11]# print("---------------------------------ResultOKorNG:", ResultOKorNG)if TriggerReady == "1":Trigger = "1"ctrl = ctrl[:1] + TriggerEnable + ctrl[2:]print("-----------------ctrl")if TriggerACK == "1":Trigger = "0"ctrl = ctrl[:1] + Trigger + ctrl[2:]if ResultOKorNG == "1":# print("---------------------------------------")ResultACK = "1"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# myclient.plc_out(ByteSwap=1)result = myclient.plc_out_only_result(addrtype=result_addrtype, ByteSwap=ByteSwap)print(result)

这篇关于python基于ModBusTCP服务端的业务实现特定的client的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/483214

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

一文详解Python如何开发游戏

《一文详解Python如何开发游戏》Python是一种非常流行的编程语言,也可以用来开发游戏模组,:本文主要介绍Python如何开发游戏的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、python简介二、Python 开发 2D 游戏的优劣势优势缺点三、Python 开发 3D

Python函数作用域与闭包举例深度解析

《Python函数作用域与闭包举例深度解析》Python函数的作用域规则和闭包是编程中的关键概念,它们决定了变量的访问和生命周期,:本文主要介绍Python函数作用域与闭包的相关资料,文中通过代码... 目录1. 基础作用域访问示例1:访问全局变量示例2:访问外层函数变量2. 闭包基础示例3:简单闭包示例4

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Python版本与package版本兼容性检查方法总结

《Python版本与package版本兼容性检查方法总结》:本文主要介绍Python版本与package版本兼容性检查方法的相关资料,文中提供四种检查方法,分别是pip查询、conda管理、PyP... 目录引言为什么会出现兼容性问题方法一:用 pip 官方命令查询可用版本方法二:conda 管理包环境方法