深入理解 JUC:ReentrantReadWriteLock

上一篇我们分析了 ReentrantLock 锁的设计与实现,对于大部分并发场景来说已经能够满足同步加锁的需求,但是相对于本文将要介绍的 ReentrantReadWriteLock 锁而言,ReentrantLock 主要存在的不足在于不区分读操作和写操作。考虑读多写少的场景,如果将读操作和写操作一视同仁的对待,那么线程之间读操作是互斥的,同一时间只允许一个线程读取数据,势必会影响系统的吞吐性能,并且读操作并不会导致数据的不一致性,这种情况下应该允许多个线程对同一份数据执行并发读取。

ReentrantReadWriteLock 锁在设计上认识到了 ReentrantLock 锁在读多写少场景下的不足,于是分别设计了读锁和写锁,在满足同步加锁需求的基础上支持读锁能够同时被多个线程持有。ReentrantReadWriteLock 锁的特性包括:

  1. 读锁和写锁均是可重入的,并允许多个不同线程同时持有读锁。
  2. 对于不同线程而言,读锁与写锁之间是互斥的,写锁与写锁之间也是互斥的。
  3. 对于相同线程而言,在获取到写锁的前提下,仍然可以获取读锁,反之则不行。
  4. 支持公平锁机制,默认为非公平锁。

关于读写锁不同加锁顺序对应的表现概括如下表:

- 先获取 再获取 表现
相同线程 read lock read lock 重入
相同线程 write lock write lock 重入
相同线程 write lock read lock 重入
相同线程 read lock write lock 死锁
不同线程 read lock read lock 重入
不同线程 write lock write lock 互斥
不同线程 write lock read lock 互斥
不同线程 read lock write lock 互斥

思考 :对于同一个线程而言,为什么在获取到读锁的前提下再去获取写锁会出现死锁?

我们将这种在获取到读锁的情况下再去获取写锁的操作称为锁升级(不太严谨,需要与 synchronized 关键字中锁升级(膨胀)的过程相区分)。之所以 ReentrantReadWriteLock 不允许锁升级操作,首先我们需要再次明确一下 ReentrantReadWriteLock 的基本特性,即读锁能够被多个线程同时持有,但是写锁在同一时间只能被同一个线程持有,并且读锁和写锁对于不同线程而言是互斥的。设想当前有多个线程持有读锁,如果允许锁升级则这些线程在执行过程中都会尝试去获取写锁,其结果必然只能是其中一个线程获取写锁成功,又对于不同线程而言获取读写锁是互斥的,那么此时这些持有读锁的线程将如何处理自己手中持有的读锁对象呢?所以 ReentrantReadWriteLock 的解决方案是直接禁止锁升级行为。然而,作为 ReentrantReadWriteLock 增强版的 StampedLock 则支持由读锁到写锁的切换,关于 StampedLock 的实现原理我们以后再进行分析。

ReentrantReadWriteLock 示例

在开始分析 ReentrantReadWriteLock 的实现内幕之前,我们先以官方示例来回忆一下 ReentrantReadWriteLock 的基本使用,具体实现如下(做了一些小小的改动):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Object data;
private volatile boolean cacheInvalid = true;

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();

void processCachedData() {
readLock.lock();
if (cacheInvalid) {
readLock.unlock(); // 在获取写锁之前必须释放读锁,否则会造成死锁
writeLock.lock();
try {
// 再次校验状态,考虑其它线程可能已经初始化了缓存数据
if (cacheInvalid) {
// 初始化缓存数据
data = new Object();
cacheInvalid = false;
}
// 获取读锁,在持有写锁的情况下,同一个线程仍然可以获取读锁
readLock.lock();
} finally {
// 释放写锁
writeLock.unlock();
}
}

try {
// 使用缓存数据
this.use(data);
} finally {
// 释放读锁
readLock.unlock();
}
}

线程在读取缓存数据 data 之前需要获取读锁(ReadLock),如果判断缓存数据 data 未被初始化,则尝试获取写锁(WriteLock)以初始化缓存数据。因为读锁是共享的,所有上述示例允许多个线程并发读取 data 数据,但是一旦某个线程检测到 data 未被初始化,则尝试获取写锁并更新缓存数据,期间其它线程需要阻塞等待初始化过程的完成。

