JAVA实现亿级千万级数据顺序导出的示例代码

2025-09-21 12:50

本文主要是介绍JAVA实现亿级千万级数据顺序导出的示例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《JAVA实现亿级千万级数据顺序导出的示例代码》本文主要介绍了JAVA实现亿级千万级数据顺序导出的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面...

前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序OOM问题。

实现思路:

A.启用线程池,分页读取数据

B.使用 PriorityblockingQueue 队列存储查询出来的数据,方便写入线程去

优先级队列特性 PriorityBlockingQueue是一个优先级队列,这意味着它里面的元素是按照某种优先级顺序进行排列的。元素的优先级是通过元素自身的自然顺序(如果元素实现了Comparable接口)或者通过一个自定义的比较器(Comparator)来确定的。 当从队列中获取元素时,具有最高优先级的元素会被首先返回。例如,在一个存储任务的PriorityBlockingQueue中,紧急任务可以被定义为具有较高优先级,这样它们就能在普通任务之前被执行。 阻塞队列特性 作为一个阻塞队列,PriorityBlockingQueue提供了阻塞操作。当队列为空时,试图从队列中获取元素的线程会被阻塞,直到队列中有可用的元素。 同样,当队列已满(不过PriorityBlockingQueue在理论上是无界的,这个情况比较特殊,后面会详细说)时,试图向队列中添加元素的线程会被阻塞,直到队列中有足够的空间。这种阻塞特性使得它在多线程环境下能够有效地协调生产者 - 消费者模式

C.开启单独的一个线程,做读取写入,利用join()方法 等待所有写入结束,直接返回。

CompletableFuture.runAsync(() -> System.out.println("执行一个异步任务"));

D.利用 CountDownLatch 做读取数据任务阻塞。

E.字典转换是用的反射。

以下是代码实现

1.配置线程池

/**
 * 导出线程池配置
 */
@Configuration
@Slf4j
public class ThreadPoolExportExecutorConfig {
 
