Java Synchronized 关键字的底层实现原理详解

很多老开发者偏好使用 ReentrantLock 实现并发控制,这是因为早期的 synchronized 只能通过操作系统的 MutexLock 实现锁操作,开销较大性能也差。然而,Java 官方更愿意让用户在一般情况下放心地使用 synchronized。因此,从 JDK 1.6 开始,他们对 synchronized 进行了优化,现在的 synchronized 性能也越来越高。

JDK 主要进行了以下几项优化:

1 锁粗化

互斥临界区的范围应尽量小,以缩短串行操作的执行时间,使等待锁的其他线程能够尽早获得锁。然而,加锁操作本身也会消耗资源,如果存在一连串对同一个对象的连续加锁、释放锁操作,可能会导致不必要的性能损耗。

锁粗化就是将用户的多个连续加锁和解锁操作合并成一个更大范围的锁,从而避免频繁的加、解锁操作。这样可以有效减少锁竞争,提高性能。示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
for (int i = 0; i < 1000; i++) {
    synchronized(this) {
        j++;
    }
}
/* 锁粗化后: |
             v
*/
synchronized(this) {
    for (int i = 0; i < 1000; i++) {
            j++;
    }
}

2 锁消除

JVM 在即时编译 (JIT) 时,可以扫描运行上下文,进行逃逸分析。如果发现被锁定的资源不可能发生资源竞争,那么就会移除这个锁,从而避免无谓的时间片消耗,这就是锁消除技术。

例如以下示例,方法中锁定的对象是在方法内创建的,因此无论如何都不会被其他线程访问到(被锁定对象没有发生逃逸),因此会被优化掉:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void func() {
    int j = 0;
    synchronized(new Object()) {
        for (int i = 0; i < 1000; i++) {
            j++;
        }
    }
}
/* 锁消除后: |
             v
*/
void func() {
    int j = 0;
    for (int i = 0; i < 1000; i++) {
        j++;
    }
}

3 锁升级 *

从 JDK 1.6 开始,为了降低获取和释放锁所带来的开销,引入了偏向锁和轻量级锁。随着资源竞争的加剧,synchronized 会不断升级锁的级别,具体升级顺序为:「无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁」且锁只会升级不会降级

Java 中每个对象都有一个对象头,它的具体内容请参考 JVM 架构专栏,这里不再赘述。Java 对象头中的 Mark Word 记录的锁标志位会随着锁的状态变化而发生改变。

3.1 偏向锁

在没有多线程竞争的情况下,轻量级锁的多次 CAS 操作便失去了意义,因此 synchronized 引入了偏向锁。该锁只在第一次获取时执行一次 CAS 操作,后续在不发生竞争的情况下几乎没有多余的开销。

偏向锁默认开启,可以通过设置 JVM 参数 -XX:-UseBiasedLocking=false 来关闭偏向锁,关闭之后程序会默认进入轻量级锁状态。

接下来,我们将通过分析 OpenJDK 1.8 中的 Hotspot 虚拟机源码来探讨偏向锁的实现原理。在 JVM 运行时,与 synchronized 锁操作相关的函数为模板解释器中的 TemplateTable::monitorenterTemplateTable::monitorexit。虽然模板解释器在运行效率上表现出色,但它内部使用了大量的内联汇编,并不容易理解。因此,这里我们选择分析实现原理类似的字节码解释器 BytecodeInterpreterBytecodeInterpreter 负责解析各种字节码,其中与加锁相关的代码为 _monitorenter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
CASE(_monitorenter): {
    // 获取当前锁对象
    oop lockee = STACK_OBJECT(-1);
    ...

    BasicObjectLock* limit = istate->monitor_base();
    BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
    BasicObjectLock* entry = NULL;
    // 从栈基向上寻找空闲的监视器,如果发现了当前锁对象,跳出循环
    while (most_recent != limit ) {
        if (most_recent->obj() == NULL) entry = most_recent;
        else if (most_recent->obj() == lockee) break;
        most_recent++;
    }
    if (entry != NULL) { // 找到了空闲的监视器
        // 将监视器的 obj 指向当前锁对象
        entry->set_obj(lockee);
        int success = false;
        uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
        // 获取对象头中的 Mark Word
        markOop mark = lockee->mark();
        intptr_t hash = (intptr_t) markOopDesc::no_hash;
        // 如果为偏向模式
        if (mark->has_bias_pattern()) {
            uintptr_t thread_ident;
            uintptr_t anticipated_bias_locking_value;
            // 计算偏向锁状态
            thread_ident = (uintptr_t)istate->thread();
            anticipated_bias_locking_value =...;
            
            if  (anticipated_bias_locking_value == 0) {
                // 如果偏向锁偏向的是当前线程,重入次数++
                ...
                success = true;
            }
            else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
                // 如果未偏向当前线程,且偏向功能被禁止,则通过 CAS 尝试撤销偏向锁
                ...
            }
            else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
                // 如果未偏向当前线程,且 epoch 过期了,则利用 CAS 重新偏向
                ...
                success = true;
            }
            else {
                // 如果线程处于匿名偏向状态,则利用 CAS 偏向当前线程
                markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
                    (uintptr_t)markOopDesc::age_mask_in_place |
                    epoch_mask_in_place));
                if (hash != markOopDesc::no_hash) {
                    header = header->copy_set_hash(hash);
                }
                markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
                // 只有匿名偏向状态,才能偏向成功
                if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
                    // CAS 成功
                    ...
                } else {
                    // 偏向失败或非匿名,说明存在竞争,交给 monitorenter 异步处理
                    CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                }
                success = true;
            }
        }
        
        // fasle,则升级为轻量级锁
        if (!success) {
            // 为监视器 entry 构造一个无锁状态的 Displaced Mark Word
            markOop displaced = lockee->mark()->set_unlocked();
            entry->lock()->set_displaced_header(displaced);
            bool call_vm = UseHeavyMonitors;
            if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
                // CAS 失败,说明对象不是无锁状态,紧接着判断是否是重入
                if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                    // 如果是重入,则直接将 Displaced Mark Word 指空
                    entry->lock()->set_displaced_header(NULL);
                } else {
                    // 否则交给 monitorenter 异步处理
                    CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                }
            }
        }
        // 执行临界区代码
        UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
    } else {
        // entry 为 null,则重新执行一次
        istate->set_msg(more_monitors);
        UPDATE_PC_AND_RETURN(0); // Re-execute
    }
}

