Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

2025-03-27 02:50

本文主要是介绍Python实现将MySQL中所有表的数据都导出为CSV文件并压缩,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到...

pythonmysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个目录下,然后解压缩这个目录中的所有zip文件到第三个目录下。不使用Pandas库,需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

脚本已在考虑大数据量、异常处理和性能优化的基础上进行了全面设计,能够处理大多数常见场景。根据具体需求可进一步调整批量大小(BATch_size)和线程数(max_workers)以获得最佳性能。

import os
import csv
import zipfile
import logging
import mysql.connector
from datetime import datetime
import time
import concurrent.futures
import glob

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('data_export.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000):
    """导出单个表的数据到CSV文件,分批处理"""
    conn = None
    cursor = None
    total_rows = 0
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()

        # 获取数据并写入CSV
        with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            
            # 执行查询并获取列名
            cursor.execute(f"SELECT * FROM `{table_name}`")
            columns = [col[0] for col in cursor.description]
            writer.writerow(columns)
            
            # 分批获取数据
            while True:
                rows = cursor.fetchmany(batch_size)
                if not rows:
                    break
                writer.writerows(rows)
                total_rows += len(rows)
                logger.debug(f"{table_name} 已导出 {total_rows} 行")

        logger.info(f"{table_name} CSV导出完成,总行数:{total_rows}")
        return total_rows

    except Exception as e:
        logger.error(f"导出表 {table_name} 失败: {str(e)}", exc_info=True)
        raise
    finally:
        if cursor:
            cursor.close()
        if conn and conn.is_connected():
            conn.close()

def compress_to_zip(source_path, zip_path):
    """压缩文件为ZIP格式"""
    try:
     China编程   with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            zipf.write(source_path, arcname=os.path.basename(source_path))
        logger.info(f"成功压缩 {source_path} 到 {zip_path}")
    except Exception as e:
        logger.error(f"压缩 {source_path} 失败: {str(e)}", exc_info=True)
        raise

def process_table(table_name, db_config, csv_dir, zip_dir):
    """处理单个表的导出和压缩"""
    start_time = time.time()
    logger.info(f"开始处理表: {table_name}")
    status = "成功"
    rows_exported = 0

    try:
        # 定义文件路径
        csv_filename = f"{table_name}.csv"
        zip_filename = f"{table_name}.zip"
        csv_path = os.path.join(csv_dir, csv_filename)
        zip_path = os.path.join(zip_dir, zip_filename)

        # 导出CSV
        rows_exported = export_table_to_csv(table_name, csv_path, db_config)
        
        # 压缩文件
        compress_to_zip(csv_path, zip_path)

    except Exception as e:
        status = f"失败: {str(e)}"
        # 清理可能存在的中间文件
        for path in [csv_path, zip_path]:
            if path and os.path.exists(path):
                try:
                    os.remove(path)
                    logger.warning(f"已清理文件: {path}")
                except Exception as clean_error:
                    logger.error(f"清理文件失败: {clean_error}")

    finally:
        duration = time.time() - start_time
        log_message = (
            f"表处理完成 - 表名: {table_name}, "
            f"状态: {status}, "
            f"导出行数: {rows_exported}, "
            f"耗时: {duration:.2f}秒"
        )
        logger.info(log_message)

def unzip_files(zip_dir, unzip_dir):
    """解压指定目录中的所有ZIP文件"""
    zip_files = glob.glob(os.path.join(zip_dir, '*.zip'))
    if not zip_files:
        logger.warning("未找到ZIP文件,跳过解压")
        return

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for zip_path in zip_files:
            futures.append(executor.submit(
                lambda: extract_zip(zip_path, unzip_dir)
            ))
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                logger.error(f"解压过程中发生错误: {str(e)}")

def extract_zip(zip_path, unzip_dir):
    """解压单个ZIP文件"""
    try:
        start_time = time.time()
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(unzip_dir)
        duration = time.time() - start_time
        logger.info(f"解压完成: {zip_path} => {unzip_dir} (耗时: {duration:.2f}秒)")
    except Exception as e:
        logger.error(f"解压 {zip_path} 失败: {str(e)}", exc_info=True)
        raise

def main():
    # 配置参数
    db_config = {
        'host': 'localhost',
        'user': 'your_username',
        'password': 'your_password',
        'database': 'your_database'
    }
    
    # 目录配置
    base_dir = os.path.dirname(os.path.abspath(__file__))
    csv_dir = os.path.join(base_dir, 'csv_exports')
    zip_dir = os.path.join(base_dir, 'zip_archives')
    unzip_dir = os.path.join(base_dir, 'unzipped_files')

    # 创建目录
    for dir_path in [csv_dir, zip_dir, unzip_dir]:
        os.makedirs(dir_path, exist_ok=True)
        logger.info(f"目录已准备: {dir_path}")

    # 获取所有表名
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursorjs()
        cursor.execute("SHOW TABLES")
        tables = [table[0] for table in cursor.fetchall()]
        logger.info(f"发现 {len(tables)} 个需要处理的表")
    except Exception as e:
        logger.error(f"获取数据库表失败: {str(e)}", exc_info=True)
        return
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals() and conn.is_connected():
            conn.close()

    # 处理所有表(多线程导出和压缩)
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = []
        for table in tables:
            futures.append(executor.submit(
                process_table,
                table,
                db_config,
               China编程 csv_dir,
                zip_dir
            ))

        # 处理任务结果
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                logger.error(f"表处理异常: {str(e)}")

    # 解压所有ZIP文件(多线程解压)
    logger.info("开始解压所有ZIP文件")
    unzip_files(zip_dir, unzip_dir)
    logger.info("全部处理流程完成")

if __name__ == "__main__":
    main()

关键特性说明:

1.分批处理大数据:

  • 使用fetchmany(batch_size)分批获取数据(默认每批1000行)
  • 流式处理减少内存占用

2.多线程处理:

  • 使用ThreadPoolExecutor并行处理不同表的导出和压缩
  • 独立的数据库连接池(每个线程有自己的连接)
  • 并行解压处理

3.异常处理:

  • 全面的try-http://www.chinasem.cnexcept块覆盖所有关键操作
  • 自动清理失败时产生的中间文件
  • 详细的错误日志记录(包含堆栈跟踪)

4.日志记录:

  • 同时输出到文件和终端
  • 记录时间戳、操作类型、状态、耗时等关键信息
  • 包含每个表的处理结果统计

5.文件管理:

  • 自动创建所需目录
  • 使用ZIP_DEFLATED进行高效压缩
  • 安全的文件路径处理

6.性能优化:

  • 使用服务器端游标避免内存过载
  • 可配置的批量大小和线程数
  • 异步I/O操作

使用说明:

安装依赖:

pip install mysql-connector-python

修改配置:

更新db_config中的数据库连接信息

根据需要调整目录路径(csv_dir, zip_dir, unzip_dir)

运行脚本:

python script.py

查看日志:

实时终端输出

详细日志文件data_export.log

扩展建议:

通过命令行参数接受数据库配置和目录路径

添加邮件通知功能(处理完成或失败时通知)

实现断点续传功能

添加文件校验(MD5校验和)

支持配置文件(YAML/jsON格式)

添加进度条显示

到此这篇关于PnlHKgwAzbHython实现将MySQL中所有表的数据都导出为CSV文件并压缩的文章就介绍到这了,更多相关Python MySQL数据导出为CSV内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于Python实现将MySQL中所有表的数据都导出为CSV文件并压缩的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1153956

相关文章

Nexus安装和启动的实现教程

《Nexus安装和启动的实现教程》:本文主要介绍Nexus安装和启动的实现教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Nexus下载二、Nexus安装和启动三、关闭Nexus总结一、Nexus下载官方下载链接:DownloadWindows系统根

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

MySQL 衍生表(Derived Tables)的使用

《MySQL衍生表(DerivedTables)的使用》本文主要介绍了MySQL衍生表(DerivedTables)的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学... 目录一、衍生表简介1.1 衍生表基本用法1.2 自定义列名1.3 衍生表的局限在SQL的查询语句select

MySQL 横向衍生表(Lateral Derived Tables)的实现

《MySQL横向衍生表(LateralDerivedTables)的实现》横向衍生表适用于在需要通过子查询获取中间结果集的场景,相对于普通衍生表,横向衍生表可以引用在其之前出现过的表名,本文就来... 目录一、横向衍生表用法示例1.1 用法示例1.2 使用建议前面我们介绍过mysql中的衍生表(From子句

Mybatis的分页实现方式

《Mybatis的分页实现方式》MyBatis的分页实现方式主要有以下几种,每种方式适用于不同的场景,且在性能、灵活性和代码侵入性上有所差异,对Mybatis的分页实现方式感兴趣的朋友一起看看吧... 目录​1. 原生 SQL 分页(物理分页)​​2. RowBounds 分页(逻辑分页)​​3. Page

六个案例搞懂mysql间隙锁

《六个案例搞懂mysql间隙锁》MySQL中的间隙是指索引中两个索引键之间的空间,间隙锁用于防止范围查询期间的幻读,本文主要介绍了六个案例搞懂mysql间隙锁,具有一定的参考价值,感兴趣的可以了解一下... 目录概念解释间隙锁详解间隙锁触发条件间隙锁加锁规则案例演示案例一:唯一索引等值锁定存在的数据案例二:

一文全面详解Python变量作用域

《一文全面详解Python变量作用域》变量作用域是Python中非常重要的概念,它决定了在哪里可以访问变量,下面我将用通俗易懂的方式,结合代码示例和图表,带你全面了解Python变量作用域,需要的朋友... 目录一、什么是变量作用域?二、python的四种作用域作用域查找顺序图示三、各作用域详解1. 局部作

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