AQS(二)

目录

AQS(二)

上篇读了AQS的一些基础方法.以及独占模式下获取锁的方式.本文我们会把AQS剩余部分看完.

private void doAcquireShared(int arg) {
	//添加一个队尾节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {//如果当前节点是第二节点则尝试获取
                int r = tryAcquireShared(arg);//大于等于0表示获取到锁
                if (r >= 0) {
                	//获取到锁的话讲当前节点设置为头结点,并向后r个节点传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared也是自旋,反复调用tryAcquireShared方法,直到成功或者失败。 共享模式跟上一篇的独占模式大体一致.只是获取锁的方式有点区别。

AQS有很多方法.有一部分方法是需要子类实现的.所以这里这部分方式不再写.还有一部分方法本身很简单,也不再写.接下来,我会找些比较重要的内容。

获取队列中的第一个线程(等待时间最久的那个)

 public final Thread getFirstQueuedThread() {
    // handle only fast path, else relay
    return (head == tail) ? null : fullGetFirstQueuedThread();
}

private Thread fullGetFirstQueuedThread() {
    Node h, s;
    Thread st;
    //通常情况下队列中的第一个线程是head.next(head节点的线程为Null.因为head的节点都是当前线程的节点.回顾setHead方法)
    if (((h = head) != null && (s = h.next) != null &&
         s.prev == head && (st = s.thread) != null) ||
        ((h = head) != null && (s = h.next) != null &&
         s.prev == head && (st = s.thread) != null))
        return st;

	//如果head.next中没有找到的话,就从队尾开始向头部遍历.找出最前面的线程
    Node t = tail;
    Thread firstThread = null;
    while (t != null && t != head) {
        Thread tt = t.thread;
        if (tt != null)
            firstThread = tt;
        t = t.prev;
    }
    return firstThread;
}

当前队列大小(估计值)

public final int getQueueLength() {
        int n = 0;
        for (Node p = tail; p != null; p = p.prev) {
            if (p.thread != null)
                ++n;
        }
        return n;
    }

可以看到这个方法并没有对队列加锁.所以如果并发的情况下,当前线程在遍历队列的时候,有其他线程会对队列进行操作.所以它返回的n是不一定等于队列的长度的。

ConditionObject

先看它的定义

public class ConditionObject implements Condition, java.io.Serializable {}

可以看到ConditionObject实现了Condition接口. Condition可以将Object的monitor(这个以后会说)的方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用。

添加一个条件节点(addConditionWaiter)

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果最后一个节点不是CONDITION状态,则将队列(condition队列)中已取消等待condition的节点退出队列(只要队尾节点是非Condition的就需要从头到尾都检查一遍)
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //添加一个新节点.如果队列为空则将其设置为头结点,否则加到队尾
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

unlinkCancelledWaiters

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)//如果第一个节点就是非CONDITION节点则,将first.next节点设置为头节点
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

比较容易看.就是从头到尾检查每个节点是不是CONDITION状态.如果不是就清除出队

发送信号

 public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    /*
     * 如果不能更改状态值,则表示该节点已经被取消了
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * 将节点加入队尾.如果已取消或者设置状态失败.则唤醒该线程重新同步.
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal方法会将在条件队列中等待最久的节点加入到同步队列队尾.

awaitUninterruptibly

 public final void awaitUninterruptibly() {
    Node node = addConditionWaiter(); //添加一个条件等待节点
    int savedState = fullyRelease(node);//保存锁状态
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        //如果不在同步队列中则挂起当前线程
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)//当前线程被其它线程中断了的话,中断当前线程
        selfInterrupt();
}

awaitUntil

public final boolean awaitUntil(Date deadline)
                throws InterruptedException {
    long abstime = deadline.getTime(); //获取绝对时间
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {//如果不再队列就不断重试
        if (System.currentTimeMillis() > abstime) {//如果当前时间大于deadline则
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);//挂起当前线程,abstime是挂起多久时间。
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//如果取消等待contion了就将该节点加入同步队列队尾。并返回true
        enq(node);
        return true;
    }

    //如果不在同步队列的话就自旋,直到加入到同步队列
    while (!isOnSyncQueue(node))
        Thread.yield();//自旋的时候主动让出cpu
    return false;
}

指定等待到某个时间点就不再等待.

AQS的主要方法到此就结束了.鉴于很多同学反馈的基础不好,看不太懂.下周开始我们先聊聊并发的理论基础(jvm线程模型,可见性,原子性等)