死磕 java同步系列之ReentrantReadWriteLock源码解析

2024-03-14 13:18

本文主要是介绍死磕 java同步系列之ReentrantReadWriteLock源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题

(1)读写锁是什么?

(2)读写锁具有哪些特性?

(3)ReentrantReadWriteLock是怎么实现读写锁的?

(4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap?

简介

读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。

特性

读写锁具有以下特性:

是否互斥

可以看到,读写锁除了读读不互斥,读写、写读、写写都是互斥的。

那么,ReentrantReadWriteLock是怎么实现读写锁的呢?

类结构

在看源码之前,我们还是先来看一下ReentrantReadWriteLock这个类的主要结构。

ReentrantReadWriteLock

ReentrantReadWriteLock中的类分成三个部分:

(1)ReentrantReadWriteLock本身实现了ReadWriteLock接口,这个接口只提供了两个方法readLock()writeLock()

(2)同步器,包含一个继承了AQS的Sync内部类,以及其两个子类FairSync和NonfairSync;

(3)ReadLock和WriteLock两个内部类实现了Lock接口,它们具有锁的一些特性。

源码分析

主要属性

// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;

维护了读锁、写锁和同步器。

主要构造方法

// 默认构造方法
public ReentrantReadWriteLock() {this(false);
}
// 是否使用公平锁的构造方法
public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}

它提供了两个构造方法,默认构造方法使用的是非公平锁模式,在构造方法中初始化了读锁和写锁。

获取读锁和写锁的方法

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

属性中的读锁和写锁是私有属性,通过这两个方法暴露出去。

下面我们主要分析读锁和写锁的加锁、解锁方法,且都是基于非公平模式的。

ReadLock.lock()

// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
public final void acquireShared(int arg) {// 尝试获取共享锁(返回1表示成功,返回-1表示失败)if (tryAcquireShared(arg) < 0)// 失败了就可能要排队doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 状态变量的值// 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数int c = getState();// 互斥锁的次数// 如果其它线程获得了写锁,直接返回-1if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 读锁被获取的次数int r = sharedCount(c);// 下面说明此时还没有写锁,尝试去更新state的值获取读锁// 读者是否需要排队(是否是公平模式)if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 获取读锁成功if (r == 0) {// 如果之前还没有线程获取读锁// 记录第一个读者为当前线程firstReader = current;// 第一个读者重入的次数为1firstReaderHoldCount = 1;} else if (firstReader == current) {// 如果有线程获取了读锁且是当前线程是第一个读者// 则把其重入次数加1firstReaderHoldCount++;} else {// 如果有线程获取了读锁且当前线程不是第一个读者// 则从缓存中获取重入次数保存器HoldCounter rh = cachedHoldCounter;// 如果缓存不属性当前线程// 再从ThreadLocal中获取// readHolds本身是一个ThreadLocal,里面存储的是HoldCounterif (rh == null || rh.tid != getThreadId(current))// get()的时候会初始化rhcachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)// 如果rh的次数为0,把它放到ThreadLocal中去readHolds.set(rh);// 重入的次数加1(初始次数为0)rh.count++;}// 获取读锁成功,返回1return 1;}// 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败)return fullTryAcquireShared(current);
}
// AbstractQueuedSynchronizer.doAcquireShared()
private void doAcquireShared(int arg) {// 进入AQS的队列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {// 当前节点的前一个节点final Node p = node.predecessor();// 如果前一个节点是头节点(说明是第一个排队的节点)if (p == head) {// 再次尝试获取读锁int r = tryAcquireShared(arg);// 如果成功了if (r >= 0) {// 头节点后移并传播// 传播即唤醒后面连续的读节点setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 没获取到读锁,阻塞并等待被唤醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {// h为旧的头节点Node h = head;// 设置当前节点为新头节点setHead(node);// 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE)if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 需要传播// 取下一个节点Node s = node.next;// 如果下一个节点为空,或者是需要获取读锁的节点if (s == null || s.isShared())// 唤醒下一个节点doReleaseShared();}
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 这个方法只会唤醒一个节点
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果头节点状态为SIGNAL,说明要唤醒下一个节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒下一个节点unparkSuccessor(h);}else if (ws == 0 &&// 把头节点的状态改为PROPAGATE成功才会跳到下面的if!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 如果唤醒后head没变,则跳出循环if (h == head)                   // loop if head changedbreak;}
}

