一篇文章学会使用 CompletableFuture(JDK9)

2023-12-08 08:58

本文主要是介绍一篇文章学会使用 CompletableFuture(JDK9),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

CompletableFuture 是 Java8 新增的一个超大型工具类。

为什么说它大呢?

因为它实现了 Future 接口,而更重要的是,它也实现了 CompletionStage 接口。

CompletionStage 接口也是 Java8 中新增的,它拥有多达约 40 种方法!

是的,你没有看错,这看起来完全不符合设计中所谓的“单方法接口”原则,但是在这里,它就这么存在了。

这个接口拥有如此众多的方法,是为函数式编程中的流式调用准备的。

通过 Completionstage 接口,我们可以在个执行结果上进行多次流式调用,以此可以得到最终结果。

比如,你可以在一个 CompletionStage 接口上进行如下调用:

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun (()-> System.out.println());

这一连串的调用就会依次执行。

1、完成了就通知我

CompletableFuture 和 Future 一样,可以作为函数调用的契约。

向 CompletableFuture 请求一个数据,如果数据还没有准备好,请求线程就会等待。

而让人惊喜的是,我们可以手动设置 CompletableFuture 的完成状态。

package com.shockang.study.java.concurrent.completable;import java.util.concurrent.CompletableFuture;/*** 脱离线程池的使用,仅作为一个契约** @author Shockang*/
public class CFutureMain1 {public static class AskThread implements Runnable {CompletableFuture<Integer> re = null;public AskThread(CompletableFuture<Integer> re) {this.re = re;}@Overridepublic void run() {int myRe = 0;try {myRe = re.get() * re.get();} catch (Exception e) {}System.out.println(myRe);}}public static void main(String[] args) throws InterruptedException {final CompletableFuture<Integer> future = new CompletableFuture<>();new Thread(new AskThread(future)).start();// 模拟长时间其他调用Thread.sleep(1000);// 告知完成结果future.complete(60);}
}
3600

2、异步执行任务

通过 CompletableFuture 提供的进一步封装,我们很容易实现 Future 模式那样的异步调用。

package com.shockang.study.java.concurrent.completable;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** 完成普通future的工作* <p>* 以下几个函数可以执行(创建)一个CompletableFuture任务* static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);* static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);* static CompletableFuture<Void> runAsync(Runnable runnable);* static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);** @author Shockang*/
public class CFutureMain2 {public static Integer calc(Integer para) {try {// 模拟一个长时间的执行Thread.sleep(1000);} catch (InterruptedException e) {}return para * para;}public static void main(String[] args) throws InterruptedException, ExecutionException {final CompletableFuture<Integer> future =CompletableFuture.supplyAsync(() -> calc(50));System.out.println(future.get());}
}
2500

在 CompletableFuture 中,类似的工厂方法如下所示。

    /*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {@link ForkJoinPool#commonPool()} with* the value obtained by calling the given Supplier.** @param supplier a function returning the value to be used* to complete the returned CompletableFuture* @param <U> the function's return type* @return the new CompletableFuture*/public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the given executor with the value obtained* by calling the given Supplier.** @param supplier a function returning the value to be used* to complete the returned CompletableFuture* @param executor the executor to use for asynchronous execution* @param <U> the function's return type* @return the new CompletableFuture*/public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {@link ForkJoinPool#commonPool()} after* it runs the given action.** @param runnable the action to run before completing the* returned CompletableFuture* @return the new CompletableFuture*/public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the given executor after it runs the given* action.** @param runnable the action to run before completing the* returned CompletableFuture* @param executor the executor to use for asynchronous execution* @return the new CompletableFuture*/public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}

其中 supplyAsync() 方法用于那些需要返回值的场景,比如计算某个数据等。

而 runAsync() 方法用于没有返回值的场景,比如,仅仅是简单地执行某一个异步动作。

在这两对方法中,都有一个方法可以接收一个 Executor 参数。

这就使我们可以让 Supplier< U > 或者 Runnable 在指定的线程池中工作。

如果不指定,则在默认的系统公共的 ForkJoinPool.common 线程池中执行。

注意:在 Java8 中,新増了 ForkJoinPool.commonPool() 方法。 它可以获得一个公共的 ForkJoin 线程池。 这个公共线程池中的所有线程都是 Daemon 线程。 这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。

3、流式调用

在前文中我已经提到, CompletionStage 的 40 个接口是为函数式编程做准备的。

在这里,就让我们看一下,如何使用这些接口进行函数式的流式 API 调用。

package com.shockang.study.java.concurrent.completable;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** 完成普通future的工作* <p>* 以下几个函数可以执行(创建)一个CompletableFuture任务* thenApply 转换* thenAccept 最后处理** @author Shockang*/
public class CFutureMain3 {public static Integer calc(Integer para) {try {// 模拟一个长时间的执行Thread.sleep(1000);} catch (InterruptedException e) {}return para * para;}public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50)).thenApply((i) -> Integer.toString(i)).thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);fu.get();}
}
"2500"

4、异常处理

如果 CompletableFuture 在执行过程中遇到异常,那么我们可以用函数式编程的风格来优雅地处理这些异常。

CompletableFuture 提供了一个异常处理方法 exceptionally() :

package com.shockang.study.java.concurrent.completable;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** 完成普通future的工作* <p>* exceptionally 异常处理 发生异常进行处理,如果没有异常,则它返回原有的结果** @author Shockang*/
public class CFutureMain4 {public static Integer calc(Integer para) {return para / 0;}public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50)).exceptionally(ex -> {System.out.println(ex.toString());return 0;}).thenApply((i) -> Integer.toString(i)).thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);fu.get();}}
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
"0"

