Java AQS 同步功能详解

1 AQS 简介

AQS(即 AbstractQueuedSynchronizer,抽象队列同步器)是 JUC 中实现并发同步功能的基石,它抽象了同步器的状态管理线程的排队和唤醒、同步器的共享/独占模式等关键部分。同时,AQS 还提供了一套通用的 API,使得开发者能够更方便地使用同步器。

AbstractQueuedSynchronizer 类的声明如下:

1
2
3
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable
  • AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizerAbstractOwnableSynchronizer 抽象类提供了创建可能需要所有权概念的锁和相关同步器的基础。虽然它的功能很简单,但它是一个重要的基类,为 AbstractQueuedSynchronizer 和其他相关同步器提供了必要的支持。

2 state 变量

在 AQS 中,维护了一个 volatile int state 变量来表示共享资源。通过调用 getStatesetStatecompareAndSetState 方法,可以方便地查看或修改共享资源的值:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
private volatile int state;

// get 方法
protected final int getState() {
    return state;
}
// set 方法
protected final void setState(int newState) {
    state = newState;
}
// 基于 CAS 的 set 方法
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 作为一个抽象同步类,并没有限定 state 的使用规则,具体如何利用它由实现类自行决定。例如:

  • ReentrantLockstate 设定成一种可重入锁,当 state == 0 时表示资源已被释放,否则表示重入次数。
  • ReentrantReadWriteLockstate 的高 16 位用作读锁重入次数,低 16 位用作写锁重入次数。
  • Semaphorestate 用于记录当前可用信号数量。
  • CountDownLatchstate 用于记录计数器的当前值。

3 同步队列

同步队列是整个 AQS 的核心,它是一个双向队列,通过 headtail 两个指针分别指向队列的头和尾,而且队列头节点是一个空节点,不与任何线程关联:

同步队列

初始状态下,headtail 指针均指向 null,当往队列中添加阻塞线程时,会先创建一个空节点,并让 headtail 都指向这个空节点,然后再在空节点后插入被阻塞线程的节点。所以,当 head == tail 的时候,就说明队列为空了。

4 Node 内部类

Node 内部类提供了以下几个字段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的
static final Node SHARED = new Node();
// 标记线程是获取独占资源时被挂起后放入 AQS 队列的
static final Node EXCLUSIVE = null;

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

//表示节点的状态,不同的值代表不同的含义
volatile int waitStatus;

// 记录前驱节点
volatile Node prev;
// 记录后继节点
volatile Node next;

// 存放进入 AQS 队列里面的线程
volatile Thread thread;

// 表示条件队列中下一个节点
Node nextWaiter;

这里我们重点关注表示节点状态的 volatile int waitStatus,它共有 5 种取值:

  • 0:新节点入队时的默认状态,或者处理过程中的中间状态;

  • CANCELLED (1):表示当前节点已取消调度。

    节点一旦进入该状态,将不再发生状态变化,通常发生在节点超时或被中断时;

  • SIGNAL (-1):表示当前节点的后继节点正在等待唤醒。

    当后继节点入队时,会将前驱节点的状态更新为 SIGNAL,以通知后继节点当前节点已经释放资源。

  • CONDITION (-2):表示当前节点由于不满足某个条件而被阻塞,常用于实现 Condition 接口中的同步队列;

  • PROPAGATE (-3):用于共享同步模式,表示释放共享资源时,需要唤醒的节点不仅包括当前节点的后继节点,还可能包括其他节点。(本质是为了解决 JDK1.6 中共享模式的 BUG 而引入的一种新状态)

waitStatus 的负值表示节点处于有效等待状态,正值表示节点已取消。

5 AQS 的模板方法

前文提到,AQS 作为一个抽象同步类,并没有限定 state 的使用规则,但这只是其抽象设计的一部分。实际上,AQS 在抽象独占共享两种同步模式时,通过模板方法限定了资源获取 (acquire)释放 (release) 的过程。这样一来子类只需要实现 tryAcquire(int arg)tryRelease(int arg) 等对 state 的具体操作即可实现各种同步策略。

AQS 提供了 acquire()release()acquireShared()releaseShared() 四个基础模板方法。前两个用于独占模式,后两个用于共享模式。下面我们将详细分析这些方法的实现原理。

6 独占模式 (Exclusive)

6.1 获取资源 acquire()

acquire() 方法是独占模式下线程获取共享资源的顶层入口。如果线程获取到资源,则直接返回。否则,线程将被添加到同步队列中,一直等待直到获取到资源为止。需要注意的是,在整个获取资源的过程中,线程将不会触发上层应用的中断处理逻辑。

以下是 acquire() 方法的实现:

1
2
3
4
5
6
@ReservedStackAccess
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法的执行流程如下:

  1. 首先调用 tryAcquire() 钩子函数获取资源,具体交由子类实现;
  2. 如果获取资源失败,则调用 addWaiter() 将该线程加入同步队列尾部,并标记为独占模式;
  3. 然后线程进入 acquireQueued() 方法挂起,直到成功获取资源后才会返回。
  4. acquireQueued() 方法会根据阻塞过程中是否发生过中断来返回结果。如果线程被中断过,则该方法会返回 true,然后调用 selfInterrupt() 方法来补偿中断信号。

以下是本流程中所涉及到的方法的详细介绍。

6.1.1 tryAcquire()

tryAcquire 是 AQS 模板方法中的一个钩子函数,具体需要子类去重写,直接调用会抛出异常,实现如下:

1
2
3
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
为什么 AQS 不将 tryAcquire() 设计成抽象方法
抽象方法会强制要求子类去实现。AQS 抽象了两种同步类型:独占模式和共享模式,以资源获取操作 acquire 为例,顶层方法分别为 tryAcquiretryAcquireShared。如果 AQS 将这两个方法定义为抽象方法,那么面向独占模式的实现类就必须实现用不到的 tryAcquireShared 方法,面向共享模式的实现类也必须实现 tryAcquire 方法。即使实现类可以像 AQS 一样在不需要的方法中抛出异常来满足要求,但这既麻烦,又不优雅。

6.1.2 addWaiter()

addWaiter() 方法用于在同步队列的队尾添加一个新的节点,实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private Node addWaiter(Node mode) {
    // 把当前线程封装为 node,并指定资源访问方式
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 如果 tail 不为空,直接把新 node 插到末尾
    if (pred != null) {
        node.prev = pred;
        // 通过 CAS 将 tail 指向新 node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果 tail 是空,说明同步队列未初始化,执行初始化
    // 如果 compareAndSetTail 操作失败,也进入该方法,自旋重试
    enq(node);
    return node;
}

addWaiter 方法内部首先会创建一个新节点,该节点的线程为当前线程,模式为传入的 mode,可以是独占模式 (Node.EXCLUSIVE) 或共享模式 (Node.SHARED)。然后获取同步队列尾节点的 pred。如果 pred 不为空,则将新节点插入队列的尾部;否则,调用 enq 方法对队列进行初始化,并将新节点插入到队尾。最后返回新创建的节点。其中,enq 的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // CAS:如果头为空,就用虚拟 node 节点初始化头
            if (compareAndSetHead(new Node()))
                tail = head;// 然后将 tail 也指向头
        } else {
            // 已经初始化好了,则 CAS + 自旋,将新节点设为 tail
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq 用于初始化同步队列,并通过自旋将 node 节点插入队列末尾。该方法首先获取同步队列的尾节点 tail

  • 如果 tail 为空,表示同步队列为空,需要进行初始化。在初始化过程中,使用 compareAndSetHead 方法将一个空的 Node 设置为 head 节点,head 节点本身不保存任何线程信息。
    • 如果成功设置头节点,则将 tail 指向该头节点。
    • 否则,说明其他线程已完成同步队列的初始化,因此直接开始下一轮循环。
  • 如果 tail 不为空,则将当前节点插入到队尾。具体做法是将当前节点的 prev 指向 tail,然后调用 compareAndSetTail 方法将同步队列的尾节点设置为当前节点。
    • 如果设置成功,则将原 tail 节点的 next 指向当前节点,然后返回原 tail 节点。
    • 否则,自旋重试该过程。

6.1.3 acquireQueued()

线程节点成功插入同步队列后,便会进入 acquireQueued() 中阻塞等待。源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;// 标记是否成功拿到资源
    try {
        boolean interrupted = false;// 是否发生过中断
        for (;;) {
            // 获取当前线程节点的前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点已经是队列头了,说明当前节点是队列中第一个有效节点,所以尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 获取到了资源,则将该线程设置为新队头
                setHead(node);
                // 摘下原队头
                p.next = null; // help GC
                failed = false;
                // 返回等待过程中的中断标志
                return interrupted;
            }
            // 如果前驱节点不是 head,或者抢占资源失败了,则检查是否可以安全挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //如果可以安全挂起,则执行挂起操作,否则继续循环 
                // 如果等待过程中被中断过,哪怕只有那么一次,也会将 interrupted 标记为 true
                interrupted = true;
        }
    } finally {
        // 如果等待过程中没有成功获取资源(如 timeout,或者可中断的情况下被中断了),
        // 那么取消节点在队列中的等待
        if (failed)
            cancelAcquire(node);
    }
}

