并发队列之DelayQueue

2023-12-21 11:08
文章标签 并发 队列 delayqueue

本文主要是介绍并发队列之DelayQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  已经说了四个并发队列了,DelayQueue这是最后一个,这是一个无界阻塞延迟队列,底层基于前面说过的PriorityBlockingQueue实现的 ,队列中每个元素都有过期时间,当从队列获取元素时,只有过期元素才会出队列,而队列头部的元素是过期最快的元素;

一.简单使用

  可以看到我们可以自己设置超时时间和优先级队列中的比较规则,这样我们在队列中取的时候,按照最快超时的先出队;

package com.example.demo.study;import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;import lombok.Data;public class Study0210 {@Datastatic class MyDelayed implements Delayed {private long delayTime;//该任务需要再队列中的延迟的时候private long expire;//这个时间表示当前时间和延迟时间相加,这里就叫做到期时间private String taskName;//任务的名称public MyDelayed(long delayTime, String taskName) {this.delayTime = delayTime;this.taskName = taskName;this.expire = System.currentTimeMillis()+delayTime;}//指定优先级队列里面的比较规则,就跟上篇博客中说的优先级队列中说的比较器一样
        @Overridepublic int compareTo(Delayed o) {return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));}//这个方法表示该任务在队列中还有多少剩余时间,也就是expire-当前时间
        @Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire-System.currentTimeMillis(), TimeUnit.MILLISECONDS);}}public static void main(String[] args) throws InterruptedException {//创建延迟队列DelayQueue<MyDelayed> queue = new DelayQueue<MyDelayed>();//创建任务丢到队列中Random random = new Random();for (int i = 1; i < 11; i++) {MyDelayed myDelayed = new MyDelayed(random.nextInt(500),"task"+i);queue.add(myDelayed);}//获取队列中的任务,这里只会跟超时时间最小的有关,和入队顺序无关MyDelayed myDelayed = queue.take();while(myDelayed!=null) {System.out.println(myDelayed.toString());myDelayed = queue.take();}}
}

二.基本组成 

//由此可是这个队列中存放的任务必须是Delayed类型的
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {//独占锁private final transient ReentrantLock lock = new ReentrantLock();//优先级队列private final PriorityQueue<E> q = new PriorityQueue<E>();//leader线程,实际上每次进行入队和出队操作的只能是leader线程,其余的都叫做fallower线程,这里会用到一个leader-follower模式private Thread leader = null;//条件变量private final Condition available = lock.newCondition();//省略很多代码
}

  具体的继承关系可以看看下面这个图,实际操作的都是内部的PriorityQueue;

三.offer方法

  上面代码中我们虽然说调用的是add方法,其实就是调用的是offer方法;

public boolean offer(E e) {final ReentrantLock lock = this.lock;//获取锁
    lock.lock();try {//往优先级队列中添加一个元素
        q.offer(e);//注意,peek方法只是获取优先级队列中第一个元素,并不会删除//如果优先级队列中取的元素就是和当前添加的元素一样,说明当前元素就是达到过期要求的,于是设置leader线程为null//然后通知条件队列中的线程优先级队列中已经有元素了,可以过来取了if (q.peek() == e) {leader = null;available.signal();}return true;} finally {//释放锁
        lock.unlock();}
}

四.take方法

  获取并移除队列中达到超时时间要求的元素,如果队列中没有元素,就把当前线程丢到条件队列中阻塞;

  从下面的代码逻辑中我们可以知道:线程分为两种,一种是leader线程,一种是follower线程,其中leader线程只会阻塞一定的时间,follower线程会在条件队列阻塞无限长的时间;当leader线程执行完take操作之后,就会重置leader线程为null,然后从条件队列中拿一个出来设置为leader线程

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//获取锁,可中断
    lock.lockInterruptibly();try {for (;;) {//这里先是尝试从优先级队列中获取一下节点,获取不到的话,说明当前优先级队列为空,就阻塞当前线程E first = q.peek();if (first == null)available.await();else {//如果优先级队列中有元素,那么肯定能走到这里来,然后取到该元素的超时时间,如果小于0,说明已经达到要求了,可以获取并删除队列中的元素long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waiting//如果leader队列不为空,说明有其他线程正在执行take,于是就把当前线程放到条件队列中if (leader != null)available.await();//到这里,说明优先级队列中没有元素到超时时间,而且此时没有其他线程调用take方法,于是就把leader线程设置为当前线程,//然后当前leader线程就会等待一定的时间,等优先级队列中最快超时的元素;//在等待的时候,leader线程会释放锁,这时其他线程B可以调用offer方法添加元素,线程C也可以调用take方法,然后线程C就会在//上面这里阻塞无限长的时间,直到被唤醒else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {//当前线程阻塞一定时间之后,不管成功了没有,都会把leader线程重置为null,然后重新循环if (leader == thisThread)leader = null;}}}}//这里的意思就是当前线程移除元素成功之后,唤醒条件队列中的线程去继续从队列中获取元素} finally {if (leader == null && q.peek() != null)available.signal();//释放锁
        lock.unlock();}
}

五.poll操作

  获取并移除队头过期元素,如果队列为空,或者对头元素没有过超时时间就返回null

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//尝试获取队头元素,如果队头元素为空或者该延迟过期时间还没到,就返回nullE first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;else//否则就获取并移除队头元素return q.poll();} finally {lock.unlock();}
}

六.总结

  这个队列其实很容易,主要的是有一个延迟时间,我们从优先级队列中获取的根节点首先会判断有没有过超时时间,有的话就移除并返回就好了,没有的话,就看看还剩下多少时间才会超时(由于是优先级队列,所以根节点一般就是最快超时时间的,当然,也可以修改优先级队列的比较规则),于是当前线程就会等这个节点超时,此时leader等于当前线程,在等待的过程中,会释放锁,所以其他线程可以往队列中添加元素,也可以获取元素(但是由于此时leader!=null,这些线程会阻塞无限长时间直到被唤醒);

  在leader线程超时时间到了之后自动唤醒,再进行一次循环,就会获取并移除根节点了,最后再重置leader节点为null,顺便唤醒条件队列中的节点;

这篇关于并发队列之DelayQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/519747

相关文章

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

python多线程并发测试过程

《python多线程并发测试过程》:本文主要介绍python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、并发与并行?二、同步与异步的概念?三、线程与进程的区别?需求1:多线程执行不同任务需求2:多线程执行相同任务总结一、并发与并行?1、

golang实现延迟队列(delay queue)的两种实现

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录1 延迟队列:邮件提醒、订单自动取消2 实现2.1 simplChina编程e简单版:go自带的time

Linux高并发场景下的网络参数调优实战指南

《Linux高并发场景下的网络参数调优实战指南》在高并发网络服务场景中,Linux内核的默认网络参数往往无法满足需求,导致性能瓶颈、连接超时甚至服务崩溃,本文基于真实案例分析,从参数解读、问题诊断到优... 目录一、问题背景:当并发连接遇上性能瓶颈1.1 案例环境1.2 初始参数分析二、深度诊断:连接状态与

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.