    @Bean("ExportServiceExecutor")
    @Primary
    public Executor exportServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(12);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //空闲时间
        executor.setKeepAliveSeconds(60);
        //配置队列大小
        executor.setQueueCapacity(100);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("ExportThread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

2.转换工具类

public class PojoInfoUtil {
 
    /**
     * ListDTO转换
     *
     * @param <E>       entity类
     * @param <D>       DTO类
     * @param listInfoE listInfoE<E>类对象
     * @return List<D>  转换后List<D>
     */
    public static <E, D> List<D> listInfoToDTO(List<E> listInfo, Class<D> dtoClass) {
 
        if (CollectionUtils.isEmpty(listInfo)) {
            return Lists.newArrayList();
        }
 
        // 创建 ModelMapper 实例
        ModelMapper modelMapper = new ModelMapper();
 
        /python/ 设置匹配策略为基于字段名称
        modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
 
        // 使用流和 ModelMapper 进行转换
        List<D> list = listInfo.stream()
                .map(entity -> modelMapper.map(entity, dtoClass))
   China编程             .collect(Collectors.toList());
 
        // 释放旧数据
        listInfo.clear();
 
        return list;
    }

3.导出工具类

@Component
public class MultiThreadExportUtil {
 
    @Resource(name = "ExportServiceExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
    @Autowired
    private DicAdapter dicAdapter;
 
    /**
     *
     * @param exportClazz 导出实体类
     * @param dictFieldNames 字典转换字段
     * @param fileName 导出文件名称
     * @param SheetNamePre sheet页名前缀
     * @param service 查询接口类
     * @param queryWrapper 查询条件
     */
    public void exprtBythread(Class<?> exportClazz,List<String> dictFieldNames,String fileName,String SheetNamePre,IService service, LambdaQueryWrapper<?> queryWrapper){
        HttpServletResponse response = HttpServletUtil.getResponse();
        Map<String, String> threadExportLimit = dicAdapter.getBasicInfoDicMapInfo("threadExportLimit");
 
        if(ObjectUtils.isEmpty(threadExportLimit) || !threadExportLimit.containsKey("limit") || !threadExportLimit.containsKey("maxLimit")){
            throw new BizException("请提前配置导出数量限制");
        }
 
        //每次请求限制条数
        int limit = Integer.parseInt(threadExportLimit.get("limit"));
        //最大限制条数
        int maxLimit = Integer.parseInt(threadExportLimit.get("maxLimit"));
 
        int count = service.count(queryWrapper);
 
        if(count > maxLimit) throw new BizException("导出条数超出最大条数" + maxLimit + "限制,请调整查询条件");
 
 
        //设置响应头
        response.setContentType("application/vnd.ms-excel");
        response.setCharacterEncoding("utf-8");
        try {
            String name = URLEncoder.encode(fileName, "UTF-8");
            response.setHeader("Content-disposition", "attachment;filename=" + name + ".xlsx");
        } catch (Exception e) {
            throw new BizException(e.getMessage());
        }
 
        // 查询次数
        int i = (count + limit - 1) / limit;
 
        AtomicReference<ExcelWriter> excelWriterRef = new AtomicReference<>();
        ServletOutputStream outputStream = null;
 
        try {
            outputStream = response.getOutputStream();
            excelWriterRef.set(EasyExcel.write(outputStream, exportClazz).build());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
 
        if(i == 0){
            WriteSheet writeSheet = EasyExcel.writerSheet(SheetNamePre).head(exportClazz)
                    .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()).build();
            excelWriterRef.get().write(null, writeSheet);
            excelWriterRef.get().finish();
            returnhEppHNVRP;
        }
 
 
        //计数器
        CountDownLatch countDownLatch = new CountDownLatch(i);
 
 
        // 使用 PriorityBlockingQueue 作为查询结果的缓冲区
        BlockingQueue<ThreadQueryResult> resultQueue = new PriorityBlockingQueue<ThreadQueryResult>();
 
 
        for(int j = 0; j < i; j++){
 
            int pageNo =  j + 1;
            String sheetName = SheetNamePre + pageNo;
            threadPoolTaskExecutor.execute(()->{
                try {
 
                    IPage<T> page = service.page(new Page<>(pageNo, limit), queryWrapper);
 
                    List<T> records = page.getRecords();
 
                    //转换
                    List<?> exportList = PojoInfoUtil.listInfoToDTO(records, exportClazz);
                    //处理字典数据
                    convertFieldwithDictionary(exportList,dictFieldNames);
 
                    resultQueue.put(new ThreadQueryResult(pageNo, sheetName, exportList));
 
                }catch (Exception e){
                    e.printStackTrace();
                    LogUtil.info("查询第" + pageNo + "页时,发生异常" + e.getMessage());
                    throw new BizException("查询第" + pageNo + "页时,发生异常" + e.getMessage());
                }
            });
            countDownLatch.countDown();
        }
 
        // 启动一个写入线程  按照查询顺序读取 写入
        CompletableFuture<Void> writeFuture  = CompletableFuture.runAsync(() -> {
            for (int pageNo = 1; pageNo <= i; ) {
                boolean interrupted = false;
                try {
                    while (true) {
                        ThreadQueryResult result = resultQueue.take();
                        if (result.getPageNo() == pageNo) {
 
                            WriteSheet writeSheet = EasyExcel.writerSheet(result.getSheetName()).head(exportClazz)
                                    .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()).build();
 
                            ExcelWriter excelWriter = excelWriterRef.get();
                            excelWriter.write(result.getRecords(), writeSheet);
                            result.getRecords().clear(); //及时释放写入数据
 
                            pageNo++;
                            break;
                        } else {
                            // 如果不是当前需要的 pageNo,则放回队列
                            resultQueue.put(result);
                        }
                    }
                } catch (InterruptedException e) {
                    interrupted = true;
                    LogUtil.error("写入线程被中断: " + e.getMessage());
                } finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt(); // 重新设置中断标志
                    }
                }
            }
        }, threadPoolTaskExecutor);
 
        // 等待所有任务完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //等待写入线程完成
        writeFuture.join();
 
        // 关闭 ExcelWriter
        ExcelWriter excelWriter = excelWriterRef.get();
        if (excelWriter != null) {
            excelWriter.finish();
        }
    }
 
 
 
 
    /**
     * 对给定的对象列表中的指定字段进行字典转换。
     *
     * @param records   要转换的对象列表
     * @param fieldDictNames 字段名列表(带前缀)
     * @param <T>       对象类型
     */
    public <T> void convertFieldWithDictionary(List<T> records, List<String> fieldDictNames) {
        if (CollectionUtils.isEmpty(records) || CollectionUtils.isEmpty(fieldDictNames)) {
            return;
        }
 
        Map<String, Map<String, String>> dictMap = dicAdapter.handleStaticBasicInfoDicMap();
        try {
            for (String fieldDictName : fieldDictNames) {
                //前缀区分各实体类字段
                if(StringUtils.isBlank(fieldDictName) || !fieldDictName.contains("-")){
                    return;
                }
                String fieldName = fieldDictName.split("-")[1];
           hEppHNVRP     for (T record : records) {
 
                    Field field = record.getClass().getDeclaredField(fieldName);
                    field.setAccessible(true);
 
                    Object fieldValue = field.get(record);
                    if (fieldValue != null && dictMap.containsKey(fieldDictName)) {
 
                        Map<String, String> dictValueMap = dictMap.get(fieldDictName);
 
                        if (dictValueMap != null && dictValueMap.containsKey(fieldValue)) {
                            String dictValue = dictValueMappython.get(fieldValue);
                            if (dictValue != null) {
                                field.set(record, dictValue);
                            }
                        }
                    }
                }
            }
        } catch(Exception e){
            LogUtil.error("导出转换字典失败,请联系管理员" + e);
            throw new BizException("导出转换字典失败,请联系管理员"+ e);
        }
    }
}

到此这篇关于Java实现亿级千万级数据顺序导出的示例代码的文章就介绍到这了,更多相关JAVA 亿级千万级顺序导出内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于JAVA实现亿级千万级数据顺序导出的示例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155955

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三