Java StampedLock 实现原理

java.util.concurrent.locks.StampedLock

读写锁 一文中我们了解到,ReentrantReadWriteLock 是一种悲观读锁,读写之间仍然是互斥的,实际使用时性能并不高,而且容易导致写锁饥饿。因此,JDK 8 引入了一款支持 “乐观读” 策略的新式读写锁 ——StampedLock

StampedLock 读取的时候可以不对资源加锁,只有读取后发现数据被修改了,才升级成“悲观读”,一定程度上降低了读锁的地位,让资源更向写锁一边倾斜,降低写锁饥饿的概率。

1 使用示例

在分析源码前,我们先来看看源码中给出的官方使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    // 多个线程调用该方法,修改 x,y
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();// 写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 多个线程调用该方法,求距离
    double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead(); // 使用“乐观读”
        double currentX = x, currentY = y; //将数据拷贝到线程本地栈
        if (!sl.validate(stamp)) { //读取后,判断数据是否被修改过
            stamp = sl.readLock(); //如果被修改过,升级为悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
    // 多个线程调用该方法,移动坐标
    void moveIfAtOrigin(double newX, double newY) {
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = sl.tryConvertToWriteLock(stamp);
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

示例的 Point 类中有三个支持多线程调用的方法:

  • move() 方法:该方法用于移动坐标,支持多线程调用,由于是纯粹的“写操作”,所以方法直接使用了互斥的写锁 writeLock & unlockWrite
  • distanceFromOrigin() 方法:该方法用于计算点到原点的距离,支持多线程调用,由于是纯粹的“读操作”,因此方法一开始选择了乐观读锁 tryOptimisticRead(),如果读取后又有其他线程修改了数据导致版本号不匹配,就重新申请悲观读锁 readLock()
  • moveIfAtOrigin() 方法:该方法用于将原点的坐标移到目标位置,支持多线程调用,该方法选择先通过悲观读锁确认当前点位于坐标轴原点,当确认后再通过 tryConvertToWriteLock() 方法直接将悲观读锁升级为写锁。

2 state 变量

作为一个读写锁,StampedLock 也提供了一个分成两份的 state 变量,只不过 StampedLock 重新实现了一版 CLH 队列,并没有像 ReentrantReadWriteLock 一样基于 AQS 实现。因此,它将 state 直接设为了自己的私有变量。声明如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class StampedLock implements java.io.Serializable {
    // state 字段
    private transient volatile long state;
    // state 的初始值 1 0000 0000,即 256
    private static final long ORIGIN = WBIT << 1;

    private static final int LG_READERS = 7;
    private static final long RUNIT = 1L;
    // 写锁掩码 1000 0000 == 128,第八位表示写锁
    private static final long WBIT  = 1L << LG_READERS;
    // 读锁掩码 0111 1111 == 127,低七位表示读锁
    private static final long RBITS = WBIT - 1L;
    // 读锁的总数量:0111 1110 == 126
    private static final long RFULL = RBITS - 1L;
    // 综合掩码:1111 1111 == 255
    private static final long ABITS = RBITS | WBIT;
    // 1...1 1000 0000 = -128
    private static final long SBITS = ~RBITS;
}

如上所示,StampedLockstate 变量的低 8 位表示读锁和写锁的状态,其中:

  • 第 8 位表示写锁的状态(只拥有一个比特位,所以写锁不支持重入);
  • 低 7 位表示读锁的状态。

3 乐观读实现原理

state 为初始状态 ORIGIN 时,其二进制值为 WBIT 左移一位,即 1 0000 0000。为什么 state 变量的默认值不是 0 呢?这就涉及到了乐观锁的实现原理,源码如下:

1
2
3
4
5
6
public long tryOptimisticRead() {
    long s;
    // 第八位为 1,表示写锁被占用,此时直接返回 0
    // 否则返回 s & SBITS
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

当写锁被占用时,((s = state) & WBIT) != 0L,因此方法直接返回 0;当悲观读锁被占用时,state 的第 9 位以上肯定存在不为 0 的位(初始化与写锁原理使然),因此 s & SBITS 始终大于 0;当既没有读锁占用也没有写锁占用,也就是初始状态时,state == ORIGIN,即第 9 位为 1 其他位都为 0,因此 s & SBITS 等于 256 也大于 0。所以 tryOptimisticRead() 在存在写锁占用时返回 0,否则该方法均返回正值。

获取到乐观锁后,需要接着调用 validate() 判断锁是否有效:

1
2
3
4
5
public boolean validate(long stamp) {
    U.loadFence(); // sun.misc.Unsafe
    // 当 stamp 等于 0 时,该方法永远返回 false
    return (stamp & SBITS) == (state & SBITS);
}

该方法会判断 tryOptimisticRead() 的返回值与当前 state 变量的第 9 位以上部分是否相等,如果 tryOptimisticRead() 返回 0,则 validate() 必然返回 false,表明存在写锁占用,乐观锁失效;如果 tryOptimisticRead() 返回值大于 0,但等式不成立,那么说明期间存在其他线程完成过写锁的加锁与释放,乐观锁也会失效;其他情况下乐观锁有效,返回 true。

我们注意到,在取得 stamp 并比较之前,还插入了一个内存屏障 U.loadFence(),该屏障可以禁止其两边代码之间的重排序,因此上文代码示例中才“敢”在调用 tryOptimisticRead() 后,先执行变量复制操作 double currentX = x, currentY = y,而不是紧接着判断乐观读是否有效。

4 WNode 节点类

StampedLock 内部重新实现的 CLH 队列的节点 WNode 源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
/** 队列节点 */
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait;    // list of linked readers
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING(-1), or CANCELLED(1)
    final int mode;           // RMODE(0) or WMODE(1)
    /* 节点初始化的时候,会设定当前节点的类型,以及其前驱节点 */
    WNode(int m, WNode p) { mode = m; prev = p; }
}

/** CLH 队列头节点 */
private transient volatile WNode whead;
/** CLH 队列尾节点 */
private transient volatile WNode wtail;

WNode 组成的 CLH 队列与 AQS 中的队列有相似之处,默认情况下,头尾节点都指向 null。在初始化后,会将一个空的 WNode 设置为整个队列的头节点,并将与线程绑定的节点插入到头节点之后。因此,整个队列中的头节点作为空的虚节点存在。

特殊的是,WNode 中有一个 cowait 指针,会串联所有等待读锁资源的节点,方便批量唤醒线程。

5 写锁实现原理

5.1 写锁加锁操作

写锁的加锁操作包含 writeLock()tryWriteLock() 等多个方法,这里我们只分析其中的典型方法 writeLock()

1
2
3
4
5
6
public long writeLock() {
    long s, next;
    return ((((s = state) & ABITS) == 0L &&
                U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

写锁不支持重入且与读锁互斥,所以只有当 state 的低八位都为 0 时,才允许抢锁。当抢锁失败后,再将后续的重试过程移交给 acquireWrite() 方法。acquireWrite() 的实现如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;
    /* 第一个大循环,包含入队前的 CAS 抢锁操作、队列初始化以及尝试入队操作 */
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 如果 state 低 8 位为 0 了,尝试 CAS 加锁,否则继续自旋
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0) // 更新 spins
            // 当前锁被其他线程写锁占用,且队列为空,则 spins = 0,退出自旋
            // 否则初始化 spins
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            // SPINS 为进入队列前的最大重试次数,
            //  值为 (NCPU > 1) ? 1 << 6 : 0
        else if (spins > 0) { // 根据随机函数决定是否减少 spins
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        else if ((p = wtail) == null) {
            // 如果队列为空,则初始化队列
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null) // 初始化节点
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            p.next = node;
            break; // 最终将节点插入队列,跳出循环
        }
    }

    /* 第二个大循环,包含节点阻塞前的自旋、阻塞与唤醒操作 */
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        if ((h = whead) == p) { // 如果队列此前没有其他节点在等待
            if (spins < 0) // 则先初始化 spins
                // HEAD_SPINS 为阻塞线程前的最大自旋次数,
                //  值为 (NCPU > 1) ? 1 << 10 : 0
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS) {
                // MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0
                //  第二次进入循环时,说明是 re-locking 操作,
                //   则扩大自旋次数上限
                spins <<= 1;
            }
            // 阻塞前自旋 spins 次,尝试 CAS 加锁
            for (int k = spins;;) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                              ns = s + WBIT)) {
                        whead = node;
                        node.prev = null;
                        // 成功则返回 384(ORIGIN + WBIT)
                        return ns;
                    }
                }
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                          --k <= 0)
                    break; // 自旋次数超了,则跳出循环
            }
        }
        // 如果队列中存在其他等待读锁的节点
        else if (h != null) {
            // 则沿着 cowait 全部唤醒
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        // 如果本次自旋执行到这里,头节点仍然没变化,则阻塞
        if (whead == h) {
            if ((np = node.prev) != p) {
                // 上次自旋如果 p 是 CANCELLED,会被短路,
                //  所以这里更新 p 引用,使它继续指向当前的前驱节点
                if (np != null)
                    (p = np).next = node;
            }
            else if ((ps = p.status) == 0) // 更新前驱节点状态
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                // 如果 p 被取消了,则短路 p
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else { // 阻塞线程
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    U.park(false, time);  // 阻塞,等待被唤醒
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

整个 acquireWrite 方法是两个大的 for 循环,内部实现了非常复杂的自旋策略:

  • 第一个大循环的目的是创建当前线程的 WNode ,并将其插入 CLH 队列的尾部。自旋过程中如果发现锁未被任何线程占用时,还会通过 CAS 尝试加锁。
  • 第二个大循环的目的是阻塞当前线程:
    • 当程序进入第二个大循环,说明当前 WNode 已经位于 CLH 队列的尾部,可以阻塞了。
      • 不过在阻塞前,如果当前 CLH 队列中没有其他等待的节点,还会继续自旋加锁直到达到 MAX_HEAD_SPINS 指定的次数。
      • 如果 CLH 队列中存在其他阻塞的节点,因由于锁之间不互斥,所以这里会沿着 cowait 链表唤醒全部读锁节点,对应上述代码中高亮的 68 ~ 76 行。
    • 当其他线程在解锁操作中调用 release() 方法时,便会唤醒等待队列中阻塞等待的第一个节点,被唤醒的节点再接着执行第二个大循环中剩下的代码。
  • 在循环的过程中,如果 CAS 成功,则返回 state 的值为 384,否则一直循环下去直到被中断信号取消。

5.2 写锁解锁操作

写锁的解锁操作为 unlockWrite(long stamp),实现如下:

1
2
3
4
5
6
7
8
public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    if ((h = whead) != null && h.status != 0)
        release(h);
}

当释放写锁时,首先通过 state 判断写锁是否被占用,如果没有被占用则抛出异常;否则更新 state 的值。这里我们重点关注 state 值的更新表达式:(stamp += WBIT) == 0L ? ORIGIN : stamp,这里并没有直接将 state 值的第 8 位设为 0,而是通过进位操作state 的低八位为 0。不断加读锁,则不断进位,直到溢出后再将 state 初始化为 ORIGIN

为什么写锁解锁时要采用进位方式处理?
之所以这么做,是因为乐观锁需要知晓处理期间是否有其他线程申请并释放过写锁,因为只要其他线程申请过写锁,数据就有失效的可能。这种进位的方式可以在 state 第 9 位以上的部分保留写锁的操作痕迹,相当的精妙!

最后当 CLH 头部的虚节点不为空,且状态正常时,调用 release() 唤醒等待队列中第一个等待的线程。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0); // 更新虚节点状态为 0
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 如果等待队列中的第一个实节点为空,或状态被取消,
            //  则从后往前寻找第一个正常的节点,进行唤醒
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w); // 唤醒目标节点
    }
}