上述示例还暗含了一个 锁降级 的过程,我们将 在持有写锁的前提下获取到读锁,然后再释放写锁的过程称为锁降级(先释放写锁再获取读锁的过程并不能称为锁降级) 。那么锁降级操作的意义何在呢?简单而言,锁降级通过不同线程间获取读写锁的互斥性,实现当前线程在对数据进行修改之后能够读取到修改后的值,保证期间数据不会被其它线程所更改。

以上述示例为例,当线程完成对缓存数据 data 的更新之后,并没有立即释放写锁,而是先获取到了读锁后再释放写锁,这样就能够避免期间其它线程因为获取到写锁而更新缓存数据。下面对上述示例稍作修改以演示锁降级策略,调度程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();

private volatile boolean cacheInvalid = true;
private int value = 0;

@Test
public void testDowngrade() throws Exception {
CountDownLatch start = new CountDownLatch(1);
CountDownLatch stop = new CountDownLatch(2);
final Thread ta = new Thread(new DowngradeTask(start, stop, 1L), "A");
final Thread tb = new Thread(new DowngradeTask(start, stop, 2L), "B");
ta.start();
tb.start();
start.countDown();
stop.await();
}

private class DowngradeTask implements Runnable {

private final CountDownLatch start;
private final CountDownLatch stop;
private final long sleepSecs;

public DowngradeTask(CountDownLatch start, CountDownLatch stop, long sleepSecs) {
this.start = start;
this.stop = stop;
this.sleepSecs = sleepSecs;
}

@Override
public void run() {
try {
start.await();
TimeUnit.SECONDS.sleep(sleepSecs);
System.out.println("<thread-" + Thread.currentThread().getName() + "> start to run after " + sleepSecs + " seconds");
cacheInvalid = true;
// processCachedDataWithoutDowngrade(RandomUtils.nextInt(1, 100));
processCachedDataWithDownGrade(RandomUtils.nextInt(1, 100));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stop.countDown();
}
}

}

如果不使用锁降级策略,即先释放写锁再获取读锁,对应的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void processCachedDataWithoutDowngrade(int newValue) {
readLock.lock();
if (cacheInvalid) {
readLock.unlock();
writeLock.lock();
try {
if (cacheInvalid) {
value = newValue;
System.out.println("<thread-" + Thread.currentThread().getName() + "> update value as " + value);
cacheInvalid = true;
}
} finally {
writeLock.unlock();
}
}
try {
/*
* 模拟处理数据的过程,期间 cacheInvalid 可能会被其它线程修改以期望修改数据,
* 没有锁降级保证,其它线程期间可能获取到写锁并更改数据,导致当前线程之前看到的数据发生变化
*/
TimeUnit.SECONDS.sleep(5);
System.out.println("<thread-" + Thread.currentThread().getName() + "> read value is " + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.getReadHoldCount() > 0) {
readLock.unlock();
}
}
}

执行结果如下:

1
2
3
4
5
6
<thread-A> start to run after 1 seconds
<thread-A> update value as 11
<thread-B> start to run after 2 seconds
<thread-B> update value as 12
<thread-A> read value is 12
<thread-B> read value is 12

上述代码因为在处理缓存数据之前释放了写锁,所以期间可能会出现写锁被其它线程持有的情况,进而对数据进行修改,最终导致当前线程看到的数据状态发生了变化。

如果在释放写锁之前先持有写锁,即采用锁降级策略,则能够避免此类问题,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void processCachedDataWithDownGrade(int newValue) {
readLock.lock();
if (cacheInvalid) {
readLock.unlock();
writeLock.lock();
try {
if (cacheInvalid) {
value = newValue;
System.out.println("<thread-" + Thread.currentThread().getName() + "> update value as " + value);
cacheInvalid = false;
// 锁降级,在释放写锁之前先获取读锁
readLock.lock();
}
} finally {
writeLock.unlock();
}
}

try {
/*
* 模拟处理数据的过程,期间 cacheInvalid 可能会被其它线程修改以期望修改数据,
* 因为有锁降级保证,其它线程期间尝试获取写锁的请求会被阻塞,所以当前线程之前看到的数据不会变化
*/
TimeUnit.SECONDS.sleep(5);
System.out.println("<thread-" + Thread.currentThread().getName() + "> read value is " + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.getReadHoldCount() > 0) {
readLock.unlock();
}
}
}

