本文主要是介绍SpringBoot分段处理List集合多线程批量插入数据方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率...
项目场景
大数据量的List集合,需要把List集合中的数据批量插入数据库中。
解决方案
拆分list集合后,然后使用多线程批量插入数据库
1.实体类
package com.test.entity;
import lombok.Data;
@Data
public class TestEntity {
private String id;
private String name;
}
2.Mapper
如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用BATch模式。
package com.test.mapper; import Java.util.List; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import com.test.entity.TestEntity; public interface TestMapper { /** * 1.用于使用batch模式,ExecutorType.BATCH开启批处理模式 * 数据量很大,推荐这种方式 */ @Insert("insert into test(id, name) " + " values" + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})") void testInsert(TestEntity testEntity); /** * 2.使用foreach标签,批量保存 * 数据量少可以使用这种方式 */ @Insert("insert into test(id, name) " + " values" + " <foreach collection='list' item='item' index='index' separator=','>" + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})" + " </foreach>") void testBatchInsert(@Param("list") List<TestEntity> list); }
3.spring容器注入线程池bean对象
package com.test.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
* 异步任务自定义线程池
*/
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(50);
//配置最大线程数
executor.setMaxPoolSize(500);
//配置队列大小
executor.setQueueCapacity(300);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("testExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//调用shutdown()方法时等待所有的任务完成后再关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
//等待所有任务完成后的最大等待时间
executor.setAwaitTermyIeLdPcginationSeconds(60);
return executor;
}
}
4.创建异步线程业务类
package com.test.service; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.beans.factory.annotation.Autowired; impandroidort org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import com.test.entity.TestEntity; import com.test.mapper.TestMapper; @Service public class AsyncService { @Autowired private SqlSessionFactory sqlSessionFactory; @Async("asyncServiceExecutor") public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) { //获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务 SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH); try{ TestMapper mapper = session.getMapper(TestMapper.class); //异步线程要做的事情 for (int i = 0; i < logOutputResults.size(); i++) { System.out.println(Thread.currentThread().getName() + "线程:" + logOutputResults.get(i)); TestEntity test = new TestEntity(); //test.set() //............. //批量保存 mapper.testInsert(test); //每1000条提交一次防止内存溢出 if(i%1000==0){ session.flushStatements(); } } //提交剩下未处理的事务 session.flushStatements(); }finally { countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放 if(session != null){ session.close(); } } } }
5.拆分list调用异步的业务方法
package com.test.service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
@Service
public class TestService {
@Resource
private AsyncService asyncService;
public int testMultiThread() {
List<String> logOutputResults = getTestData();
//按线程数拆分后的list
List<List<String>> lists = splitList(logOutputResults);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
for (List<String> listSub:lists) {
asyncService.executeAsync(listSub, countDownLatch);
}
try {
countDownLatcChina编程h.await(); //保证之前的所有的线程都执行完成,才会走下面的;
// 这样就可以在下面拿到所有线程执行完的集合结果
} catch (Exception e) {
e.printStackTrace();
}
return logOutputResults.size();
}
public List<String> getTestData() {
List<String> logOutputResults = new ArrayList<String>();
for (int i = 0; i < 3000; i++) {
logOutputResults.add("测试数据"+i);
}
return logOutputResults;
}
public List<List<String>> splitList(List<String> logOutputResults) {
List<List<String>> results = new ArrayList<List<String>>();
/*动态线程数方式*/
// 每500条数据开启一条线程
int threadSize = 500;
// 总数据条数
int dataSize = logOutputResults.size();
// 线程数,动态生成
int threadNum = dataSize / threadSize + 1;
/*固定线程数方式
// 线程数
int threadNum = 6;
// 总数据条数
int dataSize = logOutputResults.size();
// 每一条线程处理多少条数据
int threadSize = dataSize / (threadNum - 1);
*/
// 定义标记,过滤threadNum为整数
boolean special = dataSize % threadSize == 0;
List<String> cutList = null;
// 确定每条线程的数据
for (int i = 0; i < threadNum; i++) {
if (i == threadNum - 1) {
if (special) {
break;
}
cutList = logOutputResults.subList(threadSize * i, dataSize);
} else {
cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
}
results.add(cutList);
}
return results;
}
}6.Controller测试
@RestController
public class TestController {
@Resource
private TestService testService;
@RequestMapping(value = "/log", method = RequestMethod.GET)
@ApiOperation(value = "测试")
public String test() {
testService.testMultiThread();
return "success";
}
}总结
注意这里执行插入的数据是无序的。
这篇关于SpringBoot分段处理List集合多线程批量插入数据方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!