更新交互-队列监听-数据并发-程序执行

2024-03-19 18:36

本文主要是介绍更新交互-队列监听-数据并发-程序执行,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

 2 . 每次接口数据发生变化执行入队操作

3 . 使用进程池进行算法配置数据并行操作

4 . 算法程序进行消费执行 (代码案例仅供参考)


需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)

 2 . 每次接口数据发生变化执行入队操作

# redis队列 当算法配置修改完成时,数据写入队列,redis监听到数据重新执行
queue_redis = TestQueue()
queue_redis.push('1','1')

3 . 使用进程池进行算法配置数据并行操作

# 创建进程池
pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据
print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据
pool.map(process_data, data_results)# 关闭进程池
pool.close()# 等待所有进程结束
pool.join()

4 . 算法程序进行消费执行 (代码案例仅供参考)根据消费场景加锁进行限制

# -*- coding: gbk -*-
import cv2
import numpy as np
import os
import random
import pymysql
import copy
import ctypes
import time
import paramiko
import redisfrom ctypes import *
from dbutils.pooled_db import PooledDB# 数据库连接池
pool = PooledDB(pymysql, maxconnections=10, host='192.168.14.93', user='root', password='abc123',database='seal_system',charset='utf8')# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)# 服务器k类
class SSH_Func(object):def __init__(self):# 寒武纪盒子self.Folder_SSH = '/var/car_image/model_algorithm_package/'  # 华为云上传文件默认地址self.ssh_host = '114.116.48.77'self.ssh_user = 'root'self.ssh_password = 'hmwp1!YZHHab'self.ssh = paramiko.SSHClient()# ssh连接盒子服务器def connect_ssh(self):# SSH连接信息# 标识符connected = False# 最大连接次数attempts = 0while connected == False and attempts < 180:try:self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.ssh.connect(self.ssh_host, username=self.ssh_user, password=self.ssh_password)print("SSH连接成功")connected = Trueexcept Exception as e:print("SSH连接失败:", e)print("正在进行连接重试...")attempts += 1print(attempts, '次数')time.sleep(1)if connected == False:print('连接超过最大次数限制,无法连接到SSH服务器')return Nonereturn self.ssh# 华为云文件夹创建相关逻辑def Add_Folder(self):ssh = self.connect_ssh()if ssh is None:returnsftp = self.ssh.open_sftp()# 判断文件夹是否存在,如果不存在进行创建remote_folder_1st_level = os.path.dirname(self.Folder_SSH)try:sftp.mkdir(remote_folder_1st_level)print('成功创建算法模型目录:', remote_folder_1st_level)except Exception as e:print('算法模型目录已经存在,无法创建目录:', remote_folder_1st_level, e)sftp.close()ssh.close()# 上传文件方法def upload_send_file(self, img0, Conf_id, ii):ssh = self.connect_ssh()sftp = ssh.open_sftp()try:output_dir = self.Folder_SSH + 'output/{}/'.format(Conf_id)# 新增两个目录main_output_dir = self.Folder_SSH + 'output/'# 如果云服务不存在这个路径。那么进行创建try:sftp.chdir(main_output_dir)except IOError:sftp.mkdir(main_output_dir)try:sftp.chdir(output_dir)except IOError:sftp.mkdir(output_dir)finally:sftp.chdir(output_dir)# 路径赋值filename = '{:04d}.jpg'.format(ii)# 将图像数据直接写入到远程服务器with sftp.file(filename, 'wb') as f:# 使用OpenCV将图像数据写入文件对象_, img_data = cv2.imencode('.jpg', img0)f.write(img_data.tobytes())return output_dir + filenameexcept Exception as e:print('文件上传失败:', e)finally:sftp.close()ssh.close()# 获取视频为录像机子集的返回格式  参数为  父级id  子集通道code
def get_children_rtsp(id, code):# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()try:sql = "SELECT equipment_uname, equipment_password, equipment_ip FROM t_equipment WHERE id = %s"cursor.execute(sql, (id,))parent_data = cursor.fetchone()if parent_data:user = parent_data[0]password = parent_data[1]ip = parent_data[2]result = 'rtsp://{}:{}@{}:554/Streaming/Unicast/Channels/{}'.format(user, password, ip, code)else:result = Nonefinally:conn.close()return result# 获取所需数据
def get_data():# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()sql = ("SELECT c.id AS Mine_id, c.mine_name, d.id AS Equipment_id, ""d.equipment_name, d.equipment_type, d.equipment_ip, d.equipment_uname, d.equipment_password, d.code, d.parent_id, ""a.id AS Conf_id, a.Algorithm_library_id, a.conf_area, e.id AS test_type_id, e.test_type_ename, ""b.algorithm_status, b.algorithm_path, b.algorithm_name ""FROM `t_algorithm_config` AS a ""JOIN `t_algorithm_library` AS b ON a.Algorithm_library_id = b.id ""JOIN `t_mine` AS c ON a.Mine_id = c.id ""JOIN `t_equipment` AS d ON a.Equipment_id = d.id ""JOIN `t_algorithm_test_type` AS e ON a.Algorithm_test_type_id = e.id ""WHERE b.algorithm_status = 1;")cursor.execute(sql)# 获取查询结果集的列名columns = [i[0] for i in cursor.description]results = cursor.fetchall()# 构建字典列表result_dict_list = [dict(zip(columns, i)) for i in results]# 在列表推导式中使用了浅拷贝 copy.deepcopy() 来复制每个字典,更新 'rtsp_url' 字段。# 这样做可以保留原始数据的所有字段,同时只对 'rtsp_url' 进行修改。result_dict_list = [{**copy.deepcopy(i),'conf_area': list(np.ravel(eval(i['conf_area']))),'rtsp_url': 'rtsp://{}:{}@{}'.format(i['equipment_uname'], i['equipment_password'], i['equipment_ip'])}if i['equipment_type'] == '摄像头'else {**copy.deepcopy(i),'rtsp_url': get_children_rtsp(i['parent_id'], i['code']),'conf_area': list(np.ravel(eval(i['conf_area']))),}for i in result_dict_list]result_dict_list = [{**copy.deepcopy(i),'max_x': max(d['x'] for d in i['conf_area']),'max_y': max(d['y'] for d in i['conf_area']),'min_x': min(d['x'] for d in i['conf_area']),'min_y': min(d['y'] for d in i['conf_area'])} for i in result_dict_list]return result_dict_listnames = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', \'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', \'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', \'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', \'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', \'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', \'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', \'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', \'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', \'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier','toothbrush']def clip_coords(boxes, shape):# Clip bounding xyxy bounding boxes to image shape (height, width)boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1])  # x1, x2boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0])  # y1, y2def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):# Rescale coords (xyxy) from img1_shape to img0_shapeif ratio_pad is None:  # calculate from img0_shapegain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1])  # gain  = old / newpad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2  # wh paddingelse:gain = ratio_pad[0][0]pad = ratio_pad[1]coords[:, [0, 2]] -= pad[0]  # x paddingcoords[:, [1, 3]] -= pad[1]  # y paddingcoords[:, :4] /= gainclip_coords(coords, img0_shape)return coordsdef letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):# Resize and pad image while meeting stride-multiple constraintsshape = im.shape[:2]  # current shape [height, width]if isinstance(new_shape, int):new_shape = (new_shape, new_shape)# Scale ratio (new / old)r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])if not scaleup:  # only scale down, do not scale up (for better val mAP)r = min(r, 1.0)# Compute paddingratio = r, r  # width, height ratiosnew_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]  # wh paddingif auto:  # minimum rectangledw, dh = np.mod(dw, stride), np.mod(dh, stride)  # wh paddingelif scaleFill:  # stretchdw, dh = 0.0, 0.0new_unpad = (new_shape[1], new_shape[0])ratio = new_shape[1] / shape[1], new_shape[0] / shape[0]  # width, height ratiosdw /= 2  # divide padding into 2 sidesdh /= 2if shape[::-1] != new_unpad:  # resizeim = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))left, right = int(round(dw - 0.1)), int(round(dw + 0.1))im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)  # add borderreturn im, ratio, (dw, dh)def plot_one_box(x, img, color=None, label=None, line_thickness=None):# Plots one bounding box on image imgtl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1  # line/font thicknesscolor = color or [random.randint(0, 255) for _ in range(3)]c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)if label:tf = max(tl - 1, 1)  # font thicknesst_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA)  # filledcv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)def get_boxes(prediction, batch_size=1, img_size=608):"""Returns detections with shape:(x1, y1, x2, y2, object_conf, class_score, class_pred)"""reshape_value = np.reshape(prediction, (-1, 1))num_boxes_final = reshape_value[0]print('num_boxes_final: ', num_boxes_final)all_list = [[] for _ in range(batch_size)]for i in range(int(num_boxes_final)):batch_idx = int(reshape_value[64 + i * 7 + 0])if batch_idx >= 0 and batch_idx < batch_size:bl = reshape_value[64 + i * 7 + 3]br = reshape_value[64 + i * 7 + 4]bt = reshape_value[64 + i * 7 + 5]bb = reshape_value[64 + i * 7 + 6]if bt - bl > 0 and bb - br > 0:all_list[batch_idx].append(bl)all_list[batch_idx].append(br)all_list[batch_idx].append(bt)all_list[batch_idx].append(bb)all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])# all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])all_list[batch_idx].append(reshape_value[64 + i * 7 + 1])output = [np.array(all_list[i]).reshape(-1, 6) for i in range(batch_size)]return outputdef detect(img0, ip, model, ii, img_size, stride, api_lib, handle, save_img, colors, max_x, max_y, min_x, min_y,Conf_id, test_type_ename):print('ip:%s, ii:%s, model:%s' % (ip, ii, model))img = letterbox(img0, new_shape=img_size, stride=stride, auto=False)[0]image = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)image = image[np.newaxis, :].astype(np.uint8)data = np.asarray(image, dtype=np.uint8)input = data.ctypes.data_as(ctypes.c_void_p)# 3.inferenceoutput = api_lib.cnpyInference(handle, input)pred0 = output.contents[0:7232]pred0 = np.array(pred0).reshape(1, 7232)pred = get_boxes(pred0)print(pred)for i, det in enumerate(pred):if det is not None and len(det):det[:, :4] = scale_coords(img.shape[:2], det[:, :4], img0.shape).round()for *xyxy, conf, cls in det:cls = int(cls)  # 标签索引,对应 name_list = ['opencar_model', 'opencar_num', 'flatcar_model', 'flatcar_num', 'container_model', 'container_num']xmin = int(xyxy[0])  # 横坐标最小值ymin = int(xyxy[1])  # 纵坐标最小值xmax = int(xyxy[2])  # 横坐标最大值ymax = int(xyxy[3])  # 纵坐标最小值if min_x < xmin < max_x and min_y < ymin < max_y and min_x < xmax < max_x and min_y < ymax < max_y:# if xmax < max_x and xmin > min_x  and ymax < max_y and ymin > min_y:# if names[int(cls)] == 'person':if save_img:label = '%s %.2f' % (names[int(cls)], conf)plot_one_box(xyxy, img0, label=label, color=colors[int(cls)], line_thickness=2)print(names[int(cls)], '检测类型')if test_type_ename == names[int(cls)]:# 上传服务器类ssh = SSH_Func()ssh.Add_Folder()ssh.upload_send_file(img0, Conf_id, ii)# dirs = './output/%s/' % (Conf_id)## if not os.path.exists(dirs):  # 如果不存在路径,则创建这个路径#     os.makedirs(dirs)## # 路径赋值# dir_path = dirs + '/%04d.jpg' % ii## cv2.imwrite(dir_path, img0)# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()# 构建 SQL 语句和要插入的值sql = "INSERT INTO `t_algorithm_result` (`Algorithm_config_id`, `res_type`, `res_image`) VALUES (%s, %s, %s)"values = (Conf_id, 1, 'http://114.116.48.77:9001' + ssh.upload_send_file(img0, Conf_id, ii))# 执行 SQL 语句并提交事务cursor.execute(sql, values)conn.commit()else:print('检测类型不符,数据不予添加')print('---------------------------------------------------------------------------------------------------')returnimport multiprocessing
import timedef process_data(data):# 配置算法数据# 是否保存图片  yolov5save_img = True# 框与标注颜色colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]# 1.initll = ctypes.cdll.LoadLibraryapi_lib = ll("./lib/libcnrtapi.so")# 2.data preparationstride = 32img_size = 640offlinemodel_yolov5s = data['algorithm_path'].encode()handle_yolov5s = api_lib.cnpyInit(offlinemodel_yolov5s)api_lib.cnpyInference.restype = POINTER(POINTER(c_float))cap = cv2.VideoCapture(data['rtsp_url'])ii = 0# 创建队列对象queue = TestQueue()if cap.isOpened():while True:ret, frame = cap.read()if ii % 30 == 0 and queue.get_all() == []:  # 抽帧if ret:ip = 34detect(frame, ip, 'model_yolov5s', ii, img_size, stride, api_lib, handle_yolov5s, save_img,colors, data['max_x'], data['max_y'], data['min_x'], data['min_y'], data['Conf_id'],data['test_type_ename'])else:cap = cv2.VideoCapture(data['rtsp_url'])if ii % 30 == 0 and queue.get_all() != []:queue.pop()breakii += 1def main():while True:# 创建进程池pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据pool.map(process_data, data_results)# 关闭进程池pool.close()# 等待所有进程结束pool.join()if __name__ == "__main__":main()

这篇关于更新交互-队列监听-数据并发-程序执行的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/826913

相关文章

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A