AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析

本文主要是介绍AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析

1、CountDownLatch 简介

CountDownLatch,是一个简单的同步器,它的含义是 允许一个或者多个线程等待其他线程的操作执行完毕后再执行后续的操作

CountDownLatch 的通常用法和 Thread.join() 有点类似,等待其他线程都完成后再执行主任务。

2、入门案例分析

案例1

  • 对于像我一样的学生来说,CountDownLatch 的实际开发应用很少,甚至有同学没有接触过它。但是在并发条件下,这个类的使用还是很常见的,所以先引入两个案例去了解下它的用途:
  • 借助 CountDownLatch ,控制主线程等待子线程完成再执行
/*** @author wcc* @date 2022/2/15 19:09*/
public class CountDownLatchTest01 {private static final int TASK_COUNT = 8;private static final int THREAD_CORE_SIZE = 10;public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(TASK_COUNT);Executor executor = Executors.newFixedThreadPool(10);for (int i = 0; i < TASK_COUNT; i++) {executor.execute(new WorkerRunnable(i, countDownLatch));}System.out.println("主线程正在等待所有子任务完成...");long mainWaitStartTimeMillis = System.currentTimeMillis();countDownLatch.await();long mainWaitEndTimeMillis = System.currentTimeMillis();System.out.println("主线程等待时长:"+ (mainWaitEndTimeMillis - mainWaitStartTimeMillis));}static class WorkerRunnable implements Runnable{private int taskId;private CountDownLatch latch;@Overridepublic void run() {doWorker();}public void doWorker(){System.out.println("任务ID:"+ taskId + ",任务正在进行中...");try {TimeUnit.MILLISECONDS.sleep(500);}catch (Exception e){e.printStackTrace();}finally {latch.countDown();}}public WorkerRunnable(int taskId, CountDownLatch latch) {this.taskId = taskId;this.latch = latch;}}
}

运行结果如下

H4fJ2T.png

案例2

  • 执行任务的线程,也可能是多对多的关系:本案例就来了解一下,借助 CountDownLatch,使得主线程控制子线程同时开启后,主线程再去阻塞等待子线程结束!
