Synchronized之偏向锁


synchronized 是我们日常使用频率很高的关键字,它是控制线程安全的同步锁。虽然开发中一直使用,但对于它的实现原理一直理解不够透彻,在面试中也是经常被问,对于一些高级资深的岗位来说,不仅仅只是简单的问一下synchronized 的使用和大致原理了,而是更深入的问它的加锁解锁过程,实际上synchronized 默默的为我们做了很多事情。关于它的原理网上博文很多但大多比较浅显,有的来龙去脉说的不够清楚让人云里雾里,有的只停留在字节码的指令级别,对于更深层次的实现没有阐述。对于synchronized的加锁过程直接去看源码,如果没有C++基础和并发编程的思维想要看懂还是很吃力的,这里我要推荐两篇博客https://github.com/farmerjohngit/myblog/issues/12https://www.cnblogs.com/gmt-hao/p/14139341.html synchronized 的加锁过程很复杂涉及到几种锁状态的转换,部分代码细节的数据结构设计和计算(源码中有很多位运算)晦涩难懂。本文很多借鉴了这两篇文章,我在很多不解的地方加入了更详细的解释和理解,把整个逻辑理顺了一遍。解决了下面这几个问题,本文分析的JVM版本是JVM8,具体版本号以及代码可以在这里看到。

官方对synchronized做了优化,性能得到了很大的提升,已经和AQS差不多了。synchronized代码块是由monitorentermonitorexit两个字节码指令实现的。JVM 会对其进行解析,如果锁的是对象则就是对该对象加锁解锁,如果是类方法则是对 Class 对象加锁解锁,如果是实例方法则是对对应的对象加锁解锁。

关于HotSpot虚拟机中获取锁的入口,网上很多文章要么给出的方法入口为interpreterRuntime.cpp#monitorenter和bytecodeInterpreter.cpp#1816。因此要找锁的入口,肯定是要在源码中找到对monitorenter指令解析的地方。在HotSpot的中有两处地方对monitorenter指令进行解析:一个是在bytecodeInterpreter.cpp#1816 ,另一个是在templateTable_x86_64.cpp#3667。前者是JVM中的字节码解释器(bytecodeInterpreter),用C++实现了每条JVM指令(如monitorenter、invokevirtual等),其优点是实现相对简单且容易理解,缺点是执行慢。后者是模板解释器(templateInterpreter),其对每个指令都写了一段对应的汇编代码,启动时将每个指令与对应汇编代码入口绑定,可以说是效率做到了极致。在HotSpot中,只用到了模板解释器,字节码解释器根本就没用到,但其实bytecodeInterpreter的逻辑和templateInterpreter的逻辑是大同小异的,因为templateInterpreter中都是汇编代码,比较晦涩,所以看bytecodeInterpreter的实现会便于理解一点。而interpreterRuntime.cpp#monitorenter方法只是锁实现中的一部分,需要注意的是在jdk8u之前,bytecodeInterpreter并没有实现偏向锁的逻辑。

这里还需知道一个概念,即管程,英文名为Monitor。字面意思,是用来管理进程的。所谓的管程实际上是JVM定义的一种数据结构和控制进程的一些操作的集合。管程封装了同步操作,对进程隐蔽了同步细节,简化了同步功能的调用界面。用户编写并发程序如同编写顺序(串行)程序。

简介

在Java中sycnhronized的使用很简单,可以加在方法名上,这时锁住的是当前这个类的实例对象,另一种典型用法是直接通过代码块的方式锁住资源对象

synchronized(obj) {
...
}

synchronized(lockee)会被编译成:

monitorenter (lockee)
//...
monitorexit (lockee)

两条针对锁对象lockee的lock/unlock指令,这两条指令涉及到JVM同步机制中的相当多的细节概念:对象头MarkOop,轻量级锁,锁膨胀,重量级锁,对象监视器ObjectMonitor对象等等。BytecodeInterpreter对这两条指令的实现如下。synchronized的加锁和解锁逻辑都在这里

/* monitorenter and monitorexit for locking/unlocking an object */

 CASE(_monitorenter): {
     // ...
 }

 CASE(_monitorexit): {
     // ...
 }

偏向锁流程