执行结果如下:

1
2
3
4
5
6
<thread-A> start to run after 1 seconds
<thread-A> update value as 43
<thread-B> start to run after 2 seconds
<thread-A> read value is 43
<thread-B> update value as 48
<thread-B> read value is 48

因为对于不同线程来说,获取读锁和写锁是互斥的,所以锁降级能够保证在当前线程处理完缓存数据之前,其它线程尝试获取写锁的请求都会被阻塞,所以当前线程看到的数据状态不会被更改。

ReentrantReadWriteLock 实现内幕

下面来分析一下 ReentrantReadWriteLock 的设计与实现,ReentrantReadWriteLock 在设计上并没有直接实现 Lock 接口,而是实现了 ReadWriteLock 接口,该接口定义了返回读锁和写锁对象的方法,如下:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

ReentrantReadWriteLock 在内部定义了 ReadLock 和 WriteLock 两个内部类,用于分别表示读锁和写锁,ReadLock 和 WriteLock 均实现自 Lock 接口。与 ReentrantLock 一样,ReentrantReadWriteLock 也支持公平锁(FairSync)与非公平锁(NonfairSync)机制,并且允许在构造 ReentrantReadWriteLock 对象时通过参数指定是否使用公平锁,默认使用非公平锁。

FairSync 和 NonfairSync 均派生自 Sync 抽象类,该抽象类继承自 AbstractQueuedSynchronizer,这一点设计上与 ReentrantLock 如出一辙。Sync 类的字段定义和部分基础方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
abstract static class Sync extends AbstractQueuedSynchronizer {

static final int SHARED_SHIFT = 16;
/** 共享锁(读锁)单位值 */
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/** 共享锁(读锁)最大线程数 */
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

/** 独占锁(写锁)掩码 */
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 读锁线程数目 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

/** 写锁重入次数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

/** 记录各个线程(除第 1 个)获取读锁的重入次数 */
private transient ThreadLocalHoldCounter readHolds;

/** 最近 1 次获取到读锁的线程计数器 */
private transient HoldCounter cachedHoldCounter;

/** 记录第 1 个获取到读锁的线程 */
private transient Thread firstReader = null;
/** 记录第 1 个获取到读锁的线程重入次数 */
private transient int firstReaderHoldCount;

// ... 省略方法定义

}

ReentrantReadWriteLock 复用 AQS 的 state 字段以记录线程的重入次数,区别于 ReentrantLock 锁,因为区分了读锁和写锁,ReentrantReadWriteLock 将 state 字段由中间分割成了两段,其中高位区间(16 位)用于记录持有读锁的线程数(重入则累加),低位区间(16 位)用于记录写锁的重入次数。

前面在分析 AQS 同步队列的 Node 内部类定义时,介绍了结点分为共享(SHARED)和独占(EXCLUSIVE)两种模式。对应到 ReentrantReadWriteLock,我们可以认为读锁是共享的,而写锁是独占的,同一时间读锁能够被多个线程持有,而写锁只能被一个线程持有。Sync 定义了 Sync#sharedCountSync#exclusiveCount 方法,分别用于获取读锁线程重入次数和写锁线程重入次数。

属性 Sync#readHolds 用于记录除第 1 个获取到读锁的线程外的剩余线程的重入次数,类型为 ThreadLocalHoldCounter。ThreadLocalHoldCounter 类继承自 ThreadLocal,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
@Override
public HoldCounter initialValue() {
return new HoldCounter();
}
}

static final class HoldCounter {
int count = 0; // 重入次数
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread()); // 线程 ID
}

ThreadLocalHoldCounter 覆盖实现了 ThreadLocal#initialValue 方法,当某个线程第一次访问 Sync#readHolds 属性时会为该线程创建一个 HoldCounter 对象(下文简称线程计数器),用于记录当前线程 ID 和重入次数。

属性 Sync#cachedHoldCounter 为 HoldCounter 类型,用于记录最近一次获取到读锁的线程计数器对象,其目的是为了提升 ReadLock 的性能,因为大多数时候调用 ReadLock#unlock 方法释放锁资源的线程就是刚刚(即最近一次)调用 ReadLock#lock 方法获取锁资源的线程,引入 Sync#cachedHoldCounter 字段能够快速获取该线程的计数器对象,避免从 Sync#readHolds 中查找。

