死磕 java同步系列之ReentrantReadWriteLock源码解析

2024-03-14 13:18

本文主要是介绍死磕 java同步系列之ReentrantReadWriteLock源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题

(1)读写锁是什么?

(2)读写锁具有哪些特性?

(3)ReentrantReadWriteLock是怎么实现读写锁的?

(4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap?

简介

读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。

特性

读写锁具有以下特性:

是否互斥

可以看到,读写锁除了读读不互斥,读写、写读、写写都是互斥的。

那么,ReentrantReadWriteLock是怎么实现读写锁的呢?

类结构

在看源码之前,我们还是先来看一下ReentrantReadWriteLock这个类的主要结构。

ReentrantReadWriteLock

ReentrantReadWriteLock中的类分成三个部分:

(1)ReentrantReadWriteLock本身实现了ReadWriteLock接口,这个接口只提供了两个方法readLock()writeLock()

(2)同步器,包含一个继承了AQS的Sync内部类,以及其两个子类FairSync和NonfairSync;

(3)ReadLock和WriteLock两个内部类实现了Lock接口,它们具有锁的一些特性。

源码分析

主要属性

// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;

维护了读锁、写锁和同步器。

主要构造方法

// 默认构造方法
public ReentrantReadWriteLock() {this(false);
}
// 是否使用公平锁的构造方法
public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}

它提供了两个构造方法,默认构造方法使用的是非公平锁模式,在构造方法中初始化了读锁和写锁。

获取读锁和写锁的方法

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

属性中的读锁和写锁是私有属性,通过这两个方法暴露出去。

下面我们主要分析读锁和写锁的加锁、解锁方法,且都是基于非公平模式的。

ReadLock.lock()

// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
public final void acquireShared(int arg) {// 尝试获取共享锁(返回1表示成功,返回-1表示失败)if (tryAcquireShared(arg) < 0)// 失败了就可能要排队doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 状态变量的值// 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数int c = getState();// 互斥锁的次数// 如果其它线程获得了写锁,直接返回-1if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 读锁被获取的次数int r = sharedCount(c);// 下面说明此时还没有写锁,尝试去更新state的值获取读锁// 读者是否需要排队(是否是公平模式)if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 获取读锁成功if (r == 0) {// 如果之前还没有线程获取读锁// 记录第一个读者为当前线程firstReader = current;// 第一个读者重入的次数为1firstReaderHoldCount = 1;} else if (firstReader == current) {// 如果有线程获取了读锁且是当前线程是第一个读者// 则把其重入次数加1firstReaderHoldCount++;} else {// 如果有线程获取了读锁且当前线程不是第一个读者// 则从缓存中获取重入次数保存器HoldCounter rh = cachedHoldCounter;// 如果缓存不属性当前线程// 再从ThreadLocal中获取// readHolds本身是一个ThreadLocal,里面存储的是HoldCounterif (rh == null || rh.tid != getThreadId(current))// get()的时候会初始化rhcachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)// 如果rh的次数为0,把它放到ThreadLocal中去readHolds.set(rh);// 重入的次数加1(初始次数为0)rh.count++;}// 获取读锁成功,返回1return 1;}// 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败)return fullTryAcquireShared(current);
}
// AbstractQueuedSynchronizer.doAcquireShared()
private void doAcquireShared(int arg) {// 进入AQS的队列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {// 当前节点的前一个节点final Node p = node.predecessor();// 如果前一个节点是头节点(说明是第一个排队的节点)if (p == head) {// 再次尝试获取读锁int r = tryAcquireShared(arg);// 如果成功了if (r >= 0) {// 头节点后移并传播// 传播即唤醒后面连续的读节点setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 没获取到读锁,阻塞并等待被唤醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {// h为旧的头节点Node h = head;// 设置当前节点为新头节点setHead(node);// 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE)if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 需要传播// 取下一个节点Node s = node.next;// 如果下一个节点为空,或者是需要获取读锁的节点if (s == null || s.isShared())// 唤醒下一个节点doReleaseShared();}
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 这个方法只会唤醒一个节点
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果头节点状态为SIGNAL,说明要唤醒下一个节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒下一个节点unparkSuccessor(h);}else if (ws == 0 &&// 把头节点的状态改为PROPAGATE成功才会跳到下面的if!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 如果唤醒后head没变,则跳出循环if (h == head)                   // loop if head changedbreak;}
}

