小议CompletableFuture 链式执行和响应式编程

2024-01-12 07:12

本文主要是介绍小议CompletableFuture 链式执行和响应式编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相关文章:

  • 用最简单的方式理解同步和异步、阻塞与非阻塞
  • CompletableFuture 实战

背景

昨晚和一个朋友讨论了他在开发过程中遇到的一个场景设计问题。这个场景可以简化为:服务接收到一个需要处理的任务请求,然后立即返回。这个任务需要经过四个处理器的处理,每个处理器的处理都依赖于前一个处理器的处理结果。

这个场景与我最近处理 GitLab WebHook 的工作流程非常相似。例如,我从 GitLab 接收到一个事件后,需要对数据进行清洗、解析、调用 AI 大模型和发布评论等操作,每个操作都依赖于前一个操作的结果。

对于这种场景的设计,我目前采用的是基于链式设计的方法。我定义了一个抽象的处理器类和链式工厂:

/*** @author dongguabai* @date 2024-01-09 13:52*/
public abstract class MergeRequestHandler {private MergeRequestHandler next;public MergeRequestHandler(MergeRequestHandler next) {this.next = next;}public void handle(Task task) {if (run(task) && next != null) {next.handle(task);}}public abstract boolean run(Task task);
}
/*** @author dongguabai* @date 2024-01-09 15:53*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MergeRequestHandlerFactory {public static MergeRequestHandler createHandlerChain() {return new ActionHandler(new MergeRequestRetrievalHandler(new CommentHandler(new FinalHandler())));}
}

在链式工厂里面指定了执行顺序。

当然,也可以参考 ZooKeeper 的实现:org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors

@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false);commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();firstProcessor = new PrepRequestProcessor(this, proposalProcessor);((PrepRequestProcessor)firstProcessor).start();}

然后,朋友提出要将其改为异步处理。实际上,异步处理也是可以很简单实现的,只需在执行 HandlerChain 时将其放入线程池中即可。但在这里,我完全可以使用 CompletableFuture 来实现,这也是我当前代码改造的主要方向。

关于CompletableFuture的实现,我设计了两套方案。现在来看一下最终实现的方案。

抽象 Processor

@Log4j
abstract class Processor {public CompletableFuture<Task> process(Task task, Executor executor) {return CompletableFuture.supplyAsync(() -> doProcess(task), executor);}protected abstract Task doProcess(Task task);
}

实现类:

@Log4j
class Processor1 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor1 is processing");ProcessorChain.sleep(1);log.info("Processor1 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}
@Log4j
class Processor2 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor2 is processing");ProcessorChain.sleep(2);log.info("Processor2 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}

核心的处理链:

@Log4j
class ProcessorChain {private final List<Processor> processors;public ProcessorChain(List<Processor> processors) {this.processors = processors;}public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}public static void sleep(Integer i){try {TimeUnit.SECONDS.sleep(i);} catch (InterruptedException e) {e.printStackTrace();}}
}

测试代码:

    public static void main(String[] args) {Executor executor = Executors.newCachedThreadPool();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> exceptionally = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}

输出:

22:38:35.276 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:38:35.276 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:38:36.285 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:38:36.285 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 is processing
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 end
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 is processing
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 end
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 is processing
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 end
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@6dfb9646

效果挺符合诉求的,但这里也有几个疑问。

ProcessorChain#process中的执行顺序问题

先看 ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}

朋友原话是:“他期望的执行顺序是 process1process2process3。但由于 thenComposeAsync是异步执行的,那么在循环到 process2 时,它会异步执行。此时,循环可能已经到了 process3,并开始执行 thenComposeAsync。这样,process3process2可能会并行执行,这跟期望结果不一致”。

其实这里是不会的,thenComposeAsync 可以翻译为 “然后异步组合”。

这里的 “组合” 指的是将当前的 CompletableFuture 与另一个 CompletableFuture 进行组合。“异步” 指的是这里的 Function 操作会在一个单独的线程中执行,不会阻塞当前的线程。

我这里的 Processor#process 函数返回的是 CompletableFuture,所以这里 thenComposeAsync 的流程是:等待当前的 CompletableFuture 完成后,异步执行 FunctionFunction 会返回一个新的 CompletableFuture,然后 thenComposeAsync 会返回这个 CompletableFuture

这里要注意 thenApplyAsyncthenComposeAsync 不要混淆了。

ProcessorChain#process中的阻塞与非阻塞、同步和异步问题

上文已经说明了 ProcessorChain#process 中的执行顺序问题。接下来看一下这段代码中的阻塞与非阻塞、同步和异步问题。

为了查看方便,我这里再贴一下测试代码和 ProcessorChain#proccess 的实现。

测试代码:

    public static void main(String[] args) {Executor executor = Executors.newSingleThreadExecutor();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> c = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}

ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}

执行流程如下:

  1. main 线程开始执行,并调用 ProcessorChainprocess 方法。
  2. ProcessorChainprocess 方法中,main 线程创建了一个已经完成的 CompletableFuture,并开始遍历 Processor 对象。
  3. main 线程调用 thenComposeAsync 方法,此时 main 线程将 Processor1process 方法提交给 Executor,并立即返回一个新的 CompletableFuturemain 线程不会等待 Processor1process 方法完成,因此 main 线程是非阻塞的。
  4. Executor 的一个线程(称之为 Thread_1)开始执行 Processor1process 方法。由于 thenComposeAsync 的链式调用,Processor2process 方法会等待 Processor1process 方法完成后才开始执行,以此类推。
  5. main 线程继续遍历 Processor 对象,并对每个 Processor 重复步骤 3 和 4。每次调用 thenComposeAsync 方法时,main 线程都会立即返回一个新的 CompletableFuture,并将 Processorprocess 方法提交给 Executor。这意味着 main 线程是非阻塞的,而 Executor 的线程会按照 Processor 的顺序执行 process 方法。
  6. 当所有的 Processorprocess 方法都完成后,其中一个 Executor 的线程会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

其实这里的 CompletableFuture<Task> future = CompletableFuture.completedFuture(task); 只是作为一个“CompletableFuture 的启动器”(这么一说是不是就更好理解了)。

响应式编程

其实上面的内容让我想起了响应式编程,引入维基百科的一个说明:

例如,在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。

其实这里的 CompletableFuture 实际上是响应式编程的一个简单例子,因为它表示一个异步计算的结果,我们可以在它上面“提前”注册回调函数(我觉得这个“提前”是精髓),这些回调函数会在 CompletableFutureFunction 完成时被调用。而我这里就是提前准备好了一个结果,即 CompletableFuture,然后不断的循环在其上面增加回调。

可以增加日志看一下:

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {log.info(future);future = future.thenComposeAsync(t -> processor.process(t, executor), executor);log.info(future);// future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}

再次执行测试代码:

22:56:42.512 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@3fa77460[Completed normally]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5fdef03a[Not completed]
22:56:42.614 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:56:42.615 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 is processing
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 end
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 is processing
22:56:48.630 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 end
22:56:48.631 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 is processing
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 end
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@58d4f75a

重新梳理执行流程:

  1. 最开始的启动器是 3fa77460,这是一个已经完成的 CompletableFuture 对象,用于启动异步操作链。

  2. 第一次循环:3fa77460 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 e2d56bf。此时,Processor1process 方法正在异步执行。

  3. 第二次循环:e2d56bf 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 244038d0。此时,Processor2process 方法正在等待 Processor1process 方法完成。

  4. 第三次循环:244038d0 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5680a178。此时,Processor3process 方法正在等待 Processor2process 方法完成。

  5. 第四次循环:5680a178 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5fdef03a。此时,Processor4process 方法正在等待 Processor3process 方法完成。

  6. 所有循环结束后,返回 5fdef03a。当所有的 Processorprocess 方法都完成后,5fdef03a 会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

这个过程中,每个 CompletableFuture 对象都会等待前一个 CompletableFuture 对象完成,然后开始执行自己的异步操作。这就形成了一个 CompletableFuture 链,每个 CompletableFuture 对象都会按照 Processor 的顺序执行 process 方法。

CompletableFuture链

当调用thenComposeAsync方法时,会创建一个新的CompletableFuture对象:

    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}
    private <V> CompletableFuture<V> uniComposeStage(Executor e, Function<? super T, ? extends CompletionStage<V>> f) {if (f == null) throw new NullPointerException();Object r; Throwable x;if (e == null && (r = result) != null) {// try to return function result directlyif (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {return new CompletableFuture<V>(encodeThrowable(x, r));}r = null;}try {@SuppressWarnings("unchecked") T t = (T) r;CompletableFuture<V> g = f.apply(t).toCompletableFuture();Object s = g.result;if (s != null)return new CompletableFuture<V>(encodeRelay(s));CompletableFuture<V> d = new CompletableFuture<V>();UniRelay<V> copy = new UniRelay<V>(d, g);g.push(copy);copy.tryFire(SYNC);return d;} catch (Throwable ex) {return new CompletableFuture<V>(encodeThrowable(ex));}}CompletableFuture<V> d = new CompletableFuture<V>();//构建 UniComposeUniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);push(c);c.tryFire(SYNC);return d;}

构造的 UniCompose对象持有前一个CompletableFuture对象的引用,也就是 UniComposesrc 字段。所以即使循环中出现 CompletableFuture 被覆盖,也不用担心被 GC 的问题。

总结

本文以一个实际的场景设计问题为出发点,详细探讨了 CompletableFuture 的链式执行机制,对执行流程和线程切换进行了深入的分析,并对响应式编程的概念和应用进行了简单的讨论。

References

  • https://zh.wikipedia.org/zh-cn/%E5%93%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B

欢迎关注公众号:
在这里插入图片描述

这篇关于小议CompletableFuture 链式执行和响应式编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/597157

相关文章

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

MySQL中SQL的执行顺序详解

《MySQL中SQL的执行顺序详解》:本文主要介绍MySQL中SQL的执行顺序,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql中SQL的执行顺序SQL执行顺序MySQL的执行顺序SELECT语句定义SELECT语句执行顺序总结MySQL中SQL的执行顺序

Python 异步编程 asyncio简介及基本用法

《Python异步编程asyncio简介及基本用法》asyncio是Python的一个库,用于编写并发代码,使用协程、任务和Futures来处理I/O密集型和高延迟操作,本文给大家介绍Python... 目录1、asyncio是什么IO密集型任务特征2、怎么用1、基本用法2、关键字 async1、async

SQLyog中DELIMITER执行存储过程时出现前置缩进问题的解决方法

《SQLyog中DELIMITER执行存储过程时出现前置缩进问题的解决方法》在SQLyog中执行存储过程时出现的前置缩进问题,实际上反映了SQLyog对SQL语句解析的一个特殊行为,本文给大家介绍了详... 目录问题根源正确写法示例永久解决方案为什么命令行不受影响?最佳实践建议问题根源SQLyog的语句分

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

html5的响应式布局的方法示例详解

《html5的响应式布局的方法示例详解》:本文主要介绍了HTML5中使用媒体查询和Flexbox进行响应式布局的方法,简要介绍了CSSGrid布局的基础知识和如何实现自动换行的网格布局,详细内容请阅读本文,希望能对你有所帮助... 一 使用媒体查询响应式布局        使用的参数@media这是常用的

shell编程之函数与数组的使用详解

《shell编程之函数与数组的使用详解》:本文主要介绍shell编程之函数与数组的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录shell函数函数的用法俩个数求和系统资源监控并报警函数函数变量的作用范围函数的参数递归函数shell数组获取数组的长度读取某下的

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.