python 将mysql转为csv、json导入到Doris数据库

2024-08-21 19:20

本文主要是介绍python 将mysql转为csv、json导入到Doris数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一篇文章是将mysql导出成csv文件,适合csv不是很大的情况,以下对脚本进行了优化,采用分块读取csv,降低了内存的使用率,提高了传输速度。

from pydoris.doris_client import DorisClient
import requests
import pandas as pd
import numpy as npfe_host = ''
username = ''
passwd = ''
fe_http_port = ""
fe_query_port = ""doris_client = DorisClient(fe_host=fe_host,fe_query_port=fe_query_port,fe_http_port=fe_http_port,username=username,password=passwd,db='zst_cep_model')# 由于导入的csv文件过大,都是五六G以上,所以使用chunksize 分块获取数据进行操作。
for key, chunk in enumerate(pd.read_csv('xxxx.csv', chunksize=10000, dtype=str)):# 这一步也是很迷,需要读csv,然后转为字符串类型csv = chunk.to_csv(header=False, index=False, sep=',').encode('utf-8')# table_name 必须的是database.table的形式success = doris_client.write('xxx.xxx', csv)if success:print("数据写入成功!")else:print("数据写入失败。", key)breakdel csv

后面需要进行增量更新,使用csv太麻烦,想着使用json传输可能会好点,并直接从mysql中获取数据直接传输到doris,以下是脚本。

import pandas as pd
from pydoris.doris_client import *from tools import *# 配置
config = {// Mysql的配置'mysql_config': {'host': '','port': ,'user': '','password': '','database': ''},// Doris的配置'doris_db_config': {'fe_host': '','username': '','passwd': '','fe_http_port': "",'fe_query_port': "",'db': ''},# 要传输的mysql的目标表'mysql_table': '',# 要接收的Doris的目标表'doris_table': '',
}# MysqlDataBaseClass 是自己编写的Mysql工具类,返回的是Mysql连接对象
yp_apidb = MysqlDataBaseClass(host=config['mysql_config']['host'], port=config['mysql_config']['port'], user=config['mysql_config']['user'],password=config['mysql_config']['password'], database=config['mysql_config']['database'])doris_client = DorisClient(fe_host=config['doris_db_config']['fe_host'],fe_query_port=config['doris_db_config']['fe_query_port'],fe_http_port=config['doris_db_config']['fe_http_port'],username=config['doris_db_config']['username'],password=config['doris_db_config']['passwd'],db=config['doris_db_config']['db'])def get_data_from_mysql(page=1, end_page=None, total_row_num=None, page_size=10000, limit_date='2024-08-12'):result = {'total_page': 0, 'total': 0, 'now_page': page, 'data': [], 'code': False, 'msg': ''}if total_row_num is None:select_res = yp_apidb.ExecuteSQL_Select(sql=f'''SELECT count(1) as total_num FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}';''')totalRowsNum = int(select_res[0]['total_num'])else:totalRowsNum = total_row_numif (totalRowsNum % page_size) == 0:totalPages = int(totalRowsNum / page_size)else:totalPages = int((totalRowsNum / page_size) + 1)result['total_page'] = totalPagesresult['total'] = totalRowsNumif end_page and page > end_page:result['msg'] = '已经达到设置的最后一页'return resultif page > totalPages:result['msg'] = '已经是最后一页'return resultstart_num = int((page - 1) * page_size)limit = f'{start_num}, {page_size}'sql = f'''SELECT * FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}' limit {limit};'''data_list = yp_apidb.ExecuteSQL_Select(sql)result['data'] = data_listresult['code'] = Trueresult['msg'] = '获取成功'return resultdef insert_to_doris(data_list):if len(data_list) > 0:df = pd.DataFrame(data_list)json_data = df.to_json(orient='records')options = WriteOptions()options.set_json_format()options.set_option("strip_outer_array", "true")success = doris_client.write(f"{config['doris_table']}", json_data, options=options)if success:return Trueelse:print("数据写入失败。")return Falseif __name__ == '__main__':page = 1total_row_num = Nonelimit_date = '2024-08-01'# 循环获取下一页,从而达到自动翻页的功能while True:res = get_data_from_mysql(page=page, total_row_num=total_row_num, limit_date=limit_date)print(res['msg'], res['total_page'], res['total'], res['now_page'])total_row_num = res['total']if res['code']:data_list = res['data']flage = insert_to_doris(data_list)if flage is False:breakpage += 1else:print(res['msg'], page)break

以上脚本仅供学习参考,仅为实现临时功能而编写,还有优化的空间。

这篇关于python 将mysql转为csv、json导入到Doris数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1094054

相关文章

MySQL MCP 服务器安装配置最佳实践

《MySQLMCP服务器安装配置最佳实践》本文介绍MySQLMCP服务器的安装配置方法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下... 目录mysql MCP 服务器安装配置指南简介功能特点安装方法数据库配置使用MCP Inspector进行调试开发指

mysql中insert into的基本用法和一些示例

《mysql中insertinto的基本用法和一些示例》INSERTINTO用于向MySQL表插入新行,支持单行/多行及部分列插入,下面给大家介绍mysql中insertinto的基本用法和一些示例... 目录基本语法插入单行数据插入多行数据插入部分列的数据插入默认值注意事项在mysql中,INSERT I

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解

一文深入详解Python的secrets模块

《一文深入详解Python的secrets模块》在构建涉及用户身份认证、权限管理、加密通信等系统时,开发者最不能忽视的一个问题就是“安全性”,Python在3.6版本中引入了专门面向安全用途的secr... 目录引言一、背景与动机:为什么需要 secrets 模块?二、secrets 模块的核心功能1. 基

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

python常见环境管理工具超全解析

《python常见环境管理工具超全解析》在Python开发中,管理多个项目及其依赖项通常是一个挑战,下面:本文主要介绍python常见环境管理工具的相关资料,文中通过代码介绍的非常详细,需要的朋友... 目录1. conda2. pip3. uvuv 工具自动创建和管理环境的特点4. setup.py5.

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

Python常用命令提示符使用方法详解

《Python常用命令提示符使用方法详解》在学习python的过程中,我们需要用到命令提示符(CMD)进行环境的配置,:本文主要介绍Python常用命令提示符使用方法的相关资料,文中通过代码介绍的... 目录一、python环境基础命令【Windows】1、检查Python是否安装2、 查看Python的安

SQL Server数据库死锁处理超详细攻略

《SQLServer数据库死锁处理超详细攻略》SQLServer作为主流数据库管理系统,在高并发场景下可能面临死锁问题,影响系统性能和稳定性,这篇文章主要给大家介绍了关于SQLServer数据库死... 目录一、引言二、查询 Sqlserver 中造成死锁的 SPID三、用内置函数查询执行信息1. sp_w

Python UV安装、升级、卸载详细步骤记录

《PythonUV安装、升级、卸载详细步骤记录》:本文主要介绍PythonUV安装、升级、卸载的详细步骤,uv是Astral推出的下一代Python包与项目管理器,主打单一可执行文件、极致性能... 目录安装检查升级设置自动补全卸载UV 命令总结 官方文档详见:https://docs.astral.sh/