本文主要是介绍Java Stream 并行流简介、使用与注意事项小结,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《JavaStream并行流简介、使用与注意事项小结》Java8并行流基于StreamAPI,利用多核CPU提升计算密集型任务效率,但需注意线程安全、顺序不确定及线程池管理,可通过自定义线程池与C...
1. 并行流简介
Java 8 引入了 Stream
API,提供了一种高效的数据处理方式。而 并行流(Parallel Stream) 则是 Stream
的并行版本,能够将流操作分配到多个线程中执行,充分利用多核 CPU 的性能。
特点:
- 默认使用
ForkJoinPool.commonPool()
执行任务。 - 适合处理 计算密集型 任务。
- 任务执行顺序不确定。
2. 并行流的简单使用
将普通流转换为并行流非常简单,只需调用 parallel()
方法即可。
示例:并行流的基本使用
import java.util.Arrays; import java.util.List; public class ParallelStreamExample { public static void main(String[] args) { List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); python // 将流转换为并行流 numbers.parallelStream() .forEach(num -> System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + num)); } }
输出示例:
线程: main, 处理: 6
线程: ForkJoinPool.commonPool-worker-1, 处理: 3
线程: ForkJoinPool.commonPojavascriptol-worker-2, 处理: 8
...
3. 配合自定义线程池
默认情况下,并行流使用 ForkJoinPool.commonPool()
执行任务。你可以通过自定义线程池来控制并行流的执行环境。
示例:自定义线程池
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamCustomPool {
public static void main(String[] args) {
// 创建自定义线程池
ForkJoinPool customPool = new ForkJoinPool(4);
// 在自定义线程池中执行并行流任务
customPool.submit(() -> {
List<Integer> result = IntStream.rangeClosed(1, 10)
.parallel()
.map(i -> {
System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i);
China编程 return i * 2;
})
.boxed()
.collect(Collectors.toList());
System.out.println("结果: " + result);
}).join(); // 等待任务完成
customPool.shutdown(); // 关闭线程池
}
}
示例:配合CompletableFuture实现异步
import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; public class ParallelStreamWithCompletableFuture { public static void main(String[] args) { // 创建一个并行流 List<CompletableFuture<Integer>> futures = IntStream.rangeClosed(1, 10) .parallel() .mapToObj(i -> CompletableFuture.supplyAsync(() -> { System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i); return i * 2; // 模拟计算任务 })) .collect(Collectors.toList()); // 等待所有任务完成并获取结果 List<Integer> res编程ults = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println("结果: " + results); } }
好处:
- 并行流:适合处理数据流中的计算密集型任务,能够自动将任务分配到多个线程中执行。
- CompletableFuture:提供强大的异步编程能力,可以处理任务的依赖关系、异常处理、结果合并等。
结合两者的优势,可以实现:
- 异步并行处理:将并行流的任务异步化,进一步提升性能。
- 任务依赖管理:通过
CompletableFuture
管理任务之间的依赖关系。 - 结果合并:将多个任务的结果合并处理。
4. 控制有序性
并行流的任务执行顺序是不确定的。如果需要保持顺序,可以使用 forEachOrdered()
方法。
示例:保持顺序
import java.util.Arrays; import java.util.List; public class ParallelStreamOrder { public static void main(String[] args) { List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); numbers.parallelStream() .forEachOrdered(System.out::println); // 输出顺序与流中元素顺序一致 } }
5. 共享资源的安全性
并行流在多个线程中执行操作,如果操作共享可变状态,可能会导致线程安全问题。
示例:线程安全问题
import java.util.ArrayList; import java.util.List; public class ParallelStreamThreadSafety { public static void main(String[] args) { List<Integer> result = new ArrayList<>(); IntStream.rangeClosed(1, 1000) .parallel() .forEach(result::add); // 这里会出现线程安全问题 System.out.println("结果大小: " + result.size()); // 结果可能小于 1000 } }
解决方法:
- 使用线程安全的集合,如
Collections.synchronizedList()
。 - 使用
collect()
方法将结果收集到线程安全的容器中。
示例:线程安全的解决方案
import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; public class ParallelStreamThreadSafety { public static void main(String[] args) { List<Integer> result = IntStream.rangeClosed(1, 1000) .parallel() .boxed() .collect(Collectors.toList()); // 使用 collect() 方法 System.out.println("结果大小: " + result.size()); // 输出: 1000 } }
6. 注意事项
- 任务类型:
- 适合 计算密集型 任务,不适合 I/O 密集型 任务。
- 线程安全:
- 避免在并行流中操作共享可变状态。
- 任务顺序:
- 并行流的任务执行顺序不确定,使用
forEachOrdered()
保持顺序。
- 并行流的任务执行顺序不确定,使用
- 线程池管理:
- 使用自定义线程池时,记得关闭线程池,避免资源泄漏。
7. 总结
并行流是 Java 8 提供的一个强大工具,能够显著提升数据处理性能。但在使用时需要注意线程安全、任务顺序和线程池管理等问题。通过合理使用并行流,可以编写高效、灵活的代码。
附录:完整代码
import java.util.Arrays; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import java.util.stream.IntStream; public class ParallelStreamDemo { public static void main(String[] args) { // 基本使用 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); numbers.parallelStream() .forEach(num -> System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + num)); // 自定义线程池 ForkJoinPool customPool = new ForkJoinPool(4); customPool.submit(() -> { List<Integer> result = IntStream.rangeClosed(1, 10) .parallel() .map(i -> { System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i); return i * 2; }) .boxed() China编程 .collect(Collectors.toList()); System.out.println("结果: " + result); }).join(); customPool.shutdown(); // 保持顺序 numbers.parallelStream() .forEachOrdered(System.out::println); // 线程安全 List<Integer> safeResult = IntStream.rangeClosed(1, 1000) .parallel() .boxed() .collect(Collectors.toList()); System.out.println("结果大小: " + safeResult.size()); } }
希望这篇文章能帮助你更好地理解和使用 Java 的并行流!如果有任何问题,欢迎在评论区讨论!
到此这篇关于Java Stream 并行流简介、使用与注意事项小结的文章就介绍到这了,更多相关Java Stream 并行流内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于Java Stream 并行流简介、使用与注意事项小结的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!