下面开始偏向锁获取流程分析,代码在bytecodeInterpreter.cpp#1816。首先提一个问题,为什么需要偏向锁?JVM的开发者发现在很多情况下,在Java程序运行时,同步块中的代码都是不存在竞争的。这种情况下,用重量级锁是没必要的。因此JVM引入了偏向锁的概念。

CASE(_monitorenter): {
  // lockee 就是锁对象 栈帧当中生成一个lock record 记录
  oop lockee = STACK_OBJECT(-1);
  // 判null,如果锁对象为null,会抛出空指针异常
  CHECK_NULL(lockee);
  // find a free monitor or one already allocated for this object
  // if we find a matching object then we need a new monitor
  // since this is recursive enter
  // code 1:找到一个空闲的Lock Record
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  BasicObjectLock* entry = NULL;
  while (most_recent != limit ) {
    if (most_recent->obj() == NULL) entry = most_recent;
    else if (most_recent->obj() == lockee) break;
    most_recent++;
  }
  //entry不为null,代表还有空闲的Lock Record
  if (entry != NULL) {
    // code 2:将Lock Record的obj指针指向锁对象
    entry->set_obj(lockee);
    int success = false;
    uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
    // markoop即对象头的mark word
    markOop mark = lockee->mark();
    intptr_t hash = (intptr_t) markOopDesc::no_hash;
    // code 3:如果锁对象的mark word的状态是偏向模式(1 01)
    if (mark->has_bias_pattern()) {
      uintptr_t thread_ident;
      uintptr_t anticipated_bias_locking_value;
      thread_ident = (uintptr_t)istate->thread();
     // code 4:这里有几步操作,下文分析
     // anticipated_bias_locking_value用来判断对象锁是否偏向自己 (逻辑比较绕,这里是通过位运算计算)
      anticipated_bias_locking_value =
        // 将线程id与prototype_header(epoch、分代年龄、偏向模式、锁标志)部分做或运算
        (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) 
         // 与锁对象的markword异或,相等为0
         ^ (uintptr_t)mark) 
         // 将上面结果中的分代年龄忽略掉
         &~((uintptr_t) markOopDesc::age_mask_in_place);
     // 下面几个同级的if分支(code 5,code 6,code 7,code 8)都是在判断是否能用偏向锁,轻量级锁
     // 如果不能就会升级到重量级锁
     // code 5:判断是否偏向自己,为0代表偏向线程是自己
      if  (anticipated_bias_locking_value == 0) {
            // already biased towards this thread, nothing to do
            if (PrintBiasedLockingStatistics) {
              (* BiasedLocking::biased_lock_entry_count_addr())++;
            }
            success = true;
      }
      // code 6:判断是否可偏向,不可偏向则尝试撤销
      else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
            markOop header = lockee->klass()->prototype_header();
            if (hash != markOopDesc::no_hash) {
              header = header->copy_set_hash(hash);
            }
            // 利用CAS操作将mark word替换为class中的mark word,即撤销偏向

            // 原子操作 Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark)
            // 相当于   lockee->cas_set_mark(header, mark) == mark
            if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
              if (PrintBiasedLockingStatistics)
                (*BiasedLocking::revoked_lock_entry_count_addr())++;
            }
      }
      // code 7:如果epoch不等于锁对象中的epoch,则尝试重偏向
      else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
            // 基于lockee对象构造一个偏向当前线程的mark word
            markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
            if (hash != markOopDesc::no_hash) {
              new_header = new_header->copy_set_hash(hash);
            }
            // CAS替换对象头的mark word,偏向当前线程的

            // 原子操作 Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark)
            // 相当于   lockee->cas_set_mark(new_header, mark) == mark
            if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
              if (PrintBiasedLockingStatistics)
                (* BiasedLocking::rebiased_lock_entry_count_addr())++;
            }
            else {
              // 重偏向失败,代表存在多线程竞争,则调用monitorenter方法进行锁升级
              CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
            }
            success = true;
      }
      //上面情况都不满足,进入这个else说明当前要么偏向别的线程,要么是匿名偏向(即没有偏向任何线程)
      else {
            // code 8:下面构建一个匿名偏向的mark word,尝试用CAS指令替换掉锁对象的mark word
            markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
                              (uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
            if (hash != markOopDesc::no_hash) {
              header = header->copy_set_hash(hash);
            }
            // 这是将mark word偏向当前线程
            markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
            // debugging hint
            DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
            //这里会尝试将偏向当前线程的mark word替换到锁对象中,
            //若是匿名偏向则可以cas成功,若已经偏向其他线程,
            //或有可能刚好被其他线程先修改了,都说明有多个线程竞争,则会cas失败

            // 原子操作 Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header)
            // 相当于   lockee->cas_set_mark(new_header, header)
            if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {

               // 如果CAS修改成功
              if (PrintBiasedLockingStatistics)
                (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
            }
            else {
              // 如果修改失败说明存在多线程竞争,所以进入monitorenter方法, 锁升级
              CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
            }
            success = true;
      }
    }
    // success等于false说明偏向锁没有加成功(偏向线程不是当前线程或没有开启偏向模式等原因都会导致失败)
    // 如果偏向锁加失败,会进入轻量级锁。下面这里就是轻量级锁的加锁逻辑
    if (!success) {
          // 轻量级锁的逻辑 start
          //code 9: 构造一个无锁状态的 Mark Word,保存在Lock Record的对象头副本中
          markOop displaced = lockee->mark()->set_unlocked();
          entry->lock()->set_displaced_header(displaced);
          //如果指定了-XX:+UseHeavyMonitors,则call_vm=true,代表禁用偏向锁和轻量级锁
          bool call_vm = UseHeavyMonitors;
          // 利用CAS将对象头的mark word替换为指向Lock Record的指针
          // cas_set_mark((markOop)entry, displaced)
          if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
            // 如果CAS修改不成功,代表锁对象不是无锁状态,这时候判断下是不是锁重入
            // 是的话把Displaced Mark Word设置为null来表示重入
            // 置null的原因是因为要记录重入次数,但是mark word大小有限,
             // 所以每次重入都在栈帧中新增一个Displaced Mark Word为null的记录
            if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {                 
               //code 10: 如果是锁重入,则直接将Displaced Mark Word设置为null
              entry->lock()->set_displaced_header(NULL);
            } else {
             // 出现锁竞争,则锁升级
              CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
            }
          }
    }
    UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  } else {
    // lock record不够,重新执行
    istate->set_msg(more_monitors);
    UPDATE_PC_AND_RETURN(0); // Re-execute
  }
}

