从Akka 内存可见性的实现看JVM volatile机制

Posted by CodingCat on January 18, 2015

引发写这篇博客的原因是前几天和人讨论给spark打的一个patch,说到,为什么akka Actor的内部状态不需要显示声明为volatile也能做到多线程间的内存可见。在stackoverflow上找到了一个答案,但是很显然回答者Roland (Akka的Leader) 回答的过于简洁了,没有把背后的volatile语义原理讲出来,所以这篇博客的就是要讲出这个所以然来。

背景知识

“Akka是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用。使构建高并发的分布式应用更加容易。”

Akka其设计原则遵循了Actor模型。所谓的Actor模型是一种系统设计抽象方法,这种方法将系统中的实体部分 (系统中的任何一个组件都可以当成是一个实体)变成无共享状态的一堆actor。这些实体可以是纯粹的并发的计算工具(这点就类似传统意义上的并发计算工具 无状态线程),也可以是维护着某些状态可以对应到物理世界中物体的系统组件 (例如一个actor可以用来代表访问系统的一个用户)。(推荐阅读: http://www.infoq.com/cn/news/2014/11/intro-actor-model(中文), http://rerun.me/2014/09/11/introducing-actors-akka-notes-part-1/ (English))

Actor模型的关键部分就是,actor之间的状态是无共享的。他们之间只有一条唯一的路径可以起到协同工作的目的 - 消息传递。这种无共享设计原则的好处是什么呢? 显而易见,他解决了大部分由锁的使用带来的问题。不恰当的锁使用往往被认为是并发程序中最大的问题,无共享状态自然不需要锁 (当然,如果单个actor中引入了多线程/并发,你还是要锁的)。

Akka是当前最流行的actor模型的实现之一,为了理解这篇文章的内容我们需要首先了解下Akka中对actor的调度方法:

为了在Akka框架下使用actor,我们首先必须创建一个ActorSystem, 这个ActorSystem可以简单理解为为在其下运行的Actor提供了运行时环境 (e.g. 调度,监管等等等等, 由于本文不是Akka的科普文,就不展开讲了)。每一个ActorSystem中有一个组件叫做Dispatcher, 这个Dispatcher就是触发每一个actor去执行的一个引擎。在默认的Dispatcher配置中,Akka将一组actor分配给一个线程池,实际是一个fork-and-join的线程池,每一个akka actor对于当前mailbox的处理都是一个fork-and-join的task, dispatcher将这个task放到某一个线程的task queue里,每个Actor在执行一段时间后(其实是一定量的消息)线程会转向下一个Actor (执行时间在Akka中是由throughput选项配置的, 见http://doc.akka.io/docs/akka/2.3.8/scala/dispatchers.html) 。

Akka 内存可见性的实现

背景知识介绍完了,那么这里存在着一个并发场景下的问题,执行Actor任务的线程来自于一个线程池,因此我们并不能保证同一个Actor的操作每次都是被同一个线程所执行,如果这个Actor有一些内部状态需要维护,我们是否需要将Actor的内部状态全部声明为volatile, 以保证程序的正确性?

根据Akka的官方文档 ( http://doc.akka.io/docs/akka/snapshot/general/jmm.html ) , “fields in your actor need not be volatile or equivalent.” 答案是No! 简而言之,Akka框架为我们保证了一个actor在处理之前的消息时所引发的内部状态的读取与更新,一定先行发生于(Happen-before)这个actor在处理之后message所引发的内部状态的读取与更新。这个结论与actor的操作是否是被不同线程所执行是无关的。

所谓先行发生原则,简单说就是如果A 先行发生于B,那么A对A与B共享的环境所带来的一切影响,在B发生时一定是对B可见的。回到Akka的场景下:

在时刻M1,Actor A1被线程T1所执行处理消息m0,内部状态S发生改变,假设S=1; 在时刻M2, Actor A1被线程T2所执行去处理消息m1, 此刻Actor A1必然能观察到S==1,而不会出现S的值在内存中不可见的问题。

这篇文章要讨论的第一个问题就是,Akka是如何做到以上这点的。在Stackoverflow上这个讨论给我提供了研究这个问题的第一步: http://stackoverflow.com/questions/15849366/how-does-akka-implement-the-jmm-like-happens-before-relationship

回答问题的人是Roland Kuhn,Akka项目的Leader,没人比他的回答更可信了 :-) 。好,Roland说,要解答这个问题要分成两个层面去理解,1) 消息的发送方发出一条消息,接收方如果被不同的线程所调用,那么如何保证两者之间是同步的,这个很简单了,Roland说的很清楚,通过volatile写或对同一把锁的操作来实现。2) 第二个层面就涉及到今天要讨论的内容了,上文提到的先行发生原则是如何保证的。