看完【死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁】的分析再看这章的内容应该会比较简单,中间一样的方法我们这里直接跳过了。

我们来看看大致的逻辑:

(1)先尝试获取读锁;

(2)如果成功了直接结束;

(3)如果失败了,进入doAcquireShared()方法;

(4)doAcquireShared()方法中首先会生成一个新节点并进入AQS队列中;

(5)如果头节点正好是当前节点的上一个节点,再次尝试获取锁;

(6)如果成功了,则设置头节点为新节点,并传播;

(7)传播即唤醒下一个读节点(如果下一个节点是读节点的话);

(8)如果头节点不是当前节点的上一个节点或者(5)失败,则阻塞当前线程等待被唤醒;

(9)唤醒之后继续走(5)的逻辑;

在整个逻辑中是在哪里连续唤醒读节点的呢?

答案是在doAcquireShared()方法中,在这里一个节点A获取了读锁后,会唤醒下一个读节点B,这时候B也会获取读锁,然后B继续唤醒C,依次往复,也就是说这里的节点是一个唤醒一个这样的形式,而不是一个节点获取了读锁后一次性唤醒后面所有的读节点。

ReentrantReadWriteLock1

ReadLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {// 如果尝试释放成功了,就唤醒下一个节点if (tryReleaseShared(arg)) {// 这个方法实际是唤醒下一个节点doReleaseShared();return true;}return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// 如果第一个读者(读线程)是当前线程// 就把它重入的次数减1// 如果减到0了就把第一个读者置为空if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {// 如果第一个读者不是当前线程// 一样地,把它重入的次数减1HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {// 共享锁获取的次数减1// 如果减为0了说明完全释放了,才返回trueint c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行为跟方法名有点不符,实际是唤醒下一个节点
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果头节点状态为SIGNAL,说明要唤醒下一个节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒下一个节点unparkSuccessor(h);}else if (ws == 0 &&// 把头节点的状态改为PROPAGATE成功才会跳到下面的if!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 如果唤醒后head没变,则跳出循环if (h == head)                   // loop if head changedbreak;}
}

解锁的大致流程如下:

(1)将当前线程重入的次数减1;

(2)将共享锁总共被获取的次数减1;

(3)如果共享锁获取的次数减为0了,说明共享锁完全释放了,那就唤醒下一个节点;

如下图,ABC三个节点各获取了一次共享锁,三者释放的顺序分别为ACB,那么最后B释放共享锁的时候tryReleaseShared()才会返回true,进而才会唤醒下一个节点D。

ReentrantReadWriteLock2

WriteLock.lock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {// 先尝试获取锁// 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一模一样了if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();// 状态变量state的值int c = getState();// 互斥锁被获取的次数int w = exclusiveCount(c);if (c != 0) {// 如果c!=0且w==0,说明共享锁被获取的次数不为0// 这句话整个的意思就是// 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁)// 那么就返回false,获取写锁失败if (w == 0 || current != getExclusiveOwnerThread())return false;// 溢出检测if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可setState(c + acquires);// 获取写锁成功return true;}// 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false)// 如果失败了,说明获取写锁失败,返回false// 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回trueif (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;
}
// 获取写锁失败了后面的逻辑跟ReentrantLock是一致的,进入队列排队,这里就不列源码了

写锁获取的过程大致如下:

(1)尝试获取锁;

(2)如果有读者占有着读锁,尝试获取写锁失败;

(3)如果有其它线程占有着写锁,尝试获取写锁失败;

(4)如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;

