Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析

本文主要是介绍Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • LinkedBlockingQueue概述
    • 类图结构及重要字段
    • 构造器
    • 出队和入队操作
      • 入队enqueue
      • 出队dequeue
    • 阻塞式操作
      • E take() 阻塞式获取
      • void put(E e) 阻塞式插入
      • E poll(timeout, unit) 阻塞式超时获取
      • boolean offer(e, timeout, unit) 阻塞式超时插入
    • 其他常规操作
      • boolean offer(E e)
      • E poll()
      • E peek()
      • Boolean remove(Object o)
    • 总结
    • 参考阅读

系列传送门:

  • Java并发包源码学习系列:AbstractQueuedSynchronizer
  • Java并发包源码学习系列:CLH同步队列及同步资源获取与释放
  • Java并发包源码学习系列:AQS共享式与独占式获取与释放资源的区别
  • Java并发包源码学习系列:ReentrantLock可重入独占锁详解
  • Java并发包源码学习系列:ReentrantReadWriteLock读写锁解析
  • Java并发包源码学习系列:详解Condition条件队列、signal和await
  • Java并发包源码学习系列:挂起与唤醒线程LockSupport工具类
  • Java并发包源码学习系列:JDK1.8的ConcurrentHashMap源码解析
  • Java并发包源码学习系列:阻塞队列BlockingQueue及实现原理分析
  • Java并发包源码学习系列:阻塞队列实现之ArrayBlockingQueue源码解析

LinkedBlockingQueue概述

LinkedBlockingQueue是由单链表构成的界限可选的阻塞队列,如不指定边界,则为Integer.MAX_VALUE,因此如不指定边界,一般来说,插入的时候都会成功。

LinkedBlockingQueue支持FIFO先进先出的次序对元素进行排序。

类图结构及重要字段

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;// 单链表节点static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}/** 容量,如果不指定就是Integer.MAX_VALUE */private final int capacity;/** 原子变量,记录元素个数 */private final AtomicInteger count = new AtomicInteger();/*** 哨兵头节点,head.next才是队列的第一个元素*/transient Node<E> head;/*** 指向最后一个元素*/private transient Node<E> last;/** 用来控制同时只有一个线程可以从队头获取元素 */private final ReentrantLock takeLock = new ReentrantLock();/** 条件队列,队列为空时,执行出队take操作的线程将会被置入该条件队列 */private final Condition notEmpty = takeLock.newCondition();/** 用来控制同时只有一个线程可以从队尾插入元素 */private final ReentrantLock putLock = new ReentrantLock();/** 条件队列,队列满时,执行入队操作put的线程将会被置入该条件队列 */private final Condition notFull = putLock.newCondition();
}
  • 单向链表实现,维护head和last两个Node节点,head是哨兵节点,head.next是第一个真正的元素,last指向队尾节点。
  • 队列中的元素通过AtomicInteger类型的原子变量count记录。
  • 维护两把锁:takeLock保证同时只有一个线程可以从对头获取元素,putLock保证只有一个线程可以在队尾插入元素。
  • 维护两个条件变量:notEmpty和notFull,维护条件队列,用以存放入队出队阻塞的线程。

如果希望获取一个元素,需要先获取takeLock锁,且notEmpty条件成立。

如果希望插入一个元素,需要先获取putLock锁,且notFull条件成立。

构造器

使用LinkedBlockingQueue的时候,可以指定容量,也可以使用默认的Integer.MAX_VALUE,几乎就是无界的了,当然,也可以传入集合对象,直接构造。

	// 如果不指定容量,默认容量为Integer.MAX_VALUE  (1 << 30) - 1public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}// 传入指定的容量public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;// 初始化last 和 head指针last = head = new Node<E>(null);}// 传入指定集合对象,容量视为Integer.MAX_VALUE,直接构造queuepublic LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);// 写线程获取putLockfinal ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}

出队和入队操作

队列的操作最核心的部分莫过于入队和出队了,后面分析的方法基本上都基于这两个工具方法。

LinkedBlockingQueue的出队和入队相对ArrayBlockingQueue来说就简单很多啦:

入队enqueue

    private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;}
  1. 将node连接到last的后面。
  2. 更新last指针的位置,指向node。

出队dequeue

    private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first; // head向后移一位E x = first.item;first.item = null;return x;}

队列中的元素实际上是从head.first开始的,那么移除队头,其实就是将head指向head.next即可。

阻塞式操作

E take() 阻塞式获取

take操作将会获取当前队列头部元素并移除,如果队列为空则阻塞当前线程直到队列不为空,退出阻塞时返回获取的元素。

如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。

    public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;// 首先要获取takeLockfinal ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 如果队列为空, notEmpty不满足,就等着while (count.get() == 0) {notEmpty.await();}// 出队x = dequeue();// c先赋值为count的值, count 减 1c = count.getAndDecrement();// 这次出队后至少还有一个元素,唤醒notEmpty中的读线程if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// c == capacity 表示在该元素出队之前,队列是满的if (c == capacity)// 因为在这之前队列是满的,可能会有写线程在等着,这里做个唤醒signalNotFull();return x;}// 用于唤醒写线程private void signalNotFull() {final ReentrantLock putLock = this.putLock;// 获取putLockputLock.lock();try {notFull.signal();} finally {putLock.unlock();}}

void put(E e) 阻塞式插入

put操作将向队尾插入元素,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列不满。