/*** @author wcc* @date 2022/2/15 19:09*/
public class CountDownLatchTest02 {public static void main(String[] args) throws InterruptedException {CountDownLatch startSignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(10);for (int i = 0; i < 10; i++) {new Thread(new Worker(startSignal, doneSignal, i)).start();}// 这里让主线程休眠 500 毫秒,确保所有子线程已经启动,并且阻塞在 startSignal 栅栏处TimeUnit.MILLISECONDS.sleep(500);// 因为 startSignal 栅栏值为1,所以主线程只需要调用一次// 那么所有调用 startSignal.await() 阻塞的子线程,就都可以同时通过栅栏了System.out.println("子任务栅栏已经开始");startSignal.countDown();System.out.println("等待子任务结束...");long startTime = System.currentTimeMillis();// 等待所有子任务结束doneSignal.await();long endTime = System.currentTimeMillis();System.out.println("所有子任务已经结束,耗时:" + (endTime - startTime));}static class Worker implements Runnable{private final CountDownLatch startSignal;private final CountDownLatch doneSignal;private int id;@Overridepublic void run() {try {// 为了让所有线程同时开启任务,我们让所有线程先阻塞在这里// 等大家都准备好了,再打开这个门槛startSignal.await();System.out.println("子任务-" + id + ",开启时间:" + System.currentTimeMillis());doWorker();}catch (Exception e){e.printStackTrace();}finally {doneSignal.countDown();}}public void doWorker() throws InterruptedException{TimeUnit.SECONDS.sleep(5);}public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, int id) {this.startSignal = startSignal;this.doneSignal = doneSignal;this.id = id;}}
}

执行结果

H4f2se.png

上面代码中 startSignal.await();就相当于一个栅栏,把所有子线程都抵挡在他们的 run方法,等待主线程执行 startSignal.countDown();即关闭栅栏之后,所有子线程同时继续执行他们自己的 run() 方法,如下图:

H4fbQS.png

案例3

/*** @author wcc* @date 2022/2/16 14:14*/
public class CountDownLatchTest03 {public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(2);Thread t1 = new Thread(() -> {try {Thread.sleep(5000);}catch (Exception e){}// 休息 5 秒钟后(模拟工作线程工作了 5 秒),调用 countDown()latch.countDown();}, "t1");Thread t2 = new Thread(() -> {try {Thread.sleep(10000);}catch (Exception e){}// 休息 10 秒钟后(模拟工作线程工作了 10 秒),调用 countDown()latch.countDown();}, "t2");t1.start();t2.start();Thread t3 = new Thread(() -> {try {// 阻塞,等待 state 减为 0latch.await();System.out.println("线程 t3 从 await 中返回了");}catch (Exception e){System.out.println("线程 t3  await 被中断");Thread.currentThread().interrupt();}}, "t3");Thread t4 = new Thread(() -> {try {// 阻塞,等待 state 减为 0latch.await();System.out.println("线程 t4 从 await 中返回了");}catch (Exception e){System.out.println("线程 t4  await 被中断");Thread.currentThread().interrupt();}}, "t4");t3.start();t4.start();}
}

执行结果如下

H4fXZj.png

3、源码分析

3.1、Sync 内部类

private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;//传入初始count次数Sync(int count) {// 调用 setState()方法设置AQS中的 state 的值setState(count);}//获取还剩下的count次数int getCount() {return getState();}//尝试获取共享锁protected int tryAcquireShared(int acquires) {// 注意,这里state等于0的时候返回的是1// state不等于0的时候返回的是-1,也就是说state不等于0的时候总是要排队return (getState() == 0) ? 1 : -1;}/*** 尝试释放共享锁* 更新 AQS.state 的值,每调用一次,state 值减1,当 state - 1正好为0的时候,返回true*/protected boolean tryReleaseShared(int releases) {// 自旋操作,确保AQS.state 的值更新成功for (;;) {// 获取当前 state 的值int c = getState();// 条件成立:说明前面已经有线程触发唤醒操作了(已经释放共享锁,无法再释放了),这里返回falseif (c == 0)return false;// 执行到这里,说明 state > 0// 如果 c 的值 > 0,则将c值-1int nextc = c-1;// 原子更新 state CAS成功:说明当前线程执行 tryReleaseShared方法 c-1之前,没有其他线程修改过state的值//原子更新state的值:if (compareAndSetState(c, nextc))// nextc == 0:true:说明当前调用 countDown() 方法的线程就是需要触发唤醒操作的线程,此时会返回true进行唤醒操作return nextc == 0;}
}

Sync 内部类重写了 tryReleaseShared(int releases)tryAcquireShared(int acquires) 方法,并把 count 存到 state 变量中去。这里要注意一下,上面两个方法的参数并没有被用到。

3.2、构造方法

// 构造方法需要传入一个 count,也就是初始次数
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}

3.3、await() 方法

await() 方法是等待其他线程完成的方法,它会先尝试获取一下共享锁,如果失败则进入 AQS 的阻塞队列中排队等待被唤醒。

根据上面的 Sync 的源码,我们知道,state 不等于 0 的时候 tryAcquireShared() 返回的是 -1,也就是说 count 未减到 0 的时候,所有调用 await() 方法的线程都要排队。

public void await() throws InterruptedException {// 调用 AQS 的acquireSharedInterruptibly() 方法sync.acquireSharedInterruptibly(1);
}

AQS 中的 acquireSharedInterruptibly 方法:

// 位于AQS 中:可以响应中断获取共享锁的方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 条件成立:说明当前调用 await 方法的线程已经是中断状态了,直接抛出异常if (Thread.interrupted())throw new InterruptedException();// 条件成立:说明当前 AQS 的 state 是大于0的,此时将线程入队,然后走唤醒操作// 条件不成立: AQS.state == 0,此时就不会阻塞线程了...// 对应业务层面执行任务的线程已经将latch打破了,然后其他在调用latch.await的线程就不会在这里阻塞了if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 将调用 latch.await() 方法的线程包装成 node 加入到AQS 的阻塞队列中final Node node = addWaiter(Node.SHARED);// false:表示当前线程没有被中断,未抛出中断异常,不需要进行响应中断出队的逻辑// true:表示当前线程被中断了,且抛出中断异常,需要进行取消指定node参与竞争的逻辑boolean failed = true;try {// 自旋操作for (;;) {// 获取当前节点的前驱节点final Node p = node.predecessor();// 条件成立:说明当前线程对应的节点为 head.next 节点if (p == head) {// head.next 节点就有权力获取共享锁了int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// shouldParkAfterFailedAcquire():会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 -1(SIGNAL),最终这个方法返回 trueif (shouldParkAfterFailedAcquire(p, node) &&// parkAndCheckInterrupt():挂起当前线程,并返回当前线程的中断标记parkAndCheckInterrupt())throw new InterruptedException();}} finally {// 条件成立:说明当前线程发生了中断,需要进行取消指定当前node线程参与竞争if (failed)cancelAcquire(node);}}/*** AQS 的setHeadAndPropagate方法 设置当前节点为head节点,并且向后传播(依次唤醒)* @param node* @param propagate 1:代表当前共享锁的state==0,-1:代表当前共享锁状态 state != 0*/private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below// 设置当前节点为新的head节点,并设置thread、prev为nullsetHead(node);// 调用 setHeadAndPropagate的时候 propagate == 1 一定成立if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 获取当前节点的后继节点Node s = node.next;// 条件一:s== null 什么时候成立:当前 node 节点已经是 tail了,这时候条件一会成立,调用 doReleaseShared会处理这种情况// 条件二:前置条件:s != null 要求后继节点s的模式为共享模式 SHAREDif (s == null || s.isShared())// 基本上所有情况都会执行到 doReleaseShared 方法doReleaseShared();}}
}

