详读JUC系列(2)之AQS
AQS(二)
目录
AQS(二)
上篇读了AQS的一些基础方法.以及独占模式下获取锁的方式.本文我们会把AQS剩余部分看完.
private void doAcquireShared(int arg) {
//添加一个队尾节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {//如果当前节点是第二节点则尝试获取
int r = tryAcquireShared(arg);//大于等于0表示获取到锁
if (r >= 0) {
//获取到锁的话讲当前节点设置为头结点,并向后r个节点传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared也是自旋,反复调用tryAcquireShared方法,直到成功或者失败。 共享模式跟上一篇的独占模式大体一致.只是获取锁的方式有点区别。
AQS有很多方法.有一部分方法是需要子类实现的.所以这里这部分方式不再写.还有一部分方法本身很简单,也不再写.接下来,我会找些比较重要的内容。
获取队列中的第一个线程(等待时间最久的那个)
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
//通常情况下队列中的第一个线程是head.next(head节点的线程为Null.因为head的节点都是当前线程的节点.回顾setHead方法)
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
//如果head.next中没有找到的话,就从队尾开始向头部遍历.找出最前面的线程
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
当前队列大小(估计值)
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
可以看到这个方法并没有对队列加锁.所以如果并发的情况下,当前线程在遍历队列的时候,有其他线程会对队列进行操作.所以它返回的n是不一定等于队列的长度的。
ConditionObject
先看它的定义
public class ConditionObject implements Condition, java.io.Serializable {}
可以看到ConditionObject实现了Condition接口. Condition可以将Object的monitor(这个以后会说)的方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用。
添加一个条件节点(addConditionWaiter)
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后一个节点不是CONDITION状态,则将队列(condition队列)中已取消等待condition的节点退出队列(只要队尾节点是非Condition的就需要从头到尾都检查一遍)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//添加一个新节点.如果队列为空则将其设置为头结点,否则加到队尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)//如果第一个节点就是非CONDITION节点则,将first.next节点设置为头节点
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
比较容易看.就是从头到尾检查每个节点是不是CONDITION状态.如果不是就清除出队
发送信号
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* 如果不能更改状态值,则表示该节点已经被取消了
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 将节点加入队尾.如果已取消或者设置状态失败.则唤醒该线程重新同步.
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal方法会将在条件队列中等待最久的节点加入到同步队列队尾.
awaitUninterruptibly
public final void awaitUninterruptibly() {
Node node = addConditionWaiter(); //添加一个条件等待节点
int savedState = fullyRelease(node);//保存锁状态
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
//如果不在同步队列中则挂起当前线程
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)//当前线程被其它线程中断了的话,中断当前线程
selfInterrupt();
}
awaitUntil
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime(); //获取绝对时间
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//如果不再队列就不断重试
if (System.currentTimeMillis() > abstime) {//如果当前时间大于deadline则
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);//挂起当前线程,abstime是挂起多久时间。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//如果取消等待contion了就将该节点加入同步队列队尾。并返回true
enq(node);
return true;
}
//如果不在同步队列的话就自旋,直到加入到同步队列
while (!isOnSyncQueue(node))
Thread.yield();//自旋的时候主动让出cpu
return false;
}
指定等待到某个时间点就不再等待.
AQS的主要方法到此就结束了.鉴于很多同学反馈的基础不好,看不太懂.下周开始我们先聊聊并发的理论基础(jvm线程模型,可见性,原子性等)