Java AQS Condition 条件变量详解

1 条件变量使用示例

在开始介绍 AQS Condition 实现原理之前,我们先来了解下条件变量的使用方法。与 wait/notify 必须配合 synchronized 关键字使用一样,Condition 也必须与 ReentrantLock 配合使用:

1
2
3
4
// 创建一个互斥锁
Lock lock = new ReentrantLock(true);
// 从 lock 中获取一个条件变量
Condition cdt = lock.newCondition();

拿到条件变量 cdt 后,需要等待条件成立的线程便可通过 await() 调用阻塞等待:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 首先需要获取到锁
lock.lock();
try {
    if (The condition is not met) {
        cdt.await(); // 阻塞等待条件成立
    }
    // 条件成立,执行任务
    run();
} finally {
    lock.unlock();
}

其它线程使条件成立后,调用 cdt.signal() 便可唤醒上述等待的线程:

1
2
3
4
5
6
7
8
// 首先需要获取到锁
lock.lock();
try {
    // 运行致使条件成立的代码
    cdt.signal(); // 然后唤醒等待的线程
} finally {
    lock.unlock();
}

2 Condition 接口

Condition 接口提供了与 wait/notify 功能类似的一组同步方法:

1
2
3
4
5
6
7
8
9
public interface Condition {
    // 等待条件成立
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    ...
    // 唤醒等待中的线程
    void signal();
    void signalAll();
}

3 ConditionObject 内部类

AQS 提供了一个继承自 Condition 的内部类 ConditionObject,用于在同步器中进行线程等待和唤醒的对象。

ConditionObject 中维护了一个类似于 AQS 同步队列的条件队列,其节点类型仍为 Node。这些节点通过 Node nextWaiter 字段形成了一个单向链表,用于保存正在等待条件的线程。ConditionObject 还通过两个字段分别指向条件队列的头和尾节点:

1
2
3
4
5
public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    ...
}

这些线程在其他线程调用 signal()signalAll() 方法时被唤醒。

4 await() 实现原理

await() 方法是 ConditionObject 内部类中最核心的方法之一。该方法用于使当前线程等待在与该 ConditionObject 关联的条件上,并在条件满足时重新去获取共享资源。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void await() throws InterruptedException {
    // 如果线程已被中断,直接抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程封装为一个 Condition 节点,并加到条件队列末尾
    //  内部不需要 CAS,因为此时已经获取到了独占资源
    Node node = addConditionWaiter();
    // 阻塞前,先释放当前节点所持有的资源,否则会死锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果 node 在同步队列上,跳出循环
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 阻塞自己
        // 如果收到中断新号,则将节点添加到同步队列,跳出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 当线程被唤醒,并且被添加到同步队列后,交给 aqs 同步队列重新获取资源
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 获取到资源后,删除条件队列中废弃的节点
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    // 根据中断模式处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • await() 方法内部涉及到了中断模式 interruptMode,它由 ConditionObject 中的两个常量字段指定:

    1
    2
    3
    4
    
    /** 退出 await 状态时,只需补偿中断信号 */
    private static final int REINTERRUPT =  1;
    /** 推出 await 状态时,需要直接抛出中断异常 */
    private static final int THROW_IE    = -1;

如果线程在等待过程中线程被中断,则根据中断模式进行处理:如果是 THROW_IE 模式,会重新抛出中断异常;如果是 REINTERRUPT 模式,则仅会调用 selfInterrupt() 方法补偿中断标志位。

以下是本流程中所涉及到的方法的详细介绍。

4.1 addConditionWaiter()

addConditionWaiter() 方法用于创建一个新的 Condition 类型节点,并将其添加到条件队列中。如果条件队列为空,则初始化该队列。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果条件队列尾节点状态不是 CONDITION,执行一次清理操作
    //  清理完成后,更新 t 引用
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程封装为一个 CONDITION 节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果尾节点为空,则初始化队列,否则直接将 node 插到末尾
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

可以看到,该方法并没有使用 CAS 操作来插入新节点(nextWaiter 字段甚至没有用 volatile 关键字修饰),这是因为 Condition 只用于可重入写锁,同一时刻只有一个线程会持有锁,因此是线程安全的

4.2 fullyRelease()

AQS 中的 AbstractQueuedSynchronizer#fullyRelease 方法通过调用 AbstractQueuedSynchronizer#release 释放当前节点持有的所有资源,并唤醒后继节点。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取节点的同步资源
        int savedState = getState();
        // 释放资源,并唤醒后继节点
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 如果释放资源失败,废弃该节点
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

4.3 isOnSyncQueue

AQS 中的 AbstractQueuedSynchronizer#isOnSyncQueue() 方法用于确认节点是否在同步队列上。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
final boolean isOnSyncQueue(Node node) {
    // 如果状态为 Condition,或者前驱节点为 null(单向节点)
    //  说明是条件队列节点,不在同步队列上
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果 `next` 指向后继节点,说明是同步队列节点
    //  因为条件队列用的是 nextWaiter 字段
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 上述条件都不满足,从同步队列尾节点向前遍历
    return findNodeFromTail(node);
}

可见,该方法利用两种队列的特性,在性能上做了一定的优化,当优化手段都失效后,再执行以下方法兜底:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private boolean findNodeFromTail(Node node) {
        Node t = tail;
    for (;;) {
        // 同步队列上找到了当前节点,说明这个节点在同步队列上
        if (t == node)
            return true;
        // 找到头依旧没找到,说明不在同步队列上
        if (t == null)
            return false;
        t = t.prev;
    }
}

4.4 checkInterruptWhileWaiting()

checkInterruptWhileWaiting() 方法用于检查中断信号。如果发生了中断,则返回中断模式,否则返回 0。实现如下:

1
2
3
4
5
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

当线程在条件队列上等待时被中断,会调用 transferAfterCancelledWait 方法将节点添加到同步队列。该方法会通过 CAS 操作将节点的状态由 CONDITION 更新为中间态 0。

  • 如果 CAS 操作成功,则将该节点添加至同步队列,并返回 true。
  • 如果 CAS 操作失败,说明节点的状态已经被其他线程更改,那么方法会自旋,直到该节点被成功添加至同步队列,再返回 false。

transferAfterCancelledWait 源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final boolean transferAfterCancelledWait(Node node) {
    // 更新状态为 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 将节点添加到同步队列
        enq(node);
        return true;
    }
    // 如果当前节点被插入尾节点过程中有其他线程调用该方法,则到这里自旋等待
    //  直到节点被成功添加,再返回 false
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

最终 checkInterruptWhileWaiting() 方法根据 transferAfterCancelledWait 的返回值,设置中断模式:

  • 如果返回 true,中断模式会被设置为 THROW_IE,这意味着该线程会抛出 InterruptedException
  • 否则,中断模式会被设置为 REINTERRUPT,这意味着该线程会被补偿中断信号。

4.5 unlinkCancelledWaiters()

unlinkCancelledWaiters() 方法负责将条件队列上的非 Condition 节点摘除。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            // 找到非 Condition 节点,将其短路
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null) // 找到了末尾,更新队列尾节点
                lastWaiter = trail;
        }
        else
            trail = t; //缓存上一个 Condition 节点
        t = next;
    }
}
  • 方法仅通过 volatile 语义保证一致性。因为方法内只涉及节点的短路,不涉及状态的变化,因此多个线程的重复操作不会影响一致性。