看完【死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁】的分析再看这章的内容应该会比较简单,中间一样的方法我们这里直接跳过了。

我们来看看大致的逻辑:

(1)先尝试获取读锁;

(2)如果成功了直接结束;

(3)如果失败了,进入doAcquireShared()方法;

(4)doAcquireShared()方法中首先会生成一个新节点并进入AQS队列中;

(5)如果头节点正好是当前节点的上一个节点,再次尝试获取锁;

(6)如果成功了,则设置头节点为新节点,并传播;

(7)传播即唤醒下一个读节点(如果下一个节点是读节点的话);

(8)如果头节点不是当前节点的上一个节点或者(5)失败,则阻塞当前线程等待被唤醒;

(9)唤醒之后继续走(5)的逻辑;

在整个逻辑中是在哪里连续唤醒读节点的呢?

答案是在doAcquireShared()方法中,在这里一个节点A获取了读锁后,会唤醒下一个读节点B,这时候B也会获取读锁,然后B继续唤醒C,依次往复,也就是说这里的节点是一个唤醒一个这样的形式,而不是一个节点获取了读锁后一次性唤醒后面所有的读节点。

ReentrantReadWriteLock1

ReadLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {// 如果尝试释放成功了,就唤醒下一个节点if (tryReleaseShared(arg)) {// 这个方法实际是唤醒下一个节点doReleaseShared();return true;}return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// 如果第一个读者(读线程)是当前线程// 就把它重入的次数减1// 如果减到0了就把第一个读者置为空if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {// 如果第一个读者不是当前线程// 一样地,把它重入的次数减1HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {// 共享锁获取的次数减1// 如果减为0了说明完全释放了,才返回trueint c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行为跟方法名有点不符,实际是唤醒下一个节点
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果头节点状态为SIGNAL,说明要唤醒下一个节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒下一个节点unparkSuccessor(h);}else if (ws == 0 &&// 把头节点的状态改为PROPAGATE成功才会跳到下面的if!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 如果唤醒后head没变,则跳出循环if (h == head)                   // loop if head changedbreak;}
}

解锁的大致流程如下:

(1)将当前线程重入的次数减1;

(2)将共享锁总共被获取的次数减1;

(3)如果共享锁获取的次数减为0了,说明共享锁完全释放了,那就唤醒下一个节点;

如下图,ABC三个节点各获取了一次共享锁,三者释放的顺序分别为ACB,那么最后B释放共享锁的时候tryReleaseShared()才会返回true,进而才会唤醒下一个节点D。

ReentrantReadWriteLock2

WriteLock.lock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {// 先尝试获取锁// 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一模一样了if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();// 状态变量state的值int c = getState();// 互斥锁被获取的次数int w = exclusiveCount(c);if (c != 0) {// 如果c!=0且w==0,说明共享锁被获取的次数不为0// 这句话整个的意思就是// 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁)// 那么就返回false,获取写锁失败if (w == 0 || current != getExclusiveOwnerThread())return false;// 溢出检测if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可setState(c + acquires);// 获取写锁成功return true;}// 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false)// 如果失败了,说明获取写锁失败,返回false// 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回trueif (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;
}
// 获取写锁失败了后面的逻辑跟ReentrantLock是一致的,进入队列排队,这里就不列源码了

写锁获取的过程大致如下:

(1)尝试获取锁;

(2)如果有读者占有着读锁,尝试获取写锁失败;

(3)如果有其它线程占有着写锁,尝试获取写锁失败;

(4)如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;

