Java源码分析----Future

2024-08-30 09:58
文章标签 java 分析 源码 future

本文主要是介绍Java源码分析----Future,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一般使用多线程操作的时候会使用Thread+Runnable进行处理,但是这种方式中,Runnable是没有返回值的,假设我们需要获取Runnable的返回值,可能需要如下特殊处理,伪代码如下

String returnValue1 = "";
String returnValue2 = "";
CountDownLatch cdl = ....
new Thread(()->{// xxxx操作returnValue1 = "返回值";cdl.countDown
})new Thread(()->{// xxxx操作returnValue2 = "返回值";cdl.countDown
});
cdl wait// 程序阻塞在这print returnValue1
print returnValue2

当Runnable运行完并且赋值完毕则通知CDL,最后主线程在wait处等待两个线程执行完毕,然后获取Runnable的返回值。
这样的做法比较麻烦,而JDK提供了一个叫做Future的东西,他实现了上述的功能,且使用上更加的简便,看下例子

ExecutorService executorService = Executors.newFixedThreadPool(1);Future<String> future = executorService.submit(() -> {try {Thread.sleep(100000);} catch (InterruptedException e) {e.printStackTrace();}return "xxxx";});try {String value = future.get();System.out.println(value);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

操作上更加的简便,调用Future的get方法的时候,就类似cdl wait+获取returnValue1

submit方法

那么下面看下其中是如何实现的,先看下submit方法,实现在java.util.concurrent.AbstractExecutorService中

    public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();// 将Callable转换成RunnableFuture,它继承了Runnable和FutureRunnableFuture<T> ftask = newTaskFor(task);// 线程池的execute方法,参数类型为Runnableexecute(ftask);return ftask;}

从submit方法可以看出,submit也是执行的execute方法,虽然参数不一样,但是其中Callable转换成Runnable,即RunnableFuture的实现,并将其返回,也就是上述例子中的Future。

这里和第一个例子做类比,这里返回的Future共包含几个功能,简化了使用

  • 赋值->returnValue1=“xxx”
  • 阻塞->cdl.wait
  • 取值

newTaskFor

newTaskFor方法如下:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public FutureTask(Runnable runnable, V result) {// runnable封装成callablethis.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable}

返回一个FutureTask,即RunnableFuture的实现类,也是一个Runnable的子类,当调用execute方法的时候,就会执行FutureTask的run方法,这里先不看run方法实现,先看下FutureTask的内部属性

     /*** NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/// 当前Future的状态值,加了volatile修改,则代表一个线程改变后,马上对另外线程可见private volatile int state;private static final int NEW          = 0;// 初始状态private static final int COMPLETING   = 1;// 执行完毕,还未进行赋值private static final int NORMAL       = 2;// 最终完成状态private static final int EXCEPTIONAL  = 3;// 出现异常private static final int CANCELLED    = 4;// 取消状态private static final int INTERRUPTING = 5;// 正被中断private static final int INTERRUPTED  = 6;// 被中断private Callable<V> callable;private Object outcome; // 返回值,不一定是Callable的返回值,出现异常的时候放的是异常对象private volatile Thread runner;// 执行run方法的线程private volatile WaitNode waiters;//阻塞等待的节点

run方法

这时候再看下run方法

    public void run() {// 状态不为初始化值,证明已经执行过,直接返回// 状态为NEW,则将runner设置为当前线程,如果失败,证明别的线程已经在操作,直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// 状态为初始化状态才执行,因为有可能被中断或者调用cancel取消if (c != null && state == NEW) {V result;boolean ran;try {// 执行用户定义的call方法,并取的返回值result = c.call();// 执行成功ran = true;} catch (Throwable ex) {// 有异常出现,返回值置空,设置成执行失败result = null;ran = false;// 设置状态,和设置返回值为异常对象setException(ex);}if (ran)// 执行成功,设置返回值set(result);}} finally {// runner在状态值改变之后才能设置为空,否则可能出现多个线程执行run的情况runner = null;// 中断处理int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

setException和set方法

其中主要看setException和set方法

    protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}

调用了set或者setException方法,状态会变成COMPLETING,执行完成后,会将状态设置为NORMAL或者EXCEPTIONAL,而成功时outcome是正常返回值,失败则是Throwable对象。

finishCompletion方法

最后,调用finishCompletion将阻塞的线程唤醒,遍历waiters,调用unpark唤醒线程,处理和AQS有相似之处

 private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;}

get方法

get方法有两种形式,一种是程序会一直等待结果返回,而另外一种是有等待时间的,当时间到了之后,还未返回,则会抛出异常

    public V get() throws InterruptedException, ExecutionException {int s = state;// 还未完成,则进入awaitDone,阻塞线程if (s <= COMPLETING)s = awaitDone(false, 0L);// 完成之后调用方法获取返回值return report(s);}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;// 还未完成,则进入awaitDone,阻塞线程// 时间到了之后会返回状态值s,如果此时还未完成,那么抛出异常if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}

可以看到两种方法实现是类似的,但是有时间参数的方法,在指定时间内无法获得返回值的话是会抛出异常的,这点在使用的时候需要注意
awaitDone方法会在状态为未完成状态下阻塞线程,当完成或者指定时间到达的时候返回当前状态,此时有两种情况

  • 如果没有时间参数:返回完成状态(>COMPLETING的)、
  • 如果有时间参数:返回当时的状态,有可能是NEW,COMPLETING或者其他

接下来看下awaitDone方法实现

    private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 是否有指定时间,如果没有则为0// 如果有,则deadLine为当前时间+超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;// 表示当前线程节点boolean queued = false;// 是否在排队for (;;) {if (Thread.interrupted()) {// 当前线程被中断removeWaiter(q);// 从链表中移除当前线程节点throw new InterruptedException();}int s = state;if (s > COMPLETING) {// 正常完成状态,直接返回if (q != null)q.thread = null;return s;}// 还在执行当中,但是从上可以知道,这种状态很快变化为最终状态// 所以这里使用yield而不是阻塞可能就是这个原因吧else if (s == COMPLETING) Thread.yield();else if (q == null)// 状态为NEW且为第一次循环,则代表还未结束处理,则先构造一个线程节点q = new WaitNode();else if (!queued)// 在上一个分支判断后,进入下一次循环,如果还是NEW的状态// 则将线程节点挂在链表头部queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {// 如果有时间参数,则阻塞特定时间nanos = deadline - System.nanoTime();if (nanos <= 0L) {// 减少得到deadLine已经到了,则将该节点移除,并返回当前状态removeWaiter(q);return state;}// 阻塞特定时间LockSupport.parkNanos(this, nanos);}else// 没有时间参数的话则一直阻塞知道整个任务完成LockSupport.park(this);}}

这里逻辑不太难懂,总结一下分支:

  • 线程如果被中断,则从连接移除当前线程并返回异常
  • 如果为正常完成状态,则直接返回状态值
  • 如果为COMPLETING状态则让出CPU,重新进入循环重新判断
  • 如果为NEW状态且第一次循环则创建线程节点
  • 如果为NEW状态且非第一次循环,则将线程节点挂在链表头部
  • 如果为NEW状态且已经挂在了链表头部但是有时间参数则阻塞特定时间
  • 如果为NEW状态且已经挂在了链表头部但是没有时间参数则一直阻塞直到任务完成

从上可以知道,一个线程调用get方法后,最多会执行4次循环:

  1. 创建线程节点
  2. 节点加入链表
  3. 阻塞
  4. 任务完成唤醒后再进行一次判断并返回

可以看到这里的循环次数还是比较多的,相当于先来几次自旋再阻塞。

cancel方法

Future还提供了取消任务的入口,即cancel方法,内部将对应的线程进行中断,使正在执行的线程退出

    public boolean cancel(boolean mayInterruptIfRunning) {// 状态为NEW // 如果mayInterruptIfRunning为true,状态设置成INTERRUPTING状态// 否则设置成CANCELLED// 如果状态不为NEW或者说状态设置失败了,则返回falseif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // 如果设置为true,那么取出当前正在执行的线程,并将其中断// 最后设置中断完成状态if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 唤醒阻塞线程finishCompletion();}return true;}
  • mayInterruptIfRunning为true:状态设置为INTERRUPTING,设置运行run方法的线程的中断标志
  • mayInterruptIfRunning为false:状态设置为CANCELLED

综合run方法的逻辑,cancel方法不能取消非NEW状态的任务,且只是设置了一个中断位的标志,如果run方法已经执行到判断状态位后的代码准备运行或者已经运行了,那么cancel还是无法终止任务的执行

题外话

Dubbo实现了自己的Future,整体的交互过程其实是类似的,但是逻辑会比JDK自带的Future会简单一点,因为其中没有多个线程对Future进行get的操作,所以从get的性能上讲,Dubbo的会快一点

这篇关于Java源码分析----Future的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120501

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Java中的.close()举例详解

《Java中的.close()举例详解》.close()方法只适用于通过window.open()打开的弹出窗口,对于浏览器的主窗口,如果没有得到用户允许是不能关闭的,:本文主要介绍Java中的.... 目录当你遇到以下三种情况时,一定要记得使用 .close():用法作用举例如何判断代码中的 input