数据批量导入时,加入队列,分批处理,只是个笔记

2024-05-01 09:58

本文主要是介绍数据批量导入时,加入队列,分批处理,只是个笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1,用到技术点:队列,线程,单例模式,分批处理

2,添加笔记代码:

入口:

if(null != set && set.size() > 0){//异步,加入队列logger.info(String.format("加入队列,总共  %s 条数据", set.size()));TrackBusinessRunner trackBusinessRunner	= TrackBusinessRunner.getInstance();//获取单例TrackDataDTO trackDataDTO = new TrackDataDTO();//放入处理好的数据trackDataDTO.setParam(param);trackDataDTO.setSets(set);trackBusinessRunner.putQueueOnload(trackDataDTO);//放入队列if(trackBusinessRunner.getThreadTrackBusinessService()==null){trackBusinessRunner.setThreadTrackBusinessService(threadTrackBusinessService);}if(!trackBusinessRunner.isAlive()){trackBusinessRunner.start();//判断是否启动状态,如果不是就启动}}

单例模式和队列的核心类


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;import com.sf.iec.common.util.BatchHandlerInterface;
import com.sf.iec.common.util.BatchHandlerList;
import com.sf.iec.customerbusiness.inquiryorder.dto.TrackDataDTO;
import com.sf.iec.customerbusiness.inquiryorder.service.ThreadTrackBusinessService;public class TrackBusinessRunner extends Thread {private final static Logger LOGGER = Logger.getLogger(TrackBusinessRunner.class);private ThreadTrackBusinessService threadTrackBusinessService;public ThreadTrackBusinessService getThreadTrackBusinessService() {return threadTrackBusinessService;}private LinkedBlockingQueue<TrackDataDTO> blockingQueue = new LinkedBlockingQueue<TrackDataDTO>(300);//队列长度300,非常推荐该队列(put和take好好用)private volatile boolean running = true;//开启一个线程private TrackBusinessRunner(){}private static TrackBusinessRunner trackBusinessRunner;private static Object obj = new Object();//单例模式public static TrackBusinessRunner getInstance(){if(trackBusinessRunner==null){synchronized (obj) {if(trackBusinessRunner==null){trackBusinessRunner = new TrackBusinessRunner();}}}return trackBusinessRunner;}public void putQueueOnload(TrackDataDTO trackDataDTO){int i= 0;try {blockingQueue.put(trackDataDTO);//加入队列i = 0;} catch (InterruptedException e) {LOGGER.error("加入队列信息异常");e.printStackTrace();if(i < 2){putQueueOnload(trackDataDTO);i++;}}}@Overridepublic void run() {while (running) {try {TrackDataDTO trackDataDTO = blockingQueue.take();Set<Map<String,Object>> set = trackDataDTO.getSets();final Map<String, String> param = trackDataDTO.getParam();//处理 插入  	List<Map<String,Object>> lst = new ArrayList<Map<String,Object>>();CollectionUtils.addAll(lst, set.iterator());
//分批处理,每次取200条BatchHandlerList<Map<String, Object>> handler = new BatchHandlerList<Map<String,Object>>(200,lst) {@Overridepublic void handler(List<Map<String, Object>> subList) {// TODO Auto-generated method stub						threadTrackBusinessService.saveMainTainTrajectory(subList, param);//休眠	try {Thread.sleep(12000);} catch (InterruptedException e) {LOGGER.error("batch track handler thread interrupt excption",e);}//12秒}};handler.handlerList();} catch (Exception e) {LOGGER.error("获取队列信息异常",e);e.printStackTrace();}//取数据,没有的话会等待}}		public void setThreadTrackBusinessService(ThreadTrackBusinessService threadTrackBusinessService) {this.threadTrackBusinessService = threadTrackBusinessService;}}

分批接口

import java.util.List;public interface BatchHandlerInterface<T> {public void handler(List<T> subList);
}

分批处理工具类


import java.util.List;import org.apache.log4j.Logger;/*** @author  * @description 分批调用方法接口* */
public abstract class BatchHandlerList<T> implements BatchHandlerInterface<T> {private static final Logger LOGGER = Logger.getLogger(BatchHandlerList.class);//每次处理条数private Integer perNum;private List<T> aylist;public BatchHandlerList(Integer perNum, List<T> aylist) {super();this.perNum = perNum;this.aylist = aylist;}/*** 分批调用方法* */public void handlerList(){try{if(aylist!=null && aylist.size() > 0){int size = aylist.size();int startIndex = 0;int endIndex = 1;int num = 1;if (size > perNum) {num = size / perNum;}for (int i = 1; i <= num; i++) {endIndex = (i) * perNum > size ? size : (i) * perNum;List<T> subList = aylist.subList(startIndex, endIndex);startIndex = perNum * i;if (subList!=null && subList.size() > 0) {handler(subList);}if (num == i && perNum * num < size) {//最后一批处理subList = aylist.subList(perNum * num, size);if (subList.size() > 0) {handler(subList);}}}}}catch(Throwable e){LOGGER.error("batchHandlerList handler exception",e);//错误回调方法可以重写errorHandler();}}public void errorHandler(){};
}

 

这篇关于数据批量导入时,加入队列,分批处理,只是个笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/951385

相关文章

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

解决docker目录内存不足扩容处理方案

《解决docker目录内存不足扩容处理方案》文章介绍了Docker存储目录迁移方法:因系统盘空间不足,需将Docker数据迁移到更大磁盘(如/home/docker),通过修改daemon.json配... 目录1、查看服务器所有磁盘的使用情况2、查看docker镜像和容器存储目录的空间大小3、停止dock

5 种使用Python自动化处理PDF的实用方法介绍

《5种使用Python自动化处理PDF的实用方法介绍》自动化处理PDF文件已成为减少重复工作、提升工作效率的重要手段,本文将介绍五种实用方法,从内置工具到专业库,帮助你在Python中实现PDF任务... 目录使用内置库(os、subprocess)调用外部工具使用 PyPDF2 进行基本 PDF 操作使用

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

Ubuntu向多台主机批量传输文件的流程步骤

《Ubuntu向多台主机批量传输文件的流程步骤》:本文主要介绍在Ubuntu中批量传输文件到多台主机的方法,需确保主机互通、用户名密码统一及端口开放,通过安装sshpass工具,准备包含目标主机信... 目录Ubuntu 向多台主机批量传输文件1.安装 sshpass2.准备主机列表文件3.创建一个批处理脚

Python异常处理之避免try-except滥用的3个核心原则

《Python异常处理之避免try-except滥用的3个核心原则》在Python开发中,异常处理是保证程序健壮性的关键机制,本文结合真实案例与Python核心机制,提炼出避免异常滥用的三大原则,有需... 目录一、精准打击:只捕获可预见的异常类型1.1 通用异常捕获的陷阱1.2 精准捕获的实践方案1.3

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

C++中处理文本数据char与string的终极对比指南

《C++中处理文本数据char与string的终极对比指南》在C++编程中char和string是两种用于处理字符数据的类型,但它们在使用方式和功能上有显著的不同,:本文主要介绍C++中处理文本数... 目录1. 基本定义与本质2. 内存管理3. 操作与功能4. 性能特点5. 使用场景6. 相互转换核心区别

Python动态处理文件编码的完整指南

《Python动态处理文件编码的完整指南》在Python文件处理的高级应用中,我们经常会遇到需要动态处理文件编码的场景,本文将深入探讨Python中动态处理文件编码的技术,有需要的小伙伴可以了解下... 目录引言一、理解python的文件编码体系1.1 Python的IO层次结构1.2 编码问题的常见场景二