该方法首先判断当前线程节点的前驱节点是否是头节点:

  • 如果是则说明当前节点是同步队列中最靠前的等待节点,所以调用 tryAcquire 尝试获取资源;

  • 如果不是头节点,或者上一步获取资源失败,则调用 shouldParkAfterFailedAcquire() 判断能否被安全挂起,当满足挂起条件时调用 parkAndCheckInterrupt() 将线程挂起。

  • 如果成功获取到资源,则调用 setHead() 将当前节点设置为新的空队头(即移除该线程),实现如下:

    1
    2
    3
    4
    5
    6
    
    private void setHead(Node node) {
        head = node;
        // 队头都是空节点,不关联线程信息
        node.thread = null;
        node.prev = null;
    }

6.1.4 shouldParkAfterFailedAcquire()

shouldParkAfterFailedAcquire 方法用于确定当前线程是否可以安全地休眠,以及在某些情况下是否应该继续尝试获取同步资源。如果线程能够被安全挂起,该方法将会返回 true,否则返回 false。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点的状态已经被设置为 SIGNAL,则当前线程可以安全的休眠
        return true;
    if (ws > 0) {
        // 前驱节点状态为 CANCELLED,则向前遍历,直到遇到正常状态的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 将所有 CANCELLED 节点短路(异常节点将被 GC)
        pred.next = node;
    } else {
        // 如果前驱节点状态正常,但不为 SIGNAL,则通过 CAS 操作设置前驱节点状态,并返回 false
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

6.1.5 parkAndCheckInterrupt()

parkAndCheckInterrupt() 方法用于将当前线程挂起(休眠),直到有其他线程中断当前线程或者等待时间结束。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    // 返回中断状态,并清除中断标志
    return Thread.interrupted();
}

// 其中,LockSupport.park() 实现如下:
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

parkAndCheckInterrupt() 方法通过调用 Unsafe.park 方法让当前线程进入休眠状态,并在其他线程调用了 UNSAFE.unpark(Thread thread) 方法或 thread.interrupt() 方法时被唤醒,同时设置中断标志位。

需要注意的是,park() 方法在阻塞线程时会检查线程的中断状态,如果线程已经被中断,它会立即返回而不进入阻塞状态。

6.1.6 selfInterrupt

当线程被中断唤醒后,会调用 selfInterrupt() 方法补偿被清空的中断标志,供上层业务使用。该方法实现如下:

1
2
3
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
注意

调用 parkAndCheckInterrupt() 方法会使线程进入休眠状态,被中断唤醒后将清空中断标志并记录中断情况。若发生过中断,获取到资源时会再次补偿中断信号。

清空中断标志的原因:

  • 当线程在被唤醒时获取资源,可能会再次失败,需要再次进入休眠状态。如果未清空中断标志,会导致休眠失败并导致 CPU 占用率升高。

手动补偿中断信号的原因:

  • 上层业务可能需要针对中断信号做进一步处理,所以不能把信号吞掉。

6.2 释放资源 release()