属性 Sync#firstReaderSync#firstReaderHoldCount 用于记录首次获取到读锁的线程对象和该线程重入读锁的次数。之所以要设计这样两个字段,同样是为了性能的考量,考虑只有一个线程加锁的场景,此时引入这两个字段就能避免为该线程创建一个 HoldCounter 计数器对象,同时也就避免了从 Sync#readHolds 中查找

读锁:ReadLock

本小节来分析一下 ReadLock 的实现机制。ReadLock 实现自 Lock 接口,针对接口方法的实现均委托给 Sync 对象执行。下面首先来看一下 ReadLock#tryLock() 方法,之前的文章中我们已经介绍过 Lock 接口的 Lock#tryLock() 方法用于尝试获取锁资源,不管获取成功与否该方法都会立即返回,如果获取成功则返回 true,否则返回 false。ReadLock 在实现该方法时直接调用了 Sync 的 Sync#tryReadLock 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// ReadLock#tryLock()
public boolean tryLock() {
return sync.tryReadLock();
}

// Sync#tryReadLock
final boolean tryReadLock() {
// 获取当前线程对象
Thread current = Thread.currentThread();
for (; ; ) {
// 获取 state 状态值
int c = this.getState();
// 如果写锁被其它线程持有,则立即返回 false
if (exclusiveCount(c) != 0 && this.getExclusiveOwnerThread() != current) {
return false;
}
// 获取读锁重入次数
int r = sharedCount(c);
if (r == MAX_COUNT) {
// 读锁重入次数已达上限
throw new Error("Maximum lock count exceeded");
}
// 基于 CAS 操作更新 state 状态值
if (this.compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁被首次被获取
if (r == 0) {
firstReader = current; // 记录第 1 个获取到读锁的线程
firstReaderHoldCount = 1; // 记录第 1 个获取到读锁的线程重入次数
}
// 首个获取该读锁的线程是当前线程,更新线程的重入次数
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 当前线程不是首个获取该读锁的线程,更新计数器
else {
// 最近 1 次获取到读锁的线程计数器
HoldCounter rh = cachedHoldCounter;
// 当前记录的最近 1 次获取到读锁的线程不是当前线程,则更新 cachedHoldCounter 计数器
if (rh == null || rh.tid != getThreadId(current)) {
cachedHoldCounter = rh = readHolds.get();
}
// 当前线程是最近 1 次获取到读锁的线程
else if (rh.count == 0) {
readHolds.set(rh);
}
// 线程重入次数加 1
rh.count++;
}
return true;
}
}
}

尝试获取读锁资源的操作在拿到当前线程对象之后会基于 CAS 尝试修改 state 的值,具体执行过程可以概括为:

  1. 获取 ReentrantReadWriteLock 的 state 状态值;
  2. 如果当前 ReentrantReadWriteLock 的写锁已被其它线程持有,则返回 false,因为不同线程之间读锁与写锁之间是互斥的;
  3. 否则,获取当前 ReentrantReadWriteLock 读锁的线程重入次数,校验是否达到上限值(即 65535),如果是则抛出异常;
  4. 否则,基于 Unsafe 尝试更新 state 状态值,即将读锁重入次数加 1;
  5. 如果更新成功,则修改当前线程对应的计数器,失败则退回到步骤 1 继续重试。

除了上面介绍的 ReadLock#tryLock() 方法,ReadLock 针对其它获取读锁资源的方法均委托给 AQS 执行,包括:

  • ReadLock#lock:直接调用 AbstractQueuedSynchronizer#acquireShared 方法,请求资源数为 1。
  • ReadLock#lockInterruptibly:直接调用 AbstractQueuedSynchronizer#acquireSharedInterruptibly 方法,请求资源数为 1。
  • ReadLock#tryLock(long, TimeUnit):直接调用 AbstractQueuedSynchronizer#tryAcquireSharedNanos 方法,请求资源数为 1。

AQS 针对上述方法的实现,我们在前面的文章中已经专门介绍过,这些方法均调用了模板方法 AbstractQueuedSynchronizer#tryAcquireShared 尝试获取读锁资源,Sync 类针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for lock wrt state, so ask if it should block because of queue policy.
* If not, try to grant by CASing state and updating count.
* Note that step does not check for reentrant acquires, which is postponed to full version
* to avoid having to check hold count in the more typical non-reentrant case.
* 3. If step 2 fails either because thread apparently not eligible or CAS fails or count saturated,
* chain to version with full retry loop.
*/

// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取 state 状态值
int c = this.getState();
// 如果写锁被其它线程持有,则立即返回 -1
if (exclusiveCount(c) != 0 && this.getExclusiveOwnerThread() != current) {
return -1;
}
// 获取持有读锁的线程数
int r = sharedCount(c);
if (!this.readerShouldBlock() // 对于公平锁,保证公平性,对于非公平锁,判断写锁是否被占用
&& r < MAX_COUNT // 重入次数未达上限
&& this.compareAndSetState(c, c + SHARED_UNIT)) { // 修改 state 状态值成功
// 读锁被首次被获取
if (r == 0) {
firstReader = current; // 记录第 1 个获取到读锁的线程
firstReaderHoldCount = 1; // 记录第 1 个获取到读锁的线程重入次数
}
// 首个获取该读锁的线程是当前线程,更新线程的重入次数
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 当前线程不是首个获取该读锁的线程,更新计数器
else {
// 最近 1 次获取到读锁的线程计数器
HoldCounter rh = cachedHoldCounter;
// 当前记录的最近 1 次获取到读锁的线程不是当前线程,则更新 cachedHoldCounter 计数器
if (rh == null || rh.tid != getThreadId(current)) {
cachedHoldCounter = rh = readHolds.get();
} else if (rh.count == 0) {
readHolds.set(rh);
}
// 线程重入次数加 1
rh.count++;
}
return 1;
}

/*
* 执行到这里,需要满足以下条件之一:
* 1. 对于公平锁而言,前面有等待的线程,为了保证公平性,需要让前面的线程优先获取锁;
* 2. 对于非公平锁而言,写锁已经被持有;
* 3. 读锁重入次数达到上限
* 4. 更新 state 字段失败,说明期间有其它线程获取到锁对象(读锁、写锁)
*/

// 继续尝试获取读锁,更多考虑一些重入的场景
return this.fullTryAcquireShared(current);
}

上述方法尝试获取读锁资源的执行流程可以概括为:

  1. 获取当前线程对象和 state 状态值;
  2. 如果写锁已经被其它线程持有,则获取读锁失败,返回 -1;
  3. 如果写锁未被持有(非公平锁),或者当前没有排队等待获取锁的线程(公平锁),且读锁重入次数未达到上限,则尝试更新 state 字段,即重入次数加 1;
  4. 如果更新成功,则更新线程计数器;
  5. 如果更新不成功,或者 3 中的条件不满足,则继续执行 Sync#fullTryAcquireShared 方法尝试获取读锁。

方法 Sync#fullTryAcquireShared 的实现逻辑与上述方法大同小易,区别在于引入了重试机制,主要用来处理 CAS 操作失败和线程重入的场景。之所以这样设计,主要还是考虑到性能上的需求,对于非重入的场景来说避免了查询线程计数器的开销。方法 Sync#fullTryAcquireShared 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (; ; ) {
// 获取 state 状态值
int c = this.getState();
// 当前写锁已被持有
if (exclusiveCount(c) != 0) {
// 如果持有写锁的线程不是当前线程,则尝试加读锁失败
if (this.getExclusiveOwnerThread() != current) {
return -1;
}
// else we hold the exclusive lock; blocking here would cause deadlock.
}
// 对于公平锁而言,前面存在等待的线程
else if (this.readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 更新计数器,如果当前线程未持有读锁,则移除计数器
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0) {
readHolds.remove();
}
}
}
if (rh.count == 0) {
return -1;
}
}
}

/*
* 执行到这一步说明:
* 1. 写锁未被持有,或者持有写锁的线程是当前线程
* 2. 前面没有排队等待的线程,或者当前线程已经持有了读锁
*/

// 读锁重入次数达到上限
if (sharedCount(c) == MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
// 修改读锁重入次数(加 1)
if (this.compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null) {
rh = cachedHoldCounter;
}
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
} else if (rh.count == 0) {
readHolds.set(rh);
}
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