(5)如果没有线程占有着锁(state==0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;

(6)尝试获取锁失败以后,进入队列排队,等待被唤醒;

(7)后续逻辑跟ReentrantLock是一致;

WriteLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {// 如果尝试释放锁成功(完全释放锁)// 就尝试唤醒下一个节点if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {// 如果写锁不是当前线程占有着,抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();// 状态变量的值减1int nextc = getState() - releases;// 是否完全释放锁boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);// 设置状态变量的值setState(nextc);// 如果完全释放了写锁,返回truereturn free;
}

写锁释放的过程大致为:

(1)先尝试释放锁,即状态变量state的值减1;

(2)如果减为0了,说明完全释放了锁;

(3)完全释放了锁才唤醒下一个等待的节点;

总结

(1)ReentrantReadWriteLock采用读写锁的思想,能提高并发的吞吐量;

(2)读锁使用的是共享锁,多个读锁可以一起获取锁,互相不会影响,即读读不互斥;

(3)读写、写读和写写是会互斥的,前者占有着锁,后者需要进入AQS队列中排队;

(4)多个连续的读线程是一个接着一个被唤醒的,而不是一次性唤醒所有读线程;

(5)只有多个读锁都完全释放了才会唤醒下一个写线程;

(6)只有写锁完全释放了才会唤醒下一个等待者,这个等待者有可能是读线程,也可能是写线程;

彩蛋

(1)如果同一个线程先获取读锁,再获取写锁会怎样?

ReentrantReadWriteLock3

分析上图中的代码,在tryAcquire()方法中,如果读锁被获取的次数不为0(c != 0 && w == 0),返回false,返回之后外层方法会让当前线程阻塞。

可以通过下面的方法验证:

readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();

运行程序后会发现代码停止在writeLock.lock();,当然,你也可以打个断点跟踪进去看看。

(2)如果同一个线程先获取写锁,再获取读锁会怎样?

ReentrantReadWriteLock4

分析上面的代码,在tryAcquireShared()方法中,第一个红框处并不会返回,因为不满足getExclusiveOwnerThread() != current;第二个红框处如果原子更新成功就说明获取了读锁,然后就会执行第三个红框处的代码把其重入次数更改为1。

可以通过下面的方法验证:

writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();

你可以打个断点跟踪一下看看。

(3)死锁了么?

通过上面的两个例子,我们可以感受到同一个线程先读后写和先写后读是完全不一样的,为什么不一样呢?

先读后写,一个线程占有读锁后,其它线程还是可以占有读锁的,这时候如果在其它线程占有读锁之前让自己占有了写锁,其它线程又不能占有读锁了,这段程序会非常难实现,逻辑也很奇怪,所以,设计成只要一个线程占有了读锁,其它线程包括它自己都不能再获取写锁。

先写后读,一个线程占有写锁后,其它线程是不能占有任何锁的,这时候,即使自己占有一个读锁,对程序的逻辑也不会有任何影响,所以,一个线程占有写锁后是可以再占有读锁的,只是这个时候其它线程依然无法获取读锁。

如果你仔细思考上面的逻辑,你会发现一个线程先占有读锁后占有写锁,会有一个很大的问题——锁无法被释放也无法被获取了。这个线程先占有了读锁,然后自己再占有写锁的时候会阻塞,然后它就自己把自己搞死了,进而把其它线程也搞死了,它无法释放锁,其它线程也无法获得锁了。

这是死锁吗?似乎不是,死锁的定义是线程A占有着线程B需要的资源,线程B占有着线程A需要的资源,两个线程相互等待对方释放资源,经典的死锁例子如下:

Object a = new Object();
Object b = new Object();new Thread(()->{synchronized (a) {LockSupport.parkNanos(1000000);synchronized (b) {}}
}).start();new Thread(()->{synchronized (b) {synchronized (a) {}}
}).start();

简单的死锁用jstack是可以看到的:

"Thread-1":at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40)- waiting to lock <0x000000076baa9068> (a java.lang.Object)- locked <0x000000076baa9078> (a java.lang.Object)at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)
"Thread-0":at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32)- waiting to lock <0x000000076baa9078> (a java.lang.Object)- locked <0x000000076baa9068> (a java.lang.Object)at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.

