Python 全栈系列255 UCS实践:按ID同步数据

2024-06-23 00:12

本文主要是介绍Python 全栈系列255 UCS实践:按ID同步数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

这是一个常见的使用场景,实测下来效果良好。

内容

1 实验场景

将库中所有的数据取出,送到队列

本质上,这是一种单向不返回的模式。除了在遍历全库有用,在进行回测时也是一样的,时间就是单向不返回的。

通过UCS,将任意离散的数据记录归并到了一个更大的单位下。按照brick、block、part、shard四个层级,使得数据的管理兼顾到人的记忆特性,以及程序批量处理的效率。一个brick通常代表一万条数据,之后以千不断进位。到part级别已经是十亿的容量了。

UCS将所有数据的编号分为三类:

  • 1 数值类。从0开始编号,每条记录递增,这个就是mysql的自增id。
  • 2 时间类。以小时为brick,天为block,月为part, 年为shard。
  • 3 字符类。所有为数值、非时间类的主键采用字符编号。一般采用md5码计算32位字符,然后根据 每8个字符之和对10取余。如果数据很大,也可以考虑对100,甚至1000取余。

UCS规范已经嵌在GFGoLite服务中,通过UCS对象进行快速实现。

以下是本次实验的文件

  • 1 首先要声明worker的缓存空间名称,一般只需一次,后续其他的worker也可以使用这个空间。
  • 2 worker并不是服务状态的,所以每次启动必须要载入元数据,在结束本次执行后,要保存元数据
  • 3 worker的功能是从clickhouse中取数,然后存到stream中
  • 4 QM方面,通过声明远端服务器的RedisAgent完成
  • 5 从GlobalBuffer中获取clickhouse的连接参数
  • 6 使用CHClient来进行实际的控制
  • 7 执行前,待执行的brick_list应该被更新后放在缓存内。
  • 8 执行时,worker先取出待执行的brick_list和已经执行的brick(last_brick)
  • 9 如果last_brick是空,说明这是初始状态,cur_brick为brick_list中的第一个
  • 10 其他情况,cur_brick始终可以取到下一个,直到结尾(此时cur_brick始终等于last_brick),worker会跳过执行
  • 11 在正常执行时,worker通过ucs知道当前数据主键的范围,所以可以根据这个条件取出对应brick的原始数据
  • 12 执行结束时,worker将cur_brick更新到last_brick中。

最终,没执行一次脚本,就会搬运一个brick到远端队列。

'''
UCS顺序Worker的概念Worker采用UCS的顺序编号:id编号、时间编号Worker依赖Buffer提供运行时参数:- 1 brick列表
- 2 上一次处理brick
'''# 1 创建变量空间(Once) worker.general (TroubleShooting ts_001)
# 2 读取需要处理的brick_list(Manually)from Basefuncs import * worker_buffer_space = 'sp_worker.general'
tier1 = 'xxx'
tier2 = 'ucs_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'target_redis_agent_host = 'http://IP:24118/'
target_redis_connection_hash = None 
target_stream_name = 'xxxx'
target_stream_max_len = 10000000qm = QManager(redis_agent_host = target_redis_agent_host, redis_connection_hash = target_redis_connection_hash, q_max_len = target_stream_max_len)
# ==========================  Load 
gb = GlobalBuffer()
# manually + brick_list
# gb.setx(prefix +'brick_list',brick_list,persist=True)
brick_list=  gb.getx(prefix +'brick_list')
last_brick_handled = gb.getx(prefix +'last_brick_handled')  or ''
last_runtime = gb.getx(prefix +'last_runtime')# brick_list需要保证顺序
if last_brick_handled is None:current_brick =  brick_list[0]
else:if brick_list.index(last_brick_handled) ==  len(brick_list) -1:current_brick = last_brick_handledelse:current_brick = brick_list[brick_list.index(last_brick_handled) +1]print('current_brick', current_brick)if current_brick != last_brick_handled:
# 根据buffer知道要处理的数据ucs = UCS()current_brick_bounds = ucs.get_brick_bounds(current_brick)# ==========================  Processingclick_para = gb.getx('sp_global.buffer.local.container.clickhouse.my_database.para')chc = CHClient(**click_para)# 根据bounds获取数据query_sql = 'select a, b, c, d from xxx where id >= %s and id < %s' % (current_brick_bounds[0], current_brick_bounds[1] )brick_data = chc._exe_sql(query_sql)brick_data_df = pd.DataFrame(brick_data, columns = ['a','b','c','d'])brick_data_df.columns = ['id','task_for','before','after']brick_data_df['function_type'] = 'ucs_worker'brick_data_df['rec_id'] = brick_data_df['id']brick_data_listofdict = brick_data_df.to_dict(orient='records')# ==========================  Postcur_q_len = qm.stream_len(target_stream_name)cur_write_resp = qm.parrallel_write_msg(target_stream_name, brick_data_listofdict, time_out=180)# ==========================  Updateif cur_write_resp['status']:last_brick_handled = current_brickgb.setx(prefix +'last_brick_handled', last_brick_handled, persist =True)print('current batch ', len(brick_data_listofdict),' 、target stream len',qm.stream_len(target_stream_name))
else:last_brick_handled = current_bricklast_runtime = get_time_str1()
gb.setx(prefix +'last_runtime', last_runtime)

