JUC第二十九讲:JUC工具类: Phaser详解

2023-10-15 08:52

本文主要是介绍JUC第二十九讲:JUC工具类: Phaser详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

JUC工具类:Phaser详解

本文是JUC第二十九讲,JUC工具类:Phaser详解。Phaser是JDK 7新增的一个同步辅助类,它可以实现 CyclicBarrier 和CountDownLatch 类似的功能,而且它支持 对任务的动态调整,并支持分层结构来达到更高的吞吐量

文章目录

  • JUC工具类:Phaser详解
    • 1、带着BAT大厂的面试问题去理解Phaser工具
    • 2、Phaser运行机制
    • 3、Phaser源码详解
      • 3.1、核心参数
      • 3.2、函数列表
      • 3.3、方法 - register()
      • 3.4、方法 - arrive()
      • 3.5、方法 - arriveAndAwaitAdvance()
      • 3.6、方法 - awaitAdvance(int phase)
    • 4、参考文章

1、带着BAT大厂的面试问题去理解Phaser工具

请带着这些问题继续后文,会很大程度上帮助你更好的理解Phaser工具。

  • Phaser主要用来解决什么问题?
  • Phaser与CyclicBarrier和CountDownLatch的区别是什么?
  • 如果用CountDownLatch来实现Phaser的功能应该怎么实现?
  • Phaser运行机制是什么样的?
  • 给一个Phaser使用的示例?

2、Phaser运行机制

img

  • Registration(注册)

跟其他barrier不同,在phaser上注册的parties会随着时间的变化而变化。任务可以随时注册 (使用方法register,bulkRegister注册,或者由构造器确定初始 parties),并且在任何抵达点可以随意地撤销注册(方法 arriveAndDeregister)。就像大多数基本的同步结构一样,注册和撤销只影响内部count;不会创建更深的内部记录,所以任务不能查询他们是否已经注册。(不过,可以通过继承来实现类似的记录)

  • Synchronization(同步机制)

和CyclicBarrier一样,Phaser也可以重复await。方法arriveAndAwaitAdvance的效果类似 CyclicBarrier.await。phaser的每一代都有一个相关的phase number,初始值为0,当所有注册的任务都到达phaser时 phase+1,到达最大值(Integer.MAX_VALUE)之后清零。使用phase number可以独立控制 到达phaser 和 等待其他线程 的动作,通过下面两种类型的方法:

  • Arrival(到达机制) arrive和arriveAndDeregister方法记录到达状态。这些方法不会阻塞,但是会返回一个相关的arrival phase number;也就是说,phase number用来确定到达状态。当所有任务都到达给定phase时,可以执行一个可选的函数,这个函数通过重写onAdvance方法实现,通常可以用来控制终止状态。重写此方法类似于为CyclicBarrier提供一个barrierAction,但比它更灵活。
  • Waiting(等待机制) awaitAdvance方法需要一个表示arrival phase number的参数,并且在phaser前进到与给定phase不同的phase时返回。和CyclicBarrier不同,即使等待线程已经被中断,awaitAdvance方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变phaser的状态时会遭遇异常。如果有必要,在方法forceTermination之后可以执行这些异常的相关的handler进行恢复操作,Phaser也可能被ForkJoinPool中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务。
  • Termination(终止机制) :

可以用isTerminated方法检查phaser的终止状态。在终止时,所有同步方法立刻返回一个负值。在终止时尝试注册也没有效果。当调用onAdvance返回true时Termination被触发。当deregistration操作使已注册的parties变为0时,onAdvance的默认实现就会返回true。也可以重写onAdvance方法来定义终止动作。forceTermination方法也可以释放等待线程并且允许它们终止。

  • Tiering(分层结构) :

Phaser支持分层结构(树状构造)来减少竞争。注册了大量parties的Phaser可能会因为同步竞争消耗很高的成本, 因此可以设置一些子Phaser来共享一个通用的parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。 在一个分层结构的phaser里,子节点phaser的注册和取消注册都通过父节点管理。子节点phaser通过构造或方法register、bulkRegister进行首次注册时,在其父节点上注册。子节点phaser通过调用arriveAndDeregister进行最后一次取消注册时,也在其父节点上取消注册。

  • Monitoring(状态监控) :

由于同步方法可能只被已注册的parties调用,所以phaser的当前状态也可能被任何调用者监控。在任何时候,可以通过getRegisteredParties获取parties数,其中getArrivedParties方法返回已经到达当前phase的parties数。当剩余的parties(通过方法getUnarrivedParties获取)到达时,phase进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用。

3、Phaser源码详解

3.1、核心参数