图解分析

H45EP1.png

3.4、countDown() 方法

countDown() 方法,会释放共享锁,也就是 count 的次数会减1.

根据上面 Sync 的源码,我们知道,tryReleaseShared() 每次会把 count 的次数减1,当其减为0的时候返回 true,这时候才唤醒等待的线程。

注意,doReleaseShared() 是唤醒等待的线程,这个方法我们在前面的章节中分析过了。

public void countDown() {sync.releaseShared(1);}// 释放共享锁的方法public final boolean releaseShared(int arg) {// 条件成立:说明当前调用 latch.countDown() 方法的线程,正好是 state - 1 == 0的线程,需要触发唤醒 await状态的线程if (tryReleaseShared(arg)) {// 调用countDown()方法的线程只有一个线程会进入到这个 if 块里面,调用 doReleaseShared(),唤醒阻塞状态线程的逻辑doReleaseShared();return true;}return false;}/*** 都有哪几种路径会调用到 doReleaseShared方法呢?* 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 唤醒当前阻塞队列内head.next对应的线程* 2.被唤醒的线程 -> doAcquireSharedInterruptibly() -> setHeadAndPropagate() -> doReleaseShared()*/// AQS 的 doReleaseShared() 方法private void doReleaseShared() {for (;;) {// 获取当前 AQS 内的头结点Node h = head;// 条件一:h != null 成立:说明阻塞队列不为空// 不成立:h == null,什么时候会是这样呢?// latch 创建出来后,没有任何线程调用过 await()方法之前,有线程调用 latch.countDown()操作,且触发了唤醒阻塞节点的逻辑// 条件二:h != tail,当前在阻塞队列内除了head节点以外还有其他节点// h == tail -> head 和 tail 指向的是同一个node对象 什么时候会有这种情况呢?// 1.正常唤醒情况,依次获取到共享锁,当前线程执行到这里的时候(线程就是tail节点)// 2.第一个调用 await() 方法的线程与调用countDown且触发唤醒阻塞节点的线程出现并发了//  因为await()线程是第一个调用 latch.await() 的线程,此时队列内什么也没有,它需要补充创建一个head节点,然后再次自旋入队//  在await()线程入队完成之前,假设当前队列内只有刚刚补充创建的空元素head//  同一时期,外部有一个调用 countDown() 的线程,它将state的值从1修改为0了,这个线程需要做唤醒阻塞队列内元素的逻辑// 注意:调用 await() 方法的线程,因为完全入队完成之后再次回到上层方法doAcquireSharedInterruptibly中,会进入到自旋中...// 自旋中会获取当前元素的前驱,判断自己是head.next,所有接下来改线程又会将自己设置为head,然后并没有把当前线程中断if (h != null && h != tail) {// 执行到这个if里面,说明当前head一定有后继节点// 获取头结点head的等待状态int ws = h.waitStatus;// 如果当前head节点状态为 signal 说明当前后继节点并没有被唤醒过呢if (ws == Node.SIGNAL) {// 唤醒后继节点前,将当前head节点的状态改为0// 这里,为什么使用CAS 操作呢?// 这里,是因为当前节点唤醒后继节点的时候,后继节点更新了自己为head节点,导致当前节点无法退出自旋,然后会再次参与到唤醒其后继节点的后继节点的逻辑中// 所以,此时是有并发的,要使用CAS逻辑if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases//唤醒后继节点unparkSuccessor(h);}//执行到这里,说明当前头结点的等待状态不是SIGNALelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 条件成立:// 1.说明刚刚唤醒的后继节点还没有执行到setHeadAndPropagate方法里面的,设置当前唤醒节点为head的逻辑// 这个时候,当前线程直接跳出去就结束了...// 此时,用不用担心唤醒逻辑在这里断掉呢?// 不需要担心,因为被唤醒的线程早晚会执行到 doReleaseShared方法中// 2.head == null//  latch 创建出来后,没有任何线程调用过 await()方法之前,有线程调用 latch.countDown()操作,且触发了唤醒阻塞节点的逻辑// 3.head == tail 第一个调用 await() 方法的线程与调用countDown且触发唤醒阻塞节点的线程出现并发了 head 和 tail 指向的是同一个对象// 条件不成立:// 被唤醒的节点非常积极,然后直接在上层方法doAcquireSharedInterruptibly中被被唤醒,直接将自己设置为了新的head节点// 此时唤醒它的节点(前驱节点)执行 h == head 导致条件不成立// 此时 head 节点的前驱不会跳出 doReleaseShared,会继续参与唤醒新head节点的后继节点逻辑中去if (h == head)                   // loop if head changedbreak;}}/*** 尝试释放共享锁* 更新 AQS.state 的值,每调用一次,state 值减1,当 state - 1正好为0的时候,返回true*/protected boolean tryReleaseShared(int releases) {// 自旋操作,确保AQS.state 的值更新成功for (;;) {// 获取当前 state 的值int c = getState();// 条件成立:说明前面已经有线程触发唤醒操作了(已经释放共享锁,无法再释放了),这里返回falseif (c == 0)return false;// 执行到这里,说明 state > 0// 如果 c 的值 > 0,则将c值-1int nextc = c-1;// 原子更新 state CAS成功:说明当前线程执行 tryReleaseShared方法 c-1之前,没有其他线程修改过state的值//原子更新state的值:if (compareAndSetState(c, nextc))// nextc == 0:true:说明当前调用 countDown() 方法的线程就是需要触发唤醒操作的线程,此时会返回true进行唤醒操作return nextc == 0;}}

