源码分析-PriorityBlockingQueue

2024-06-10 18:18

本文主要是介绍源码分析-PriorityBlockingQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

PriorityBLockingQueue-文档部分

doc文档

PriorityBlockingQueue是无界的阻塞队列。当然如果资源耗尽的看情况下也是会出现添加失败的情况。PriorityBlockingQueue提供的迭代器并不保证按照某一顺序顺序迭代所有元素(和有限队列一致,可以见另一篇博客源码分析-PriorityQueue)(它会先按数组迭代,然后再迭代漏掉的元素)。当两个元素相等的时候并不保证陷入先出顺序。所以如果需要保证先入先出顺序必须要保证

源码实现文档

这里使用了一个给予数组的堆实现,所有公共方法都知识用一个锁实现,但是在调整大小的阶段使用了一个自旋锁,以避免并发申请调整大小。这样可以避免等待的消费者的推迟,以及随之而来的元素重建问题。由于需要在申请新内存的过程中退出锁,所以不可以将任务简单的委托给一个PriorityQueue来完成。为了保证兼容性,在序列话的过程中使用了锁。为了增加保证兼容性。在序列化的过程中需要付出双倍的代价(先转换成一个PriorityQueue然后在序列化)

概述

大部分的内容都和之前的PriorityQueue类似。与PriorityQueue的区别主要体现在两点上,一个是在出队和入队的过程中增加了BlockingQueue的操作。另外一个在resize 的过程中使用了自旋锁。

    private static final int DEFAULT_INITIAL_CAPACITY = 11;private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty;private transient volatile int allocationSpinLock;private PriorityQueue q;

域的内容比较清楚,只做几点说明

首先一个MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;为什么要限制位Integer.MAX_VALUE-8,为什么要限制为-8。网上有人解释说这是因为有一部分虚拟机的实现是需要使用8位来储存size。在很多地方都出现了,这里做一下解释。
lock锁及其绑定的Condition是用来做阻塞操作的。因为这里是一个无界的队列所以这里只有一个状态变量。用于阻塞take。
而allocationSpinkLock是一个volatile的变量,用这个结合cas来进行自旋。

主要方法

构造器

构造器有很多,这里只看一个使用collection进行构造的构造器

    public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true;  // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify();}

这里使用两个boolean值来判断是否满足条件。
heapify表示是否需要进行构造堆。而screen表示是否需要排除null。
然后对这依照这两个变量分情况讨论。如果是sortSet类型,说明有序,则提取其comparator。并使heapify置位(后续不需要构造堆,因为递增序列是堆的一种特殊情况),如果是Priority自身拷贝当然两者都置为false表示都不需要进行。
然后使用Collection的toArray方法转换成Object[]并进行类型检查。并对各项参数进行复制。
heapify方法和之前PriorityQueue的是一样的,对数组的前半部分元素进行下滤操作。

阻塞方法的实现

非阻塞的put方法

BlockingQueue方法的实现都大同小异这里看一下put和take的实现:

  public void put(E e) {offer(e); // never need to block}

首先由于这是无界队列,所以不需要阻塞,只是讲任务委托给了offer来实现。

    public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}

因为这里没有阻塞,所以这里的锁完全是来进行同步的,不需要条件变量。
加锁后进行大小检查,如果需要增长则调用tryGrow。
否则则将新进入的元素放入队尾,然后进行上移操作。并唤醒notEmpty条件变量。

膨胀及自旋锁

        private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}

首先退出锁。
首先这里申请了一个新的引用newArray。然后使用CAS自旋操作使得volatile的allocationSplinLock变成1。
对于自旋成功的线程,会计算出一个新的堆的尺寸,然后建立一个新的数组,并且在最后使得allocationSplinLock变回0.
对于自旋失败的数组,会让步,这里很好理解,因为下一步就是阻塞,所以即使这里争取到了线程也没有太大的意义。因为其newArray是空,而后退出tryGrow,但是该线程的增长并没有成功所以在offer的循环检查中(n = size) >= (cap = (array = queue).length)这里会失败所以需要重新进入tryGrow。这样做没有意义。
而yield会使得成功创建新数组的线程更容易获得锁,然后会使得queue指向新建的数组,这样对于自旋失败的线程通常来说会在自旋成功的线程之后获得锁,这样在offer中的自旋检测中成功通过,这样就避免了一些重复的操作。以提供更高的效率。
这里还需要说明的是yield,并不是100%会让出线程的,这里的yield只是让这个概率更大一些。

阻塞的take方法

    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}

take方法则是常规的阻塞操作会重复检查notEmpty变量。
deque是典型的出队操作,类似PriorityQueue。

这篇关于源码分析-PriorityBlockingQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1048904

相关文章

基于Go语言实现Base62编码的三种方式以及对比分析

《基于Go语言实现Base62编码的三种方式以及对比分析》Base62编码是一种在字符编码中使用62个字符的编码方式,在计算机科学中,,Go语言是一种静态类型、编译型语言,它由Google开发并开源,... 目录一、标准库现状与解决方案1. 标准库对比表2. 解决方案完整实现代码(含边界处理)二、关键实现细

PostgreSQL 序列(Sequence) 与 Oracle 序列对比差异分析

《PostgreSQL序列(Sequence)与Oracle序列对比差异分析》PostgreSQL和Oracle都提供了序列(Sequence)功能,但在实现细节和使用方式上存在一些重要差异,... 目录PostgreSQL 序列(Sequence) 与 oracle 序列对比一 基本语法对比1.1 创建序

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

慢sql提前分析预警和动态sql替换-Mybatis-SQL

《慢sql提前分析预警和动态sql替换-Mybatis-SQL》为防止慢SQL问题而开发的MyBatis组件,该组件能够在开发、测试阶段自动分析SQL语句,并在出现慢SQL问题时通过Ducc配置实现动... 目录背景解决思路开源方案调研设计方案详细设计使用方法1、引入依赖jar包2、配置组件XML3、核心配

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

Java程序进程起来了但是不打印日志的原因分析

《Java程序进程起来了但是不打印日志的原因分析》:本文主要介绍Java程序进程起来了但是不打印日志的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java程序进程起来了但是不打印日志的原因1、日志配置问题2、日志文件权限问题3、日志文件路径问题4、程序

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字