本文主要是介绍Java中流式并行操作parallelStream的原理和使用方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流...
Java中流式并行操作parallelStream
0. 问题的产生
某天上线后,发现线上存在一些报错,遂即自己尝试线上操作,但是发现功能正常。追踪相关的报错代码行如下:
PointMissionPO missionPO = missionBO.getDbData();android
该行报错为空指针异常,可以从代码中判断唯一能报出空指针异常的位置为missionBO为空,向上追踪该引用:
for (PointMissionBO missionBO : result) {
// 循环体内容
}
为一个列表List的循环体。于是接着向前追踪引用,发现所有处理该列表引用的地方均使用了stream流式操作。经常使用函数式编程的同学都知道,这很少会出现null对象在列表中。我通体检查了一遍都未发现任何可能产生null对象的插入位置。很奇怪那么这个null对象是如何被加入到列表中的呢?
后来有个小伙伴经过AI上下文分析,给出了可能的位置:
missionByType.entrySet().parallelStream()
.filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
.map(entry -> processMissions(ctmId, hruId, entry.getValue()))
.forEach(result::addAll);仔细一看居然使用了parallelStream并行处理,那么什么是parallelStream呢?为什么它会出现错误呢?
1. 什么是parallelStream?
Java 8引入的Stream API提供了两种处理方式:
- stream():串行处理,按顺序处理元素
- parallelStream():并行处理,利用多核CPU将数据分割成多个部分并行处理
2. parallelStream的工作原理
parallelStream基于Fork/Join框架实现:
- 将数据源分割成多个子任务(fork)
- 在不同线程上并行处理这些子任务
- 合并结果(join)
3. parallelStream的正确与错误使用示例
错误使用示例(来自我们的测试代码):
List<String> result = new ArrayList<>(); missionByType.entrySet().parallelStream() android .map(entry -&gpythont; processMissions(entry.getValue())) .forEach(result::addAll); // 危险操作!
问题分析:
- ArrayList不是线程安全的集合
- 多个线程同时调用result::addAll会产生竞态条件
- 可能导致数据丢失、重复、甚至程序崩溃
正确使用方式一:使用同步块
List&lLJvHNVmt;String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
.map(entry -> processMissions(entry.getValue()))
.China编程filter(processedMissions -> !processedMissions.isEmpty())
.forEach(processedMissions -> {
synchronized (result) {
result.addAll(processedMissions);
}
});
正确使用方式二:使用收集器(推荐)
List<String> safeResult = missionByType.entrySet().parallelStream()
.flatMap(entry -> processMissions(entry.getValue()).stream())
.collect(Collectors.toList());
4. parallelStream在实际业务中的应用
查看我们项目中的实际应用案例:
// PointBusinessServiceImpl.java 中的实际使用
List<PointMissionBO> result = missionByType.entrySet().parallelStream()
.filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
.flatMap(entry -> processMissions(ctmId, hruId, entry.getValue()).stream())
.collect(Collectors.toList());
这种方式的优点:
- 使用flatMap展平数据结构
- 使用collect收集结果,避免线程安全问题
- 提高了任务处理效率
5. parallelStream适用场景与注意事项
适用场景:
- 数据量较大(通常万级以上)
- 计算密集型操作
- 无状态操作(函数式编程)
- 不依赖处理顺序的操作
不适用场景:
- 数据量小(并行开销可能超过收益)
- IO密集型操作
- 有状态共享操作
- 需要保证处理顺序的场景
注意事项:
- 线程安全:避免在并行流中使用非线程安全的对象
- 副作用:避免在流操作中修改外部状态
- 性能考量:并行不一定比串行快,需根据实际情况评估
- 资源竞争:注意共享资源的访问控制
6. 最佳实践建议
- 优先考虑collect:使用收集器而不是直接修改共享集合
- 避免副作用:确保流操作是无状态的纯函数
- 合理选择数据结构:使用适合并行处理的数据结构
- 测试性能:在实际环境中测试并行处理效果
- 监控资源使用:关注CPU和内存使用情况
以上就是关于parallelStream并行处理的AI输出内容。下面结合实际问题发生的场景进行推演和论述。
7. 其他思考
上面说了这么多,核心重点在于:parallelStream中使用线程不安全的对象操作时会出现异常。那么具体是什么样的异常呢?
其实我们仔细想想非内存安全List的内存管理,熟悉八股文的同学应该一下子就懂了,没错就是内存扩展机制。
当一个非内存安全的List在触发到内存扩展阈值的时候,就会触发一次内存扩展。具体原理就是开辟一个更长的(通常是2倍)列表,然后将原数据赋值到新列表中。这个过程中,如果出现了并行开辟的情况,那赋值的内容以及目标列表就会变得混乱,出现null对象也并非不可能。因此当尝试指定初始列表大小为256的情况下,第0章的错误就自然消失了。
那么也就出现一些关于Collection的使用建议:
- 初始化List时,最好能够有一个预估的列表大小并指定,其实所有Collection对象都可以这么做。
- 但凡出现非线程安全的Collection对象需要参与并行计算时,都需要注意它的数据正确性应当如何保证,如果无法自行控制,或者控制有难度的,可以考虑使用Concurrent包中的内容。
到此这篇关于Java中流式并行操作parallelStream的原理和使用方法的文章就介绍到这了,更多相关java并行parallelStream内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于Java中流式并行操作parallelStream的原理和使用方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!