本文主要是介绍SpringBoot分段处理List集合多线程批量插入数据方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率...
项目场景
大数据量的List集合,需要把List集合中的数据批量插入数据库中。
解决方案
拆分list集合后,然后使用多线程批量插入数据库
1.实体类
package com.test.entity; import lombok.Data; @Data public class TestEntity { private String id; private String name; }
2.Mapper
如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用BATch模式。
package com.test.mapper; import Java.util.List; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import com.test.entity.TestEntity; public interface TestMapper { /** * 1.用于使用batch模式,ExecutorType.BATCH开启批处理模式 * 数据量很大,推荐这种方式 */ @Insert("insert into test(id, name) " + " values" + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})") void testInsert(TestEntity testEntity); /** * 2.使用foreach标签,批量保存 * 数据量少可以使用这种方式 */ @Insert("insert into test(id, name) " + " values" + " <foreach collection='list' item='item' index='index' separator=','>" + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})" + " </foreach>") void testBatchInsert(@Param("list") List<TestEntity> list); }
3.spring容器注入线程池bean对象
package com.test.config; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableAsync public class ExecutorConfig { /** * 异步任务自定义线程池 */ @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(50); //配置最大线程数 executor.setMaxPoolSize(500); //配置队列大小 executor.setQueueCapacity(300); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("testExecutor-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //调用shutdown()方法时等待所有的任务完成后再关闭 executor.setWaitForTasksToCompleteOnShutdown(true); //等待所有任务完成后的最大等待时间 executor.setAwaitTermyIeLdPcginationSeconds(60); return executor; } }
4.创建异步线程业务类
package com.test.service; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.beans.factory.annotation.Autowired; impandroidort org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import com.test.entity.TestEntity; import com.test.mapper.TestMapper; @Service public class AsyncService { @Autowired private SqlSessionFactory sqlSessionFactory; @Async("asyncServiceExecutor") public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) { //获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务 SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH); try{ TestMapper mapper = session.getMapper(TestMapper.class); //异步线程要做的事情 for (int i = 0; i < logOutputResults.size(); i++) { System.out.println(Thread.currentThread().getName() + "线程:" + logOutputResults.get(i)); TestEntity test = new TestEntity(); //test.set() //............. //批量保存 mapper.testInsert(test); //每1000条提交一次防止内存溢出 if(i%1000==0){ session.flushStatements(); } } //提交剩下未处理的事务 session.flushStatements(); }finally { countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放 if(session != null){ session.close(); } } } }
5.拆分list调用异步的业务方法
package com.test.service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import javax.annotation.Resource; import org.springframework.stereotype.Service; @Service public class TestService { @Resource private AsyncService asyncService; public int testMultiThread() { List<String> logOutputResults = getTestData(); //按线程数拆分后的list List<List<String>> lists = splitList(logOutputResults); CountDownLatch countDownLatch = new CountDownLatch(lists.size()); for (List<String> listSub:lists) { asyncService.executeAsync(listSub, countDownLatch); } try { countDownLatcChina编程h.await(); //保证之前的所有的线程都执行完成,才会走下面的; // 这样就可以在下面拿到所有线程执行完的集合结果 } catch (Exception e) { e.printStackTrace(); } return logOutputResults.size(); } public List<String> getTestData() { List<String> logOutputResults = new ArrayList<String>(); for (int i = 0; i < 3000; i++) { logOutputResults.add("测试数据"+i); } return logOutputResults; } public List<List<String>> splitList(List<String> logOutputResults) { List<List<String>> results = new ArrayList<List<String>>(); /*动态线程数方式*/ // 每500条数据开启一条线程 int threadSize = 500; // 总数据条数 int dataSize = logOutputResults.size(); // 线程数,动态生成 int threadNum = dataSize / threadSize + 1; /*固定线程数方式 // 线程数 int threadNum = 6; // 总数据条数 int dataSize = logOutputResults.size(); // 每一条线程处理多少条数据 int threadSize = dataSize / (threadNum - 1); */ // 定义标记,过滤threadNum为整数 boolean special = dataSize % threadSize == 0; List<String> cutList = null; // 确定每条线程的数据 for (int i = 0; i < threadNum; i++) { if (i == threadNum - 1) { if (special) { break; } cutList = logOutputResults.subList(threadSize * i, dataSize); } else { cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1)); } results.add(cutList); } return results; } }
6.Controller测试
@RestController public class TestController { @Resource private TestService testService; @RequestMapping(value = "/log", method = RequestMethod.GET) @ApiOperation(value = "测试") public String test() { testService.testMultiThread(); return "success"; } }
总结
注意这里执行插入的数据是无序的。
这篇关于SpringBoot分段处理List集合多线程批量插入数据方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!