【Java】guava(二) ListenableFuture 使用及原理

2024-05-24 21:08

本文主要是介绍【Java】guava(二) ListenableFuture 使用及原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用异步编程接口获取返回值的方式有两种:

1.同步方式,也就是调用方主动获取,但是这时可能还没有返回结果,可能需要轮询;

2.回调方式,调用者在提交任务时,注册一个回调函数,任务执行完以后,自动触发回调函数通知调用者;这种实现方式需要在执行框架里植入一个扩展点,用于触发回调。

Java原生api里的Future属于第一种,Java8提供的CompletableFuture属于第二种;在Java8出来之前,guava也提供了基于回调的编程接口,也就是本次要说的ListenableFuture(其实看guava代码,里面有大量这玩意儿,不搞懂不行。。。)。

先看下ListenableFuture接口定义:

public interface ListenableFuture<V> extends Future<V> {void addListener(Runnable listener, Executor executor);
}

可以看到,这个接口在Future接口的基础上增加了addListener方法,允许我们注册回调函数。当然,我们在编程时可能不会直接使用这个接口,因为这个接口只能传Runnable实例。Futures类提供了另一个方法:addCallback方法。下面我们看一个实例:

@Test
public void test55() throws InterruptedException {ListenableFuture<String> s = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)).submit(() -> {Thread.sleep(2000L);return "async result";});Futures.addCallback(s, new FutureCallback<String>() {@Overridepublic void onSuccess(@Nullable String result) {System.out.println("succeed, result: {}" + result);}@Overridepublic void onFailure(Throwable t) {System.out.println("failed, t: " + t);}}, Executors.newSingleThreadExecutor());Thread.sleep(100000);
}

首先看下addCallback方法干了啥?

public static <V> void addCallback(final ListenableFuture<V> future,final FutureCallback<? super V> callback,Executor executor) {
Preconditions.checkNotNull(callback);
future.addListener(new CallbackListener<V>(future, callback), executor);
}

这里调用了ListenableFuture接口的addListener方法,传入了一个CallbackListener实例。而这个实例由需要传入future和一个Callback实例,所以这个回调是可以拿到返回值的。本质上是guava帮我们基于Runnable封了一个回调接口。看下这个CallbackListener接口:

private static final class CallbackListener<V> implements Runnable {final Future<V> future;final FutureCallback<? super V> callback;CallbackListener(Future<V> future, FutureCallback<? super V> callback) {this.future = future;this.callback = callback;}@Overridepublic void run() {if (future instanceof InternalFutureFailureAccess) {Throwable failure =InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future);if (failure != null) {callback.onFailure(failure);return;}}final V value;try {value = getDone(future);} catch (ExecutionException e) {callback.onFailure(e.getCause());return;} catch (RuntimeException | Error e) {callback.onFailure(e);return;}callback.onSuccess(value);}
}

这个类内部有一个future和一个FutureCallback实例,其run方法就是回调时的逻辑,先调用getDone方法获取future的返回值。然后再将返回值调用FutureCallback实例的onSuccess方法执行注册的回调逻辑。当然,如果发生了异常,则会调用onFailure方法通知异常。

好的,至此我们已经了解了用户注册的回调函数是怎么执行的了,那么还有一个重要问题,这个回调是怎么触发的?

在开始的时候大致提了一下,回调的实现一般都是在执行框架层植入一个扩展点,触发回调逻辑,这里也不意外。我们从执行的执行框架入手,开始的时候我们调用MoreExecutors构造了一个线程池:

@GwtIncompatible // TODO
public static ListeningExecutorService listeningDecorator(ExecutorService delegate) {
return (delegate instanceof ListeningExecutorService)? (ListeningExecutorService) delegate: (delegate instanceof ScheduledExecutorService)? new ScheduledListeningDecorator((ScheduledExecutorService) delegate): new ListeningDecorator(delegate);
}

对于我们之前的例子,会返回一个ListeningDecorator类型的线程池,从方法命名也可以看出,这个本质上就是对于Java原生线程池的一个封装,用于返回ListenableFuture类型的Future:

  private static class ListeningDecorator extends AbstractListeningExecutorService {private final ExecutorService delegate;ListeningDecorator(ExecutorService delegate) {this.delegate = checkNotNull(delegate);}@Overridepublic final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return delegate.awaitTermination(timeout, unit);}@Overridepublic final boolean isShutdown() {return delegate.isShutdown();}@Overridepublic final boolean isTerminated() {return delegate.isTerminated();}@Overridepublic final void shutdown() {delegate.shutdown();}@Overridepublic final List<Runnable> shutdownNow() {return delegate.shutdownNow();}@Overridepublic final void execute(Runnable command) {delegate.execute(command);}}
}