(5)如果没有线程占有着锁(state==0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;

(6)尝试获取锁失败以后,进入队列排队,等待被唤醒;

(7)后续逻辑跟ReentrantLock是一致;

WriteLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {// 如果尝试释放锁成功(完全释放锁)// 就尝试唤醒下一个节点if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {// 如果写锁不是当前线程占有着,抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();// 状态变量的值减1int nextc = getState() - releases;// 是否完全释放锁boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);// 设置状态变量的值setState(nextc);// 如果完全释放了写锁,返回truereturn free;
}

写锁释放的过程大致为:

(1)先尝试释放锁,即状态变量state的值减1;

(2)如果减为0了,说明完全释放了锁;

(3)完全释放了锁才唤醒下一个等待的节点;

总结

(1)ReentrantReadWriteLock采用读写锁的思想,能提高并发的吞吐量;

(2)读锁使用的是共享锁,多个读锁可以一起获取锁,互相不会影响,即读读不互斥;

(3)读写、写读和写写是会互斥的,前者占有着锁,后者需要进入AQS队列中排队;

(4)多个连续的读线程是一个接着一个被唤醒的,而不是一次性唤醒所有读线程;

(5)只有多个读锁都完全释放了才会唤醒下一个写线程;

(6)只有写锁完全释放了才会唤醒下一个等待者,这个等待者有可能是读线程,也可能是写线程;

彩蛋

(1)如果同一个线程先获取读锁,再获取写锁会怎样?

ReentrantReadWriteLock3

分析上图中的代码,在tryAcquire()方法中,如果读锁被获取的次数不为0(c != 0 && w == 0),返回false,返回之后外层方法会让当前线程阻塞。

可以通过下面的方法验证:

readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();

运行程序后会发现代码停止在writeLock.lock();,当然,你也可以打个断点跟踪进去看看。

(2)如果同一个线程先获取写锁,再获取读锁会怎样?

ReentrantReadWriteLock4

分析上面的代码,在tryAcquireShared()方法中,第一个红框处并不会返回,因为不满足getExclusiveOwnerThread() != current;第二个红框处如果原子更新成功就说明获取了读锁,然后就会执行第三个红框处的代码把其重入次数更改为1。

可以通过下面的方法验证:

writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();

你可以打个断点跟踪一下看看。

(3)死锁了么?

通过上面的两个例子,我们可以感受到同一个线程先读后写和先写后读是完全不一样的,为什么不一样呢?

先读后写,一个线程占有读锁后,其它线程还是可以占有读锁的,这时候如果在其它线程占有读锁之前让自己占有了写锁,其它线程又不能占有读锁了,这段程序会非常难实现,逻辑也很奇怪,所以,设计成只要一个线程占有了读锁,其它线程包括它自己都不能再获取写锁。

先写后读,一个线程占有写锁后,其它线程是不能占有任何锁的,这时候,即使自己占有一个读锁,对程序的逻辑也不会有任何影响,所以,一个线程占有写锁后是可以再占有读锁的,只是这个时候其它线程依然无法获取读锁。

如果你仔细思考上面的逻辑,你会发现一个线程先占有读锁后占有写锁,会有一个很大的问题——锁无法被释放也无法被获取了。这个线程先占有了读锁,然后自己再占有写锁的时候会阻塞,然后它就自己把自己搞死了,进而把其它线程也搞死了,它无法释放锁,其它线程也无法获得锁了。

这是死锁吗?似乎不是,死锁的定义是线程A占有着线程B需要的资源,线程B占有着线程A需要的资源,两个线程相互等待对方释放资源,经典的死锁例子如下:

Object a = new Object();
Object b = new Object();new Thread(()->{synchronized (a) {LockSupport.parkNanos(1000000);synchronized (b) {}}
}).start();new Thread(()->{synchronized (b) {synchronized (a) {}}
}).start();

简单的死锁用jstack是可以看到的:

"Thread-1":at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40)- waiting to lock <0x000000076baa9068> (a java.lang.Object)- locked <0x000000076baa9078> (a java.lang.Object)at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)
"Thread-0":at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32)- waiting to lock <0x000000076baa9078> (a java.lang.Object)- locked <0x000000076baa9068> (a java.lang.Object)at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.

(4)如何使用ReentrantReadWriteLock实现一个高效安全的TreeMap?

class SafeTreeMap {private final Map<String, Object> m = new TreeMap<String, Object>();private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private final Lock readLock = lock.readLock();private final Lock writeLock = lock.writeLock();public Object get(String key) {readLock.lock();try {return m.get(key);} finally {readLock.unlock();}}public Object put(String key, Object value) {writeLock.lock();try {return m.put(key, value);} finally {writeLock.unlock();}}
}