上面的代码我都加了注释,基本逻辑应该是可以看的懂得,主要解释一下几个难以理解的点

code 1处

这里的 BasicObjectLock 的定义在basicLock.hpp

class BasicObjectLock VALUE_OBJ_CLASS_SPEC {
  friend class VMStructs;
 private:
  BasicLock _lock;         // the lock, must be double word aligned
  oop       _obj;          // object holds the lock;

 public:
  // Manipulation
  oop      obj() const                                { return _obj;  }
  void set_obj(oop obj)                               { _obj = obj; }
  BasicLock* lock()                                   { return &_lock; }

  // Note: Use frame::interpreter_frame_monitor_size() for the size of BasicObjectLocks
  //       in interpreter activation frames since it includes machine-specific padding.
  static int size()                                   { return sizeof(BasicObjectLock)/wordSize; }

  // GC support
  void oops_do(OopClosure* f) { f->do_oop(&_obj); }

  static int obj_offset_in_bytes()                    { return offset_of(BasicObjectLock, _obj);  }
  static int lock_offset_in_bytes()                   { return offset_of(BasicObjectLock, _lock); }
};

里面分别包含了一个BasicLock和一个oop对象,BasicLock的定义也在当前文件中

class BasicLock VALUE_OBJ_CLASS_SPEC {
  friend class VMStructs;
 private:
  volatile markOop _displaced_header;
 public:
  markOop      displaced_header() const               { return _displaced_header; }
  void         set_displaced_header(markOop header)   { _displaced_header = header; }

  void print_on(outputStream* st) const;

  // move a basic lock (used during deoptimization
  void move_to(oop obj, BasicLock* dest);

  static int displaced_header_offset_in_bytes()       { return offset_of(BasicLock, _displaced_header); }
};

