ISO15765-2 CAN报文解析 python

2023-10-15 10:30
文章标签 python 解析 报文 iso15765

本文主要是介绍ISO15765-2 CAN报文解析 python,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

网络传输层帧结构:
N_PCI
个人微信: wzdhh991
邮箱: wzd991@126.com
个人比较懒,CSDN更新缓慢,有相关问题可发邮箱或个人微信交流!
源码:

# -*- coding:utf-8 -*-
import queue
import sys
import os
import binascii
import struct
import re
import json
import traceback
import threading
from tool_base import *
from zlgcan import *
from PyQt6.QtCore import *
from queue import *
import time
import logging
uds_tp_wdbg = logging.getLogger('UdsTp')
# mode
(UDS_TP_MODE_TX_AND_RX, UDS_TP_MODE_RX) = range(2)
# rx msg
(MSG_ID_RX_PHY, MSG_ID_TX_PHY,MSG_ID_TX_APP, MSG_ID_RX_UDS_APP, MSG_ID_TX_END, MSG_ID_READ_INFO,MSG_ID_WRITE_ST) = range(7)
# data
(BAND_RATE_125K, BAND_RATE_250K, BAND_RATE_500K, BAND_RATE_1M) = (125, 250, 500, 1000)
(RX_MARK_PHY, RX_MARK_UDS_APP) = (1, 2)
MAX_RCV_NUM = 1
(NPCItype_SINGLE_FRAME, NPCItype_FIRST_FRAME, NPCItype_CONTINUE_FRAME, NPCItype_CTRL_FRAME) = range(0, 4)
(ST_IDLE, TX_ST_RXING) = range(0, 2)
(MSG_ID_SEND_UDS, MSG_ID_SEND_PHY) = range(2)
def get_uds_tp_msg_id_st(id):name = ('RX PHY', 'TX PHY', 'TX APP', 'RX APP')if id >= len(name):return 'unknon'else:return name[id]def get_n_pci(head):out_msg = {}try:pci_type = head[0] >> 4out_msg['N_PCItype'] = pci_typeif pci_type == NPCItype_SINGLE_FRAME:  # singleframe  sfout_msg['SF_DL'] = head[0] & 0xfif out_msg['SF_DL'] > 8 or out_msg['SF_DL'] == 0:out_msg['SF_DL'] = 7out_msg['data'] = binascii.hexlify(bytearray(head[1:out_msg['SF_DL'] + 1]))elif pci_type == NPCItype_FIRST_FRAME:  # first frame ff'''0-6 无效 7:扩展地址或者可变地址8-fff数据长度'''out_msg['FF_DL'] = ((head[0] & 0xf) << 8) | head[1]out_msg['data'] = binascii.hexlify(bytearray(head[2:]))passelif pci_type == NPCItype_CONTINUE_FRAME:  # consecutive frame cfout_msg['SN'] = head[0] & 0xfout_msg['data'] = binascii.hexlify(bytearray(head[1:]))passelif pci_type == NPCItype_CTRL_FRAME:  # flow controlout_msg['data'] = ''out_msg['FS'] = head[0] & 0x0fif out_msg['FS'] == 0:out_msg['FS_name'] = 'continuetosend(CTS)'  # continuetosendelif out_msg['FS'] == 1:out_msg['FS_name'] = 'wait (WT)'elif out_msg['FS'] == 2:out_msg['FS_name'] = 'overflow (OVFLW)'else:out_msg['FS_name'] = 'unknow'out_msg['BS'] = head[1]out_msg['STmin'] = head[2]if 127 >= out_msg['STmin'] >= 0:out_msg['STmin_name'] = 'separation time:%dms' % out_msg['STmin']elif 0xf1 <= out_msg['STmin'] <= 0xf9:out_msg['STmin_name'] = 'separation time:%dus' % out_msg['STmin']else:out_msg['STmin_name'] = 'unknow'except Exception as e:printf_except_info(e)return out_msgclass uds_tp(object):rx_signal_hd = [None, None]ch_run_mark = [0, 0]dev_info = {}can_hd = Nonedev_handle = Nonecan_handle = [None, None]read_thread = [None,None]run_mode = [UDS_TP_MODE_RX, UDS_TP_MODE_RX]msg_queue = (Queue(),Queue())run_st = [ST_IDLE, ST_IDLE]wait_sn = [1,1]decode_all_len = [0, 0]decode_msg = [bytearray(), bytearray()]can_id_tx = [0x700, 0x700]can_id_rx = [0x730, 0x730]"""docstring for uds_tp"""def __init__(self, dev_id):super(uds_tp, self).__init__()uds_tp_wdbg.info('device id:%d', dev_id)with open("./dev_info.json", "r") as fd:self.dev_info = json.load(fd)["USBCAN-II"]self.can_hd = ZCAN()self.dev_handle = self.can_hd.OpenDevice(self.dev_info["dev_type"], dev_id, 0)"""请求发送数据"""def set_tx_rx_id(self,cid:int, tx:int, rx:int):uds_tp_wdbg.info('tx:0x%x rx:0x%x'%(tx,rx))self.can_id_tx[cid] = txself.can_id_rx[cid] = rxdef send_phy_data(self, cid, can_id, data):self.msg_queue[cid].put((MSG_ID_SEND_PHY, can_id , data))#私用使用def _send_data_to_can(self, cid, can_id, data):#todo.try:msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_TX_PHYmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = 8msg['data'] = dataself.rx_signal_hd[cid].emit(msg)msg_temp = ZCAN_Transmit_Data()msg_temp.transmit_type = 0msg_temp.frame.rtr = 0if can_id > 0x7fff:msg_temp.frame.eff = 1else:msg_temp.frame.eff = 0msg_temp.frame.can_dlc = 8msg_temp.frame.brs = 0msg_temp.frame.len = 8msg_temp.frame.can_id = can_idfor j in range(0, 8):msg_temp.frame.data[j] = int(data[j])uds_tp_wdbg.info( 'tx canid:0x%x data:%s'%(can_id, binascii.hexlify(data)))ret = self.can_hd.Transmit(self.can_handle[cid], msg_temp, 1)if ret == 1:return 0except Exception as e:printf_except_info(e)def send_uds_frame(self, cid, can_id, data):msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_TX_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = len(data)msg['data'] = dataself.rx_signal_hd[cid].emit(msg)if len(data) == 0:returnif self.run_mode[cid] == UDS_TP_MODE_TX_AND_RX:self.msg_queue[cid].put((MSG_ID_SEND_UDS, can_id, data))def start(self, mode=UDS_TP_MODE_TX_AND_RX,cid=0, band_rate='500K',can_id=(),rx_signal=None, rx_mark=0, dev_id=0):if self.ch_run_mark[cid] == 0:try:self.rx_signal_hd[cid] = rx_signalchn_cfg = ZCAN_CHANNEL_INIT_CONFIG()chn_cfg.can_type = ZCAN_TYPE_CANchn_cfg.config.can.mode = 0chn_cfg.config.can.timing0 = self.dev_info["chn_info"]["baudrate"][band_rate]["timing0"]chn_cfg.config.can.timing1 = self.dev_info["chn_info"]["baudrate"][band_rate]["timing1"]chn_cfg.config.can.acc_code = 0chn_cfg.config.can.acc_mask = 0xFFFFFFFFself.run_mode[cid] = modeif self.dev_handle == INVALID_DEVICE_HANDLE:return "OpenDevice!"self.can_handle[cid] = self.can_hd.InitCAN(self.dev_handle, cid, chn_cfg)if self.can_handle[cid] == INVALID_CHANNEL_HANDLE:return "初始化通道失败!"ret = self.can_hd.StartCAN(self.can_handle[cid])if ret != ZCAN_STATUS_OK:return "打开通道失败!"self.ch_run_mark[cid] = 1self.read_thread[cid] = threading.Thread(None, target=self.handle_press, args=(cid, self.rx_signal_hd[cid]))self.read_thread[cid].start()except Exception as e:printf_except_info(e)return "ok"else:return "err""""请求停止"""def stop(self, cid):if self.ch_run_mark[cid] == 1:self.ch_run_mark[cid] = 0self.read_thread[cid].join(0.1)self.can_hd.ResetCAN(self.can_handle[cid])self.can_hd.CloseDevice(self.can_handle[cid])passdef set_mode(self,cid, mode):self.run_mode[cid] = modedef get_mode(self,cid):return self.run_mode[cid]def _transmit_layer_decode(self, cid, can_id, data):if self.run_mode[cid] == UDS_TP_MODE_TX_AND_RX:if (can_id != self.can_id_rx[cid]) and (can_id != 0x7df):return Noneif self.run_mode[cid] == UDS_TP_MODE_RX:idmap = (self.can_id_rx[cid], 0x7df, self.can_id_tx[cid])if can_id not in idmap:  # 物理寻址符合uds规则return Nonehead = get_n_pci(data)if head['N_PCItype'] == NPCItype_SINGLE_FRAME:#单帧if head['SF_DL'] > 0:if head['SF_DL'] > 7:head['SF_DL'] = 7msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_UDS_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = head['SF_DL']msg['data'] = data[1:head['SF_DL'] + 1]self.rx_signal_hd[cid].emit(msg)#elif head['N_PCItype'] == NPCItype_FIRST_FRAME:#多帧,第一帧out = bytearray(8)out[0] = 0x30out[2] = 1if self.run_mode[cid] == UDS_TP_MODE_TX_AND_RX:self._send_data_to_can(cid, self.can_id_tx[cid], out)self.wait_sn[cid] = 1self.decode_all_len[cid] = head['FF_DL']uds_tp_wdbg.info('can id:0x%x all size:%d', can_id, self.decode_all_len[cid])self.decode_msg[cid] = bytearray(0)if self.decode_all_len[cid] > 6:self.decode_msg[cid].extend(data[2:])self.decode_all_len[cid] -= 6# self.run_st[cid] = TX_ST_RXINGelse:msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_UDS_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = self.decode_all_len[cid]msg['data'] = data[2:2 + self.decode_all_len[cid]]self.rx_signal_hd[cid].emit(msg)self.decode_all_len[cid] = 0elif head['N_PCItype'] == NPCItype_CONTINUE_FRAME:#多帧连续帧uds_tp_wdbg.info('can id:0x%x all size:%d sn:%d', can_id, self.decode_all_len[cid], self.wait_sn[cid])if (head['SN'] == self.wait_sn[cid]):self.wait_sn[cid] += 1self.wait_sn[cid] %= 16if self.decode_all_len[cid] >= 7:self.decode_all_len[cid] -= 7self.decode_msg[cid].extend(data[1:])else:self.decode_msg[cid].extend(data[1:self.decode_all_len[cid] + 1])self.decode_all_len[cid] = 0if self.decode_all_len[cid] <= 0:self.run_st[cid] = ST_IDLEmsg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_UDS_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = len(self.decode_msg[cid])msg['data'] = self.decode_msg[cid]self.rx_signal_hd[cid].emit(msg)else:passelif head['N_PCItype'] == NPCItype_CTRL_FRAME:#流控帧passreturn headdef get_rec_data(self, cid):can_num = self.can_hd.GetReceiveNum(self.can_handle[cid], ZCAN_TYPE_CAN)if can_num == 0:return Noneread_cnt = MAX_RCV_NUM if can_num >= MAX_RCV_NUM else can_numcan_msgs, act_num = self.can_hd.Receive(self.can_handle[cid], read_cnt, MAX_RCV_NUM)if act_num == 0:return Nonemsg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_PHYmsg['canid'] = can_msgs[0].frame.can_id & 0xfffffffmsg['time'] = time.time()msg['len'] = can_msgs[0].frame.can_dlcmsg['data'] = bytearray(can_msgs[0].frame.data)msg['pci'] =  self._transmit_layer_decode(cid, msg['canid'], msg['data'])self.rx_signal_hd[cid].emit(msg)# can_id_map = [0x700,0x730,0x313, 0x701, 0x781]if (msg['canid'] == self.can_id_rx[cid]) or (self.can_id_tx[cid] == msg['canid']):uds_tp_wdbg.info('rx canid:0x%x data:%s'%(msg['canid'], binascii.hexlify(msg['data'])))return msgdef _wait_flow_ctrl(self, cid):now = time.time()while True:if time.time() - now > 0.5:return Nonemsg = self.get_rec_data(cid)if msg == None:continueif 'pci' not in msg.keys():continueelse:if  msg['pci'] is not None:if  'N_PCItype' in msg['pci'].keys():if msg['pci']['N_PCItype'] == NPCItype_CTRL_FRAME:return msg['pci']def handle_press(self, cid, signal_hd):while True:if self.ch_run_mark[cid] == 0:returnrec_msg = self.get_rec_data(cid)if self.msg_queue[cid].qsize() > 0 and (self.run_st[cid] == ST_IDLE):msg = self.msg_queue[cid].get()if msg[0] == MSG_ID_SEND_UDS:can_id = msg[1]send_temp_data = msg[2]if len(send_temp_data) <= 7:tempx = bytearray(1)tempx[0] = len(send_temp_data)tempx.extend(send_temp_data)if len(send_temp_data) < 8:temp_size = 8 - len(tempx)for i in range(0, temp_size):tempx.append(0xaa)self._send_data_to_can(cid, can_id, tempx)else:all_size = len(send_temp_data)i = 6sn = 1tempx = bytearray(0)tempx.append(0x10 | (all_size >> 8))tempx.append(all_size & 0xff)tempx.extend(send_temp_data[0:6])self._send_data_to_can(cid, can_id, tempx)pci = self._wait_flow_ctrl(cid)if pci is None:#发送失败continuewhile (i < all_size):if all_size - i > 7:tempx = bytearray(0)tempx.append(0x20 + sn)tempx.extend(send_temp_data[i: i + 7])self._send_data_to_can(cid, can_id, tempx)i += 7else:tempx = bytearray(0)tempx.append(0x20 + sn)tempx.extend(send_temp_data[i:])if len(tempx) < 8:temp_size = 8 - len(tempx)for i in range(0, temp_size):tempx.append(0xaa)self._send_data_to_can(cid, can_id, tempx)i = all_sizesn += 1sn %= 16elif msg[0] == MSG_ID_SEND_PHY:self._send_data_to_can(cid, msg[1], msg[2])

