本文主要是介绍JAVA实现亿级千万级数据顺序导出的示例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《JAVA实现亿级千万级数据顺序导出的示例代码》本文主要介绍了JAVA实现亿级千万级数据顺序导出的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面...
前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序OOM问题。
实现思路:
A.启用线程池,分页读取数据
B.使用 PriorityblockingQueue 队列存储查询出来的数据,方便写入线程去
优先级队列特性 PriorityBlockingQueue是一个优先级队列,这意味着它里面的元素是按照某种优先级顺序进行排列的。元素的优先级是通过元素自身的自然顺序(如果元素实现了Comparable接口)或者通过一个自定义的比较器(Comparator)来确定的。 当从队列中获取元素时,具有最高优先级的元素会被首先返回。例如,在一个存储任务的PriorityBlockingQueue中,紧急任务可以被定义为具有较高优先级,这样它们就能在普通任务之前被执行。 阻塞队列特性 作为一个阻塞队列,PriorityBlockingQueue提供了阻塞操作。当队列为空时,试图从队列中获取元素的线程会被阻塞,直到队列中有可用的元素。 同样,当队列已满(不过PriorityBlockingQueue在理论上是无界的,这个情况比较特殊,后面会详细说)时,试图向队列中添加元素的线程会被阻塞,直到队列中有足够的空间。这种阻塞特性使得它在多线程环境下能够有效地协调生产者 - 消费者模式
C.开启单独的一个线程,做读取写入,利用join()方法 等待所有写入结束,直接返回。
CompletableFuture.runAsync(() -> System.out.println("执行一个异步任务"));
D.利用 CountDownLatch 做读取数据任务阻塞。
E.字典转换是用的反射。
以下是代码实现
1.配置线程池
/** * 导出线程池配置 */ @Configuration @Slf4j public class ThreadPoolExportExecutorConfig { @Bean("ExportServiceExecutor") @Primary public Executor exportServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(12); //配置最大线程数 executor.setMaxPoolSize(20); //空闲时间 executor.setKeepAliveSeconds(60); //配置队列大小 executor.setQueueCapacity(100); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("ExportThread-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } }
2.转换工具类
public class PojoInfoUtil { /** * ListDTO转换 * * @param <E> entity类 * @param <D> DTO类 * @param listInfoE listInfoE<E>类对象 * @return List<D> 转换后List<D> */ public static <E, D> List<D> listInfoToDTO(List<E> listInfo, Class<D> dtoClass) { if (CollectionUtils.isEmpty(listInfo)) { return Lists.newArrayList(); } // 创建 ModelMapper 实例 ModelMapper modelMapper = new ModelMapper(); /python/ 设置匹配策略为基于字段名称 modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT); // 使用流和 ModelMapper 进行转换 List<D> list = listInfo.stream() .map(entity -> modelMapper.map(entity, dtoClass)) China编程 .collect(Collectors.toList()); // 释放旧数据 listInfo.clear(); return list; }
3.导出工具类
@Component public class MultiThreadExportUtil { @Resource(name = "ExportServiceExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired private DicAdapter dicAdapter; /** * * @param exportClazz 导出实体类 * @param dictFieldNames 字典转换字段 * @param fileName 导出文件名称 * @param SheetNamePre sheet页名前缀 * @param service 查询接口类 * @param queryWrapper 查询条件 */ public void exprtBythread(Class<?> exportClazz,List<String> dictFieldNames,String fileName,String SheetNamePre,IService service, LambdaQueryWrapper<?> queryWrapper){ HttpServletResponse response = HttpServletUtil.getResponse(); Map<String, String> threadExportLimit = dicAdapter.getBasicInfoDicMapInfo("threadExportLimit"); if(ObjectUtils.isEmpty(threadExportLimit) || !threadExportLimit.containsKey("limit") || !threadExportLimit.containsKey("maxLimit")){ throw new BizException("请提前配置导出数量限制"); } //每次请求限制条数 int limit = Integer.parseInt(threadExportLimit.get("limit")); //最大限制条数 int maxLimit = Integer.parseInt(threadExportLimit.get("maxLimit")); int count = service.count(queryWrapper); if(count > maxLimit) throw new BizException("导出条数超出最大条数" + maxLimit + "限制,请调整查询条件"); //设置响应头 response.setContentType("application/vnd.ms-excel"); response.setCharacterEncoding("utf-8"); try { String name = URLEncoder.encode(fileName, "UTF-8"); response.setHeader("Content-disposition", "attachment;filename=" + name + ".xlsx"); } catch (Exception e) { throw new BizException(e.getMessage()); } // 查询次数 int i = (count + limit - 1) / limit; AtomicReference<ExcelWriter> excelWriterRef = new AtomicReference<>(); ServletOutputStream outputStream = null; try { outputStream = response.getOutputStream(); excelWriterRef.set(EasyExcel.write(outputStream, exportClazz).build()); } catch (IOException e) { throw new RuntimeException(e); } if(i == 0){ WriteSheet writeSheet = EasyExcel.writerSheet(SheetNamePre).head(exportClazz) .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()).build(); excelWriterRef.get().write(null, writeSheet); excelWriterRef.get().finish(); returnhEppHNVRP; } //计数器 CountDownLatch countDownLatch = new CountDownLatch(i); // 使用 PriorityBlockingQueue 作为查询结果的缓冲区 BlockingQueue<ThreadQueryResult> resultQueue = new PriorityBlockingQueue<ThreadQueryResult>(); for(int j = 0; j < i; j++){ int pageNo = j + 1; String sheetName = SheetNamePre + pageNo; threadPoolTaskExecutor.execute(()->{ try { IPage<T> page = service.page(new Page<>(pageNo, limit), queryWrapper); List<T> records = page.getRecords(); //转换 List<?> exportList = PojoInfoUtil.listInfoToDTO(records, exportClazz); //处理字典数据 convertFieldwithDictionary(exportList,dictFieldNames); resultQueue.put(new ThreadQueryResult(pageNo, sheetName, exportList)); }catch (Exception e){ e.printStackTrace(); LogUtil.info("查询第" + pageNo + "页时,发生异常" + e.getMessage()); throw new BizException("查询第" + pageNo + "页时,发生异常" + e.getMessage()); } }); countDownLatch.countDown(); } // 启动一个写入线程 按照查询顺序读取 写入 CompletableFuture<Void> writeFuture = CompletableFuture.runAsync(() -> { for (int pageNo = 1; pageNo <= i; ) { boolean interrupted = false; try { while (true) { ThreadQueryResult result = resultQueue.take(); if (result.getPageNo() == pageNo) { WriteSheet writeSheet = EasyExcel.writerSheet(result.getSheetName()).head(exportClazz) .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()).build(); ExcelWriter excelWriter = excelWriterRef.get(); excelWriter.write(result.getRecords(), writeSheet); result.getRecords().clear(); //及时释放写入数据 pageNo++; break; } else { // 如果不是当前需要的 pageNo,则放回队列 resultQueue.put(result); } } } catch (InterruptedException e) { interrupted = true; LogUtil.error("写入线程被中断: " + e.getMessage()); } finally { if (interrupted) { Thread.currentThread().interrupt(); // 重新设置中断标志 } } } }, threadPoolTaskExecutor); // 等待所有任务完成 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //等待写入线程完成 writeFuture.join(); // 关闭 ExcelWriter ExcelWriter excelWriter = excelWriterRef.get(); if (excelWriter != null) { excelWriter.finish(); } } /** * 对给定的对象列表中的指定字段进行字典转换。 * * @param records 要转换的对象列表 * @param fieldDictNames 字段名列表(带前缀) * @param <T> 对象类型 */ public <T> void convertFieldWithDictionary(List<T> records, List<String> fieldDictNames) { if (CollectionUtils.isEmpty(records) || CollectionUtils.isEmpty(fieldDictNames)) { return; } Map<String, Map<String, String>> dictMap = dicAdapter.handleStaticBasicInfoDicMap(); try { for (String fieldDictName : fieldDictNames) { //前缀区分各实体类字段 if(StringUtils.isBlank(fieldDictName) || !fieldDictName.contains("-")){ return; } String fieldName = fieldDictName.split("-")[1]; hEppHNVRP for (T record : records) { Field field = record.getClass().getDeclaredField(fieldName); field.setAccessible(true); Object fieldValue = field.get(record); if (fieldValue != null && dictMap.containsKey(fieldDictName)) { Map<String, String> dictValueMap = dictMap.get(fieldDictName); if (dictValueMap != null && dictValueMap.containsKey(fieldValue)) { String dictValue = dictValueMappython.get(fieldValue); if (dictValue != null) { field.set(record, dictValue); } } } } } } catch(Exception e){ LogUtil.error("导出转换字典失败,请联系管理员" + e); throw new BizException("导出转换字典失败,请联系管理员"+ e); } } }
到此这篇关于Java实现亿级千万级数据顺序导出的示例代码的文章就介绍到这了,更多相关JAVA 亿级千万级顺序导出内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于JAVA实现亿级千万级数据顺序导出的示例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!