推荐阅读

  1. 死磕 java同步系列之ReentrantLock VS synchronized

  2. 死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

  3. 死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

  4. 死磕 java同步系列之AQS起篇

  5. 死磕 java同步系列之自己动手写一个锁Lock

  6. 死磕 java魔法类之Unsafe解析

  7. 死磕 java同步系列之JMM(Java Memory Model)

  8. 死磕 java同步系列之volatile解析

  9. 死磕 java同步系列之synchronized解析

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

qrcode

这篇关于死磕 java同步系列之ReentrantReadWriteLock源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/808537

相关文章

R语言中的正则表达式深度解析

《R语言中的正则表达式深度解析》正则表达式即使用一个字符串来描述、匹配一系列某个语法规则的字符串,通过特定的字母、数字及特殊符号的灵活组合即可完成对任意字符串的匹配,:本文主要介绍R语言中正则表达... 目录前言一、正则表达式的基本概念二、正则表达式的特殊符号三、R语言中正则表达式的应用实例实例一:查找匹配

springboot控制bean的创建顺序

《springboot控制bean的创建顺序》本文主要介绍了spring-boot控制bean的创建顺序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录1、order注解(不一定有效)2、dependsOn注解(有效)3、提前将bean注册为Bea

Java中的ConcurrentBitSet使用小结

《Java中的ConcurrentBitSet使用小结》本文主要介绍了Java中的ConcurrentBitSet使用小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、核心澄清:Java标准库无内置ConcurrentBitSet二、推荐方案:Eclipse

java中的Supplier接口解析

《java中的Supplier接口解析》Java8引入的Supplier接口是一个无参数函数式接口,通过get()方法延迟计算结果,它适用于按需生成场景,下面就来介绍一下如何使用,感兴趣的可以了解一下... 目录1. 接口定义与核心方法2. 典型使用场景场景1:延迟初始化(Lazy Initializati

Java中ScopeValue的使用小结

《Java中ScopeValue的使用小结》Java21引入的ScopedValue是一种作用域内共享不可变数据的预览API,本文就来详细介绍一下Java中ScopeValue的使用小结,感兴趣的可以... 目录一、Java ScopedValue(作用域值)详解1. 定义与背景2. 核心特性3. 使用方法

spring中Interceptor的使用小结

《spring中Interceptor的使用小结》SpringInterceptor是SpringMVC提供的一种机制,用于在请求处理的不同阶段插入自定义逻辑,通过实现HandlerIntercept... 目录一、Interceptor 的核心概念二、Interceptor 的创建与配置三、拦截器的执行顺

Java中Map的五种遍历方式实现与对比

《Java中Map的五种遍历方式实现与对比》其实Map遍历藏着多种玩法,有的优雅简洁,有的性能拉满,今天咱们盘一盘这些进阶偏基础的遍历方式,告别重复又臃肿的代码,感兴趣的小伙伴可以了解下... 目录一、先搞懂:Map遍历的核心目标二、几种遍历方式的对比1. 传统EntrySet遍历(最通用)2. Lambd

Spring Boot 中 RestTemplate 的核心用法指南

《SpringBoot中RestTemplate的核心用法指南》本文详细介绍了RestTemplate的使用,包括基础用法、进阶配置技巧、实战案例以及最佳实践建议,通过一个腾讯地图路线规划的案... 目录一、环境准备二、基础用法全解析1. GET 请求的三种姿势2. POST 请求深度实践三、进阶配置技巧1

springboot+redis实现订单过期(超时取消)功能的方法详解

《springboot+redis实现订单过期(超时取消)功能的方法详解》在SpringBoot中使用Redis实现订单过期(超时取消)功能,有多种成熟方案,本文为大家整理了几个详细方法,文中的示例代... 目录一、Redis键过期回调方案(推荐)1. 配置Redis监听器2. 监听键过期事件3. Redi

Spring Boot 处理带文件表单的方式汇总

《SpringBoot处理带文件表单的方式汇总》本文详细介绍了六种处理文件上传的方式,包括@RequestParam、@RequestPart、@ModelAttribute、@ModelAttr... 目录方式 1:@RequestParam接收文件后端代码前端代码特点方式 2:@RequestPart接