上述方法和前面分析的 AbstractQueuedSynchronizer#tryAcquireShared 方法均调用了 Sync#readerShouldBlock 方法用于判断当前获取读锁的线程是否应该阻塞。Sync 中仅仅声明了该方法,具体实现交由 FairSync 和 NonfairSync 子类实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// FairSync#readerShouldBlock
final boolean readerShouldBlock() {
return this.hasQueuedPredecessors();
}

// NonfairSync#readerShouldBlock
final boolean readerShouldBlock() {
return this.apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
}

FairSync 在实现上通过调用 AbstractQueuedSynchronizer#hasQueuedPredecessors 方法判断同步队列中是否有排在前面等待获取锁的线程,如果有的话则让渡这些线程以保证锁的公平性。NonfairSync 在实现上则判断当前同步队列队头等待线程节点是否以 EXCLUSIVE 模式在等待,也就是在等待获取写锁,如果是则当前获取读锁的线程应该让渡获取写锁的线程。

ReadLock 释放读锁资源的方法 ReadLock#unlock 同样是直接委托给 AQS 处理,调用的是 AbstractQueuedSynchronizer#releaseShared 方法。AQS 通过调用模板方法 AbstractQueuedSynchronizer#tryReleaseShared 尝试释放共享资源,成功则返回 true。Sync 针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 如果当前线程是首个获取读锁的线程,则更新对应的重入次数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) {
firstReader = null;
} else {
firstReaderHoldCount--;
}
}
// 否则,修改线程计数器
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
}
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0) {
throw this.unmatchedUnlockException();
}
}
--rh.count;
}

// 基于 CAS 修改 state 状态值
for (; ; ) {
int c = this.getState();
int nextc = c - SHARED_UNIT;
if (this.compareAndSetState(c, nextc)) {
/*
* Releasing the read lock has no effect on readers,
* but it may allow waiting writers to proceed if both read and write locks are now free.
*/
return nextc == 0; // 如果 nextc 为 0,则说明当前锁(读锁、写锁)未被任何线程持有
}
}
}

整体逻辑比较简单,如代码注释。

写锁:WriteLock

本小节一起来分析一下 WriteLock 的实现机制。WriteLock 同样实现自 Lock 接口,针对接口方法的实现均委托给 Sync 对象执行。下面首先来看一下 WriteLock#tryLock() 方法,WriteLock 在实现该方法时直接调用了 Sync#tryWriteLock 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean tryWriteLock() {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取 state 状态值
int c = this.getState();
// 如果不为 0,则说明当前锁(读锁、写锁)已被线程持有
if (c != 0) {
// 获取写锁的重入次数
int w = exclusiveCount(c);
if (w == 0 // 写锁重入次数为 0,说明读锁已被持有,获取写锁失败
|| current != this.getExclusiveOwnerThread()) { // 写锁已经被其它线程持有,获取写锁失败
return false;
}
// 写锁重入次数达到上限
if (w == MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
}
// 更新写锁重入次数(加 1)
if (!this.compareAndSetState(c, c + 1)) {
return false;
}
// 记录当前持有写锁的线程对象
this.setExclusiveOwnerThread(current);
return true;
}

尝试获取写锁的具体执行过程可以概括为:

  1. 获取当前线程对象和 state 状态值;
  2. 如果 state 状态值不为 0,则说明写锁或读锁已被线程持有;
  3. 如果是读锁被持有,则尝试获取写锁失败;否则,如果持有写锁的线程不是当前线程,则尝试获取写锁失败;
  4. 判断写锁重入次数是否达到上限(即 65535),如果是则抛出异常;
  5. 尝试修改写锁重入次数(加 1),修改成功即加锁成功,如果失败则返回 false,对于成功获取到写锁的线程对象需要予以记录,并返回 true;

除了 WriteLock#tryLock() 方法,WriteLock 针对其它获取写锁资源的方法均委托给 AQS 执行,包括:

  • WriteLock#lock:直接调用 AbstractQueuedSynchronizer#acquire 方法,请求资源数为 1。
  • WriteLock#lockInterruptibly:直接调用 AbstractQueuedSynchronizer#acquireInterruptibly 方法,请求资源数为 1。
  • WriteLock#tryLock(long, TimeUnit):直接调用 AbstractQueuedSynchronizer#tryAcquireNanos 方法,请求资源数为 1。