其实就是一个markOop对象头,也就是说BasicObjectLock其实就是锁对象本身和对象头的组合,也叫做lock record。再来看代码code 1处的第10行到第17行。轻量级锁在虚拟机内部,使用BasicObjectLock的对象实现,这个对象内部由一个BasicLock对象和一个持有该锁的Java对象指针组成。BasicObjectLock对象放置在Java栈的栈帧中。在BasicLock对象内部还维护着displaced_header字段,他用于锁对象头部的Mark Word的副本。了解完这些我们再看代码,其实就是从当前调用方法栈的most_recent(栈底)搜索到 limit(栈顶)遍历查找,直到找到一个空闲的或者之前就指向当前锁对象的lock record。(可以看到lock record中保存的对象头副本是用volatile修饰的,保证内存可见性)

Lock Record

关于Lock Record有必要再详细解释一下,它什么时候被创建?在哪里创建?

当字节码解释器执行monitorenter指令时,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝(官方称之为Displaced Mark Word),拷贝对象头中的Mark Word复制到锁记录(Lock Record)中。拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向自己(线程)的Lock Record的指针。如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,将Lock Record里的owner指针指向锁对象的Mark Word,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。失败则线程升级为重量级锁,释放时会检查Mark Word中的Lock Record指针是否指向自己(获得锁的线程Lock Record),使用原子的CAS将Displaced Mark Word替换回对象头,如果成功,则表示没有竞争发生,如果替换失败则升级为重量级锁。整个过程中,Lock Record是一个线程内独享的数据结构

如果不熟悉C++语法的人,可能看不懂在做什么,在C++中 -> 操作符左边是结构体指针(相当于Java中对象的引用)右边是这个结构体的函数,因此对于14行 if (most_recent->obj() == NULL) entry = most_recent可理解为 if (most_recent.obj() == NULL) entry = most_recent。所以整个循环就是在线程栈中找可用的 Lock Record

code 2处

获取到Lock Record后,首先要做的就是为其obj字段赋值。将Lock Record的obj指针指向锁对象

code 3处

判断锁对象的mark word是否是偏向模式,即低3位是否为101。

code 4处

里有几步位运算的操作anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ~((uintptr_t) markOopDesc::age_mask_in_place); 这个位运算可以分为3个部分。

  1. 第一部分:((uintptr_t)lockee->klass()->prototype_header() | thread_ident) 将当前线程id和类的prototype_header相或,这样得到的值为(当前线程id + prototype_header中的(epoch + 分代年龄 + 偏向锁标志 + 锁标志位)),注意prototype_header的分代年龄那4个字节为0
  2. 第二部分: ^ (uintptr_t)mark 将上面计算得到的结果与锁对象的markOop进行异或,相等的位全部被置为0,只剩下不相等的位。
  3. 第三部分 :& ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place为…0001111000,取反后,变成了…1110000111,除了分代年龄那4位,其他位全为1;将取反后的结果再与上面的结果相与,将上面异或得到的结果中分代年龄给忽略掉。

code 5处

anticipated_bias_locking_value==0代表偏向的线程是当前线程且mark word的epoch等于class的epoch,这种情况下什么都不用做。

code 6处

(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0代表class的prototype_header或对象的mark word中偏向模式是关闭的,又因为能走到这已经通过了mark->has_bias_pattern()判断,即对象的mark word中偏向模式是开启的,那也就是说class的prototype_header不是偏向模式。

code 7处

如果epoch已过期,则需要重偏向,利用CAS指令将锁对象的mark word替换为一个偏向当前线程且epoch为类的epoch的新的mark word

code 8处

CAS将偏向线程改为当前线程,如果当前是匿名偏向则能修改成功,否则进入锁升级的逻辑。

code 9处

这一步已经是轻量级锁的逻辑了。从上图的mark word的格式可以看到,轻量级锁中mark word存的是指向Lock Record的指针。这里构造一个无锁状态的mark word,然后存储到Lock RecordLock Record的格式可以看第一篇文章)。设置mark word是无锁状态的原因是:轻量级锁解锁时是将对象头的mark word设置为Lock Record中的Displaced Mark Word,所以创建时设置为无锁状态,解锁时直接用CAS替换就好了。

code 10处

如果是锁重入,则将Lock RecordDisplaced Mark Word设置为null,起到一个锁重入计数的作用。

以上是偏向锁加锁的流程(包括部分轻量级锁的加锁流程),如果当前锁已偏向其他线程||epoch值过期||偏向模式关闭||获取偏向锁的过程中存在并发冲突,都会进入到InterpreterRuntime::monitorenter方法, 在该方法中会对偏向锁撤销和升级。

