Java中流式并行操作parallelStream的原理和使用方法

2025-11-11 22:50

本文主要是介绍Java中流式并行操作parallelStream的原理和使用方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流...

Java中流式并行操作parallelStream

0. 问题的产生

某天上线后,发现线上存在一些报错,遂即自己尝试线上操作,但是发现功能正常。追踪相关的报错代码行如下:

PointMissionPO missionPO = missionBO.getDbData();android

该行报错为空指针异常,可以从代码中判断唯一能报出空指针异常的位置为missionBO为空,向上追踪该引用:

for (PointMissionBO missionBO : result) {
	// 循环体内容
}

为一个列表List的循环体。于是接着向前追踪引用,发现所有处理该列表引用的地方均使用了stream流式操作。经常使用函数式编程的同学都知道,这很少会出现null对象在列表中。我通体检查了一遍都未发现任何可能产生null对象的插入位置。很奇怪那么这个null对象是如何被加入到列表中的呢?

后来有个小伙伴经过AI上下文分析,给出了可能的位置:

missionByType.entrySet().parallelStream()
                .filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
                .map(entry -> processMissions(ctmId, hruId, entry.getValue()))
                .forEach(result::addAll);

仔细一看居然使用了parallelStream并行处理,那么什么是parallelStream呢?为什么它会出现错误呢?

1. 什么是parallelStream?

Java 8引入的Stream API提供了两种处理方式:

  • stream():串行处理,按顺序处理元素
  • parallelStream():并行处理,利用多核CPU将数据分割成多个部分并行处理

2. parallelStream的工作原理

parallelStream基于Fork/Join框架实现:

  1. 将数据源分割成多个子任务(fork)
  2. 在不同线程上并行处理这些子任务
  3. 合并结果(join)

3. parallelStream的正确与错误使用示例

错误使用示例(来自我们的测试代码):

List<String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
  android      .map(entry -&gpythont; processMissions(entry.getValue()))
        .forEach(result::addAll);  // 危险操作!

问题分析:

  • ArrayList不是线程安全的集合
  • 多个线程同时调用result::addAll会产生竞态条件
  • 可能导致数据丢失、重复、甚至程序崩溃

正确使用方式一:使用同步块

List&lLJvHNVmt;String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
        .map(entry -> processMissions(entry.getValue()))
        .China编程filter(processedMissions -> !processedMissions.isEmpty())
        .forEach(processedMissions -> {
            synchronized (result) {
                result.addAll(processedMissions);
            }
        });

正确使用方式二:使用收集器(推荐)

List<String> safeResult = missionByType.entrySet().parallelStream()
        .flatMap(entry -> processMissions(entry.getValue()).stream())
        .collect(Collectors.toList());

4. parallelStream在实际业务中的应用

查看我们项目中的实际应用案例:

// PointBusinessServiceImpl.java 中的实际使用
List<PointMissionBO> result = missionByType.entrySet().parallelStream()
        .filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
        .flatMap(entry -> processMissions(ctmId, hruId, entry.getValue()).stream())
        .collect(Collectors.toList());

这种方式的优点:

  • 使用flatMap展平数据结构
  • 使用collect收集结果,避免线程安全问题
  • 提高了任务处理效率

5. parallelStream适用场景与注意事项

适用场景:

  1. 数据量较大(通常万级以上)
  2. 计算密集型操作
  3. 无状态操作(函数式编程)
  4. 不依赖处理顺序的操作

不适用场景:

  1. 数据量小(并行开销可能超过收益)
  2. IO密集型操作
  3. 有状态共享操作
  4. 需要保证处理顺序的场景

注意事项:

  1. 线程安全:避免在并行流中使用非线程安全的对象
  2. 副作用:避免在流操作中修改外部状态
  3. 性能考量:并行不一定比串行快,需根据实际情况评估
  4. 资源竞争:注意共享资源的访问控制

6. 最佳实践建议

  1. 优先考虑collect:使用收集器而不是直接修改共享集合
  2. 避免副作用:确保流操作是无状态的纯函数
  3. 合理选择数据结构:使用适合并行处理的数据结构
  4. 测试性能:在实际环境中测试并行处理效果
  5. 监控资源使用:关注CPU和内存使用情况

以上就是关于parallelStream并行处理的AI输出内容。下面结合实际问题发生的场景进行推演和论述。

7. 其他思考

上面说了这么多,核心重点在于:parallelStream中使用线程不安全的对象操作时会出现异常。那么具体是什么样的异常呢?
其实我们仔细想想非内存安全List的内存管理,熟悉八股文的同学应该一下子就懂了,没错就是内存扩展机制。
当一个非内存安全的List在触发到内存扩展阈值的时候,就会触发一次内存扩展。具体原理就是开辟一个更长的(通常是2倍)列表,然后将原数据赋值到新列表中。这个过程中,如果出现了并行开辟的情况,那赋值的内容以及目标列表就会变得混乱,出现null对象也并非不可能。因此当尝试指定初始列表大小为256的情况下,第0章的错误就自然消失了。

那么也就出现一些关于Collection的使用建议:

  1. 初始化List时,最好能够有一个预估的列表大小并指定,其实所有Collection对象都可以这么做。
  2. 但凡出现非线程安全的Collection对象需要参与并行计算时,都需要注意它的数据正确性应当如何保证,如果无法自行控制,或者控制有难度的,可以考虑使用Concurrent包中的内容。

到此这篇关于Java中流式并行操作parallelStream的原理和使用方法的文章就介绍到这了,更多相关java并行parallelStream内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于Java中流式并行操作parallelStream的原理和使用方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1156138

相关文章

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

Linux jq命令的使用解读

《Linuxjq命令的使用解读》jq是一个强大的命令行工具,用于处理JSON数据,它可以用来查看、过滤、修改、格式化JSON数据,通过使用各种选项和过滤器,可以实现复杂的JSON处理任务... 目录一. 简介二. 选项2.1.2.2-c2.3-r2.4-R三. 字段提取3.1 普通字段3.2 数组字段四.

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置