这个家伙儿啥也没干,就是将执行逻辑委托给了delegate。当然,线程池执行不仅仅是这些方法,比如最开始的submit方法,其实是在其父类AbstractListeningExecutorService中的:

  @Overridepublic <T> ListenableFuture<T> submit(Callable<T> task) {return (ListenableFuture<T>) super.submit(task);}

然后又调用了AbstractListeningExecutorService的父类即Java中原生的AbstractExecutorService的submit方法,进入了原生Java的逻辑。之后会调用newTask创建任务:

public <T> Future<T> submit(Callable<T> task) {if (task == null) {throw new NullPointerException();} else {RunnableFuture<T> ftask = this.newTaskFor(task);this.execute(ftask);return ftask;}
}

guava的AbstractListeningExecutorService覆盖了newTaskFor方法,这样才能返回ListenableFuture呀:

  @Overrideprotected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return TrustedListenableFutureTask.create(callable);}

所以,guava里的ListenableFuture的一个实现类是这里的TrustedListenableFutureTask,这个我们不做深入,直接看其run方法吧,也是在父类里定义的,这个方法很长,截取一段关键逻辑:

try {if (run) {result = runInterruptibly();}
} catch (Throwable t) {error = t;
} finally {}if (run) {afterRanInterruptibly(result, error);}
}

先调用runInterruptibly方法执行任务内容,然后如果执行成功就调用afterxxx方法执行一个后置的逻辑,这个其实就是我们所说的“植入点”,主动调用回调的入口就是这个方法:

@Override
void afterRanInterruptibly(V result, Throwable error) {if (error == null) {TrustedListenableFutureTask.this.set(result);} else {setException(error);}
}

如果有异常,设置Exception,否则设置返回值。我们只看无异常的case:

@CanIgnoreReturnValue
protected boolean set(@Nullable V value) {
Object valueToSet = value == null ? NULL : value;
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {complete(this);return true;
}
return false;
}

这里在任务里设置完返回值后,就调用了complete方法,只截取关键逻辑:

/** Unblocks all threads and runs all listeners. */
private static void complete(AbstractFuture<?> future) {
Listener next = null;
outer:
while (true) {future.afterDone();next = future.clearListeners(next);future = null;while (next != null) {Listener curr = next;next = next.next;Runnable task = curr.task;if (task instanceof SetFuture) {} else {executeListener(task, curr.executor);}}break;
}

这里的Listener就是最开始添加到Future里的回调函数,是一个链表结构。这个方法会遍历回调链表,逐一调用executeListener方法触发回调逻辑。

至此ListenableFuture的回调逻辑基本清楚了。

 

小结:

1.优先使用Futures工具类添加回调;

2.回调的实现,在执行框架内植入触发逻辑;

这篇关于【Java】guava(二) ListenableFuture 使用及原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/999535

相关文章

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java进程异常故障定位及排查过程

《Java进程异常故障定位及排查过程》:本文主要介绍Java进程异常故障定位及排查过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、故障发现与初步判断1. 监控系统告警2. 日志初步分析二、核心排查工具与步骤1. 进程状态检查2. CPU 飙升问题3. 内存

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

java中新生代和老生代的关系说明

《java中新生代和老生代的关系说明》:本文主要介绍java中新生代和老生代的关系说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、内存区域划分新生代老年代二、对象生命周期与晋升流程三、新生代与老年代的协作机制1. 跨代引用处理2. 动态年龄判定3. 空间分

Java设计模式---迭代器模式(Iterator)解读

《Java设计模式---迭代器模式(Iterator)解读》:本文主要介绍Java设计模式---迭代器模式(Iterator),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录1、迭代器(Iterator)1.1、结构1.2、常用方法1.3、本质1、解耦集合与遍历逻辑2、统一

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析

Java 线程安全与 volatile与单例模式问题及解决方案

《Java线程安全与volatile与单例模式问题及解决方案》文章主要讲解线程安全问题的五个成因(调度随机、变量修改、非原子操作、内存可见性、指令重排序)及解决方案,强调使用volatile关键字... 目录什么是线程安全线程安全问题的产生与解决方案线程的调度是随机的多个线程对同一个变量进行修改线程的修改操

从原理到实战深入理解Java 断言assert

《从原理到实战深入理解Java断言assert》本文深入解析Java断言机制,涵盖语法、工作原理、启用方式及与异常的区别,推荐用于开发阶段的条件检查与状态验证,并强调生产环境应使用参数验证工具类替代... 目录深入理解 Java 断言(assert):从原理到实战引言:为什么需要断言?一、断言基础1.1 语