flask_celery

后来我用了python的标准logging包 + RotateLog的方式记录,不过以下脚本仍然有用。

执行脚本

对于非标准的程序执行,通过脚本方式放在本地的home文件夹下,由celery调度。
注意,被celery执行的脚本,里面最好都写上绝对路径,因为在使用celery worker执行时,当前路径会默认为服务的启动路径 /opt/flask_celery。
例如LOG_FILE,只写tem.log,那么就会在flask_celery下发生修改。
始终注意的是,由flask celery执行的应该是简单的流转任务,而不是复杂的计算任务。如果有,就应该放在某个容器里执行。
再考虑到执行环境,flask celery是在base环境启动的,对应的包应该都能用。如果要执行特别的任务,就要在脚本里指定环境的切换。

vim /home/test_exe.sh

#!/bin/bash
# 日志文件路径
LOG_FILE="/home/tem.log"# 获取当前时间并追加到日志文件
echo "$(date '+%Y-%m-%d %H:%M:%S') - 脚本执行" >> $LOG_FILE# 检查日志文件中的记录条数
LINE_COUNT=$(wc -l < "$LOG_FILE")# 如果记录条数超过10000条,则截断日志文件以保留最新的100条记录
if [ "$LINE_COUNT" -gt 10000 ]; then# 计算需要保留的行数LINES_TO_KEEP=100# 截断日志文件tail -n $LINES_TO_KEEP $LOG_FILE > temp.log && mv temp.log $LOG_FILE
fi

然后将脚本改为可执行
chmod +x /home/test_exe.sh
执行测试


import requests as req param_dict = {'script_path': '/home/test_exe.sh'}resp = req.post('http://127.0.0.1:24104/exe_sh/',json = param_dict )In [5]: !cat tem.log
2024-06-17 14:55:54 - 脚本执行
2024-06-17 14:59:14 - 脚本执行
2024-06-17 15:21:13 - 脚本执行

这篇关于Python 全栈系列255 UCS实践:按ID同步数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085804

相关文章

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python正则表达式匹配和替换的操作指南

《Python正则表达式匹配和替换的操作指南》正则表达式是处理文本的强大工具,Python通过re模块提供了完整的正则表达式功能,本文将通过代码示例详细介绍Python中的正则匹配和替换操作,需要的朋... 目录基础语法导入re模块基本元字符常用匹配方法1. re.match() - 从字符串开头匹配2.

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

通过Docker容器部署Python环境的全流程

《通过Docker容器部署Python环境的全流程》在现代化开发流程中,Docker因其轻量化、环境隔离和跨平台一致性的特性,已成为部署Python应用的标准工具,本文将详细演示如何通过Docker容... 目录引言一、docker与python的协同优势二、核心步骤详解三、进阶配置技巧四、生产环境最佳实践

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函