AQS 针对上述方法的实现,我们同样在前面的文章中已经专门分析过,这些方法均调用了模板方法 AbstractQueuedSynchronizer#tryAcquire 尝试获取资源,Sync 类针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if it is either a reentrant acquire or
* queue policy allows it. If so, update state and set owner.
*/

// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取 state 状态值
int c = this.getState();
// 获取写锁的重入次数
int w = exclusiveCount(c);
// 如果不为 0,则说明当前锁(读锁、写锁)已被线程持有
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 // 写锁重入次数为 0,说明读锁已被持有,获取写锁失败
|| current != this.getExclusiveOwnerThread()) { // 写锁已经被其它线程持有,获取写锁失败
return false;
}
// 写锁重入次数达到上限
if (w + exclusiveCount(acquires) > MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
// 更新 state 状态值
this.setState(c + acquires);
return true;
}

// 当前锁对象未被线程持有,对于非公平锁则抢占式获取锁对象,对于公平锁则等待前面的线程优先获取锁
if (this.writerShouldBlock() || !this.compareAndSetState(c, c + acquires)) {
return false;
}
// 记录当前持有写锁的线程对象
this.setExclusiveOwnerThread(current);
return true;
}

上述方法尝试获取写锁资源的执行流程可以概括为:

  1. 获取当前线程对象、state 状态值,以及写锁的重入次数;
  2. 如果 state 状态值不为 0,但写锁重入次数为 0,说明当前读锁已被持有,获取写锁失败;
  3. 如果 state 状态值不为 0,且写锁重入次数也不为 0,说明当前写锁已被持有,如果持有该写锁的线程不是当前线程,则获取写锁失败;
  4. 否则,说明当前线程已经持有该写锁资源,本次加锁相当于重入,此时需要校验写锁重入次数是否已达上限,是则抛出异常,否则说明获取写锁成功,更新 state 状态值;
  5. 如果 2 中检测到 state 状态值为 0,说明当前锁对象未被任何线程持有,对于公平锁而言需要考虑公平性,优先让排在前面的线程先获取锁;
  6. 尝试更新 state 状态值,如果成功则说明加锁成功,需要记录当前持有写锁的线程对象并返回 true,否则返回 false。

上述方法通过调用 Sync#writerShouldBlock 方法来判断当前获取写锁资源的线程是否应该被阻塞。Sync 中同样仅仅声明了该方法,具体实现交由 FairSync 和 NonfairSync 子类实现,如下:

1
2
3
4
5
6
7
8
9
// FairSync#writerShouldBlock
final boolean writerShouldBlock() {
return this.hasQueuedPredecessors();
}

// NonfairSync#writerShouldBlock
final boolean writerShouldBlock() {
return false; // writers can always barge
}

由实现可以看出在获取写锁时,对于公平锁需要让渡排在同步队列前面的等待线程,而非公平锁则允许当前线程抢占获取写锁资源。

WriteLock 释放写锁资源的方法 WriteLock#unlock 同样是直接委托给 AQS 处理,调用的是 AbstractQueuedSynchronizer#release 方法。AQS 通过调用模板方法 AbstractQueuedSynchronizer#tryRelease 尝试释放独占资源,成功则返回 true。Sync 针对该模板方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryRelease(int releases) {
// 如果写锁未被当前线程持有,则抛出异常
if (!this.isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 计算释放之后,剩余的重入次数
int nextc = this.getState() - releases;
// 如果写锁重入次数为 0,则说明锁被释放
boolean free = exclusiveCount(nextc) == 0;
if (free) {
// 写锁已经被释放,清空持有写锁的线程对象
this.setExclusiveOwnerThread(null);
}
// 更新 state 状态值
this.setState(nextc);
return free;
}

整体逻辑比较简单,如代码注释。

总结

本文我们通过一个官方示例演示了 ReentrantReadWriteLock 的基本使用,并分析了其内在实现机制。ReentrantReadWriteLock 相对于前面介绍的 ReentrantLock 更加适合读多写少的业务场景,通过将读锁与写锁相分离的设计以提升读操作的吞吐性能。然而,灵活的背后也暗藏着更加容易出错的风险,一些不规范的使用很容易造成死锁,比如在已经持有读锁的基础上仍然尝试获取写锁,使用时需要留心。

参考

  1. JDK 1.8 源码
  2. The java.util.concurrent Synchronizer Framework