数据批量导入时,加入队列,分批处理,只是个笔记

2024-05-01 09:58

本文主要是介绍数据批量导入时,加入队列,分批处理,只是个笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1,用到技术点:队列,线程,单例模式,分批处理

2,添加笔记代码:

入口:

if(null != set && set.size() > 0){//异步,加入队列logger.info(String.format("加入队列,总共  %s 条数据", set.size()));TrackBusinessRunner trackBusinessRunner	= TrackBusinessRunner.getInstance();//获取单例TrackDataDTO trackDataDTO = new TrackDataDTO();//放入处理好的数据trackDataDTO.setParam(param);trackDataDTO.setSets(set);trackBusinessRunner.putQueueOnload(trackDataDTO);//放入队列if(trackBusinessRunner.getThreadTrackBusinessService()==null){trackBusinessRunner.setThreadTrackBusinessService(threadTrackBusinessService);}if(!trackBusinessRunner.isAlive()){trackBusinessRunner.start();//判断是否启动状态,如果不是就启动}}

单例模式和队列的核心类


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;import com.sf.iec.common.util.BatchHandlerInterface;
import com.sf.iec.common.util.BatchHandlerList;
import com.sf.iec.customerbusiness.inquiryorder.dto.TrackDataDTO;
import com.sf.iec.customerbusiness.inquiryorder.service.ThreadTrackBusinessService;public class TrackBusinessRunner extends Thread {private final static Logger LOGGER = Logger.getLogger(TrackBusinessRunner.class);private ThreadTrackBusinessService threadTrackBusinessService;public ThreadTrackBusinessService getThreadTrackBusinessService() {return threadTrackBusinessService;}private LinkedBlockingQueue<TrackDataDTO> blockingQueue = new LinkedBlockingQueue<TrackDataDTO>(300);//队列长度300,非常推荐该队列(put和take好好用)private volatile boolean running = true;//开启一个线程private TrackBusinessRunner(){}private static TrackBusinessRunner trackBusinessRunner;private static Object obj = new Object();//单例模式public static TrackBusinessRunner getInstance(){if(trackBusinessRunner==null){synchronized (obj) {if(trackBusinessRunner==null){trackBusinessRunner = new TrackBusinessRunner();}}}return trackBusinessRunner;}public void putQueueOnload(TrackDataDTO trackDataDTO){int i= 0;try {blockingQueue.put(trackDataDTO);//加入队列i = 0;} catch (InterruptedException e) {LOGGER.error("加入队列信息异常");e.printStackTrace();if(i < 2){putQueueOnload(trackDataDTO);i++;}}}@Overridepublic void run() {while (running) {try {TrackDataDTO trackDataDTO = blockingQueue.take();Set<Map<String,Object>> set = trackDataDTO.getSets();final Map<String, String> param = trackDataDTO.getParam();//处理 插入  	List<Map<String,Object>> lst = new ArrayList<Map<String,Object>>();CollectionUtils.addAll(lst, set.iterator());
//分批处理,每次取200条BatchHandlerList<Map<String, Object>> handler = new BatchHandlerList<Map<String,Object>>(200,lst) {@Overridepublic void handler(List<Map<String, Object>> subList) {// TODO Auto-generated method stub						threadTrackBusinessService.saveMainTainTrajectory(subList, param);//休眠	try {Thread.sleep(12000);} catch (InterruptedException e) {LOGGER.error("batch track handler thread interrupt excption",e);}//12秒}};handler.handlerList();} catch (Exception e) {LOGGER.error("获取队列信息异常",e);e.printStackTrace();}//取数据,没有的话会等待}}		public void setThreadTrackBusinessService(ThreadTrackBusinessService threadTrackBusinessService) {this.threadTrackBusinessService = threadTrackBusinessService;}}

分批接口

import java.util.List;public interface BatchHandlerInterface<T> {public void handler(List<T> subList);
}

分批处理工具类


import java.util.List;import org.apache.log4j.Logger;/*** @author  * @description 分批调用方法接口* */
public abstract class BatchHandlerList<T> implements BatchHandlerInterface<T> {private static final Logger LOGGER = Logger.getLogger(BatchHandlerList.class);//每次处理条数private Integer perNum;private List<T> aylist;public BatchHandlerList(Integer perNum, List<T> aylist) {super();this.perNum = perNum;this.aylist = aylist;}/*** 分批调用方法* */public void handlerList(){try{if(aylist!=null && aylist.size() > 0){int size = aylist.size();int startIndex = 0;int endIndex = 1;int num = 1;if (size > perNum) {num = size / perNum;}for (int i = 1; i <= num; i++) {endIndex = (i) * perNum > size ? size : (i) * perNum;List<T> subList = aylist.subList(startIndex, endIndex);startIndex = perNum * i;if (subList!=null && subList.size() > 0) {handler(subList);}if (num == i && perNum * num < size) {//最后一批处理subList = aylist.subList(perNum * num, size);if (subList.size() > 0) {handler(subList);}}}}}catch(Throwable e){LOGGER.error("batchHandlerList handler exception",e);//错误回调方法可以重写errorHandler();}}public void errorHandler(){};
}

 

这篇关于数据批量导入时,加入队列,分批处理,只是个笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/951385

相关文章

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

oracle 11g导入\导出(expdp impdp)之导入过程

《oracle11g导入导出(expdpimpdp)之导入过程》导出需使用SEC.DMP格式,无分号;建立expdir目录(E:/exp)并确保存在;导入在cmd下执行,需sys用户权限;若需修... 目录准备文件导入(impdp)1、建立directory2、导入语句 3、更改密码总结上一个环节,我们讲了

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法