python 将mysql转为csv、json导入到Doris数据库

2024-08-21 19:20

本文主要是介绍python 将mysql转为csv、json导入到Doris数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一篇文章是将mysql导出成csv文件,适合csv不是很大的情况,以下对脚本进行了优化,采用分块读取csv,降低了内存的使用率,提高了传输速度。

from pydoris.doris_client import DorisClient
import requests
import pandas as pd
import numpy as npfe_host = ''
username = ''
passwd = ''
fe_http_port = ""
fe_query_port = ""doris_client = DorisClient(fe_host=fe_host,fe_query_port=fe_query_port,fe_http_port=fe_http_port,username=username,password=passwd,db='zst_cep_model')# 由于导入的csv文件过大,都是五六G以上,所以使用chunksize 分块获取数据进行操作。
for key, chunk in enumerate(pd.read_csv('xxxx.csv', chunksize=10000, dtype=str)):# 这一步也是很迷,需要读csv,然后转为字符串类型csv = chunk.to_csv(header=False, index=False, sep=',').encode('utf-8')# table_name 必须的是database.table的形式success = doris_client.write('xxx.xxx', csv)if success:print("数据写入成功!")else:print("数据写入失败。", key)breakdel csv

后面需要进行增量更新,使用csv太麻烦,想着使用json传输可能会好点,并直接从mysql中获取数据直接传输到doris,以下是脚本。

import pandas as pd
from pydoris.doris_client import *from tools import *# 配置
config = {// Mysql的配置'mysql_config': {'host': '','port': ,'user': '','password': '','database': ''},// Doris的配置'doris_db_config': {'fe_host': '','username': '','passwd': '','fe_http_port': "",'fe_query_port': "",'db': ''},# 要传输的mysql的目标表'mysql_table': '',# 要接收的Doris的目标表'doris_table': '',
}# MysqlDataBaseClass 是自己编写的Mysql工具类,返回的是Mysql连接对象
yp_apidb = MysqlDataBaseClass(host=config['mysql_config']['host'], port=config['mysql_config']['port'], user=config['mysql_config']['user'],password=config['mysql_config']['password'], database=config['mysql_config']['database'])doris_client = DorisClient(fe_host=config['doris_db_config']['fe_host'],fe_query_port=config['doris_db_config']['fe_query_port'],fe_http_port=config['doris_db_config']['fe_http_port'],username=config['doris_db_config']['username'],password=config['doris_db_config']['passwd'],db=config['doris_db_config']['db'])def get_data_from_mysql(page=1, end_page=None, total_row_num=None, page_size=10000, limit_date='2024-08-12'):result = {'total_page': 0, 'total': 0, 'now_page': page, 'data': [], 'code': False, 'msg': ''}if total_row_num is None:select_res = yp_apidb.ExecuteSQL_Select(sql=f'''SELECT count(1) as total_num FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}';''')totalRowsNum = int(select_res[0]['total_num'])else:totalRowsNum = total_row_numif (totalRowsNum % page_size) == 0:totalPages = int(totalRowsNum / page_size)else:totalPages = int((totalRowsNum / page_size) + 1)result['total_page'] = totalPagesresult['total'] = totalRowsNumif end_page and page > end_page:result['msg'] = '已经达到设置的最后一页'return resultif page > totalPages:result['msg'] = '已经是最后一页'return resultstart_num = int((page - 1) * page_size)limit = f'{start_num}, {page_size}'sql = f'''SELECT * FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}' limit {limit};'''data_list = yp_apidb.ExecuteSQL_Select(sql)result['data'] = data_listresult['code'] = Trueresult['msg'] = '获取成功'return resultdef insert_to_doris(data_list):if len(data_list) > 0:df = pd.DataFrame(data_list)json_data = df.to_json(orient='records')options = WriteOptions()options.set_json_format()options.set_option("strip_outer_array", "true")success = doris_client.write(f"{config['doris_table']}", json_data, options=options)if success:return Trueelse:print("数据写入失败。")return Falseif __name__ == '__main__':page = 1total_row_num = Nonelimit_date = '2024-08-01'# 循环获取下一页,从而达到自动翻页的功能while True:res = get_data_from_mysql(page=page, total_row_num=total_row_num, limit_date=limit_date)print(res['msg'], res['total_page'], res['total'], res['now_page'])total_row_num = res['total']if res['code']:data_list = res['data']flage = insert_to_doris(data_list)if flage is False:breakpage += 1else:print(res['msg'], page)break

以上脚本仅供学习参考,仅为实现临时功能而编写,还有优化的空间。

这篇关于python 将mysql转为csv、json导入到Doris数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1094054

相关文章

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

一文详解Python如何开发游戏

《一文详解Python如何开发游戏》Python是一种非常流行的编程语言,也可以用来开发游戏模组,:本文主要介绍Python如何开发游戏的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、python简介二、Python 开发 2D 游戏的优劣势优势缺点三、Python 开发 3D

Python函数作用域与闭包举例深度解析

《Python函数作用域与闭包举例深度解析》Python函数的作用域规则和闭包是编程中的关键概念,它们决定了变量的访问和生命周期,:本文主要介绍Python函数作用域与闭包的相关资料,文中通过代码... 目录1. 基础作用域访问示例1:访问全局变量示例2:访问外层函数变量2. 闭包基础示例3:简单闭包示例4

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Python版本与package版本兼容性检查方法总结

《Python版本与package版本兼容性检查方法总结》:本文主要介绍Python版本与package版本兼容性检查方法的相关资料,文中提供四种检查方法,分别是pip查询、conda管理、PyP... 目录引言为什么会出现兼容性问题方法一:用 pip 官方命令查询可用版本方法二:conda 管理包环境方法

深入理解Mysql OnlineDDL的算法

《深入理解MysqlOnlineDDL的算法》本文主要介绍了讲解MysqlOnlineDDL的算法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小... 目录一、Online DDL 是什么?二、Online DDL 的三种主要算法2.1COPY(复制法)

基于Python开发Windows自动更新控制工具

《基于Python开发Windows自动更新控制工具》在当今数字化时代,操作系统更新已成为计算机维护的重要组成部分,本文介绍一款基于Python和PyQt5的Windows自动更新控制工具,有需要的可... 目录设计原理与技术实现系统架构概述数学建模工具界面完整代码实现技术深度分析多层级控制理论服务层控制注

mysql8.0.43使用InnoDB Cluster配置主从复制

《mysql8.0.43使用InnoDBCluster配置主从复制》本文主要介绍了mysql8.0.43使用InnoDBCluster配置主从复制,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录1、配置Hosts解析(所有服务器都要执行)2、安装mysql shell(所有服务器都要执行)3、