Java 源码 - DelayQueue 源码解析

2024-04-30 14:28
文章标签 java 源码 解析 delayqueue

本文主要是介绍Java 源码 - DelayQueue 源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

        • 1. 整体设计
          • 1.1 类注释
          • 1.2、类图
          • 1.3 延迟队列的属性
          • 1.4 DelayQueue 的主要方法
            • 1.4.1 offer 添加元素
            • 1.4.2 take 取出元素
            • 1.4.3 poll 取出元素

1. 整体设计

DelayQueue 延迟队列底层使用的是锁的能力,比如说要在当前时间往后延迟 5 秒执行,那么当前线程就会沉睡 5 秒,等 5 秒后线程被唤醒时,如果能获取到资源的话,线程即可立马执行。原理上似乎很简单,但内部实现却很复杂,有很多难点,比如当运行资源不够,多个线程同时被唤醒时,如何排队等待?比如说在何时阻塞?何时开始执行等等?接下来我们从源码角度来看下是如何实现的。

1.1 类注释

类注释上比较简单,只说了三个概念:

  1. 无界延时队列中元素将在过期时被执行,越靠近队头,越早过期;
  2. 未过期的元素不能够被 take;
  3. 不允许空元素。
1.2、类图

DelayQueue 的类图,关键是 DelayQueue 类上是有泛型的,如下:
image.png

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {

从泛型中可以看出,DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,如下:

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

也就是说 DelayQueue 队列中的元素必须是实现 Delayed 接口和 Comparable 接口的,并覆写了 getDelay 方法和 compareTo 的方法才行,不然在编译时,编译器就会提醒我们元素必须强制实现 Delayed 接口。
compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。

1.3 延迟队列的属性

DelayQueue 中的重要属性如下所示。

// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();

DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式.

1.4 DelayQueue 的主要方法
1.4.1 offer 添加元素
public boolean offer(E e) {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lock();try {// 向优先队列中插入元素q.offer(e);// 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程 if (q.peek() == e) {leader = null;available.signal();}return true;} finally {// 释放全局独占锁lock.unlock();}}
leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。

DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。

1.4.2 take 取出元素

take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。
一开始看到全局独占锁,理所当然详情属于队列消费模式。 无法理解 “领导者-追随者模式”。take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。 如果多个线程调用take() 方法, 当available.awaitNanos(delay);的时候, 其它线程可以抢锁进入。 下面有测试例子。源码中:java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject.await()和await(long time, TimeUnit unit); 方法 Node node = addConditionWaiter(); long savedState = fullyRelease(node); 队列状态释放
take方法主要实现逻辑为(for循环体):
1. 获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待调用offer方法,放入对象后,通过signal()方法唤醒。【看offer方法的源码】
2. 如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
3. 如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
4. 如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
5. finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。

public E take() throws InterruptedException {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 获取队头元素,peek 方法不会删除元素E first = q.peek();if (first == null)// 若队头为空,则阻塞当前线程available.await();else {// 否则获取队头元素的超时时间long delay = first.getDelay(NANOSECONDS);// 已超时,直接出队if (delay <= 0)return q.poll();// 释放 first 的引用,避免内存泄漏first = null; // don't retain ref while waiting// leader != null 表明有其他线程在操作,阻塞当前线程if (leader != null)available.await();else {// leader 指向当前线程Thread thisThread = Thread.currentThread();leader = thisThread;try {// 超时阻塞available.awaitNanos(delay);} finally {// 释放 leaderif (leader == thisThread)leader = null;}}}}} finally {// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列if (leader == null && q.peek() != null)available.signal();// 释放全局独占锁    lock.unlock();}
}

Condition.await() 和Condition.await(100, TimeUnit.SECONDS); 方法进入等待时候,其它线程可以抢抢到锁

package com.lvyuanj.test.timer;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
public class TestConditionAwait {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {final Condition condition = lock.newCondition();final Thread thread1 = new Thread(new Runnable() {@Overridepublic void run() {lock.lock();try {Thread.currentThread().setName("ConditionAwait");log.error(Thread.currentThread().getName() + " beforeAwaitTime:" + System.currentTimeMillis());condition.await(100, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error(Thread.currentThread().getName() + " finishAwaitTime:" + System.currentTimeMillis());} finally {lock.unlock();log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());}}});Thread thread2 = new Thread(new Runnable() {@Overridepublic void run() {Thread.currentThread().setName("ConditionSignal");try {lock.lock();log.error(Thread.currentThread().getName() + " getLockTime:" + System.currentTimeMillis());//thread1.interrupt();long currentTime = System.currentTimeMillis();while (System.currentTimeMillis() - currentTime < 8000) {}condition.signal();log.error(Thread.currentThread().getName() + " signalTime:" + System.currentTimeMillis());} catch (Exception e) {} finally {lock.unlock();log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());}}});thread1.start();Thread.sleep(50);thread2.start();}
}
1.4.3 poll 取出元素

取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}
}

这篇关于Java 源码 - DelayQueue 源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949109

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

一文解析C#中的StringSplitOptions枚举

《一文解析C#中的StringSplitOptions枚举》StringSplitOptions是C#中的一个枚举类型,用于控制string.Split()方法分割字符串时的行为,核心作用是处理分割后... 目录C#的StringSplitOptions枚举1.StringSplitOptions枚举的常用

Python函数作用域与闭包举例深度解析

《Python函数作用域与闭包举例深度解析》Python函数的作用域规则和闭包是编程中的关键概念,它们决定了变量的访问和生命周期,:本文主要介绍Python函数作用域与闭包的相关资料,文中通过代码... 目录1. 基础作用域访问示例1:访问全局变量示例2:访问外层函数变量2. 闭包基础示例3:简单闭包示例4