偏向锁的撤销

这里说的撤销是指在获取偏向锁的过程因为不满足条件导致要将锁对象改为非偏向锁状态;释放是指退出同步块时的过程,释放锁的逻辑会在下一小节阐述。请读者注意本文中撤销与释放的区别。如果获取偏向锁失败会进入到InterpreterRuntime::monitorenter方法

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
  if (PrintBiasedLockingStatistics) {
    Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
  }
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
         "must be NULL or an object");
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END

UseBiasedLocking是在JVM启动的时候,是否启动偏向锁的标识

  1. 如果支持偏向锁,则执行 ObjectSynchronizer::fast_enter的逻辑
  2. 如果不支持偏向锁,则执行 ObjectSynchronizer::slow_enter逻辑,绕过偏向锁,进入轻量级锁

ObjectSynchronizer::fast_enter的实现在 synchronizer.cpp文件中,代码如下

void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 if (UseBiasedLocking) { //判断是否开启了偏向锁
    if (!SafepointSynchronize::is_at_safepoint()) { //如果不处于全局安全点
      //通过`revoke_and_rebias`这个函数尝试撤销或重偏向
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {//如果是撤销与重偏向直接返回
        return;
      }
    } else {//如果在安全点,撤销偏向锁
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}

如果是正常的Java线程,会走上面的逻辑进入到BiasedLocking::revoke_and_rebias方法,如果是VM线程则会走到下面的BiasedLocking::revoke_at_safepoint。我们主要看BiasedLocking::revoke_and_rebias方法。这个方法的主要作用像它的方法名:撤销或者重偏向,第一个参数封装了锁对象和当前线程,第二个参数代表是否允许重偏向,这里是true。下面来看看revoke_and_rebias方法

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");

  markOop mark = obj->mark();
  if (mark->is_biased_anonymously() && !attempt_rebias) {
     //如果是匿名偏向且attempt_rebias==false会走到这里,如锁对象的hashcode方法被调用会出现这种情况,需要撤销偏向锁。
    markOop biased_value       = mark;
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    if (res_mark == biased_value) {
      return BIAS_REVOKED;
    }
  } else if (mark->has_bias_pattern()) {
    // 锁对象开启了偏向模式会走到这里
    Klass* k = obj->klass();
    markOop prototype_header = k->prototype_header();
    //code 1: 如果对应class关闭了偏向模式
    if (!prototype_header->has_bias_pattern()) {
      markOop biased_value       = mark;
      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
      return BIAS_REVOKED;
    //code2: 如果epoch过期
    } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
      if (attempt_rebias) {
        assert(THREAD->is_Java_thread(), "");
        markOop biased_value       = mark;
        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED_AND_REBIASED;
        }
      } else {
        markOop biased_value       = mark;
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED;
        }
      }
    }
  }
  //code 3:批量重偏向与批量撤销的逻辑
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {
    return NOT_BIASED;
  } else if (heuristics == HR_SINGLE_REVOKE) {
    //code 4:撤销单个线程
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();
    if (mark->biased_locker() == THREAD &&
        prototype_header->bias_epoch() == mark->bias_epoch()) {
      // 走到这里说明需要撤销的是偏向当前线程的锁,当调用Object#hashcode方法时会走到这一步
      // 因为只要遍历当前线程的栈就好了,所以不需要等到safepoint再撤销。
      ResourceMark rm;
      if (TraceBiasedLocking) {
        tty->print_cr("Revoking bias by walking my own stack:");
      }
      BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
      ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
      assert(cond == BIAS_REVOKED, "why not?");
      return cond;
    } else {
      // 下面代码最终会在VM线程中的safepoint调用revoke_bias方法
      VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
      VMThread::execute(&revoke);
      return revoke.status_code();
    }
  }

  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");
   //code5:批量撤销、批量重偏向的逻辑
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}

会走到该方法的逻辑有很多,我们只分析最常见的情况:假设锁已经偏向线程A,这时B线程尝试获得锁。上面的code 1code 2B线程都不会走到,最终会走到code 4处,如果要撤销的锁偏向的是当前线程则直接调用revoke_bias撤销偏向锁,否则会将该操作push到VM Thread中等到safepoint的时候再执行。