上述代码流程总结如下:

  1. 首先,从当前线程的栈中寻找一个空闲的监视器,将其 obj 指向当前锁对象;
  2. 获取对象头部的 Mark Word,判断是否为偏向模式(最低三位是否为 101)。如果不是,进入步骤 3;否则,计算 anticipated_bias_locking_value,判断偏向锁状态:
    • 如果 anticipated_bias_locking_value = 0,说明偏向的线程是当前线程,且 Mark Word 的 epoch 等于 Class 对象的 epoch,这种情况下直接运行临界区中的代码;
    • 如果 anticipated_bias_locking_value 不为 0,且偏向被锁定,则通过 CAS 尝试将对象恢复为无所状态,无论是否 CAS 成功,都会在后续执行锁升级操作;
    • 如果 anticipated_bias_locking_value 不为 0,且 epoch 过期了,则利用 CAS 重新偏向为当前线程;
    • 上述条件都不满足,说明线程处于初始的匿名偏向状态,则通过 CAS 将锁偏向当前线程,如果成功则顺利执行临界区中的代码;失败则进行锁升级。
  3. 到这里会进行轻量级锁操作:首先,构造一个无锁状态的 Mark Word,然后利用 CAS 存储到监视器对象中:
    • 如果 CAS 失败,且是锁重入,则将 Displaced Mark Word 直接指空;否则交给 monitorenter 函数异步进行偏向锁的撤销以及锁升级操作;
    • 如果 CAS 成功,则直接进入临界区执行代码。在后续线程获取锁的过程中,只要发现 Displaced Mark Word 为空,即无锁状态,则直接获取轻量级锁
注意

当线程发现锁对象处于无锁状态时,它会直接尝试获取轻量级锁。这意味着只有处于匿名偏向状态的锁对象才能进入偏向锁模式。

在 JVM 启动时,偏向锁会延迟初始化(通常为 4000 毫秒),在初始化期间,它会将锁对象的 Mark Word 修改为匿名偏向状态。因此,在 JVM 启动后到偏向锁初始化完成之前的这段时间内,锁对象都处于无锁状态,此时如果发生竞争,线程将直接尝试获取轻量级锁。

3.2 轻量级锁

轻量级锁适用于锁争用不是太激烈,而且锁的持有时间很短的情况。因为重量级锁在锁定、解锁时会挂起和唤醒线程,这个过程需要消耗较多的时间和资源。如果只是短暂地需要锁,那么使用重量级锁会显得有点浪费。因此,轻量级锁采用了一种更灵活的策略,即允许线程在一段时间内进行自旋等待,而不是立刻阻塞线程。

上文讲过,升级轻量级锁的过程中,如果 CAS 失败会异步执行 InterpreterRuntime::monitorenter 函数,源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
    ...
    // 封装 Handle
    Handle h_obj(thread, elem->obj());
    if (UseBiasedLocking) {
        // Retry fast entry if bias is revoked to avoid unnecessary inflation
        ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
    } else {
        ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
    }
    ...
IRT_END

