【Python】FANUC机器人OPC UA通信并记录数据

2024-04-11 18:04

本文主要是介绍【Python】FANUC机器人OPC UA通信并记录数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):client = Client(url)client.connect()return client
  • 获取根节点和对象节点
def get_root_node(client: Client):return client.get_root_node()
def get_objects_node(client: Client):return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variables

5. 备份旧CSV文件

def backup_csv_file(filename):if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):"""创建客户端实例并连接到服务端"""client = Client(url)client.connect()return clientdef get_root_node(client: Client):"""获取服务器命名空间中的根节点"""return client.get_root_node()def get_objects_node(client: Client):"""获取服务器的对象节点"""return client.get_objects_node()def get_variables(node: Node, path=""):"""遍历所有子节点并返回变量节点的路径和数值"""variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variablesdef backup_csv_file(filename):"""如果CSV文件已存在则备份"""if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

这篇关于【Python】FANUC机器人OPC UA通信并记录数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/894798

相关文章

Python+FFmpeg实现视频自动化处理的完整指南

《Python+FFmpeg实现视频自动化处理的完整指南》本文总结了一套在Python中使用subprocess.run调用FFmpeg进行视频自动化处理的解决方案,涵盖了跨平台硬件加速、中间素材处理... 目录一、 跨平台硬件加速:统一接口设计1. 核心映射逻辑2. python 实现代码二、 中间素材处

python中的flask_sqlalchemy的使用及示例详解

《python中的flask_sqlalchemy的使用及示例详解》文章主要介绍了在使用SQLAlchemy创建模型实例时,通过元类动态创建实例的方式,并说明了如何在实例化时执行__init__方法,... 目录@orm.reconstructorSQLAlchemy的回滚关联其他模型数据库基本操作将数据添

Python实现快速扫描目标主机的开放端口和服务

《Python实现快速扫描目标主机的开放端口和服务》这篇文章主要为大家详细介绍了如何使用Python编写一个功能强大的端口扫描器脚本,实现快速扫描目标主机的开放端口和服务,感兴趣的小伙伴可以了解下... 目录功能介绍场景应用1. 网络安全审计2. 系统管理维护3. 网络故障排查4. 合规性检查报错处理1.

MySQL快速复制一张表的四种核心方法(包括表结构和数据)

《MySQL快速复制一张表的四种核心方法(包括表结构和数据)》本文详细介绍了四种复制MySQL表(结构+数据)的方法,并对每种方法进行了对比分析,适用于不同场景和数据量的复制需求,特别是针对超大表(1... 目录一、mysql 复制表(结构+数据)的 4 种核心方法(面试结构化回答)方法 1:CREATE

Python轻松实现Word到Markdown的转换

《Python轻松实现Word到Markdown的转换》在文档管理、内容发布等场景中,将Word转换为Markdown格式是常见需求,本文将介绍如何使用FreeSpire.DocforPython实现... 目录一、工具简介二、核心转换实现1. 基础单文件转换2. 批量转换Word文件三、工具特性分析优点局

Python中4大日志记录库比较的终极PK

《Python中4大日志记录库比较的终极PK》日志记录框架是一种工具,可帮助您标准化应用程序中的日志记录过程,:本文主要介绍Python中4大日志记录库比较的相关资料,文中通过代码介绍的非常详细,... 目录一、logging库1、优点2、缺点二、LogAid库三、Loguru库四、Structlogphp

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

C++,C#,Rust,Go,Java,Python,JavaScript的性能对比全面讲解

《C++,C#,Rust,Go,Java,Python,JavaScript的性能对比全面讲解》:本文主要介绍C++,C#,Rust,Go,Java,Python,JavaScript性能对比全面... 目录编程语言性能对比、核心优势与最佳使用场景性能对比表格C++C#RustGoJavapythonjav

Python海象运算符:=的具体实现

《Python海象运算符:=的具体实现》海象运算符又称​​赋值表达式,Python3.8后可用,其核心设计是在表达式内部完成变量赋值并返回该值,从而简化代码逻辑,下面就来详细的介绍一下如何使用,感兴趣... 目录简介​​条件判断优化循环控制简化​推导式高效计算​正则匹配与数据提取​性能对比简介海象运算符

python项目环境切换的几种实现方式

《python项目环境切换的几种实现方式》本文主要介绍了python项目环境切换的几种实现方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 如何在不同python项目中,安装不同的依赖2. 如何切换到不同项目的工作空间3.创建项目