大型文件数据读取并持久化到数据库

2024-03-06 19:36

本文主要是介绍大型文件数据读取并持久化到数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

产品经理今天给了一个上亿数据的文本文件给我,让我把导入到mysql数据库。
文本的内容很简单,只有一个字段,但有1亿行。
在这里插入图片描述
我拿到文件后最开始直接用navicat工具直接导入,但发现效率极慢,跑了一分多钟,才导进去10W+数据进去,算下来要跑完至少需要20多个小时,时间不允许。
看来只能自己写代码来提升效率了。
常规的做法肯定是把文件内容按行读取出来,然后每N条拆分一批,再插入到数据库中。但这个文件太大,一次性全部读取到内存中,对机器有点压力。所以只能按批来读取,一边读一边写,已经持久化的数据就及时释放掉,避免一直占用内存。哎!LinkedBlockingQueue 就很适合干这个事。

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.LineIter;
import cn.hutool.core.io.FileUtil;
import com.yc.kfpt.oversea.dao.entity.SourceCode;
import com.yc.kfpt.oversea.dao.repository.SourceCodeRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.io.BufferedReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @author 敖癸* @formatter:on* @since 2024/3/6*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ImportDataService {private final SourceCodeRepository sourceCodeRepository;private final Executor asyncExecutor;@Asyncpublic void importData() {LinkedBlockingQueue<String> codeQueue = new LinkedBlockingQueue<>(500000);// 监听器线程queueListener(codeQueue).start();readFile("D:\\91-240305-j000.txt", codeQueue);}/*** 创建队列监听器** @param codeQueue* @return java.lang.Thread* @author 敖癸* @since 2024/3/6 - 16:41*/private Thread queueListener(LinkedBlockingQueue<String> codeQueue) {return new Thread(() -> {long index = 0;List<String> codes = new ArrayList<>();while (true) {try {String code = codeQueue.poll(5, TimeUnit.SECONDS);// 如果超过5秒从队列中还没获取到数据,就认为已经没有数据了if (code == null) {if (CollUtil.isNotEmpty(codes)) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}break;}index++;codes.add(code);// 5000一个批次if (codes.size() == 5000) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);// 持久化操作扔到线程池中异步去执行,可以多开点线程数量。asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}} catch (InterruptedException e) {throw new RuntimeException(e);}}});}/*** 文件读取** @param codeQueue* @author 敖癸* @since 2024/3/6 - 16:41*/private static void readFile(String filePath, LinkedBlockingQueue<String> codeQueue) {BufferedReader reader = FileUtil.getReader(filePath, Charset.defaultCharset());int readCount = 0;  // 读取行数计数try (LineIter lineIter = new LineIter(reader)) {while (lineIter.hasNext()) {readCount++;// 如果codeQueue中的元素个数已达上限,这里会阻塞codeQueue.put(lineIter.next());if (readCount % 50000 == 0) {log.info("已读取{}行", readCount);}}} catch (Exception e) {log.error("文件读取异常", e);}log.info("读取完成,供{}行", readCount);}/*** 将行数据转换成数据库对象** @param codes* @return java.util.List<com.yc.kfpt.oversea.dao.entity.SourceCode>* @author 敖癸* @since 2024/3/6 - 16:43*/private static List<SourceCode> convertToEntity(List<String> codes) {return codes.stream().map(SourceCode::new).collect(Collectors.toList());}
}

实测,1亿数据量,大概花了20分钟导入完成。
这里需要注意的知识点:
LinkedBlockingQueue 的 put 方法,如果队列已满,会阻塞等待,直到队列中腾出空位。
LinkedBlockingQueue 的 poll 方法,可以设置超时时间,在等待超时后如果在队列中还是没有拿到数据,就返回null。
注意 take, add, offer, remove,poll,put的使用区别
注意 take, add, offer, remove,poll,put的使用区别

关于 LinkedBlockingQueue 的详解,可以参考一下这位博主的文章
深入理解Java系列 | LinkedBlockingQueue用法详解

这篇关于大型文件数据读取并持久化到数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/781094

相关文章

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Oracle数据库定时备份脚本方式(Linux)

《Oracle数据库定时备份脚本方式(Linux)》文章介绍Oracle数据库自动备份方案,包含主机备份传输与备机解压导入流程,强调需提前全量删除原库数据避免报错,并需配置无密传输、定时任务及验证脚本... 目录说明主机脚本备机上自动导库脚本整个自动备份oracle数据库的过程(建议全程用root用户)总结

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

Python使用openpyxl读取Excel的操作详解

《Python使用openpyxl读取Excel的操作详解》本文介绍了使用Python的openpyxl库进行Excel文件的创建、读写、数据操作、工作簿与工作表管理,包括创建工作簿、加载工作簿、操作... 目录1 概述1.1 图示1.2 安装第三方库2 工作簿 workbook2.1 创建:Workboo

虚拟机Centos7安装MySQL数据库实践

《虚拟机Centos7安装MySQL数据库实践》用户分享在虚拟机安装MySQL的全过程及常见问题解决方案,包括处理GPG密钥、修改密码策略、配置远程访问权限及防火墙设置,最终通过关闭防火墙和停止Net... 目录安装mysql数据库下载wget命令下载MySQL安装包安装MySQL安装MySQL服务安装完成