Java Semaphore 源码解析

java.util.concurrent.Semaphore

Semaphore 是 JUC 中基于 AQS 共享模式实现的用于并发控制的信号量。

1 使用示例

下面是一个 Semaphore 的简单使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
private static int n = 0;

public static void main(String[] args) {
    // 初始化十个信号量
    Semaphore semaphore = new Semaphore(10);
    for (int i = 0; i < 15; i++) {
        new Thread(() -> {
            try {
                semaphore.acquire();// 获取信号量的许可,如果获取不到则阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(++n);
        }).start();
    }
}

在以上示例中,我们创建了一个包含 10 个资源的信号量,并使用 15 个线程对其进行申请。当线程成功获取信号量后,不释放信号量直接退出,并打印出相关结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
2
6
8
5
3
1
4
10
9
7

从结果可以看出,成功获取信号量的线程数最大为 10,这意味着只有 10 个线程成功获取了信号量,剩下的五个线程被阻塞了。下面我们详细介绍 Semaphore 的 API。

2 API 简介

2.1 申请信号量

方法说明
acquire()

获取信号量中的一个许可证,如果有可用许可证并立即返回,可用许可证的数量减一。

如果没有可用的许可,则当前线程休眠,直到发生以下两种情况之一:

  • 其他线程为此信号量调用 release 方法,并且当前线程接下来要分配一个许可;
  • 或一些其他线程 Threadinterrupt 当前线程。

      如果当前线程在进入此方法时设置了中断状态;或者在等待许可时被 Threadinterrupt,就会抛出 InterruptedException 并清除当前线程的中断状态。

acquire(int permits)获取信号量中指定数量的许可,其他同 acquire()
acquireUninterruptibly()acquireUninterruptibly(int permits)阻塞状态下无视其他线程的中断信号
tryAcquire()尝试从信号量中获取一个许可,如果无可用许可,直接返回 false,不会阻塞
tryAcquire(int permits)尝试从信号量中获取指定数量的许可,如果无可用许可,直接返回 false,不会阻塞
tryAcquire(int permits, long timeout, TimeUnit unit)在指定的时间内尝试从信号量中获取指定数量的许可证,如果在指定的时间内获取成功,返回 true,否则返回 false

2.2 释放信号量

方法说明
release()release(int permits)释放一个许可、释放指定数量的许可

3 Semaphore Sync 同步器

3.1 state 变量

SemaphoreSync 同步器使用 state 变量表示可用信号的数量:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
abstract static class Sync extends AbstractQueuedSynchronizer {
    // 构造方法中初始化信号量初始值
    Sync(int permits) {
        setState(permits);
    }
    // 获取剩余信号量
    final int getPermits() {
        return getState();
    }

    //...
}

3.2 Sync 方法

3.2.1 nonfairTryAcquireShared()

Sync 同步器提供了一个默认的非公平信号量获取方法 nonfairTryAcquireShared(int)。源码如下:

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

该方法首先获取当前信号量数量 state,然后用 state 减去请求的信号量数量 acquires,如果结果小于 0,则表示剩余信号量不足,直接返回这个差值;否则,使用 CAS 尝试获取信号量,如果尝试成功则返回剩余的信号量,如果失败则自旋重试。

3.2.2 tryReleaseShared()

tryReleaseShared(int)Sync 同步器中提供的信号量释放的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

该方法本质上也是通过调用 AQS 的 state 修改方法完成对信号量数量的更新。

3.2.3 reducePermits(int)、drainPermits()

reducePermits(int)drainPermits()Sync 同步器中提供的信号量删除方法,它们的实现本质上与信号量申请操作类似,只是代表的含义不同。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 删除指定数量的信号量
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}
// 删除全部信号量
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

3.3 实现类 NonfairSync

NonfairSyncSemaphore 中的非公平同步器(默认),它继承自 Sync。源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
static final class NonfairSync extends Sync {
    // 构造方法
    NonfairSync(int permits) {
        super(permits);
    }
    // 仅调用 Sync 提供的 nonfairTryAcquireShared 实现
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

NonfairSync 的实现非常简单,它重写了 AQS 共享模式下尝试获取资源的钩子函数 tryAcquireShared(int),其中调用了父类 Sync 提供的非公平实现的 nonfairTryAcquireShared 方法。

3.4 实现类 FairSync

FairSyncSemaphore 中的公平同步器,它也是 Sync 的子类。源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {
    // 构造方法
    FairSync(int permits) {
        super(permits);
    }
    // 自己实现的公平锁
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果当前节点有前驱节点,直接返回 -1
            if (hasQueuedPredecessors())
                return -1;
            // 如果当前节点已经是头节点,或者等待队列空了,开始获取信号量
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

FairSync 也重写了 AQS 中的钩子函数 tryAcquireShared(int),并实现了自旋锁的公平性原则:在尝试获取信号量之前,该方法会先判断当前节点的前驱节点是否是头节点,或者等待队列是否为空。只有满足了这两个条件中的一个,该方法才会尝试获取信号量;否则直接返回 -1。

4 Semaphore 方法

4.1 构造方法

Semaphore 提供了两个构造方法:

1
2
3
4
5
6
7
8
// 创建一个非公平的信号量,参数为许可证数量
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 通过 fair 参数控制是否公平
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
  • 第一个构造方法只接受一个 int 类型的参数 permits,它表示初始的信号量数量,如果 permits 的值为 0,则所有线程都无法获取信号量。
  • 第二个构造方法除了初始信号量数量外,还可以指定同步控制器的公平性。如果指定为 true,则使用公平同步控制器,否则使用非公平同步控制器。
注释
  • 如果 permits 参数传入 1,那么 Semaphore 将退化为排它锁;
  • 如果 permits 参数传入 0,那么 Semaphore 相当于一个同步阻塞器。

4.2 其他方法

Semaphore 中的其余方法基本都是通过调用 AQS 及实现类中的方法实现的。以下是其中一些方法的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 申请 1 个信号量
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 申请 permits 个信号量
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

// 释放 1 个信号量
public void release() {
    sync.releaseShared(1);
}

// 释放 permits 个信号量
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

...


欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容