这篇关于ISO15765-2 CAN报文解析 python的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/217074

相关文章

Python中Json和其他类型相互转换的实现示例

《Python中Json和其他类型相互转换的实现示例》本文介绍了在Python中使用json模块实现json数据与dict、object之间的高效转换,包括loads(),load(),dumps()... 项目中经常会用到json格式转为object对象、dict字典格式等。在此做个记录,方便后续用到该方

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置

从基础到高级详解Python数值格式化输出的完全指南

《从基础到高级详解Python数值格式化输出的完全指南》在数据分析、金融计算和科学报告领域,数值格式化是提升可读性和专业性的关键技术,本文将深入解析Python中数值格式化输出的相关方法,感兴趣的小伙... 目录引言:数值格式化的核心价值一、基础格式化方法1.1 三种核心格式化方式对比1.2 基础格式化示例

Python与MySQL实现数据库实时同步的详细步骤

《Python与MySQL实现数据库实时同步的详细步骤》在日常开发中,数据同步是一项常见的需求,本篇文章将使用Python和MySQL来实现数据库实时同步,我们将围绕数据变更捕获、数据处理和数据写入这... 目录前言摘要概述:数据同步方案1. 基本思路2. mysql Binlog 简介实现步骤与代码示例1

Python ORM神器之SQLAlchemy基本使用完全指南