4.6 reportInterruptAfterWait()

reportInterruptAfterWait 方法用于根据中断模式处理中断信号。实现如下:

1
2
3
4
5
6
7
private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

如果是 THROW_IE 模式,这里会重新抛出中断异常;如果是 REINTERRUPT 模式,则仅调用 selfInterrupt() 方法来补偿中断标志位。

5 awaitUninterruptibly() 实现原理

awaitUninterruptiblyawait 方法作用类似,只是不会响应中断信号:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //如果发生中断,只做个标记,继续循环
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

6 signal() 实现原理

signal() 方法用于唤醒条件队列中的第一个节点,并将其从条件队列中移除。实现如下:

1
2
3
4
5
6
7
8
9
public final void signal() {
    // 只有持有排他资源,才能执行该方法,否则抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 唤醒条件队列头节点
    if (first != null)
        doSignal(first);
}

其中,doSignal() 是发送信号的核心实现:

1
2
3
4
5
6
7
8
9
private void doSignal(Node first) {
    do {
        // 头指针后移
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;//摘下原头节点
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

该方法首先将条件队列的头节点指向第二个节点。如果第二个节点为 null,说明之前条件队列上只有一个节点,因此将尾节点 lastWaiter 也更新为 null。然后,它会调用 transferForSignal() 方法将被摘下的头节点添加到同步队列的末尾:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
final boolean transferForSignal(Node node) {
    // 通过 CAS 将当前 Condition 节点状态更新为 0
    // 相当于废弃条件队列上的该节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;// 更新失败,说明节点已被其他线程处理,返回 false

    // 如果更新成功,将当前节点添加到同步队列末尾
    //  并返回原同步队列尾节点,即当前 node 的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果同步队列原先的尾节点状态正常,
    //   则通过 CAS 将其设置状态设置为 SIGNAL,
    //   如果设置失败立刻唤醒当前节点
    // 如果同步队列原先的尾节点已被取消,也立刻唤醒当前节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

通常情况下,signal() 方法不会直接唤醒条件队列的头节点,而是将其委托给同步队列去唤醒;只有在添加到同步队列后,前驱节点被取消或者状态无法被设置为 SIGNAL 时,才会亲自唤醒头节点。

7 signalAll() 实现原理

signalAll() 方法用于唤醒等待在条件队列上的所有线程。实现如下:

1
2
3
4
5
6
7
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

其中,doSignalAll() 是发送信号的核心实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
private void doSignalAll(Node first) {
    // 头尾指针都指向空,相当于保存当前条件队列的镜像
    //  后续线程可以向新队列插入数据,互不影响
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        // 摘下快照队列头节点
        first.nextWaiter = null;
        // 唤醒摘下的头节点
        transferForSignal(first);
        // 快照队列头指针后移
        first = next;
    } while (first != null);// 处理完链上所有节点
}

该方法的核心实现仍然是 transferForSignal(),因此不再进行赘述。与 doSignal() 方法不同的是,该方法会首先保存当前条件队列的快照,然后从快照队列的头节点开始逐个唤醒,直到唤醒所有节点。


欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容