CountDowmnLatch.countDown() 执行流程图解:

H45oJ1.png
总结

  • CountDownLatch 表示允许一个或者多个线程等待其他线程的操作执行完成后再执行后续的操作
  • CountDownLatch 使用 AQS 的共享锁机制实现
  • CountDownLatch 初始化的时候需要传入次数 count(共享锁的锁的层数)
  • 每次调用 countDown() 方法的时候 count 的次数减1
  • 每次调用 await() 方法的时候都会尝试获取锁,这里的获取锁其实是检查 AQS 中的 state 值是否为0
  • 当 count 值(也就是 state 的值)减为0的时候会唤醒 AQS 中阻塞队列中的线程, 这些线程调用 await() 方法入队

这篇关于AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/423425

相关文章

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

Java中最全最基础的IO流概述和简介案例分析

《Java中最全最基础的IO流概述和简介案例分析》JavaIO流用于程序与外部设备的数据交互,分为字节流(InputStream/OutputStream)和字符流(Reader/Writer),处理... 目录IO流简介IO是什么应用场景IO流的分类流的超类类型字节文件流应用简介核心API文件输出流应用文

java 恺撒加密/解密实现原理(附带源码)

《java恺撒加密/解密实现原理(附带源码)》本文介绍Java实现恺撒加密与解密,通过固定位移量对字母进行循环替换,保留大小写及非字母字符,由于其实现简单、易于理解,恺撒加密常被用作学习加密算法的入... 目录Java 恺撒加密/解密实现1. 项目背景与介绍2. 相关知识2.1 恺撒加密算法原理2.2 Ja

Nginx屏蔽服务器名称与版本信息方式(源码级修改)

《Nginx屏蔽服务器名称与版本信息方式(源码级修改)》本文详解如何通过源码修改Nginx1.25.4,移除Server响应头中的服务类型和版本信息,以增强安全性,需重新配置、编译、安装,升级时需重复... 目录一、背景与目的二、适用版本三、操作步骤修改源码文件四、后续操作提示五、注意事项六、总结一、背景与

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码