详读JUC系列(1)之AQS
详读J.U.C系列(1)之AQS
目录
今天起,不间断的读JUC.以及DOCKER和k8s。
jdk 1.8
AQS(AbstractQueuedSynchronizer)
整个J.U.C的核心框架就在AQS。我们常用的ConcurrentHashMap等实现都是基于这个类来实现的。那么我们看懂了这个类就看懂了一半J.U.C类库。
看本文之前,请先了解jvm内存模型,以及原子性,可见性,cas等知识(有时间的话,我会补上这一部分),内容有点多,来个葛优躺慢慢看.这篇文章会介绍AQS的所有方法.但是由于AQS是需要子类才能提供可使用的同步器.所以这些方法都是零散的,它们之间的关联性会比较低.但是没有关系,只希望大家对各个方法有印象.在接下来的文章中,将会使用这些方法实现各种功能。
读源码
我们先来看下这个类的注解,了解设计的初衷,大意如下:
提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和同步器(信号量,事件等).AQS是大多数同步器的基础.它依赖于单个原子int值表示状态.继承它的子类必须定义更改此状态的受保护的方法,并且要根据获取和释放动作定义状态的语义.该类的方法负责排队和阻塞,子类负责维护状态字段,但是子类仅保证使用getState,setState,compareAndSetState操作同步int值.应将子类定义为非公共内部帮助程序类,用于实现其封闭类的同步属性。类AbstractQueuedSynchronizer未实现任何同步接口。相反,它定义了acquireInterruptibly等方法,可以通过具体的锁和相关的同步器来适当调用它们来实现它们的公共方法
此类默认支持独占和共享模式.在独占模式下,其他线程尝试获取锁将不会成功.共享模式下,其他线程尝试获取是可能成功的.如果一个线程获取了锁,其他线程尝试获取的话还要判断他是否可以获取.在不同模式下的线程共享FIFO队列.通常,实现子类仅支持这些模式中的一种,但两者都可以在ReadWriteLock中发挥作用。仅支持独占模式或仅支持共享模式的子类无需定义支持未使用模式的方法。
此类定义了一个嵌套的ConditionObject类.子类可以通过isHeldExclusively获取是否执行独占模式,使用getState获取当前线程的状态对象值,调用的方法release完全释放此对象,并且acquire,给定此保存的状态值,最终将此对象恢复到其先前获取的状态。否则,AbstractQueuedSynchronizer方法会创建这样的条件,因此如果无法满足此约束,请不要使用它。 ConditionObject的行为取决于其同步器实现的语义。
此类提供内部队列的检查,检测和监视方法,以及condition对象的类似方法。这些可以根据需要使用AbstractQueuedSynchronizer导出到类中,用于它们的同步机制.
此类的序列化仅存储原子整数(状态属性),因此反序列化对象具有空线程队列。需要可序列化的子类需要定义readObject方法,该方法在反序列化时将其恢复为已知的初始状态. …..后面的省略
AbstractOwnableSynchronizer
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
从代码可以看出AQS是一个抽象的(abstract),可序列化的类.并且继承自AbstractOwnableSynchronizer.我们再看下AbstractOwnableSynchronizer。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
/**
* 独占模式下锁的当前拥有者
*/
private transient Thread exclusiveOwnerThread;
/**
* 设置当前拥有独占访问权限的线程。 {@code null}参数表示没有线程拥有访问权限。 此方法不会强制执行任何同步或{@code volatile}字段访问。
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
可见,此类可以设置当前锁有的持有者线程.
Node
前面注释中有讲到AQS用到了FIFO先进先出队列.为了实现这个队列,AQS定义了一个静态内部类Node.
static final class Node {
/** 表示节点在共享模式(允许多个线程持有一把锁)中等待 */
static final Node SHARED = new Node();
/** 表示节点在独占模式(只能一个线程持有锁)中等待 */
static final Node EXCLUSIVE = null;
/** 表示线程已取消 */
static final int CANCELLED = 1;
/** 表示下一个节点的线程需要唤醒 */
static final int SIGNAL = -1;
/** 表示线程正在等待某个条件 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should unconditionally propagate
* 大致意思是说如果waitStatus为-3的话,那么下一次获取锁的机会可以向后继节点传播
*/
static final int PROPAGATE = -3;
/**
* Status字段仅接受如下状态值:
* SIGNAL: 节点的后继者是阻塞的,所以当前节点被释放或者取消时必须unpark其后继者.
* 为了避免竞争,acquire方法必须首先指示它们需要的信号,
* 然后重试原子获取,然后在失败时阻塞
* CANCELLED: 该节点由于超时或终端,它将被CANCELLED.节点将永远不会离开该状态.
* 而且状态为CANCELLED的节点的线程,永远不会再次阻塞。
* CONDITION: 此节点当前处于CONDITION队列中。在传输之前,它不会用作同步队列节点,此时状态
* 将设置为0.此处使用此值与字段的其他用法无关,但可简化机制。)
* PROPAGATE: releaseShared应该传播到其他节点。在doReleaseShared中设置
* (仅限头节点)以确保继续传播,即使其他操作已经介入。
* 0: 以上都不是
*
* 值以数字方式排列以简化使用。非负值意味着节点不需要发信号。因此,大多数代码不需要检查特定值,仅用于符号。
*
* 对于正常的同步节点,该字段初始化为0,对于条件节点,该字段初始化为CONDITION。
* 它使用CAS(或可能的情况下,无条件的易失性写入)进行修改
*/
volatile int waitStatus;
/**
* 当前节点的前节点,用于检查waitStatus.在入队时设置,出队时取消.header节点永远不会成为cancelled状态.
* 因为一个节点仅在获取锁成功时成为header节点.cancelled节点永远不能acquire成功.并且线程仅能取消自身节点。
*/
volatile Node prev;
/**
* 当前节点的下一个节点,在当前节点释放时取消park的下一个节点.enq操作时不分配next字段,
* 直到该节点附加到队列中才分配.所以如果看到null值的next并不表示该节点为尾节
* 点.cancelled的节点的next指向本身而不是null
*/
volatile Node next;
/**
* 当前节点的线程.使用后取消
*/
volatile Thread thread;
/**
* 连接到等待condition的下一个节点
*/
Node nextWaiter;
/**
* 如果返回true则表示共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回上一个节点,如果为null则抛出NullPointerException
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 用于建立初始head节点或SHARED标记
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS使用Node类来实现阻塞同步队列.结合AQS中的几个其他属性构成如下结构: 其中head,state,tail定义如下:
/**
* 等待队列的head节点.仅通过setHead方法修改.如果存在head。则保证state不为cancelled
*/
private transient volatile Node head;
/**
* 等待队列的队尾节点.延迟初始化,仅在enq方法中添加新节点
*/
private transient volatile Node tail;
/**
* 同步状态
*/
private volatile int state;
setHead 出对操作
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
仅从上面的代码看,是很难理解AQS是怎么出对的.因为这段代码只是表达了将新节点设置为head,并将thread和prev设置为null而已。以前的head节点(即要被移除队列的节点)的操作在这里并没有表达出来.这里大家只需要记住一点.如果要做出队操作的话基本都会调用该方法.我们再之后的代码中会看到具体是怎么出队的
enq 入队操作
private Node enq(final Node node) {
for (;;) {//不断重试,直到成功加入队尾
Node t = tail;
if (t == null) { // 初始化队列,设置一个空节点为head节点,并将tail也指向该节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//正常流程,将节点加到队尾.
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private static final long tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
unsafe.objectFieldOffset这个方法可以获取属性相对于对象在内存中的偏移量.而compareAndSetTail方法会比较aqs的tailOffset内存偏移处属性tail是否是expect,如果是则将tail属性更新为update.注意这里只是将node设置到tail属性,并不是入队成功.入队成功是在t.next=node这一步完成的。之前的尾节点的next指向当前节点node.才形成链表,入队成功。
addWaiter(Node mode) 添加waiter节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速enq操作,如果失败的话进入完整的enq操作.
Node pred = tail;
if (pred != null) {//
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
根据传入的mode(Node.EXCLUSIVE,Node.SHARED)参数,为当前线程创建节点并将其加入队列. 可以看到该方法首先会尝试一次cas操作.失败之后再进入enq方法进行自旋。因为大多数情况下多线程之间是不会发送竞争的.所以大多数情况下会在第一次cas操作就将节点入队成功了.从而省去了自旋对cpu的消耗.
unparkSuccessor(Node node) 唤醒后继节点
private void unparkSuccessor(Node node) {
/*
* 如果为负数(即需要等待信号或者条件).则尝试清除预期信号
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 取消挂起线程,通常是在下一个节点。但是如果下一个节点是CANCELLED状态或者为null.则需要从队尾向前搜索不是CANCELLED状态的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从队尾向前遍历前缀节点直到找到非CANCELLED的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒非cancelled节点的线程.
if (s != null)
LockSupport.unpark(s.thread);
}
LockSupport这个类改天专门聊
唤醒操作会遍历出后继节点中没有被取消的线程.
共享模式下的释放操作
//共享模式的释放操作.并确保信号向后继节点传播
private void doReleaseShared() {
for (;;) {
Node h = head;
//队列不为空且有后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果head节点的状态为SIGNAL,则将其状态改为0,并唤醒它的后继节点
if (ws == Node.SIGNAL) {//SIGNAL状态表示该节点的后继节点需要唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//状态修改失败的话重试
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果head节点状态为0,则将其改为PROPAGATE。失败重试
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
//简单来说就是出队,并且唤醒后续节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
cancelAcquire(Node node)
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 跳过cancelled状态的前继节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,并将尾节点替换为当前节点的前继节点成功的话
// 将前继节点的next置为null。如下图一
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//如下图2
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果当前节点是head(即当前节点是队列中第二个节点).并且前继节点的线程不为空.则它的
//唤醒后继节点(为什么只有这种情况下才唤醒?估计大家也会有疑问..请带着疑问继续往下看)
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
图1:
黄色部分表示代码所做处理
图2:
黄色部分表示代码所做处理
取消尝试获取锁操作,会进行如下操作:
将该节点的线程释放(node.thread=null).
将该节点的状态设为取消
如果该节点为队尾节点.则从队列中移除.否则
找到当前结点的继任结点,前趋的next指针指向继任结点
当前结点的next指针指向自己
大家可能注意到了,它之处理了next指针,并没有处理prev.因为它已经将节点的状态设置为取消状态(cancelled)。所以其他操作在操作队列时即便可以通过tail的prev遍历到它,也会因为是cancelled状态而被忽略。这么做是为什么?我还没想明白.为了效率么?
shouldParkAfterFailedAcquire(Node pred, Node node)尝试获取失败之后是否需要挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 如果该节点已经设置状态为SIGNAL,则表示它可以安全挂起.
*/
return true;
if (ws > 0) {如果当前节点已取消则需要遍历找到一个没有取消的节点
/*
* 如果前继节点已经取消,则跳过
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果是0或者PROPAGATE或者CONDITION状态.则表示需要等待一个信号但是不挂起.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
只有当前节点为SIGNAL状态时才能挂起线程.因为SIGNAL状态的后继节点会被唤起。避免当前线程挂起之后没有人去竞争锁。
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
挂起当前线程之后(LockSupport.park(this)会阻塞当前线程,直到被唤醒),唤醒之后顺便检查一下该线程是否被中断。
以上的方法就是共享模式下,常用的方法了.但是他们之间怎么组合使用呢? 接下来我们继续看一个AQS的方法
acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果当前节点为第二个节点.并且获取锁成功
//则将当前节点设为head.并将之前的head节点出队并将其next指针指向null.
//这里回看一下之前的setHead.然后再看如下图1.就可以明白是怎么出对的了
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
图1:
我们先不用关心arg参数是什么.因为在之后的子类中会讲到.这里需要强调一下,tryAcquire(arg)是个抽象方法.在具体的子类中实现.它表示尝试获取锁是否成功。
从(p == head && tryAcquire(arg))可以看出如果节点不为第二个节点.则不会去尝试获取锁.由此可见队头才能尝试获取锁(符合FIFO)。
该方法会不断重试,直到获取锁成功(前面的Node都释放锁了,等到它拿锁才能竞争到)。或者当前线程被中断。如果它在尝试获取时失败了的话会被阻塞(挂起).直到有人唤醒,然后再重试(这里并不是自旋.只是重试)。
如果该线程被中断了就取消获取。
之后的很多方法大体一致,只有细微差别.大家可以自己看。区别比较大的我才会继续写
doAcquireNanos(int arg, long nanosTimeout)
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
跟acquireQueued大体一致.唯一不同的是它的挂起操作有最大等待时间nanosTimeout.超过这个时间之后线程会自动唤醒.这里的for循环才是自旋.
aqs中阻塞模式获取锁的方法到此基本上重要的都读完了.下篇我们继续看共享模式下是怎么获取的。以及一些其他重要的方法其什么作用