Python 全栈系列255 UCS实践:按ID同步数据

2024-06-23 00:12

本文主要是介绍Python 全栈系列255 UCS实践:按ID同步数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

这是一个常见的使用场景,实测下来效果良好。

内容

1 实验场景

将库中所有的数据取出,送到队列

本质上,这是一种单向不返回的模式。除了在遍历全库有用,在进行回测时也是一样的,时间就是单向不返回的。

通过UCS,将任意离散的数据记录归并到了一个更大的单位下。按照brick、block、part、shard四个层级,使得数据的管理兼顾到人的记忆特性,以及程序批量处理的效率。一个brick通常代表一万条数据,之后以千不断进位。到part级别已经是十亿的容量了。

UCS将所有数据的编号分为三类:

  • 1 数值类。从0开始编号,每条记录递增,这个就是mysql的自增id。
  • 2 时间类。以小时为brick,天为block,月为part, 年为shard。
  • 3 字符类。所有为数值、非时间类的主键采用字符编号。一般采用md5码计算32位字符,然后根据 每8个字符之和对10取余。如果数据很大,也可以考虑对100,甚至1000取余。

UCS规范已经嵌在GFGoLite服务中,通过UCS对象进行快速实现。

以下是本次实验的文件

  • 1 首先要声明worker的缓存空间名称,一般只需一次,后续其他的worker也可以使用这个空间。
  • 2 worker并不是服务状态的,所以每次启动必须要载入元数据,在结束本次执行后,要保存元数据
  • 3 worker的功能是从clickhouse中取数,然后存到stream中
  • 4 QM方面,通过声明远端服务器的RedisAgent完成
  • 5 从GlobalBuffer中获取clickhouse的连接参数
  • 6 使用CHClient来进行实际的控制
  • 7 执行前,待执行的brick_list应该被更新后放在缓存内。
  • 8 执行时,worker先取出待执行的brick_list和已经执行的brick(last_brick)
  • 9 如果last_brick是空,说明这是初始状态,cur_brick为brick_list中的第一个
  • 10 其他情况,cur_brick始终可以取到下一个,直到结尾(此时cur_brick始终等于last_brick),worker会跳过执行
  • 11 在正常执行时,worker通过ucs知道当前数据主键的范围,所以可以根据这个条件取出对应brick的原始数据
  • 12 执行结束时,worker将cur_brick更新到last_brick中。

最终,没执行一次脚本,就会搬运一个brick到远端队列。

'''
UCS顺序Worker的概念Worker采用UCS的顺序编号:id编号、时间编号Worker依赖Buffer提供运行时参数:- 1 brick列表
- 2 上一次处理brick
'''# 1 创建变量空间(Once) worker.general (TroubleShooting ts_001)
# 2 读取需要处理的brick_list(Manually)from Basefuncs import * worker_buffer_space = 'sp_worker.general'
tier1 = 'xxx'
tier2 = 'ucs_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'target_redis_agent_host = 'http://IP:24118/'
target_redis_connection_hash = None 
target_stream_name = 'xxxx'
target_stream_max_len = 10000000qm = QManager(redis_agent_host = target_redis_agent_host, redis_connection_hash = target_redis_connection_hash, q_max_len = target_stream_max_len)
# ==========================  Load 
gb = GlobalBuffer()
# manually + brick_list
# gb.setx(prefix +'brick_list',brick_list,persist=True)
brick_list=  gb.getx(prefix +'brick_list')
last_brick_handled = gb.getx(prefix +'last_brick_handled')  or ''
last_runtime = gb.getx(prefix +'last_runtime')# brick_list需要保证顺序
if last_brick_handled is None:current_brick =  brick_list[0]
else:if brick_list.index(last_brick_handled) ==  len(brick_list) -1:current_brick = last_brick_handledelse:current_brick = brick_list[brick_list.index(last_brick_handled) +1]print('current_brick', current_brick)if current_brick != last_brick_handled:
# 根据buffer知道要处理的数据ucs = UCS()current_brick_bounds = ucs.get_brick_bounds(current_brick)# ==========================  Processingclick_para = gb.getx('sp_global.buffer.local.container.clickhouse.my_database.para')chc = CHClient(**click_para)# 根据bounds获取数据query_sql = 'select a, b, c, d from xxx where id >= %s and id < %s' % (current_brick_bounds[0], current_brick_bounds[1] )brick_data = chc._exe_sql(query_sql)brick_data_df = pd.DataFrame(brick_data, columns = ['a','b','c','d'])brick_data_df.columns = ['id','task_for','before','after']brick_data_df['function_type'] = 'ucs_worker'brick_data_df['rec_id'] = brick_data_df['id']brick_data_listofdict = brick_data_df.to_dict(orient='records')# ==========================  Postcur_q_len = qm.stream_len(target_stream_name)cur_write_resp = qm.parrallel_write_msg(target_stream_name, brick_data_listofdict, time_out=180)# ==========================  Updateif cur_write_resp['status']:last_brick_handled = current_brickgb.setx(prefix +'last_brick_handled', last_brick_handled, persist =True)print('current batch ', len(brick_data_listofdict),' 、target stream len',qm.stream_len(target_stream_name))
else:last_brick_handled = current_bricklast_runtime = get_time_str1()
gb.setx(prefix +'last_runtime', last_runtime)