release() 方法是独占模式下线程释放共享资源的顶层入口。如果线程成功释放了资源,则唤醒后继节点并返回 true,否则直接返回 false。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@ReservedStackAccess
public final boolean release(int arg) {
    // 尝试释放资源
    if (tryRelease(arg)) {
        // 获取同步队列头节点
        Node h = head;
        // 如果节点状态不等于 0,则唤醒后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

6.2.1 tryRelease()

tryRelease() 也是模板方法 release() 中的钩子函数,这里不再赘述了。

1
2
3
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

6.2.2 unparkSuccessor()

unparkSuccessor() 用于唤醒 node 的后继节点。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果参数节点的状态为负(正常状态),则尝试将它的状态设为 0
    // 这样避免过多的线程进入这个方法
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 拿到参数节点的后继节点
    Node s = node.next;
    // 如果后继节点是空,或者状态为 CANCELLED
    if (s == null || s.waitStatus > 0) {
        s = null;
        //则从 tail 向前遍历,找到最靠前的正常状态的(且非参数节点本身)节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 后继节点不为空,或者从后往前找到了正常节点,将其内部线程唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

需要注意的是,在正常情况下,头节点的后继节点需要被唤醒。但是,如果发生意外情况,例如头节点的下一个节点此时恰好为 null,或者后继节点状态异常,就需要从 tail 开始从后往前查找最靠前的正常状态的节点(且非头节点本身),然后唤醒它。

7 共享模式 (Share)

7.1 获取资源:acquireShared()

acquireShared() 方法是共享模式下线程获取资源的顶层入口。在整个获取资源的过程中,线程将不会触发上层应用的中断处理逻辑。实现如下:

1
2
3
4
5
public final void acquireShared(int arg) {
    // 调用钩子函数获取资源
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg); //如果获取失败,加入同步队列重试
}

7.1.1 tryAcquireShared()

1
2
3
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquireShared() 也是一个钩子函数,AQS 中未提供默认实现,但规定了该方法返回值的含义,需要我们重点关注:

  • < 0:表示当前线程未获取到资源,需要进一步等待。
  • == 0:表示当前线程已经获取到资源,但是剩余资源不足,后续线程无法继续获取。
  • > 0:表示当前线程已经获取到资源,并且后续线程也可以获取资源。

7.1.2 doAcquireShared()

doAcquireShared()acquireShared 中的核心方法,实现与 acquirequeued() 类似:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
    // 将线程加入同步队列末尾,设置为共享模式
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 如果前驱节点是头节点,尝试获取一次共享资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {// 成功获取到资源
                    // 将当前节点设置为新的头节点,并根据条件决定是否唤醒后续节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted) // 如果被中断过,补偿中断
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 抢占资源失败,判断能否安全休眠,如果可以则休眠
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在共享模式下,同步队列的管理与独占模式相似。但需要在获取资源后,判断剩余资源是否足够满足后续线程的需求。这一关键步骤决定了后续线程是等待还是继续尝试获取资源。

7.1.3 setHeadAndPropagate()

setHeadAndPropagate() 方法的主要作用是设置队列头节点并决定是否唤醒后继节点。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
private void setHeadAndPropagate(Node node, int propagate) {
    // 缓存旧的头节点
    Node h = head;
    // 将当前节点设置为新的头节点,并将原头节点摘下 GC
    setHead(node);
    // 如果 propagate > 0,表示剩余资源足够
    // 如果原头节点为空,或状态为负
    // 如果当前节点为空,或状态为负
    //  上述条件满足一个,就唤醒后继共享模式节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

后继节点的唤醒由 doReleaseShared() 方法实现,我们将在 下文 对其进行介绍。这里我们重点关注唤醒后继节点的条件:#todo

7.2 释放资源:releaseShared()

releaseShared() 是共享模式下实现资源释放的顶层方法。源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@ReservedStackAccess
public final boolean releaseShared(int arg) {
    // 尝试释放资源
    if (tryReleaseShared(arg)) {
        // 释放成功,唤醒后继节点
        doReleaseShared();
        return true;
    }
    // 释放失败,返回 false
    return false;
}

7.2.1 doReleaseShared()

doReleaseShared() 方法是共享模式下资源释放的核心方法。源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doReleaseShared() {
    for (;;) {
        // 缓存当前头节点
        Node h = head;
        // 如果头节点不为空,且存在线程节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 如果头节点状态为 SIGNAL,则将其更新为 0,更新失败则自旋重试
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // CAS 成功的线程,唤醒后继节点
            }
            else if (ws == 0 && // 如果头节点状态已经为 0,说明前边有线程处理过,将其更新为 PROPAGATE
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果上边代码执行过程中有其他线程节点获取到了资源,就继续循环,否则退出方法
        if (h == head)
            break;
    }
}
  • 该方法只有在头节点的状态为 SIGNAL 时才会唤醒后继节点,并将头节点的状态设置为中间状态 0。
  • 如果在此时有其他线程执行了 releaseShared() 操作,也进入了该方法,由于头节点未发生变化,因此会获取到中间状态的 waitStatus == 0,于是将头节点状态设置为 PROPAGATE
  • 如果在 doReleaseShared() 方法执行期间,头节点未发生变更,则可以直接安全地返回。但是,如果在方法执行期间,有其他节点线程被唤醒,并成功获取到了资源,将自己设置成了新的头节点,那么就需要重新进行循环加速处理过程。

8 总结

通过前文对两种模式的介绍,我们可以发现它们的流程大体上相似。都是首先尝试获取资源,如果获取失败则将当前线程加入同步队列,并由队列管理后续的资源获取操作,直到获取到资源才返回。不同之处在于共享模式下,同步队列中的节点在获取到资源后会判断资源是否还有剩余,如果有则会立即唤醒自己的后继节点,以便多个线程能够并发地访问共享资源。而独占模式则不具备这一特性,同一时刻只有一个线程可以访问资源。

需要注意的是,本文只介绍了两种模式下的基础模板方法,这些方法在处理过程中会忽略中断信号。除此之外,AQS 还有一套对应的响应中断的模板方法:acquireInterruptiblyacquireSharedInterruptibly。这些方法的实现方式与前述方法类似,但是在收到中断信号时会抛出中断异常。


欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容