python 将mysql转为csv、json导入到Doris数据库

2024-08-21 19:20

本文主要是介绍python 将mysql转为csv、json导入到Doris数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一篇文章是将mysql导出成csv文件,适合csv不是很大的情况,以下对脚本进行了优化,采用分块读取csv,降低了内存的使用率,提高了传输速度。

from pydoris.doris_client import DorisClient
import requests
import pandas as pd
import numpy as npfe_host = ''
username = ''
passwd = ''
fe_http_port = ""
fe_query_port = ""doris_client = DorisClient(fe_host=fe_host,fe_query_port=fe_query_port,fe_http_port=fe_http_port,username=username,password=passwd,db='zst_cep_model')# 由于导入的csv文件过大,都是五六G以上,所以使用chunksize 分块获取数据进行操作。
for key, chunk in enumerate(pd.read_csv('xxxx.csv', chunksize=10000, dtype=str)):# 这一步也是很迷,需要读csv,然后转为字符串类型csv = chunk.to_csv(header=False, index=False, sep=',').encode('utf-8')# table_name 必须的是database.table的形式success = doris_client.write('xxx.xxx', csv)if success:print("数据写入成功!")else:print("数据写入失败。", key)breakdel csv

后面需要进行增量更新,使用csv太麻烦,想着使用json传输可能会好点,并直接从mysql中获取数据直接传输到doris,以下是脚本。

import pandas as pd
from pydoris.doris_client import *from tools import *# 配置
config = {// Mysql的配置'mysql_config': {'host': '','port': ,'user': '','password': '','database': ''},// Doris的配置'doris_db_config': {'fe_host': '','username': '','passwd': '','fe_http_port': "",'fe_query_port': "",'db': ''},# 要传输的mysql的目标表'mysql_table': '',# 要接收的Doris的目标表'doris_table': '',
}# MysqlDataBaseClass 是自己编写的Mysql工具类,返回的是Mysql连接对象
yp_apidb = MysqlDataBaseClass(host=config['mysql_config']['host'], port=config['mysql_config']['port'], user=config['mysql_config']['user'],password=config['mysql_config']['password'], database=config['mysql_config']['database'])doris_client = DorisClient(fe_host=config['doris_db_config']['fe_host'],fe_query_port=config['doris_db_config']['fe_query_port'],fe_http_port=config['doris_db_config']['fe_http_port'],username=config['doris_db_config']['username'],password=config['doris_db_config']['passwd'],db=config['doris_db_config']['db'])def get_data_from_mysql(page=1, end_page=None, total_row_num=None, page_size=10000, limit_date='2024-08-12'):result = {'total_page': 0, 'total': 0, 'now_page': page, 'data': [], 'code': False, 'msg': ''}if total_row_num is None:select_res = yp_apidb.ExecuteSQL_Select(sql=f'''SELECT count(1) as total_num FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}';''')totalRowsNum = int(select_res[0]['total_num'])else:totalRowsNum = total_row_numif (totalRowsNum % page_size) == 0:totalPages = int(totalRowsNum / page_size)else:totalPages = int((totalRowsNum / page_size) + 1)result['total_page'] = totalPagesresult['total'] = totalRowsNumif end_page and page > end_page:result['msg'] = '已经达到设置的最后一页'return resultif page > totalPages:result['msg'] = '已经是最后一页'return resultstart_num = int((page - 1) * page_size)limit = f'{start_num}, {page_size}'sql = f'''SELECT * FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}' limit {limit};'''data_list = yp_apidb.ExecuteSQL_Select(sql)result['data'] = data_listresult['code'] = Trueresult['msg'] = '获取成功'return resultdef insert_to_doris(data_list):if len(data_list) > 0:df = pd.DataFrame(data_list)json_data = df.to_json(orient='records')options = WriteOptions()options.set_json_format()options.set_option("strip_outer_array", "true")success = doris_client.write(f"{config['doris_table']}", json_data, options=options)if success:return Trueelse:print("数据写入失败。")return Falseif __name__ == '__main__':page = 1total_row_num = Nonelimit_date = '2024-08-01'# 循环获取下一页,从而达到自动翻页的功能while True:res = get_data_from_mysql(page=page, total_row_num=total_row_num, limit_date=limit_date)print(res['msg'], res['total_page'], res['total'], res['now_page'])total_row_num = res['total']if res['code']:data_list = res['data']flage = insert_to_doris(data_list)if flage is False:breakpage += 1else:print(res['msg'], page)break

以上脚本仅供学习参考,仅为实现临时功能而编写,还有优化的空间。

这篇关于python 将mysql转为csv、json导入到Doris数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1094054

相关文章

一文教你Python如何快速精准抓取网页数据

《一文教你Python如何快速精准抓取网页数据》这篇文章主要为大家详细介绍了如何利用Python实现快速精准抓取网页数据,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录1. 准备工作2. 基础爬虫实现3. 高级功能扩展3.1 抓取文章详情3.2 保存数据到文件4. 完整示例

MySQL 多表连接操作方法(INNER JOIN、LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN)

《MySQL多表连接操作方法(INNERJOIN、LEFTJOIN、RIGHTJOIN、FULLOUTERJOIN)》多表连接是一种将两个或多个表中的数据组合在一起的SQL操作,通过连接,... 目录一、 什么是多表连接?二、 mysql 支持的连接类型三、 多表连接的语法四、实战示例 数据准备五、连接的性

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

MySQL中的分组和多表连接详解

《MySQL中的分组和多表连接详解》:本文主要介绍MySQL中的分组和多表连接的相关操作,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录mysql中的分组和多表连接一、MySQL的分组(group javascriptby )二、多表连接(表连接会产生大量的数据垃圾)MySQL中的

基于Python打造一个智能单词管理神器

《基于Python打造一个智能单词管理神器》这篇文章主要为大家详细介绍了如何使用Python打造一个智能单词管理神器,从查询到导出的一站式解决,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 项目概述:为什么需要这个工具2. 环境搭建与快速入门2.1 环境要求2.2 首次运行配置3. 核心功能使用指

Python实现微信自动锁定工具

《Python实现微信自动锁定工具》在数字化办公时代,微信已成为职场沟通的重要工具,但临时离开时忘记锁屏可能导致敏感信息泄露,下面我们就来看看如何使用Python打造一个微信自动锁定工具吧... 目录引言:当微信隐私遇到自动化守护效果展示核心功能全景图技术亮点深度解析1. 无操作检测引擎2. 微信路径智能获

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

利用Python打造一个Excel记账模板

《利用Python打造一个Excel记账模板》这篇文章主要为大家详细介绍了如何使用Python打造一个超实用的Excel记账模板,可以帮助大家高效管理财务,迈向财富自由之路,感兴趣的小伙伴快跟随小编一... 目录设置预算百分比超支标红预警记账模板功能介绍基础记账预算管理可视化分析摸鱼时间理财法碎片时间利用财

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息