Reading the Fucking Source Code

我们首先从代码层面上解释Roland所说的Akka是怎么做的:

在Akka中,发往每一个actor的消息,都会被存储在属于这个actor的mailbox中,Akka的Mailbox (akka/dispatch/Mailbox.scala) 实际实现了Runnable接口,而所谓actor的执行就是通过线程池调用Mailbox的run 方法

而至于run方法的实现, 在Mailbox.scala大约 219行

<code>
final def run = {
  try {
    if (!isClosed) { //Volatile read, needed here
      processAllSystemMessages() //First, deal with any system messages
      processMailbox() //Then deal with messages
    }
  } finally {
    setAsIdle() //Volatile write, needed here
    dispatcher.registerForExecution(this, false, false)
  }
}

</code>

我们大体可以猜到,用户定义的消息,都是在processMailbox()中实现的

这个函数的定义大约在234行

processMailbox
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}
}

这个shouldProcessMessage决定了当前的actor是否是可调度的状态,shouldProcessMessage则很简单:

shouldProcessMessage
final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0

虽然简单,但却带来了至关重要的一个方法, status

akka/dispatch/Mailbox.scala 大约在109行

status
@inline
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)

每次调用这个方法时实际是读取了Unsafe.getIntVolatile方法,就是一个volatile读,而回到run方法的实现上我们看到每一个mailbox调度结束后都会执行setAsIdle()

setAsIdle()
/**
* Reset Scheduled status, keeping primary status as is.
*/
@tailrec
final def setAsIdle(): Boolean = {
val s = status
updateStatus(s, s & ~Scheduled) || setAsIdle()
}
@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
本质上这是对status的一次volatile写操作。从stackoverflow上的讨论可以看出, 正是每次处理消息之前的volatile读和每次throughput达到时的volatile写保证了我们在上文提到的那个先行发生原则: 一个actor在处理之前的消息时所引发的内部状态的读取与更新,一定先行发生于(Happen-before)这个actor在处理之后message所引发的内部状态的读取与更新。 ### volatile机制的实现 下一个问题是,为什么volatile有这个特性并且volatile的特性是如何和akka的这个特性联系上的。 先解答第一个问题,volatile除了我们所熟知的能够保证变量在多线程场景下的内存可见性之外,还有一个特性,就是他可以起到一个内存屏障的作用。这一点我推荐去看《深入理解Java虚拟机》第二版370页的例子。 为了使得程序能更高效的执行,解释器所产生的指令与程序代码中的逻辑顺序并非完全一直,虽然他能够保证指令乱序执行的结果和顺序执行是一样的。关于乱序执行可以直接去看计算机体系结构书了。 《深入理解Java虚拟机》370页的例子说明了,当volatile变量被赋值后,JIT的汇编代码会在赋值指令后加上一个lock前缀的空操作,例如
lock addl $0x0, (%esp)"
这样的一个操作中的lock前缀使得当前CPU的cache被写入内存,同时也无效化了其他CPU的cache,从而保证了写在这个指令后面的指令不能够被乱序执行到这条指令之前,这条指令之前的指令也不可能被乱序到他之后去。 这又是为什么呢?这和我们说过的先行发生原则有关系,Java Language Specification, 2013年2月SE7的规范上说了,"If x and y are actions of the same thread and x comes before y in program order, then hb(x, y).”, 意思是,对于单线程的程序,两条操作谁先谁后是和程序代码位置一致的。我们同样知道,对于status的volatile write操作是整个执行过程的最后一步,那么由此可知: **在这个volatile write的时候,所有的状态更新都是可见的。** 再看JLS说的另一条规则,"A write to a volatile field happens-before every subsequent read of that field.” 只要是对同一个volatile的读写操作,写操作一定是对后续读操作可见的,这和线程无关。那么由上文可知,每次actor处理消息之前都必须volatile 读一次status,也就是说在这个读的时刻,上一次volatile写操作一定是对这个读操作可见的。 JSL又说了,happen-before实际是个偏序关系,偏序是可传递的,那么在上一个消息处理中发生的写操作(状态变更), 也一定是可见的 (happen-before)关系。我们再进一步,在当前消息处理中,因为是单线程的关系,当前的volatile 读操作对于后续的状态变更是Happen-before关系,所以最终我们依然通过偏序可传递性得出,上次消息处理发生的状态变更对本次状态变更是happen-before关系,即可见的。 所以,我们知道了,Akka就是这样利用了Java Memory Model volatile机制的实现去达到了actor内部状态的可见性。