详读JUC系列之ReadWriteLock
虽然锁可以保证资源的线程安全,但是锁带来的性能损失也是不可以忽略的.尤其是在读多写少的场景下,如果你使用synchronized
或者ReentrantLock
会浪费非常大的性能.此时使用ReentrantReadWriteLock
会更合适
目录
概述
ReetrantLock
和ReetrantReadWriteLock
的区别在于,前者会试图把进入已保护的临界区的线程都阻塞,而后者会根据当前已进入保护临界区的线程以及想要进入保护区的线程的属性来判断是否允许进入。
换成普通话就是: 1. 如果资源正在被写线程占用,则阻塞所有其他线程.(读写互斥,写写互斥) 2. 如果资源正在被读线程占用,并且当前线程是读线程,则允许进入,否则阻塞。(读读兼容)
所以读写锁,能够在都多写少的场景下使cpu尽量的运作起来,从而提高吞吐量和性能。这里需要强调一下,很多情况下的业务都是不太可能发生数据竞争的,尤其是请求不多的话.这时候如果只是简单的使用独占锁(synchronized
,ReentrantLock
),会造成性能的浪费.所以在选择锁的时候应该考虑业务场景,而不是只要遇到线程不安全时候就加独占锁.有的场景甚至即便线程不安全也不需要加锁从而提高性能(极少,请具体场景请具体分析).
源码
一般用法
ReadWriteLock rwLock = new ReentrantReadWriteLock();
rwLock.readLock().lock();
rwLock.readLock().unlock();
从上面可以看出ReentrantReadWriteLock是实现了ReadWriteLock接口的.我们可以从其源码上验证:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable{
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
.....
}
其中readerLock,writerLock和sync都是内部类.接下来我们看每个内部类的源码实现.
Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
* reader和write的常量读取数,锁定状态在逻辑上分为两个无符号短整型:低16位表示独占锁(wirte)计数,
* 高16位表示共享锁(read)计数
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
记得在aqs博文中,说过juc很多同步器都是基于aqs实现的.在读写锁中也是.你可以看到Sync继承了aqs。在ReentrantLock中,有一个同步状态值表示锁被同一个线程重复获取了几次.在读写锁里,也需要用到此状态.但是比ReentrantLock
更复杂的是读写锁需要用一个值来表示reader和write两种状态.这里它就必须要将状态值按照”按位切割使用”,将状态切分为高16位和低16位。
[0][0][0][0][0][0][0][0][0][0][0][0][0][1][1]|[0][0][0][0][0][0][0][0][0][0][0][0][0][1][1]
如上所示,|前面是高16位,|后面是低16位.上面的值表示一个线程已经获取了写锁,并且重入了两次(2进制11是10进制的3),且获取了三次读锁。读写锁是如何确定各自的状态的呢?答案是靠位运算: 假设当前同步状态的值为S,则 - 写状态等于S&0x0000FFFF(将高16位全部抹去) - 读状态等于S>>>16(无符号补0右移16位) - 写状态加1,则等于S+1 - 读状态加1,则等于S+(1<<16)(1左移16位到高位) - 当S不为0时,若写状态为0,则读状态不为0。即写锁被获取 这些内容可以在源码中验证:
/** 共享(读)状态值 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 独占(写)状态值 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
另外还有几个比较重要的属性
//当前线程持有的读锁的重入次数.在构造函数和readObject中初始化。当计数降至0时要删除,避免内存泄漏
private transient ThreadLocalHoldCounter readHolds;
//最后一次获取 readLock 的 HoldCounter 的缓存
//有了 readHolds 为什么还需要 cachedHoldCounter呢? 在大并发的场景中, 这次进行release readLock的线程就是上次 acquire 的线程, 这样直接通过cachedHoldCounter来进行获取, 节省了通过readHolds的遍历的过程
private transient HoldCounter cachedHoldCounter
//第一个获得读锁定的线程
private transient Thread firstReader = null;
//是firstReader的保持计数
private transient int firstReaderHoldCount;
tryAcquire
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. 如果read count或者write count不为0,或当前线程不是已拥有锁的线程则失败
* 2. 如果计数器饱和则失败
* 3. 否则,如果此线程是可重入获取或队列策略允许,则该线程有资格获得锁。如果获取到锁,则需要更新状态和锁的拥有者。
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//如果c不为0,且写状态为0,则读状态肯定不为0
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);//这里不需要cas修改,因为独占锁是排他的,不会有其他的线程与当前线程竞争( current != getExclusiveOwnerThread()).
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryAcquire是排他模式的获取锁,所以只要是读写状态不为0(锁已经被获取过),且不是重入线程.则获取锁失败. readerShouldBlock和writerShouldBlock两个方法都是抽象方法,在子类中实现。
tryRelease
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//释放该锁的线程必须是锁的拥有线程
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)//如果独占状态为0,则表示已经没有线程获取锁了,需要将锁的拥有者改为null
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
write lock总结
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
rwLock.writeLock().lock();
从lock跳进源码可以看到它进入到了aqs中:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
writeLock加锁时首先调用tryAcquire,如果在tryAcquire获取成功则直接返回获取锁成功.否则进入acquireQueued方法进行排队,并自旋不断的调用tryAcquire,直到获取锁成功,或者线程被中断未知.
tryAcquireShared
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. 如果其他线程获得了写锁,失败
* 2. 否则,当前线程可以尝试获取锁.因此要询问readerShouldBlock()是否阻塞,如果没有,请尝试通过CAS状态授予并更新count.Note该步骤不检查可重入获取
* 3. 如果第2步失败,则表示条件不符合,可能是csa操作失败,也可能是状态值超过最大值失败.此时进入到fullTryAcquireShared
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) //独占状态不为0,且读锁的拥有者不是当前线程则退出。
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {/没有线程获取到读锁,直接设置初始值
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//第一个获取读锁的线程重入
firstReaderHoldCount++;
} else {
//尝试从缓存获取rh,如果数据不正确,则从readHolds.get()获取。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)//count为0需要设置rh,因为count为0时会有其他线程对ThreadLocal进行remove操作
readHolds.set(rh);
rh.count++;
}
return 1; //获取锁成功
}
return fullTryAcquireShared(current); //前面获取锁失败则调用该方法
}
该方法可以看出读锁在写锁没有被获取时,是可以多个线程同时获取读锁的.如果写锁被获取了,那么只有该锁的拥有者线程可以获取到读锁。
fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) { //独占状态不为0表示写锁被获,此时只能锁的拥有者线程才可以获取锁
if (getExclusiveOwnerThread() != current)
return -1; //结合aqs的acquireShared(int arg)方法看.如果返回-1则进入aqs的doAcquireShared 将当前线程加入到aqs的等待队列.
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//读状态为0或非0,且readerShouldBlock时
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {//如果是读锁的重入,则直接进行下半部分的cas操作
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0) //如果当前线程没有获取到读锁.则进入aqs的doAcquireShared 将当前线程加入到aqs的等待队列.
return -1;//只有当aqs中的队列存在等待读锁或者写锁的节点时才进入这行代码
}
}
//如果readerShouldBlock不要求阻塞,或readerShouldBlock要求阻塞但是当前线程不是第一次获取读锁.则进入下面的cas操作
if (sharedCount(c) == MAX_COUNT)//溢出
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//获取读锁成功
if (sharedCount(c) == 0) {//第一个获取到读锁的线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//如果当前线程是第一个获取到读锁的线程则记录器+1
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1; //获取锁成功的标志
}
}
}
fullTryAcquireShared会进行自旋不断的尝试获取锁(cas修改aqs的state).如果获取读锁时,写锁已经被获取.且当前线程不是写锁的拥有者,则return -1通过doAcquireShared将当前线程加入aqs的等待队列.等待信号然后再次调用tryAcquire.直到获取读锁成功,或者线程被中断。
if (rh.count == 0) return -1。这段代码需要强调一下.首先如果当前线程是第一个获取读锁的线程(假设此时写锁状态为0),则在tryAcquireShared方法时即可获取到锁并返回.既然进入了fullTryAcquireShared方法则表示,读锁已经被获取,且这时候获取读锁产生了竞争. 如果readerShouldBlock要求阻塞reader。且当前线程是第一次尝试获取读锁.则return -1.然后将其加入等待队列。
tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//如果是第一个获取到读锁的线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)//没有重入则记录清空
firstReader = null;
else
firstReaderHoldCount--;//重入计数减1
} else {
HoldCounter rh = cachedHoldCounter; //不是第一个获取读锁的线程需要更新HoldCounter
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 释放读锁对reader没有影响,但如果读和写锁现在都是空闲的,它可能允许继续等待writers。
return nextc == 0; //读状态为0的话表示读锁释放成功.
}
}
从该方法可以看到一个获取到读锁的线程,和不是第一个获取到读锁的线程的重入状态记录分别是用firstReaderHoldCount和HoldCounter记录的. 而且要释放锁,只有当读状态为0时(所有获取到读锁的状态都释放了读锁)才算释放锁成功。
读锁总结
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
rwLock.readLock().lock();
跳入源码可以看到
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
他会先尝试tryAcquireShared,如果获取到锁了直接返回获取锁成功,否则tryAcquireShared会进入fullTryAcquireShared方法进行自旋以及重入判断. 1. 如果获取读锁时,写锁已经被获取则此时只有写锁的拥有者线程才能获取读锁.因为写锁是排他的. 2. 如果当前线程不是写锁的拥有者,则加入aqs的等待队列。 3. 如果写锁没有被获取.且当前线程就是第一次读取的获取线程,则该线程重入直接进入cas 4. 如果写锁没有被获取.且reader需要被阻塞.且当前线程第一次尝试获取读锁.则加入等待队列 等待队列会排队,直到前面的线程都出队之后,当前线程会再次调用tryAcquireShared。这样就形成一个循环.直到获取到锁,或者线程被中断。
tryWriteLock
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread()) //写状态为0,且state不为0,则表示读锁已被获取。读锁被获取,或者写锁的拥有者不是当前线程则获取失败.
return false;
if (w == MAX_COUNT)//溢出
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))//cas失败则获取失败
return false;
setExclusiveOwnerThread(current);
return true;
}
tryReadLock
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//写锁被获取时,只能是写锁的拥有者线程才可以获取读锁.
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)//溢出
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
如果写锁没被获取,且重入次数没有溢出.则自旋获取读锁直到成功.一旦写锁被获取了则返回获取失败
FairSync公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
hasQueuedPredecessors该方法在aqs中讲过,如果在队列中有前节点则返回true.就是不能插队.有人排在你前面的话你就老实的排队等吧.
NoFairSync非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
apparentlyFirstQueuedIsExclusive如果队列中第一个节点的线程是独占模式获取锁的.则返回true.即队头的人在等待获取写锁。时reader必须阻塞挂起。