本文主要是介绍小议CompletableFuture 链式执行和响应式编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
相关文章:
- 用最简单的方式理解同步和异步、阻塞与非阻塞
- CompletableFuture 实战
背景
昨晚和一个朋友讨论了他在开发过程中遇到的一个场景设计问题。这个场景可以简化为:服务接收到一个需要处理的任务请求,然后立即返回。这个任务需要经过四个处理器的处理,每个处理器的处理都依赖于前一个处理器的处理结果。
这个场景与我最近处理 GitLab WebHook 的工作流程非常相似。例如,我从 GitLab 接收到一个事件后,需要对数据进行清洗、解析、调用 AI 大模型和发布评论等操作,每个操作都依赖于前一个操作的结果。
对于这种场景的设计,我目前采用的是基于链式设计的方法。我定义了一个抽象的处理器类和链式工厂:
/*** @author dongguabai* @date 2024-01-09 13:52*/
public abstract class MergeRequestHandler {private MergeRequestHandler next;public MergeRequestHandler(MergeRequestHandler next) {this.next = next;}public void handle(Task task) {if (run(task) && next != null) {next.handle(task);}}public abstract boolean run(Task task);
}
/*** @author dongguabai* @date 2024-01-09 15:53*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MergeRequestHandlerFactory {public static MergeRequestHandler createHandlerChain() {return new ActionHandler(new MergeRequestRetrievalHandler(new CommentHandler(new FinalHandler())));}
}
在链式工厂里面指定了执行顺序。
当然,也可以参考 ZooKeeper 的实现:org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors:
@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false);commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();firstProcessor = new PrepRequestProcessor(this, proposalProcessor);((PrepRequestProcessor)firstProcessor).start();}
然后,朋友提出要将其改为异步处理。实际上,异步处理也是可以很简单实现的,只需在执行 HandlerChain 时将其放入线程池中即可。但在这里,我完全可以使用 CompletableFuture 来实现,这也是我当前代码改造的主要方向。
关于CompletableFuture的实现,我设计了两套方案。现在来看一下最终实现的方案。
抽象 Processor:
@Log4j
abstract class Processor {public CompletableFuture<Task> process(Task task, Executor executor) {return CompletableFuture.supplyAsync(() -> doProcess(task), executor);}protected abstract Task doProcess(Task task);
}
实现类:
@Log4j
class Processor1 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor1 is processing");ProcessorChain.sleep(1);log.info("Processor1 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}
@Log4j
class Processor2 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor2 is processing");ProcessorChain.sleep(2);log.info("Processor2 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}
核心的处理链:
@Log4j
class ProcessorChain {private final List<Processor> processors;public ProcessorChain(List<Processor> processors) {this.processors = processors;}public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}public static void sleep(Integer i){try {TimeUnit.SECONDS.sleep(i);} catch (InterruptedException e) {e.printStackTrace();}}
}
测试代码:
public static void main(String[] args) {Executor executor = Executors.newCachedThreadPool();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> exceptionally = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}
输出:
22:38:35.276 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:38:35.276 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:38:36.285 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:38:36.285 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 is processing
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 end
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 is processing
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 end
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 is processing
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 end
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@6dfb9646
效果挺符合诉求的,但这里也有几个疑问。
ProcessorChain#process中的执行顺序问题
先看 ProcessorChain#proccess:
public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}
朋友原话是:“他期望的执行顺序是 process1、process2、process3。但由于 thenComposeAsync是异步执行的,那么在循环到 process2 时,它会异步执行。此时,循环可能已经到了 process3,并开始执行 thenComposeAsync。这样,process3和process2可能会并行执行,这跟期望结果不一致”。
其实这里是不会的,thenComposeAsync 可以翻译为 “然后异步组合”。
这里的 “组合” 指的是将当前的 CompletableFuture 与另一个 CompletableFuture 进行组合。“异步” 指的是这里的 Function 操作会在一个单独的线程中执行,不会阻塞当前的线程。
我这里的 Processor#process 函数返回的是 CompletableFuture,所以这里 thenComposeAsync 的流程是:等待当前的 CompletableFuture 完成后,异步执行 Function, Function 会返回一个新的 CompletableFuture,然后 thenComposeAsync 会返回这个 CompletableFuture。
这里要注意 thenApplyAsync 和 thenComposeAsync 不要混淆了。
ProcessorChain#process中的阻塞与非阻塞、同步和异步问题
上文已经说明了 ProcessorChain#process 中的执行顺序问题。接下来看一下这段代码中的阻塞与非阻塞、同步和异步问题。
为了查看方便,我这里再贴一下测试代码和 ProcessorChain#proccess 的实现。
测试代码:
public static void main(String[] args) {Executor executor = Executors.newSingleThreadExecutor();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> c = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}
ProcessorChain#proccess:
public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}
执行流程如下:
main线程开始执行,并调用ProcessorChain的process方法。- 在
ProcessorChain的process方法中,main线程创建了一个已经完成的CompletableFuture,并开始遍历Processor对象。 main线程调用thenComposeAsync方法,此时main线程将Processor1的process方法提交给Executor,并立即返回一个新的CompletableFuture。main线程不会等待Processor1的process方法完成,因此main线程是非阻塞的。Executor的一个线程(称之为Thread_1)开始执行Processor1的process方法。由于thenComposeAsync的链式调用,Processor2的process方法会等待Processor1的process方法完成后才开始执行,以此类推。main线程继续遍历Processor对象,并对每个Processor重复步骤 3 和 4。每次调用thenComposeAsync方法时,main线程都会立即返回一个新的CompletableFuture,并将Processor的process方法提交给Executor。这意味着main线程是非阻塞的,而Executor的线程会按照Processor的顺序执行process方法。- 当所有的
Processor的process方法都完成后,其中一个Executor的线程会执行thenAccept方法,处理最后一个Processor的process方法返回的结果。
其实这里的 CompletableFuture<Task> future = CompletableFuture.completedFuture(task); 只是作为一个“CompletableFuture 的启动器”(这么一说是不是就更好理解了)。
响应式编程
其实上面的内容让我想起了响应式编程,引入维基百科的一个说明:
例如,在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。
其实这里的 CompletableFuture 实际上是响应式编程的一个简单例子,因为它表示一个异步计算的结果,我们可以在它上面“提前”注册回调函数(我觉得这个“提前”是精髓),这些回调函数会在 CompletableFuture 的 Function 完成时被调用。而我这里就是提前准备好了一个结果,即 CompletableFuture,然后不断的循环在其上面增加回调。
可以增加日志看一下:
public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {log.info(future);future = future.thenComposeAsync(t -> processor.process(t, executor), executor);log.info(future);// future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}
再次执行测试代码:
22:56:42.512 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@3fa77460[Completed normally]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5fdef03a[Not completed]
22:56:42.614 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:56:42.615 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 is processing
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 end
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 is processing
22:56:48.630 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 end
22:56:48.631 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 is processing
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 end
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@58d4f75a
重新梳理执行流程:
-
最开始的启动器是
3fa77460,这是一个已经完成的CompletableFuture对象,用于启动异步操作链。 -
第一次循环:
3fa77460执行thenComposeAsync方法,返回新的CompletableFuture对象e2d56bf。此时,Processor1的process方法正在异步执行。 -
第二次循环:
e2d56bf执行thenComposeAsync方法,返回新的CompletableFuture对象244038d0。此时,Processor2的process方法正在等待Processor1的process方法完成。 -
第三次循环:
244038d0执行thenComposeAsync方法,返回新的CompletableFuture对象5680a178。此时,Processor3的process方法正在等待Processor2的process方法完成。 -
第四次循环:
5680a178执行thenComposeAsync方法,返回新的CompletableFuture对象5fdef03a。此时,Processor4的process方法正在等待Processor3的process方法完成。 -
所有循环结束后,返回
5fdef03a。当所有的Processor的process方法都完成后,5fdef03a会执行thenAccept方法,处理最后一个Processor的process方法返回的结果。
这个过程中,每个 CompletableFuture 对象都会等待前一个 CompletableFuture 对象完成,然后开始执行自己的异步操作。这就形成了一个 CompletableFuture 链,每个 CompletableFuture 对象都会按照 Processor 的顺序执行 process 方法。
CompletableFuture链
当调用thenComposeAsync方法时,会创建一个新的CompletableFuture对象:
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}
private <V> CompletableFuture<V> uniComposeStage(Executor e, Function<? super T, ? extends CompletionStage<V>> f) {if (f == null) throw new NullPointerException();Object r; Throwable x;if (e == null && (r = result) != null) {// try to return function result directlyif (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {return new CompletableFuture<V>(encodeThrowable(x, r));}r = null;}try {@SuppressWarnings("unchecked") T t = (T) r;CompletableFuture<V> g = f.apply(t).toCompletableFuture();Object s = g.result;if (s != null)return new CompletableFuture<V>(encodeRelay(s));CompletableFuture<V> d = new CompletableFuture<V>();UniRelay<V> copy = new UniRelay<V>(d, g);g.push(copy);copy.tryFire(SYNC);return d;} catch (Throwable ex) {return new CompletableFuture<V>(encodeThrowable(ex));}}CompletableFuture<V> d = new CompletableFuture<V>();//构建 UniComposeUniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);push(c);c.tryFire(SYNC);return d;}
构造的 UniCompose对象持有前一个CompletableFuture对象的引用,也就是 UniCompose 的 src 字段。所以即使循环中出现 CompletableFuture 被覆盖,也不用担心被 GC 的问题。
总结
本文以一个实际的场景设计问题为出发点,详细探讨了 CompletableFuture 的链式执行机制,对执行流程和线程切换进行了深入的分析,并对响应式编程的概念和应用进行了简单的讨论。
References
- https://zh.wikipedia.org/zh-cn/%E5%93%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B
欢迎关注公众号:

这篇关于小议CompletableFuture 链式执行和响应式编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!