关于VM Thread这里介绍下:在JVM中有个专门的VM Thread,该线程会源源不断的从VMOperationQueue中取出请求,比如GC请求。对于需要safepoint的操作(VM_Operationevaluate_at_safepoint返回true)必须要等到所有的Java线程进入到safepoint才开始执行。接下来我们着重分析下revoke_bias方法。第一个参数为锁对象,第2、3个参数为都为false

static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
  markOop mark = obj->mark();
  // 如果没有开启偏向模式,则直接返回NOT_BIASED
  if (!mark->has_bias_pattern()) {
    ...
    return BiasedLocking::NOT_BIASED;
  }

  uint age = mark->age();
  // 构建两个mark word,一个是匿名偏向模式(101),一个是无锁模式(001)
  markOop   biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
  markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);

  ...

  JavaThread* biased_thread = mark->biased_locker();
  if (biased_thread == NULL) {
     // 匿名偏向。当调用锁对象的hashcode()方法可能会导致走到这个逻辑
     // 如果不允许重偏向,则将对象的mark word设置为无锁模式
    if (!allow_rebias) {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // code 1:判断偏向线程是否还存活
  bool thread_is_alive = false;
  // 如果当前线程就是偏向线程 
  if (requesting_thread == biased_thread) {
    thread_is_alive = true;
  } else {
     // 遍历当前jvm的所有线程,如果能找到,则说明偏向的线程还存活
    for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
      if (cur_thread == biased_thread) {
        thread_is_alive = true;
        break;
      }
    }
  }
  // 如果偏向的线程已经不存活了
  if (!thread_is_alive) {
    // 允许重偏向则将对象mark word设置为匿名偏向状态,否则设置为无锁状态
    if (allow_rebias) {
      obj->set_mark(biased_prototype);
    } else {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // 线程还存活则遍历线程栈中所有的Lock Record
  GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
  BasicLock* highest_lock = NULL;
  for (int i = 0; i < cached_monitor_info->length(); i++) {
    MonitorInfo* mon_info = cached_monitor_info->at(i);
    // 如果能找到对应的Lock Record说明偏向的线程还在执行同步代码块中的代码
    if (mon_info->owner() == obj) {
      ...
      // 需要升级为轻量级锁,直接修改偏向线程栈中的Lock Record。
      // 为了处理锁重入的case,在这里将Lock Record的Displaced Mark Word设置为null,
      // 第一个Lock Record会在下面的代码中再处理
      markOop mark = markOopDesc::encode((BasicLock*) NULL);
      highest_lock = mon_info->lock();
      highest_lock->set_displaced_header(mark);
    } else {
      ...
    }
  }
  if (highest_lock != NULL) {
    // 修改第一个Lock Record为无锁状态,然后将obj的mark word设置为指向该Lock Record的指针
    highest_lock->set_displaced_header(unbiased_prototype);
    obj->release_set_mark(markOopDesc::encode(highest_lock));
    ...
  } else {
    // 走到这里说明偏向线程已经不在同步块中了
    ...
    if (allow_rebias) {
       //设置为匿名偏向状态
      obj->set_mark(biased_prototype);
    } else {
      // 将mark word设置为无锁状态
      obj->set_mark(unbiased_prototype);
    }
  }

  return BiasedLocking::BIAS_REVOKED;
}

需要注意下,当调用锁对象的Object#hashSystem.identityHashCode()方法会导致该对象的偏向锁或轻量级锁升级。这是因为在Java中一个对象的hashcode是在调用这两个方法时才生成的,如果是无锁状态则存放在mark word中,如果是重量级锁则存放在对应的monitor中,而偏向锁是没有地方能存放该信息的,所以必须升级。revoke_bias方法逻辑如下:

  1. 查看偏向的线程是否存活,如果已经不存活了,则直接撤销偏向锁。JVM维护了一个集合存放所有存活的线程,通过遍历该集合判断某个线程是否存活。
  2. 偏向的线程是否还在同步块中,如果不在了,则撤销偏向锁。我们回顾一下偏向锁的加锁流程:每次进入同步块(即执行monitorenter)的时候都会以从高往低的顺序在栈中找到第一个可用的Lock Record,将其obj字段指向锁对象。每次解锁(即执行monitorexit)的时候都会将最低的一个相关Lock Record移除掉。所以可以通过遍历线程栈中的Lock Record来判断线程是否还在同步块中。
  3. 将偏向线程所有相关Lock RecordDisplaced Mark Word设置为null,然后将最高位的Lock RecordDisplaced Mark Word 设置为无锁状态,最高位的Lock Record也就是第一次获得锁时的Lock Record(这里的第一次是指重入获取锁时的第一次),然后将对象头指向最高位的Lock Record,这里不需要用CAS指令,因为是在safepoint。 执行完后,就升级成了轻量级锁。原偏向线程的所有Lock Record都已经变成轻量级锁的状态。

偏向锁的释放

偏向锁的释放入口在bytecodeInterpreter.cpp#1923

CASE(_monitorexit): {
  oop lockee = STACK_OBJECT(-1);
  CHECK_NULL(lockee);
  // derefing's lockee ought to provoke implicit null check
  // find our monitor slot
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  // 从低往高遍历栈的Lock Record
  while (most_recent != limit ) {
    // 如果Lock Record关联的是该锁对象
    if ((most_recent)->obj() == lockee) {
      BasicLock* lock = most_recent->lock();
      markOop header = lock->displaced_header();
      // 释放Lock Record
      most_recent->set_obj(NULL);
      // 如果是偏向模式,仅仅释放Lock Record就好了。否则要走轻量级锁or重量级锁的释放流程
      if (!lockee->mark()->has_bias_pattern()) {
        bool call_vm = UseHeavyMonitors;
        // header!=NULL说明不是重入,则需要将Displaced Mark Word CAS到对象头的Mark Word
        if (header != NULL || call_vm) {
          if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
            // CAS失败或者是重量级锁则会走到这里,先将obj还原,然后调用monitorexit方法
            most_recent->set_obj(lockee);
            CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
          }
        }
      }
      //执行下一条命令
      UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
    }
    //处理下一条Lock Record
    most_recent++;
  }
  // Need to throw illegal monitor state exception
  CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
  ShouldNotReachHere();
}

批量重偏向与批量撤销

渊源:从偏向锁的加锁解锁过程中可看出,当只有一个线程反复进入同步块时,偏向锁带来的性能开销基本可以忽略,但是当有其他线程尝试获得锁时,就需要等到safe point时,再将偏向锁撤销为无锁状态或升级为轻量级,会消耗一定的性能,所以在多线程竞争频繁的情况下,偏向锁不仅不能提高性能,还会导致性能下降。
于是,就有了批量重偏向与批量撤销的机制。

解决场景:批量重偏向(bulk rebias)机制是为了解决:一个线程创建了大量对象并执行了初始的同步操作,后来另一个线程也来将这些对象作为锁对象进行操作,这样会导致大量的偏向锁撤销操作。批量撤销(bulk revoke)机制是为了解决:在明显多线程竞争剧烈的场景下使用偏向锁是不合适的。

实现原理:以class为单位,为每个class维护一个偏向锁撤销计数器,每一次该class的对象发生偏向撤销操作时,该计数器+1,当这个值达到重偏向阈值(默认20)时,JVM就认为该class的偏向锁有问题,因此会进行批量重偏向。每个class对象会有一个对应的epoch字段,每个处于偏向锁状态对象的Mark Word中也有该字段,其初始值为创建该对象时class中的epoch的值。每次发生批量重偏向时,就将该值+1,同时遍历JVM中所有线程的栈,找到该class所有正处于加锁状态的偏向锁,将其epoch字段改为新值。下次获得锁时,发现当前对象的epoch值和class的epoch不相等,那就算当前已经偏向了其他线程,也不会执行撤销操作,而是直接通过CAS操作将其Mark Word的Thread Id 改成当前线程Id。当达到重偏向阈值后,假设该class计数器继续增长,当其达到批量撤销的阈值后(默认40),JVM就认为该class的使用场景存在多线程竞争,会标记该class为不可偏向,之后,对于该class的锁,直接走轻量级锁的逻辑。

偏向锁逻辑总结

  1. 线程A第一次访问同步块时,先检测对象头Mark Word中的标志位是否为01,依此判断此时对象锁是否处于无所状态或者偏向锁状态(匿名偏向锁);

  2. 判断偏向锁标志位是否为1,如果不是,则进入轻量级锁逻辑(使用CAS竞争锁),如果是,则进入下一步流程;

  3. 判断是偏向锁时,检查对象头Mark Word中记录的Thread Id是否是当前线程ID,如果是,则表明当前线程已经获得对象锁,以后该线程进入同步块时,不需要CAS进行加锁,只会往当前线程的栈中添加一条Displaced Mark Word为空的Lock Record中,用来统计重入的次数(如图为当对象所处于偏向锁时,当前线程重入3次,线程栈帧中Lock Record记录)。退出同步块释放偏向锁时,则依次删除对应Lock Record,但是不会修改对象头中的Thread Id

    注:偏向锁撤销是指在获取偏向锁的过程中因不满足条件导致要将锁对象改为非偏向锁状态,而释放是指退出同步块时的过程。

  4. 如果对象头Mark Word中Thread Id不是当前线程ID,则进行CAS操作,企图将当前线程ID替换进Mark Word。如果当前对象锁状态处于匿名偏向锁状态(可偏向未锁定),则会替换成功(将Mark Word中的Thread id由匿名0改成当前线程ID,在当前线程栈中找到内存地址最高的可用Lock Record,将线程ID存入),获取到锁,执行同步代码块;

  5. 如果对象锁已经被其他线程占用,则会替换失败,开始进行偏向锁撤销,这也是偏向锁的特点,一旦出现线程竞争,就会撤销偏向锁

  6. 向锁的撤销需要等待全局安全点(safe point,代表了一个状态,在该状态下所有线程都是暂停的),暂停持有偏向锁的线程,检查持有偏向锁的线程状态(遍历当前JVM的所有线程,如果能找到,则说明偏向的线程还存活),如果线程还存活,则检查线程是否在执行同步代码块中的代码,如果是,则升级为轻量级锁,进行CAS竞争锁

  7. 注:每次进入同步块(即执行monitorenter)的时候都会以从高往低的顺序在栈中找到第一个可用的Lock Record,并设置偏向线程ID;每次解锁(即执行monitorexit)的时候都会从最低的一个Lock Record移除。所以如果能找到对应的Lock Record说明偏向的线程还在执行同步代码块中的代码。

  8. 如果持有偏向锁的线程未存活,或者持有偏向锁的线程未在执行同步代码块中的代码,则进行校验是否允许重偏向,如果不允许重偏向,则撤销偏向锁,将Mark Word设置为无锁状态(未锁定不可偏向状态),然后升级为轻量级锁,进行CAS竞争锁

  9. 如果允许重偏向,设置为匿名偏向锁状态,CAS将偏向锁重新指向线程A(在对象头和线程栈帧的锁记录中存储当前线程ID)

  10. 唤醒暂停的线程,从安全点继续执行代码

简单来说偏向锁适合线程竞争少的情况,偏向锁的加锁过程仅仅只在修改Mark Word中的偏向线程时使用了CAS操作,后续对于持有偏向锁的重入操作仅仅只是判断一下,而对于没有持有锁的线程来竞争时也只是判断一下,如果不是偏向自己则升级为轻量级锁。对于偏向锁的重入计数是通过线程栈中的Lock Record来实现的,每次重入(即碰到monitorenter指令)时创建一个Lock Record,每次释放(碰到monitorexit指令)时,移除一个Lock Record,因此在并发不高的情况下,偏向锁的性能很高


Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Previous
Synchronized之轻量级锁 Synchronized之轻量级锁
在偏向锁出现竞争后,加锁失败的线程会把Mark Word中的锁状态改为轻量级锁,这样其他线程再来时就会走向轻量级锁的加锁流程。下面开始轻量级锁获取流程分析,代码在bytecodeInterpreter.cpp#1816。 CASE(_mon
2021-05-09
Next 
Java的对象头MarkWord Java的对象头MarkWord
在Java中任意对象都可以用作锁,因此必定要有一个映射关系,存储该对象以及其对应的锁信息(比如当前哪个线程持有锁,哪些线程在等待)。一种很直观的方法是,用一个全局map,来存储这个映射关系,但这样会有一些问题:需要对map做线程安全保障,不
2021-04-25
  TOC