6 读锁实现原理

6.1 读锁加锁操作

StampedLock 的读锁加锁方法包括 readLock()tryReadLock()readLockInterruptibly() 等,本文只介绍其中的典型 readLock() 方法,实现如下:

1
2
3
4
5
6
public long readLock() {
    long s = state, next;
    return ((whead == wtail && (s & ABITS) < RFULL &&
                U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

由于读锁与写锁互斥,且 StampedLock 的设计目标是降低写锁饥饿的概率,所以只有当 CLH 队列为空,且读锁还有剩余资源时,才会通过 CAS 抢锁,如果抢锁成功则返回 CAS 后的 statenext。以上情况都不满足则调用 acquireRead() 托管后续的重试过程。读锁的 acquireRead() 方法与写锁的 acquireWrite() 整体流程类似,只不过当读锁剩余资源数量不足时,会考虑对读锁容量进行扩容:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
if ((m = (s = state) & ABITS) < RFULL ? // 判断当前 state 是否小于 RFULL (126)
    U.compareAndSwapLong(this, STATE, s,  ns = s + RUNIT) : // CAS 抢锁
    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) // 否则扩容
{
    WNode c; Thread w;
    whead = node;
    node.prev = null;
    while ((c = node.cowait) != null) {
        if (U.compareAndSwapObject(node, WCOWAIT,
                                    c, c.cowait) &&
            (w = c.thread) != null)
            U.unpark(w); // 唤醒其他等待中的读锁节点
    }
    return ns; //加锁成功,返回
}

tryIncReaderOverflow() 实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private long tryIncReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    if ((s & ABITS) == RFULL) { // 如果 state 恰好等于 RFULL (126)
        // 尝试通过 CAS 将 state 由 126 设置为 127
        if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
            // 设置成功的线程将 readerOverflow+1,再将 state 降回 126
            // 由于只有一个线程会 CAS 成功,所以这里是线程安全的
            ++readerOverflow;
            state = s;
            return s;
        }
    }
    // state 升到 127 后,其他线程会进入这个分支,让出 CPU
    else if ((LockSupport.nextSecondarySeed() &
            OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    // 除了上述 CAS 成功的线程,其他线程都返回 0
    return 0L;
}

因此,扩容过程中 CAS 成功的线程会加锁成功并返回,其他线程都会继续下一轮自旋。

为什么 RFULL 要等于 126,且扩容时要引入一个新变量 readerOverflow ?

其实问题这么问,答案便呼之欲出了。

首先回答第一个问题:我们知道 StampedLockstate 变量的低 7 位用作读锁资源,那么理论上读锁资源总数应该有 127 个才对,为什么要设置为 126 呢?这就与上方介绍的 tryIncReaderOverflow() 方法实现有关:tryIncReaderOverflow() 方法的扩容是通过 CAS state 变量实现的,判定成功的条件就是线程是否将 state 由 126 设置为 127,因此这里预留了一个值用于 CAS。

解答了第一个问题,第二个问题就简单了:state 变量的 bit 位被卡死了 7 位,因此需要新的变量来存储扩容后的读锁加锁数量。

6.2 读锁解锁操作

读锁的解锁操作为 unlockRead(long stamp),实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 如果参数值不与当前 state 相等
        // 或 stamp 低八位为 0
        // 或当前 state 低八位为 0
        // 或写锁被占用
        // 则抛出异常
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { // 释放读锁
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h); // 释放 head 的后继节点
                break;
            }
        }
        // 如果之前提高过读锁上限,这里降低上限
        else if (tryDecReaderOverflow(s) != 0L)
            break;
    }
}

当读锁的释放操作较写锁复杂一些:它首先会判断其是否符合释放条件,如果符合条件,再接着判断读锁是否被扩容过。如果没有扩容过,就直接 CAS state 变量释放资源,成功后唤醒队列中第一个等待的线程;如果读锁被扩容过,则调用 tryDecReaderOverflow() 完成解锁。tryDecReaderOverflow() 方法实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 private long tryDecReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    if ((s & ABITS) == RFULL) {
        if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
            int r; long next;
            if ((r = readerOverflow) > 0) {
                readerOverflow = r - 1;
                next = s;
            }
            else
                next = s - RUNIT;
                state = next;
                return next;
        }
    }
    else if ((LockSupport.nextSecondarySeed() &
                OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    return 0L;
}

可见,减少容量操作与提高容量类似,都是通过对 state 变量进行 CAS 操作实现的,CAS 成功的线程继续操作 state 变量并返回其值,表示解锁成功;其他失败的线程均返回 0,表示解锁失败。


欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容