private volatile long state;
/*** The parent of this phaser, or null if none*/
private final Phaser parent;
/*** The root of phaser tree. Equals this if not in a tree.*/
private final Phaser root;// 等待线程的栈顶元素,根据phase取模定义为一个奇数header和一个偶数header
private final AtomicReference<QNode> evenQ; // 偶数
private final AtomicReference<QNode> oddQ;  // 奇数

state状态说明:

Phaser使用一个long型state值来标识内部状态:

  • 低0-15位表示未到达parties数;
  • 中16-31位表示等待的parties数;
  • 中32-62位表示phase当前代;
  • 高63位表示当前phaser的终止状态。

注意:子Phaser的phase在没有被真正使用之前,允许滞后于它的root节点。这里在后面源码分析的 reconcileState 方法里会讲解。 Qnode 是 Phaser定义的内部等待队列,用于在阻塞时记录等待线程及相关信息。实现了ForkJoinPool的一个内部接口ManagedBlocker,上面已经说过,Phaser也可能被 ForkJoinPool 中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务 (通过内部实现方法isReleasable和block)。

3.2、函数列表

//构造方法
public Phaser() {this(null, 0);
}
public Phaser(int parties) {this(null, parties);
}
public Phaser(Phaser parent) {this(parent, 0);
}
public Phaser(Phaser parent, int parties)
// 注册一个新的party
public int register()
// 批量注册
public int bulkRegister(int parties)
// 使当前线程到达 phaser,不等待其他任务到达。返回arrival phase number
public int arrive() 
//使当前线程到达phaser并撤销注册,返回arrival phase number
public int arriveAndDeregister()/** 使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。* 如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。* 如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。*/
public int arriveAndAwaitAdvance()
//等待给定phase数,返回下一个 arrival phase number
public int awaitAdvance(int phase)
//阻塞等待,直到phase前进到下一代,返回下一代的phase number
public int awaitAdvance(int phase) 
//响应中断版awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)throws InterruptedException, TimeoutException
//使当前phaser进入终止状态,已注册的parties不受影响,如果是分层结构,则终止所有phaser
public void forceTermination()

3.3、方法 - register()

//注册一个新的party
public int register() {return doRegister(1);
}
private int doRegister(int registrations) {// adjustment to statelong adjust = ((long)registrations << PARTIES_SHIFT) | registrations;final Phaser parent = this.parent;int phase;for (;;) {long s = (parent == null) ? state : reconcileState();int counts = (int)s;int parties = counts >>> PARTIES_SHIFT;//获取已注册parties数int unarrived = counts & UNARRIVED_MASK;//未到达数if (registrations > MAX_PARTIES - parties)throw new IllegalStateException(badRegister(s));phase = (int)(s >>> PHASE_SHIFT);//获取当前代if (phase < 0)break;if (counts != EMPTY) {                  // not 1st registrationif (parent == null || reconcileState() == s) {if (unarrived == 0)             // wait out advanceroot.internalAwaitAdvance(phase, null);//等待其他任务到达else if (UNSAFE.compareAndSwapLong(this, stateOffset,s, s + adjust))//更新注册的parties数break;}}else if (parent == null) {              // 1st root registrationlong next = ((long)phase << PHASE_SHIFT) | adjust;if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))//更新phasebreak;}else {//分层结构,子phaser首次注册用父节点管理synchronized (this) {               // 1st sub registrationif (state == s) {               // recheck under lockphase = parent.doRegister(1);//分层结构,使用父节点注册if (phase < 0)break;// finish registration whenever parent registration// succeeded, even when racing with termination,// since these are part of the same "transaction".//由于在同一个事务里,即使phaser已终止,也会完成注册while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,((long)phase << PHASE_SHIFT) | adjust)) {//更新phases = state;phase = (int)(root.state >>> PHASE_SHIFT);// assert (int)s == EMPTY;}break;}}}}return phase;
}

说明:register方法为phaser添加一个新的party,如果onAdvance正在运行,那么这个方法会等待它运行结束再返回结果。如果当前phaser有父节点,并且当前phaser上没有已注册的party,那么就会交给父节点注册。

register和bulkRegister都由doRegister实现,大概流程如下:

  • 如果当前操作不是首次注册,那么直接在当前phaser上更新注册parties数
  • 如果是首次注册,并且当前phaser没有父节点,说明是root节点注册,直接更新phase
  • 如果当前操作是首次注册,并且当前phaser由父节点,则注册操作交由父节点,并更新当前phaser的phase
  • 上面说过,子Phaser的phase在没有被真正使用之前,允许滞后于它的root节点。非首次注册时,如果Phaser有父节点,则调用reconcileState()方法解决root节点的phase延迟传递问题, 源码如下:
private long reconcileState() {final Phaser root = this.root;long s = state;if (root != this) {int phase, p;// CAS to root phase with current parties, tripping unarrivedwhile ((phase = (int)(root.state >>> PHASE_SHIFT)) !=(int)(s >>> PHASE_SHIFT) &&!UNSAFE.compareAndSwapLong(this, stateOffset, s,s = (((long)phase << PHASE_SHIFT) |((phase < 0) ? (s & COUNTS_MASK) :(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :((s & PARTIES_MASK) | p))))))s = state;}return s;
}

当root节点的phase已经advance到下一代,但是子节点phaser还没有,这种情况下它们必须通过更新未到达parties数 完成它们自己的advance操作(如果parties为0,重置为EMPTY状态)。

回到register方法的第一步,如果当前未到达数为0,说明上一代phase正在进行到达操作,此时调用internalAwaitAdvance()方法等待其他任务完成到达操作,源码如下:

//阻塞等待phase到下一代
private int internalAwaitAdvance(int phase, QNode node) {// assert root == this;releaseWaiters(phase-1);          // ensure old queue cleanboolean queued = false;           // true when node is enqueuedint lastUnarrived = 0;            // to increase spins upon changeint spins = SPINS_PER_ARRIVAL;long s;int p;while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {if (node == null) {           // spinning in noninterruptible modeint unarrived = (int)s & UNARRIVED_MASK;//未到达数if (unarrived != lastUnarrived &&(lastUnarrived = unarrived) < NCPU)spins += SPINS_PER_ARRIVAL;boolean interrupted = Thread.interrupted();if (interrupted || --spins < 0) { // need node to record intr//使用node记录中断状态node = new QNode(this, phase, false, false, 0L);node.wasInterrupted = interrupted;}}else if (node.isReleasable()) // done or abortedbreak;else if (!queued) {           // push onto queueAtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;QNode q = node.next = head.get();if ((q == null || q.phase == phase) &&(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enqqueued = head.compareAndSet(q, node);}else {try {ForkJoinPool.managedBlock(node);//阻塞给定node} catch (InterruptedException ie) {node.wasInterrupted = true;}}}if (node != null) {if (node.thread != null)node.thread = null;       // avoid need for unpark()if (node.wasInterrupted && !node.interruptible)Thread.currentThread().interrupt();if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)return abortWait(phase); // possibly clean up on abort}releaseWaiters(phase);return p;
}

简单介绍下第二个参数node,如果不为空,则说明等待线程需要追踪中断状态或超时状态。以doRegister中的调用为例,不考虑线程争用,internalAwaitAdvance大概流程如下:

  • 首先调用releaseWaiters唤醒上一代所有等待线程,确保旧队列中没有遗留的等待线程。
  • 循环SPINS_PER_ARRIVAL指定的次数或者当前线程被中断,创建node记录等待线程及相关信息。
  • 继续循环调用ForkJoinPool.managedBlock运行被阻塞的任务
  • 继续循环,阻塞任务运行成功被释放,跳出循环
  • 最后唤醒当前phase的线程

3.4、方法 - arrive()

//使当前线程到达phaser,不等待其他任务到达。返回arrival phase number
public int arrive() {return doArrive(ONE_ARRIVAL);
}private int doArrive(int adjust) {final Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;int counts = (int)s;//获取未到达数int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);if (unarrived <= 0)throw new IllegalStateException(badArrive(s));if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {//更新stateif (unarrived == 1) {//当前为最后一个未到达的任务long n = s & PARTIES_MASK;  // base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (root == this) {if (onAdvance(phase, nextUnarrived))//检查是否需要终止phasern |= TERMINATION_BIT;else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;int nextPhase = (phase + 1) & MAX_PHASE;n |= (long)nextPhase << PHASE_SHIFT;UNSAFE.compareAndSwapLong(this, stateOffset, s, n);releaseWaiters(phase);//释放等待phase的线程}//分层结构,使用父节点管理arriveelse if (nextUnarrived == 0) { //propagate deregistrationphase = parent.doArrive(ONE_DEREGISTER);UNSAFE.compareAndSwapLong(this, stateOffset,s, s | EMPTY);}elsephase = parent.doArrive(ONE_ARRIVAL);}return phase;}}
}

说明: arrive方法手动调整到达数,使当前线程到达phaser。arrive和arriveAndDeregister都调用了doArrive实现,大概流程如下:

  • 首先更新state(state - adjust);
  • 如果当前不是最后一个未到达的任务,直接返回phase
  • 如果当前是最后一个未到达的任务:
    • 如果当前是root节点,判断是否需要终止phaser,CAS更新phase,最后释放等待的线程;
    • 如果是分层结构,并且已经没有下一代未到达的parties,则交由父节点处理doArrive逻辑,然后更新state为EMPTY。

3.5、方法 - arriveAndAwaitAdvance()

public int arriveAndAwaitAdvance() {// Specialization of doArrive+awaitAdvance eliminating some reads/pathsfinal Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;int counts = (int)s;int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);//获取未到达数if (unarrived <= 0)throw new IllegalStateException(badArrive(s));if (UNSAFE.compareAndSwapLong(this, stateOffset, s,s -= ONE_ARRIVAL)) {//更新stateif (unarrived > 1)return root.internalAwaitAdvance(phase, null);//阻塞等待其他任务if (root != this)return parent.arriveAndAwaitAdvance();//子Phaser交给父节点处理long n = s & PARTIES_MASK;  // base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (onAdvance(phase, nextUnarrived))//全部到达,检查是否可销毁n |= TERMINATION_BIT;else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;int nextPhase = (phase + 1) & MAX_PHASE;//计算下一代phasen |= (long)nextPhase << PHASE_SHIFT;if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))//更新statereturn (int)(state >>> PHASE_SHIFT); // terminatedreleaseWaiters(phase);//释放等待phase的线程return nextPhase;}}
}

说明: 使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。效果类似于CyclicBarrier.await。大概流程如下:

  • 更新state(state - 1);
  • 如果未到达数大于1,调用internalAwaitAdvance阻塞等待其他任务到达,返回当前phase
  • 如果为分层结构,则交由父节点处理arriveAndAwaitAdvance逻辑
  • 如果未到达数<=1,判断phaser终止状态,CAS更新phase到下一代,最后释放等待当前phase的线程,并返回下一代phase。

3.6、方法 - awaitAdvance(int phase)

public int awaitAdvance(int phase) {final Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;if (p == phase)return root.internalAwaitAdvance(phase, null);return p;
}
//响应中断版awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {final Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;if (p == phase) {QNode node = new QNode(this, phase, true, false, 0L);p = root.internalAwaitAdvance(phase, node);if (node.wasInterrupted)throw new InterruptedException();}return p;
}

