数据批量导入时,加入队列,分批处理,只是个笔记

2024-05-01 09:58

本文主要是介绍数据批量导入时,加入队列,分批处理,只是个笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1,用到技术点:队列,线程,单例模式,分批处理

2,添加笔记代码:

入口:

if(null != set && set.size() > 0){//异步,加入队列logger.info(String.format("加入队列,总共  %s 条数据", set.size()));TrackBusinessRunner trackBusinessRunner	= TrackBusinessRunner.getInstance();//获取单例TrackDataDTO trackDataDTO = new TrackDataDTO();//放入处理好的数据trackDataDTO.setParam(param);trackDataDTO.setSets(set);trackBusinessRunner.putQueueOnload(trackDataDTO);//放入队列if(trackBusinessRunner.getThreadTrackBusinessService()==null){trackBusinessRunner.setThreadTrackBusinessService(threadTrackBusinessService);}if(!trackBusinessRunner.isAlive()){trackBusinessRunner.start();//判断是否启动状态,如果不是就启动}}

单例模式和队列的核心类


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;import com.sf.iec.common.util.BatchHandlerInterface;
import com.sf.iec.common.util.BatchHandlerList;
import com.sf.iec.customerbusiness.inquiryorder.dto.TrackDataDTO;
import com.sf.iec.customerbusiness.inquiryorder.service.ThreadTrackBusinessService;public class TrackBusinessRunner extends Thread {private final static Logger LOGGER = Logger.getLogger(TrackBusinessRunner.class);private ThreadTrackBusinessService threadTrackBusinessService;public ThreadTrackBusinessService getThreadTrackBusinessService() {return threadTrackBusinessService;}private LinkedBlockingQueue<TrackDataDTO> blockingQueue = new LinkedBlockingQueue<TrackDataDTO>(300);//队列长度300,非常推荐该队列(put和take好好用)private volatile boolean running = true;//开启一个线程private TrackBusinessRunner(){}private static TrackBusinessRunner trackBusinessRunner;private static Object obj = new Object();//单例模式public static TrackBusinessRunner getInstance(){if(trackBusinessRunner==null){synchronized (obj) {if(trackBusinessRunner==null){trackBusinessRunner = new TrackBusinessRunner();}}}return trackBusinessRunner;}public void putQueueOnload(TrackDataDTO trackDataDTO){int i= 0;try {blockingQueue.put(trackDataDTO);//加入队列i = 0;} catch (InterruptedException e) {LOGGER.error("加入队列信息异常");e.printStackTrace();if(i < 2){putQueueOnload(trackDataDTO);i++;}}}@Overridepublic void run() {while (running) {try {TrackDataDTO trackDataDTO = blockingQueue.take();Set<Map<String,Object>> set = trackDataDTO.getSets();final Map<String, String> param = trackDataDTO.getParam();//处理 插入  	List<Map<String,Object>> lst = new ArrayList<Map<String,Object>>();CollectionUtils.addAll(lst, set.iterator());
//分批处理,每次取200条BatchHandlerList<Map<String, Object>> handler = new BatchHandlerList<Map<String,Object>>(200,lst) {@Overridepublic void handler(List<Map<String, Object>> subList) {// TODO Auto-generated method stub						threadTrackBusinessService.saveMainTainTrajectory(subList, param);//休眠	try {Thread.sleep(12000);} catch (InterruptedException e) {LOGGER.error("batch track handler thread interrupt excption",e);}//12秒}};handler.handlerList();} catch (Exception e) {LOGGER.error("获取队列信息异常",e);e.printStackTrace();}//取数据,没有的话会等待}}		public void setThreadTrackBusinessService(ThreadTrackBusinessService threadTrackBusinessService) {this.threadTrackBusinessService = threadTrackBusinessService;}}

分批接口

import java.util.List;public interface BatchHandlerInterface<T> {public void handler(List<T> subList);
}

分批处理工具类


import java.util.List;import org.apache.log4j.Logger;/*** @author  * @description 分批调用方法接口* */
public abstract class BatchHandlerList<T> implements BatchHandlerInterface<T> {private static final Logger LOGGER = Logger.getLogger(BatchHandlerList.class);//每次处理条数private Integer perNum;private List<T> aylist;public BatchHandlerList(Integer perNum, List<T> aylist) {super();this.perNum = perNum;this.aylist = aylist;}/*** 分批调用方法* */public void handlerList(){try{if(aylist!=null && aylist.size() > 0){int size = aylist.size();int startIndex = 0;int endIndex = 1;int num = 1;if (size > perNum) {num = size / perNum;}for (int i = 1; i <= num; i++) {endIndex = (i) * perNum > size ? size : (i) * perNum;List<T> subList = aylist.subList(startIndex, endIndex);startIndex = perNum * i;if (subList!=null && subList.size() > 0) {handler(subList);}if (num == i && perNum * num < size) {//最后一批处理subList = aylist.subList(perNum * num, size);if (subList.size() > 0) {handler(subList);}}}}}catch(Throwable e){LOGGER.error("batchHandlerList handler exception",e);//错误回调方法可以重写errorHandler();}}public void errorHandler(){};
}

 

这篇关于数据批量导入时,加入队列,分批处理,只是个笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/951385

相关文章

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

利用Python脚本实现批量将图片转换为WebP格式

《利用Python脚本实现批量将图片转换为WebP格式》Python语言的简洁语法和库支持使其成为图像处理的理想选择,本文将介绍如何利用Python实现批量将图片转换为WebP格式的脚本,WebP作为... 目录简介1. python在图像处理中的应用2. WebP格式的原理和优势2.1 WebP格式与传统

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模