《PythonORM神器之SQLAlchemy基本使用完全指南》SQLAlchemy是Python主流ORM框架,通过对象化方式简化数据库操作,支持多数据库,提供引擎、会话、模型等核心组件,实现事务... 目录一、什么是SQLAlchemy?二、安装SQLAlchemy三、核心概念1. Engine(引擎)

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

Ubuntu如何升级Python版本

《Ubuntu如何升级Python版本》Ubuntu22.04Docker中,安装Python3.11后,使用update-alternatives设置为默认版本,最后用python3-V验证... 目China编程录问题描述前提环境解决方法总结问题描述Ubuntu22.04系统自带python3.10,想升级

Python自动化处理PDF文档的操作完整指南

《Python自动化处理PDF文档的操作完整指南》在办公自动化中,PDF文档处理是一项常见需求,本文将介绍如何使用Python实现PDF文档的自动化处理,感兴趣的小伙伴可以跟随小编一起学习一下... 目录使用pymupdf读写PDF文件基本概念安装pymupdf提取文本内容提取图像添加水印使用pdfplum

Maven中生命周期深度解析与实战指南

《Maven中生命周期深度解析与实战指南》这篇文章主要为大家详细介绍了Maven生命周期实战指南,包含核心概念、阶段详解、SpringBoot特化场景及企业级实践建议,希望对大家有一定的帮助... 目录一、Maven 生命周期哲学二、default生命周期核心阶段详解(高频使用)三、clean生命周期核心阶

Python 基于http.server模块实现简单http服务的代码举例

《Python基于http.server模块实现简单http服务的代码举例》Pythonhttp.server模块通过继承BaseHTTPRequestHandler处理HTTP请求,使用Threa... 目录测试环境代码实现相关介绍模块简介类及相关函数简介参考链接测试环境win11专业版python