说明: awaitAdvance用于阻塞等待线程到达,直到phase前进到下一代,返回下一代的phase number。方法很简单,不多赘述。awaitAdvanceInterruptibly方法是响应中断版的awaitAdvance,不同之处在于,调用阻塞时会记录线程的中断状态。

4、参考文章

  • JUC源码分析-JUC锁(五):Phaser

这篇关于JUC第二十九讲:JUC工具类: Phaser详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/216664

相关文章

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

MySQL数据库中ENUM的用法是什么详解

《MySQL数据库中ENUM的用法是什么详解》ENUM是一个字符串对象,用于指定一组预定义的值,并可在创建表时使用,下面:本文主要介绍MySQL数据库中ENUM的用法是什么的相关资料,文中通过代码... 目录mysql 中 ENUM 的用法一、ENUM 的定义与语法二、ENUM 的特点三、ENUM 的用法1

MySQL count()聚合函数详解

《MySQLcount()聚合函数详解》MySQL中的COUNT()函数,它是SQL中最常用的聚合函数之一,用于计算表中符合特定条件的行数,本文给大家介绍MySQLcount()聚合函数,感兴趣的朋... 目录核心功能语法形式重要特性与行为如何选择使用哪种形式?总结深入剖析一下 mysql 中的 COUNT

一文详解Git中分支本地和远程删除的方法

《一文详解Git中分支本地和远程删除的方法》在使用Git进行版本控制的过程中,我们会创建多个分支来进行不同功能的开发,这就容易涉及到如何正确地删除本地分支和远程分支,下面我们就来看看相关的实现方法吧... 目录技术背景实现步骤删除本地分支删除远程www.chinasem.cn分支同步删除信息到其他机器示例步骤

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

mysql中的服务器架构详解

《mysql中的服务器架构详解》:本文主要介绍mysql中的服务器架构,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、mysql服务器架构解释3、总结1、背景简单理解一下mysqphpl的服务器架构。2、mysjsql服务器架构解释mysql的架

ModelMapper基本使用和常见场景示例详解

《ModelMapper基本使用和常见场景示例详解》ModelMapper是Java对象映射库,支持自动映射、自定义规则、集合转换及高级配置(如匹配策略、转换器),可集成SpringBoot,减少样板... 目录1. 添加依赖2. 基本用法示例:简单对象映射3. 自定义映射规则4. 集合映射5. 高级配置匹