源码分析-PriorityBlockingQueue

2024-06-10 18:18

本文主要是介绍源码分析-PriorityBlockingQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

PriorityBLockingQueue-文档部分

doc文档

PriorityBlockingQueue是无界的阻塞队列。当然如果资源耗尽的看情况下也是会出现添加失败的情况。PriorityBlockingQueue提供的迭代器并不保证按照某一顺序顺序迭代所有元素(和有限队列一致,可以见另一篇博客源码分析-PriorityQueue)(它会先按数组迭代,然后再迭代漏掉的元素)。当两个元素相等的时候并不保证陷入先出顺序。所以如果需要保证先入先出顺序必须要保证

源码实现文档

这里使用了一个给予数组的堆实现,所有公共方法都知识用一个锁实现,但是在调整大小的阶段使用了一个自旋锁,以避免并发申请调整大小。这样可以避免等待的消费者的推迟,以及随之而来的元素重建问题。由于需要在申请新内存的过程中退出锁,所以不可以将任务简单的委托给一个PriorityQueue来完成。为了保证兼容性,在序列话的过程中使用了锁。为了增加保证兼容性。在序列化的过程中需要付出双倍的代价(先转换成一个PriorityQueue然后在序列化)

概述

大部分的内容都和之前的PriorityQueue类似。与PriorityQueue的区别主要体现在两点上,一个是在出队和入队的过程中增加了BlockingQueue的操作。另外一个在resize 的过程中使用了自旋锁。

    private static final int DEFAULT_INITIAL_CAPACITY = 11;private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty;private transient volatile int allocationSpinLock;private PriorityQueue q;

域的内容比较清楚,只做几点说明

首先一个MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;为什么要限制位Integer.MAX_VALUE-8,为什么要限制为-8。网上有人解释说这是因为有一部分虚拟机的实现是需要使用8位来储存size。在很多地方都出现了,这里做一下解释。
lock锁及其绑定的Condition是用来做阻塞操作的。因为这里是一个无界的队列所以这里只有一个状态变量。用于阻塞take。
而allocationSpinkLock是一个volatile的变量,用这个结合cas来进行自旋。

主要方法

构造器

构造器有很多,这里只看一个使用collection进行构造的构造器

    public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true;  // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify();}

这里使用两个boolean值来判断是否满足条件。
heapify表示是否需要进行构造堆。而screen表示是否需要排除null。
然后对这依照这两个变量分情况讨论。如果是sortSet类型,说明有序,则提取其comparator。并使heapify置位(后续不需要构造堆,因为递增序列是堆的一种特殊情况),如果是Priority自身拷贝当然两者都置为false表示都不需要进行。
然后使用Collection的toArray方法转换成Object[]并进行类型检查。并对各项参数进行复制。
heapify方法和之前PriorityQueue的是一样的,对数组的前半部分元素进行下滤操作。

阻塞方法的实现

非阻塞的put方法

BlockingQueue方法的实现都大同小异这里看一下put和take的实现:

  public void put(E e) {offer(e); // never need to block}

首先由于这是无界队列,所以不需要阻塞,只是讲任务委托给了offer来实现。

    public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}

因为这里没有阻塞,所以这里的锁完全是来进行同步的,不需要条件变量。
加锁后进行大小检查,如果需要增长则调用tryGrow。
否则则将新进入的元素放入队尾,然后进行上移操作。并唤醒notEmpty条件变量。

膨胀及自旋锁

        private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}

首先退出锁。
首先这里申请了一个新的引用newArray。然后使用CAS自旋操作使得volatile的allocationSplinLock变成1。
对于自旋成功的线程,会计算出一个新的堆的尺寸,然后建立一个新的数组,并且在最后使得allocationSplinLock变回0.
对于自旋失败的数组,会让步,这里很好理解,因为下一步就是阻塞,所以即使这里争取到了线程也没有太大的意义。因为其newArray是空,而后退出tryGrow,但是该线程的增长并没有成功所以在offer的循环检查中(n = size) >= (cap = (array = queue).length)这里会失败所以需要重新进入tryGrow。这样做没有意义。
而yield会使得成功创建新数组的线程更容易获得锁,然后会使得queue指向新建的数组,这样对于自旋失败的线程通常来说会在自旋成功的线程之后获得锁,这样在offer中的自旋检测中成功通过,这样就避免了一些重复的操作。以提供更高的效率。
这里还需要说明的是yield,并不是100%会让出线程的,这里的yield只是让这个概率更大一些。

阻塞的take方法

    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}

take方法则是常规的阻塞操作会重复检查notEmpty变量。
deque是典型的出队操作,类似PriorityQueue。

这篇关于源码分析-PriorityBlockingQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1048904

相关文章

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,

Java Stream的distinct去重原理分析

《JavaStream的distinct去重原理分析》Javastream中的distinct方法用于去除流中的重复元素,它返回一个包含过滤后唯一元素的新流,该方法会根据元素的hashcode和eq... 目录一、distinct 的基础用法与核心特性二、distinct 的底层实现原理1. 顺序流中的去重

关于MyISAM和InnoDB对比分析

《关于MyISAM和InnoDB对比分析》:本文主要介绍关于MyISAM和InnoDB对比分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录开篇:从交通规则看存储引擎选择理解存储引擎的基本概念技术原理对比1. 事务支持:ACID的守护者2. 锁机制:并发控制的艺

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性