该方法将 thread 和监视器对象中的 obj 封装到了 Handle 里,和这里我们关注 slow_enter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
    // 获取锁对象的 mark word
    markOop mark = obj->mark();
    ...

    if (mark->is_neutral()) {// 如果为无锁状态
        // 通过 CAS 尝试将 BasicLock 的指针指向 obj->mark()
        lock->set_displaced_header(mark);
        if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
            // 如果成功,则进入临界区
            TEVENT (slow_enter: release stacklock) ;
            return ;
        }
        // Fall through to inflate() ...
    } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
        // 如果是有所状态,并且是重入锁,将 displaced_header 指空,然后进入临界区
        assert(lock != mark->locker(), "must not re-lock the same lock");
        assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
        lock->set_displaced_header(NULL);
        return;
    }

    // 如果无所状态下 CAS 失败,或有锁状态下并非当前线程重入,则升级为重量级锁
    lock->set_displaced_header(markOopDesc::unused_mark());
    // 调用 monitor 的 enter 方法竞争重量级锁
    ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

上述代码流程总结如下:

  1. 首先获取锁对象的 Mark Word;
  2. 然后判断锁对象是否为无锁状态(最低位为 1):
    • 如果处于无锁状态,则把 Mark Word 保存到 BasicLock 对象的 displaced_header,并通过 CAS 更新 mark 指针。如果更新成功则执行同步代码,否则执行步骤 3;
    • 如果处于有锁状态,且是自己重入,则设置 displaced_header 为 null,再执行同步代码。
  3. 如果 CAS 更新失败,或者锁被其他线程占用,则直接膨胀为重量级锁,并加锁。

3.3 重量级锁

当轻量级锁发生竞争时,会调用 ObjectSynchronizer::inflate 函数将其膨胀为重量级锁,且膨胀过程中有以下几种状态:

  • Inflated - 重量级锁状态,直接返回
  • Stack-locked - 轻量级锁状态,膨胀
  • INFLATING - 膨胀中,等待完成
  • Neutral - 无锁状态,膨胀
  • BIASED - 非法状态,实际上不会处于这种状态

源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
    ...
    for (;;) {
        const markOop mark = object->mark() ;
        // CASE: 已经重量级了,直接返回
        if (mark->has_monitor()) {
            ObjectMonitor * inf = mark->monitor() ;
            return inf ;
        }

        // CASE: 膨胀中,说明另一个线程正在膨胀该锁,自选等待
        if (mark == markOopDesc::INFLATING()) {
            TEVENT (Inflate: spin while INFLATING) ;
            ReadStableMark(object) ;
            continue ;
        }

        // CASE: 轻量级锁,执行膨胀
        if (mark->has_locker()) {
            // 分配一个 ObjectMonitor 对象
            ObjectMonitor * m = omAlloc (Self) ;
            // 初始化 ObjectMonitor
            m->Recycle();
            m->_Responsible  = NULL ;
            m->OwnerIsThread = 0 ;
            m->_recursions   = 0 ;
            m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;
            // 通过 CAS 将锁对象 mark word 设为 INFLATING,表明正在膨胀
            markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
            if (cmp != mark) {
                omRelease (Self, m, true) ;
                continue ;       // 失败重试
            }

            markOop dmw = mark->displaced_mark_helper() ;
            assert (dmw->is_neutral(), "invariant") ;
            m->set_header(dmw) ;
            m->set_owner(mark->locker());
            m->set_object(object);

            ...
            // 将对象头设置为重量级状态
            object->release_set_mark(markOopDesc::encode(m));

            ...
            return m ;
        }

        // CASE: 无锁状态
        ...//也是通过 CAS 升级锁状态,省略

        return m ;
    }
}

inflate 会不断循环处理 mark 的状态,直到成功升级为重量级锁并返回锁对象,最终调用 enter 完成重量级锁的加锁操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void ATTR ObjectMonitor::enter(TRAPS) {
    Thread * const Self = THREAD ;
    void * cur ;
    // _owner 为 null,cas 成功则直接返回
    cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
    if (cur == NULL) {
        ...
        return ;
    }
    // 如果是自己重入,次数+1 并返回
    if (cur == Self) {
        _recursions ++ ;
        return ;
    }
    // 如果这个重量级锁,就是当前线程自己膨胀的
    if (Self->is_lock_owned ((address)cur)) {
        // 初始化重入次数,锁所属线程后返回
        _recursions = 1 ;
        _owner = Self ;
        OwnerIsThread = 1 ;
        return ;
    }
    // 最后再自旋几次
    Self->_Stalled = intptr_t(this) ;
    if (Knob_SpinEarly && TrySpin (Self) > 0) {
        ...
        Self->_Stalled = 0 ;
        return ;
    }
    ...
    // 上述优化手段都没生效,调用操作系统的 MutexLock
    {
        ...
        for (;;) {
            ...
            EnterI (THREAD) ;
            ...
        }
    }
    ...
}

可见,更新后的重量级锁也经过了一些优化:如果锁处于无锁状态或者已经被当前线程持有(重入锁),则可以直接返回,无需额外操作;如果当前线程就是将锁膨胀为重量级锁的线程,那么只需设置一些属性便可返回;最后,如果有必要,还会利用 CAS 操作进行自旋尝试获取锁。只有在多次自旋不成功的情况下才会调用操作系统的 API 来挂起线程。


欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容