(4)如何使用ReentrantReadWriteLock实现一个高效安全的TreeMap?

class SafeTreeMap {private final Map<String, Object> m = new TreeMap<String, Object>();private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private final Lock readLock = lock.readLock();private final Lock writeLock = lock.writeLock();public Object get(String key) {readLock.lock();try {return m.get(key);} finally {readLock.unlock();}}public Object put(String key, Object value) {writeLock.lock();try {return m.put(key, value);} finally {writeLock.unlock();}}
}

推荐阅读

  1. 死磕 java同步系列之ReentrantLock VS synchronized

  2. 死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

  3. 死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

  4. 死磕 java同步系列之AQS起篇

  5. 死磕 java同步系列之自己动手写一个锁Lock

  6. 死磕 java魔法类之Unsafe解析

  7. 死磕 java同步系列之JMM(Java Memory Model)

  8. 死磕 java同步系列之volatile解析

  9. 死磕 java同步系列之synchronized解析

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

qrcode

这篇关于死磕 java同步系列之ReentrantReadWriteLock源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/808537

相关文章

javax.net.ssl.SSLHandshakeException:异常原因及解决方案

《javax.net.ssl.SSLHandshakeException:异常原因及解决方案》javax.net.ssl.SSLHandshakeException是一个SSL握手异常,通常在建立SS... 目录报错原因在程序中绕过服务器的安全验证注意点最后多说一句报错原因一般出现这种问题是因为目标服务器

CSS place-items: center解析与用法详解

《CSSplace-items:center解析与用法详解》place-items:center;是一个强大的CSS简写属性,用于同时控制网格(Grid)和弹性盒(Flexbox)... place-items: center; 是一个强大的 css 简写属性,用于同时控制 网格(Grid) 和 弹性盒(F

Java实现删除文件中的指定内容

《Java实现删除文件中的指定内容》在日常开发中,经常需要对文本文件进行批量处理,其中,删除文件中指定内容是最常见的需求之一,下面我们就来看看如何使用java实现删除文件中的指定内容吧... 目录1. 项目背景详细介绍2. 项目需求详细介绍2.1 功能需求2.2 非功能需求3. 相关技术详细介绍3.1 Ja

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再

spring中的ImportSelector接口示例详解

《spring中的ImportSelector接口示例详解》Spring的ImportSelector接口用于动态选择配置类,实现条件化和模块化配置,关键方法selectImports根据注解信息返回... 目录一、核心作用二、关键方法三、扩展功能四、使用示例五、工作原理六、应用场景七、自定义实现Impor

SpringBoot3应用中集成和使用Spring Retry的实践记录

《SpringBoot3应用中集成和使用SpringRetry的实践记录》SpringRetry为SpringBoot3提供重试机制,支持注解和编程式两种方式,可配置重试策略与监听器,适用于临时性故... 目录1. 简介2. 环境准备3. 使用方式3.1 注解方式 基础使用自定义重试策略失败恢复机制注意事项

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

python常见环境管理工具超全解析

《python常见环境管理工具超全解析》在Python开发中,管理多个项目及其依赖项通常是一个挑战,下面:本文主要介绍python常见环境管理工具的相关资料,文中通过代码介绍的非常详细,需要的朋友... 目录1. conda2. pip3. uvuv 工具自动创建和管理环境的特点4. setup.py5.

全面解析HTML5中Checkbox标签

《全面解析HTML5中Checkbox标签》Checkbox是HTML5中非常重要的表单元素之一,通过合理使用其属性和样式自定义方法,可以为用户提供丰富多样的交互体验,这篇文章给大家介绍HTML5中C... 在html5中,Checkbox(复选框)是一种常用的表单元素,允许用户在一组选项中选择多个项目。本