如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。

    public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// 所有的插入操作 都约定 本地变量c 作为是否失败的标识int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 插入操作获取 putLockputLock.lockInterruptibly();try {// 队列满,这时notFull条件不满足,awaitwhile (count.get() == capacity) {notFull.await();}enqueue(node);// c先返回count的值 , 原子变量 + 1 ,c = count.getAndIncrement();// 至少还有一个空位可以插入,notFull条件是满足的,唤醒它if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// c == 0 表示在该元素入队之前,队列是空的if (c == 0)// 因为在这之前队列是空的,可能会有读线程在等着,这里做个唤醒signalNotEmpty();}// 用于唤醒读线程private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;// 获取takeLocktakeLock.lock();try {// 唤醒notEmpty.signal();} finally {takeLock.unlock();}}

E poll(timeout, unit) 阻塞式超时获取

在take阻塞式获取方法的基础上额外增加超时功能,传入一个timeout,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回null。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 这里就是超时机制的逻辑所在while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

boolean offer(e, timeout, unit) 阻塞式超时插入

在put阻塞式插入方法的基础上额外增加超时功能,传入一个timeout,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回null。

    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}

其他常规操作

boolean offer(E e)

offer(E e)是非阻塞的方法,向队尾插入一个元素,如果队列未满,则插入成功并返回true;如果队列已满则丢弃当前元素,并返回false。

    public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;// 此时队列已满,直接返回falseif (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);// 插入操作 获取putLockfinal ReentrantLock putLock = this.putLock;putLock.lock();try {// 加锁后再校验一次if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0; // 只要不是-1,就代表成功~}

E poll()

从队列头部获取并移除第一个元素,如果队列为空则返回null。

    public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {// 如果队列不为空,则出队, 并递减计数if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

E peek()

瞅一瞅队头的元素是啥,如果队列为空,则返回null。

    public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {// 实际上第一个元素是head.nextNode<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}

Boolean remove(Object o)

移除队列中与元素o相等【指的是equals方法判定相同】的元素,移除成功返回true,如果队列为空或没有匹配元素,则返回false。

    public boolean remove(Object o) {if (o == null) return false;fullyLock();try {// trail 和 p 同时向后遍历, 如果p匹配了,就让trail.next = p.next代表移除pfor (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}}// trail为p的前驱, 希望移除p节点void unlink(Node<E> p, Node<E> trail) {// assert isFullyLocked();// p.next is not changed, to allow iterators that are// traversing p to maintain their weak-consistency guarantee.p.item = null;trail.next = p.next;// 移除p// 如果p已经是最后一个节点了,就更新一下lastif (last == p)last = trail;// 移除一个节点之后,队列从满到未满, 唤醒notFullif (count.getAndDecrement() == capacity)notFull.signal();}//----- 多个锁 获取和释放的顺序是 相反的// 同时上锁void fullyLock() {putLock.lock();takeLock.lock();}// 同时解锁void fullyUnlock() {takeLock.unlock();putLock.unlock();}

总结

  • LinkedBlockingQueue是由单链表构成的界限可选的阻塞队列,如不指定边界,则为Integer.MAX_VALUE,因此如不指定边界,一般来说,插入的时候都会成功。
  • 维护两把锁:takeLock保证同时只有一个线程可以从对头获取元素,putLock保证只有一个线程可以在队尾插入元素。
  • 维护两个条件变量:notEmpty和notFull,维护条件队列,用以存放入队出队阻塞的线程。

如果希望获取一个元素,需要先获取takeLock锁,且notEmpty条件成立。

如果希望插入一个元素,需要先获取putLock锁,且notFull条件成立。

参考阅读

  • 《Java并发编程之美》
  • 《Java并发编程的艺术》

这篇关于Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/582121

相关文章

SpringBoot整合liteflow的详细过程

《SpringBoot整合liteflow的详细过程》:本文主要介绍SpringBoot整合liteflow的详细过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...  liteflow 是什么? 能做什么?总之一句话:能帮你规范写代码逻辑 ,编排并解耦业务逻辑,代码

JavaSE正则表达式用法总结大全

《JavaSE正则表达式用法总结大全》正则表达式就是由一些特定的字符组成,代表的是一个规则,:本文主要介绍JavaSE正则表达式用法的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录常用的正则表达式匹配符正则表China编程达式常用的类Pattern类Matcher类PatternSynta

Spring Security中用户名和密码的验证完整流程

《SpringSecurity中用户名和密码的验证完整流程》本文给大家介绍SpringSecurity中用户名和密码的验证完整流程,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定... 首先创建了一个UsernamePasswordAuthenticationTChina编程oken对象,这是S

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

Java MQTT实战应用

《JavaMQTT实战应用》本文详解MQTT协议,涵盖其发布/订阅机制、低功耗高效特性、三种服务质量等级(QoS0/1/2),以及客户端、代理、主题的核心概念,最后提供Linux部署教程、Sprin... 目录一、MQTT协议二、MQTT优点三、三种服务质量等级四、客户端、代理、主题1. 客户端(Clien

Java中调用数据库存储过程的示例代码

《Java中调用数据库存储过程的示例代码》本文介绍Java通过JDBC调用数据库存储过程的方法,涵盖参数类型、执行步骤及数据库差异,需注意异常处理与资源管理,以优化性能并实现复杂业务逻辑,感兴趣的朋友... 目录一、存储过程概述二、Java调用存储过程的基本javascript步骤三、Java调用存储过程示

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

Spring 框架之Springfox使用详解

《Spring框架之Springfox使用详解》Springfox是Spring框架的API文档工具,集成Swagger规范,自动生成文档并支持多语言/版本,模块化设计便于扩展,但存在版本兼容性、性... 目录核心功能工作原理模块化设计使用示例注意事项优缺点优点缺点总结适用场景建议总结Springfox 是