5、组合多个 CompletableFuture

CompletableFuture 还允许你将多个 CompletableFuture 进行组合。

thenCompose()

一种方法是使用 thenCompose() 方法,它的签名如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

一个 CompletableFuture 可以在执行完成后,将执行结果通过 Function 接口传递给下个 CompletionStage 实例进行处理。

Function 接口返回新的 CompletionStage 实例。

package com.shockang.study.java.concurrent.completable;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** 完成普通future的工作* <p>* thenCompose** @author Shockang*/
public class CFutureMain5 {public static Integer calc(Integer para) {return para / 2;}public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Void> fu =CompletableFuture.supplyAsync(() -> calc(50)).thenCompose((i) -> CompletableFuture.supplyAsync(() -> calc(i))).thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);fu.get();}}
"12"

thenCombine()

另外一种组合多个 CompletableFuture 的方法是 thenCombine() 方法,它的签名如下:

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)

方法 thenCombine() 首先完成当前 CompletableFuture 和 other 的执行。
接着,将这两者的执行结果传递给 BiFunction (该接口接收两个参数,并有一个返回值),并返回代表 BiFunction 实例的 CompletableFuture 对象。

package com.shockang.study.java.concurrent.completable;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** 完成普通future的工作* <p>* thenCombine 合并结果** @author Shockang*/
public class CFutureMain6 {public static Integer calc(Integer para) {return para / 2;}public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50));CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc(25));CompletableFuture<Void> fu = intFuture.thenCombine(intFuture2, (i, j) -> (i + j)).thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);fu.get();}}
"37"

6、支持 timeout(JDK9)

在 JDK9 以后 CompletableFuture 增加了 timeout功能。

如果一个任务在给定时间内没有完成,则直接抛出异常。

package com.shockang.study.java.concurrent.completable;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CFutureMain7 {public static Integer calc(Integer para) {return para / 2;}public static void main(String[] args) {CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {}return calc(50);}).orTimeout(1, TimeUnit.SECONDS).exceptionally(e -> {System.err.println(e);return 0;}).thenAccept(System.out::println);try {Thread.sleep(2000);} catch (InterruptedException e) {}}
}
java.util.concurrent.TimeoutException
0

这篇关于一篇文章学会使用 CompletableFuture(JDK9)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/469271

相关文章

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

使用Python和Pyecharts创建交互式地图

《使用Python和Pyecharts创建交互式地图》在数据可视化领域,创建交互式地图是一种强大的方式,可以使受众能够以引人入胜且信息丰富的方式探索地理数据,下面我们看看如何使用Python和Pyec... 目录简介Pyecharts 简介创建上海地图代码说明运行结果总结简介在数据可视化领域,创建交互式地

Java Stream流使用案例深入详解

《JavaStream流使用案例深入详解》:本文主要介绍JavaStream流使用案例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录前言1. Lambda1.1 语法1.2 没参数只有一条语句或者多条语句1.3 一个参数只有一条语句或者多

Java Spring 中 @PostConstruct 注解使用原理及常见场景

《JavaSpring中@PostConstruct注解使用原理及常见场景》在JavaSpring中,@PostConstruct注解是一个非常实用的功能,它允许开发者在Spring容器完全初... 目录一、@PostConstruct 注解概述二、@PostConstruct 注解的基本使用2.1 基本代

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删

使用Python实现矢量路径的压缩、解压与可视化

《使用Python实现矢量路径的压缩、解压与可视化》在图形设计和Web开发中,矢量路径数据的高效存储与传输至关重要,本文将通过一个Python示例,展示如何将复杂的矢量路径命令序列压缩为JSON格式,... 目录引言核心功能概述1. 路径命令解析2. 路径数据压缩3. 路径数据解压4. 可视化代码实现详解1