更新交互-队列监听-数据并发-程序执行

2024-03-19 18:36

本文主要是介绍更新交互-队列监听-数据并发-程序执行,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

 2 . 每次接口数据发生变化执行入队操作

3 . 使用进程池进行算法配置数据并行操作

4 . 算法程序进行消费执行 (代码案例仅供参考)


需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)

 2 . 每次接口数据发生变化执行入队操作

# redis队列 当算法配置修改完成时,数据写入队列,redis监听到数据重新执行
queue_redis = TestQueue()
queue_redis.push('1','1')

3 . 使用进程池进行算法配置数据并行操作

# 创建进程池
pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据
print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据
pool.map(process_data, data_results)# 关闭进程池
pool.close()# 等待所有进程结束
pool.join()

4 . 算法程序进行消费执行 (代码案例仅供参考)根据消费场景加锁进行限制

# -*- coding: gbk -*-
import cv2
import numpy as np
import os
import random
import pymysql
import copy
import ctypes
import time
import paramiko
import redisfrom ctypes import *
from dbutils.pooled_db import PooledDB# 数据库连接池
pool = PooledDB(pymysql, maxconnections=10, host='192.168.14.93', user='root', password='abc123',database='seal_system',charset='utf8')# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)# 服务器k类
class SSH_Func(object):def __init__(self):# 寒武纪盒子self.Folder_SSH = '/var/car_image/model_algorithm_package/'  # 华为云上传文件默认地址self.ssh_host = '114.116.48.77'self.ssh_user = 'root'self.ssh_password = 'hmwp1!YZHHab'self.ssh = paramiko.SSHClient()# ssh连接盒子服务器def connect_ssh(self):# SSH连接信息# 标识符connected = False# 最大连接次数attempts = 0while connected == False and attempts < 180:try:self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.ssh.connect(self.ssh_host, username=self.ssh_user, password=self.ssh_password)print("SSH连接成功")connected = Trueexcept Exception as e:print("SSH连接失败:", e)print("正在进行连接重试...")attempts += 1print(attempts, '次数')time.sleep(1)if connected == False:print('连接超过最大次数限制,无法连接到SSH服务器')return Nonereturn self.ssh# 华为云文件夹创建相关逻辑def Add_Folder(self):ssh = self.connect_ssh()if ssh is None:returnsftp = self.ssh.open_sftp()# 判断文件夹是否存在,如果不存在进行创建remote_folder_1st_level = os.path.dirname(self.Folder_SSH)try:sftp.mkdir(remote_folder_1st_level)print('成功创建算法模型目录:', remote_folder_1st_level)except Exception as e:print('算法模型目录已经存在,无法创建目录:', remote_folder_1st_level, e)sftp.close()ssh.close()# 上传文件方法def upload_send_file(self, img0, Conf_id, ii):ssh = self.connect_ssh()sftp = ssh.open_sftp()try:output_dir = self.Folder_SSH + 'output/{}/'.format(Conf_id)# 新增两个目录main_output_dir = self.Folder_SSH + 'output/'# 如果云服务不存在这个路径。那么进行创建try:sftp.chdir(main_output_dir)except IOError:sftp.mkdir(main_output_dir)try:sftp.chdir(output_dir)except IOError:sftp.mkdir(output_dir)finally:sftp.chdir(output_dir)# 路径赋值filename = '{:04d}.jpg'.format(ii)# 将图像数据直接写入到远程服务器with sftp.file(filename, 'wb') as f:# 使用OpenCV将图像数据写入文件对象_, img_data = cv2.imencode('.jpg', img0)f.write(img_data.tobytes())return output_dir + filenameexcept Exception as e:print('文件上传失败:', e)finally:sftp.close()ssh.close()# 获取视频为录像机子集的返回格式  参数为  父级id  子集通道code
def get_children_rtsp(id, code):# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()try:sql = "SELECT equipment_uname, equipment_password, equipment_ip FROM t_equipment WHERE id = %s"cursor.execute(sql, (id,))parent_data = cursor.fetchone()if parent_data:user = parent_data[0]password = parent_data[1]ip = parent_data[2]result = 'rtsp://{}:{}@{}:554/Streaming/Unicast/Channels/{}'.format(user, password, ip, code)else:result = Nonefinally:conn.close()return result# 获取所需数据
def get_data():# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()sql = ("SELECT c.id AS Mine_id, c.mine_name, d.id AS Equipment_id, ""d.equipment_name, d.equipment_type, d.equipment_ip, d.equipment_uname, d.equipment_password, d.code, d.parent_id, ""a.id AS Conf_id, a.Algorithm_library_id, a.conf_area, e.id AS test_type_id, e.test_type_ename, ""b.algorithm_status, b.algorithm_path, b.algorithm_name ""FROM `t_algorithm_config` AS a ""JOIN `t_algorithm_library` AS b ON a.Algorithm_library_id = b.id ""JOIN `t_mine` AS c ON a.Mine_id = c.id ""JOIN `t_equipment` AS d ON a.Equipment_id = d.id ""JOIN `t_algorithm_test_type` AS e ON a.Algorithm_test_type_id = e.id ""WHERE b.algorithm_status = 1;")cursor.execute(sql)# 获取查询结果集的列名columns = [i[0] for i in cursor.description]results = cursor.fetchall()# 构建字典列表result_dict_list = [dict(zip(columns, i)) for i in results]# 在列表推导式中使用了浅拷贝 copy.deepcopy() 来复制每个字典,更新 'rtsp_url' 字段。# 这样做可以保留原始数据的所有字段,同时只对 'rtsp_url' 进行修改。result_dict_list = [{**copy.deepcopy(i),'conf_area': list(np.ravel(eval(i['conf_area']))),'rtsp_url': 'rtsp://{}:{}@{}'.format(i['equipment_uname'], i['equipment_password'], i['equipment_ip'])}if i['equipment_type'] == '摄像头'else {**copy.deepcopy(i),'rtsp_url': get_children_rtsp(i['parent_id'], i['code']),'conf_area': list(np.ravel(eval(i['conf_area']))),}for i in result_dict_list]result_dict_list = [{**copy.deepcopy(i),'max_x': max(d['x'] for d in i['conf_area']),'max_y': max(d['y'] for d in i['conf_area']),'min_x': min(d['x'] for d in i['conf_area']),'min_y': min(d['y'] for d in i['conf_area'])} for i in result_dict_list]return result_dict_listnames = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', \'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', \'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', \'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', \'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', \'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', \'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', \'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', \'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', \'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier','toothbrush']def clip_coords(boxes, shape):# Clip bounding xyxy bounding boxes to image shape (height, width)boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1])  # x1, x2boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0])  # y1, y2def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):# Rescale coords (xyxy) from img1_shape to img0_shapeif ratio_pad is None:  # calculate from img0_shapegain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1])  # gain  = old / newpad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2  # wh paddingelse:gain = ratio_pad[0][0]pad = ratio_pad[1]coords[:, [0, 2]] -= pad[0]  # x paddingcoords[:, [1, 3]] -= pad[1]  # y paddingcoords[:, :4] /= gainclip_coords(coords, img0_shape)return coordsdef letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):# Resize and pad image while meeting stride-multiple constraintsshape = im.shape[:2]  # current shape [height, width]if isinstance(new_shape, int):new_shape = (new_shape, new_shape)# Scale ratio (new / old)r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])if not scaleup:  # only scale down, do not scale up (for better val mAP)r = min(r, 1.0)# Compute paddingratio = r, r  # width, height ratiosnew_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]  # wh paddingif auto:  # minimum rectangledw, dh = np.mod(dw, stride), np.mod(dh, stride)  # wh paddingelif scaleFill:  # stretchdw, dh = 0.0, 0.0new_unpad = (new_shape[1], new_shape[0])ratio = new_shape[1] / shape[1], new_shape[0] / shape[0]  # width, height ratiosdw /= 2  # divide padding into 2 sidesdh /= 2if shape[::-1] != new_unpad:  # resizeim = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))left, right = int(round(dw - 0.1)), int(round(dw + 0.1))im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)  # add borderreturn im, ratio, (dw, dh)def plot_one_box(x, img, color=None, label=None, line_thickness=None):# Plots one bounding box on image imgtl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1  # line/font thicknesscolor = color or [random.randint(0, 255) for _ in range(3)]c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)if label:tf = max(tl - 1, 1)  # font thicknesst_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA)  # filledcv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)def get_boxes(prediction, batch_size=1, img_size=608):"""Returns detections with shape:(x1, y1, x2, y2, object_conf, class_score, class_pred)"""reshape_value = np.reshape(prediction, (-1, 1))num_boxes_final = reshape_value[0]print('num_boxes_final: ', num_boxes_final)all_list = [[] for _ in range(batch_size)]for i in range(int(num_boxes_final)):batch_idx = int(reshape_value[64 + i * 7 + 0])if batch_idx >= 0 and batch_idx < batch_size:bl = reshape_value[64 + i * 7 + 3]br = reshape_value[64 + i * 7 + 4]bt = reshape_value[64 + i * 7 + 5]bb = reshape_value[64 + i * 7 + 6]if bt - bl > 0 and bb - br > 0:all_list[batch_idx].append(bl)all_list[batch_idx].append(br)all_list[batch_idx].append(bt)all_list[batch_idx].append(bb)all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])# all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])all_list[batch_idx].append(reshape_value[64 + i * 7 + 1])output = [np.array(all_list[i]).reshape(-1, 6) for i in range(batch_size)]return outputdef detect(img0, ip, model, ii, img_size, stride, api_lib, handle, save_img, colors, max_x, max_y, min_x, min_y,Conf_id, test_type_ename):print('ip:%s, ii:%s, model:%s' % (ip, ii, model))img = letterbox(img0, new_shape=img_size, stride=stride, auto=False)[0]image = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)image = image[np.newaxis, :].astype(np.uint8)data = np.asarray(image, dtype=np.uint8)input = data.ctypes.data_as(ctypes.c_void_p)# 3.inferenceoutput = api_lib.cnpyInference(handle, input)pred0 = output.contents[0:7232]pred0 = np.array(pred0).reshape(1, 7232)pred = get_boxes(pred0)print(pred)for i, det in enumerate(pred):if det is not None and len(det):det[:, :4] = scale_coords(img.shape[:2], det[:, :4], img0.shape).round()for *xyxy, conf, cls in det:cls = int(cls)  # 标签索引,对应 name_list = ['opencar_model', 'opencar_num', 'flatcar_model', 'flatcar_num', 'container_model', 'container_num']xmin = int(xyxy[0])  # 横坐标最小值ymin = int(xyxy[1])  # 纵坐标最小值xmax = int(xyxy[2])  # 横坐标最大值ymax = int(xyxy[3])  # 纵坐标最小值if min_x < xmin < max_x and min_y < ymin < max_y and min_x < xmax < max_x and min_y < ymax < max_y:# if xmax < max_x and xmin > min_x  and ymax < max_y and ymin > min_y:# if names[int(cls)] == 'person':if save_img:label = '%s %.2f' % (names[int(cls)], conf)plot_one_box(xyxy, img0, label=label, color=colors[int(cls)], line_thickness=2)print(names[int(cls)], '检测类型')if test_type_ename == names[int(cls)]:# 上传服务器类ssh = SSH_Func()ssh.Add_Folder()ssh.upload_send_file(img0, Conf_id, ii)# dirs = './output/%s/' % (Conf_id)## if not os.path.exists(dirs):  # 如果不存在路径,则创建这个路径#     os.makedirs(dirs)## # 路径赋值# dir_path = dirs + '/%04d.jpg' % ii## cv2.imwrite(dir_path, img0)# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()# 构建 SQL 语句和要插入的值sql = "INSERT INTO `t_algorithm_result` (`Algorithm_config_id`, `res_type`, `res_image`) VALUES (%s, %s, %s)"values = (Conf_id, 1, 'http://114.116.48.77:9001' + ssh.upload_send_file(img0, Conf_id, ii))# 执行 SQL 语句并提交事务cursor.execute(sql, values)conn.commit()else:print('检测类型不符,数据不予添加')print('---------------------------------------------------------------------------------------------------')returnimport multiprocessing
import timedef process_data(data):# 配置算法数据# 是否保存图片  yolov5save_img = True# 框与标注颜色colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]# 1.initll = ctypes.cdll.LoadLibraryapi_lib = ll("./lib/libcnrtapi.so")# 2.data preparationstride = 32img_size = 640offlinemodel_yolov5s = data['algorithm_path'].encode()handle_yolov5s = api_lib.cnpyInit(offlinemodel_yolov5s)api_lib.cnpyInference.restype = POINTER(POINTER(c_float))cap = cv2.VideoCapture(data['rtsp_url'])ii = 0# 创建队列对象queue = TestQueue()if cap.isOpened():while True:ret, frame = cap.read()if ii % 30 == 0 and queue.get_all() == []:  # 抽帧if ret:ip = 34detect(frame, ip, 'model_yolov5s', ii, img_size, stride, api_lib, handle_yolov5s, save_img,colors, data['max_x'], data['max_y'], data['min_x'], data['min_y'], data['Conf_id'],data['test_type_ename'])else:cap = cv2.VideoCapture(data['rtsp_url'])if ii % 30 == 0 and queue.get_all() != []:queue.pop()breakii += 1def main():while True:# 创建进程池pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据pool.map(process_data, data_results)# 关闭进程池pool.close()# 等待所有进程结束pool.join()if __name__ == "__main__":main()

这篇关于更新交互-队列监听-数据并发-程序执行的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/826913

相关文章

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

Go语言并发之通知退出机制的实现

《Go语言并发之通知退出机制的实现》本文主要介绍了Go语言并发之通知退出机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1、通知退出机制1.1 进程/main函数退出1.2 通过channel退出1.3 通过cont

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速