flask_celery

后来我用了python的标准logging包 + RotateLog的方式记录,不过以下脚本仍然有用。

执行脚本

对于非标准的程序执行,通过脚本方式放在本地的home文件夹下,由celery调度。
注意,被celery执行的脚本,里面最好都写上绝对路径,因为在使用celery worker执行时,当前路径会默认为服务的启动路径 /opt/flask_celery。
例如LOG_FILE,只写tem.log,那么就会在flask_celery下发生修改。
始终注意的是,由flask celery执行的应该是简单的流转任务,而不是复杂的计算任务。如果有,就应该放在某个容器里执行。
再考虑到执行环境,flask celery是在base环境启动的,对应的包应该都能用。如果要执行特别的任务,就要在脚本里指定环境的切换。

vim /home/test_exe.sh

#!/bin/bash
# 日志文件路径
LOG_FILE="/home/tem.log"# 获取当前时间并追加到日志文件
echo "$(date '+%Y-%m-%d %H:%M:%S') - 脚本执行" >> $LOG_FILE# 检查日志文件中的记录条数
LINE_COUNT=$(wc -l < "$LOG_FILE")# 如果记录条数超过10000条,则截断日志文件以保留最新的100条记录
if [ "$LINE_COUNT" -gt 10000 ]; then# 计算需要保留的行数LINES_TO_KEEP=100# 截断日志文件tail -n $LINES_TO_KEEP $LOG_FILE > temp.log && mv temp.log $LOG_FILE
fi

然后将脚本改为可执行
chmod +x /home/test_exe.sh
执行测试


import requests as req param_dict = {'script_path': '/home/test_exe.sh'}resp = req.post('http://127.0.0.1:24104/exe_sh/',json = param_dict )In [5]: !cat tem.log
2024-06-17 14:55:54 - 脚本执行
2024-06-17 14:59:14 - 脚本执行
2024-06-17 15:21:13 - 脚本执行

这篇关于Python 全栈系列255 UCS实践:按ID同步数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085804

相关文章

Python并行处理实战之如何使用ProcessPoolExecutor加速计算

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu... 目录简介完整代码示例代码解释1. 导入必要的模块2. 定义处理函数3. 主函数4. 生成数字列表5.

Python中help()和dir()函数的使用

《Python中help()和dir()函数的使用》我们经常需要查看某个对象(如模块、类、函数等)的属性和方法,Python提供了两个内置函数help()和dir(),它们可以帮助我们快速了解代... 目录1. 引言2. help() 函数2.1 作用2.2 使用方法2.3 示例(1) 查看内置函数的帮助(

Python虚拟环境与Conda使用指南分享

《Python虚拟环境与Conda使用指南分享》:本文主要介绍Python虚拟环境与Conda使用指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、python 虚拟环境概述1.1 什么是虚拟环境1.2 为什么需要虚拟环境二、Python 内置的虚拟环境工具

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

Python pip下载包及所有依赖到指定文件夹的步骤说明

《Pythonpip下载包及所有依赖到指定文件夹的步骤说明》为了方便开发和部署,我们常常需要将Python项目所依赖的第三方包导出到本地文件夹中,:本文主要介绍Pythonpip下载包及所有依... 目录步骤说明命令格式示例参数说明离线安装方法注意事项总结要使用pip下载包及其所有依赖到指定文件夹,请按照以

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取

基于Python实现一个Windows Tree命令工具

《基于Python实现一个WindowsTree命令工具》今天想要在Windows平台的CMD命令终端窗口中使用像Linux下的tree命令,打印一下目录结构层级树,然而还真有tree命令,但是发现... 目录引言实现代码使用说明可用选项示例用法功能特点添加到环境变量方法一:创建批处理文件并添加到PATH1

Python包管理工具核心指令uvx举例详细解析

《Python包管理工具核心指令uvx举例详细解析》:本文主要介绍Python包管理工具核心指令uvx的相关资料,uvx是uv工具链中用于临时运行Python命令行工具的高效执行器,依托Rust实... 目录一、uvx 的定位与核心功能二、uvx 的典型应用场景三、uvx 与传统工具对比四、uvx 的技术实

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

python判断文件是否存在常用的几种方式

《python判断文件是否存在常用的几种方式》在Python中我们在读写文件之前,首先要做的事情就是判断文件是否存在,否则很容易发生错误的情况,:本文主要介绍python判断文件是否存在常用的几种... 目录1. 使用 os.path.exists()2. 使用 os.path.isfile()3. 使用