Akka中Actor消息通信的实现原理(源码解析)
2018-01-17 10:56
666 查看
Akka中通过下面的方法向actor发送消息
! tell 意味着 “fire-and-forget”,即异步的发送消息无需等待返回结果
? ask 异步发送消息并返回代表可能回复的Future。
消息在每个发件人的基础上是有序的。
其中MessageQueue(akka.dispatch.MessageQueue)是形成Akka邮箱的心组件之一。
发送给Actor的普通消息将被排入队列(并随后出队列)它至少需要支持N个生产者和1个消费者的线程安全。 它实现了入队列,出队列等方法
其中Envelope封装了message:Any和sender:ActorRef两个成员
SystemMessageQueue提供了systemEnqueue(入队列)和systemDrain(全部出队列)方法。MailBox继承自系统消息队列SystemMessageQueue和ForkJoinTask,实现了Runnable接口,同时包含ActorCell成员和MessageQueue成员
其中ForkJoinTask是用少数线程执行海量独立任务的极好架构(独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题)
MailBox代理了MessageQueue的所有方法。MessageQueue的具体类型,根据MailBoxType的不同而不同。
在使用actorRef ! Message发送消息时,调用了actorCell对应的sendMessage方法,其中调用了dispatcher.dispatch方法
可以在ActorRef中可以看到
在ActorCell.scala中
之后可以追踪到dungeon的Dispatch.scala文件
而代码里的dispatcher.dispatch可以在dispatch.Dispatcher中找到:
dispatch方法做了两件事情:
一是将消息放到actorCell的消息队列中(maiBox 是 ActorCell 的成员变量)
二是调用dispather底层的线程池executor execute mbox执行mbox.run()(mailBox继承了
Runnable 接口所以能放入ExecutorService 中执行),
执行mbox.run()中,先从SystemMessage链表中处理系统消息,
然后从MessageQueue成员中处理用户消息。
处理用户消息时,run 是一个递归函数,每次调用处理一个消息,
处理逻辑通过调用actorCell的invoke方法实现,根据dispatcher
的throughput决定处理多少条消息,
根据dispatcher的throughputDeadlineTime决定处理多长时间,
长度和时间在处理完一条消息后检查一次。
对 PoisonKill, Terminate 系统消息的处理在 autoReceiveMessage 中,
对普通消息的处理在 receiveMessage 中,
可以看到behaviorStack 是一个 List[Actor.Receive],
其中Receive (PartialFunction[Any, Unit])函数就是我们写的对 message 的处理逻辑。
因为 Actor 支持通过 become/unbecome 切换形态,
所以behaviorStack.head就是当前的Receive处理逻辑。
还有一点,消息队列都是放到actor对应的mailbox中(以Envelope的形式封装消息本身和sender),
而执行的task对象会放到Executor的每个线程对应的工作队列中,task和消息分别使用不同的队列。
参考
https://doc.akka.io/docs/akka/snapshot/mailboxes.html
https://doc.akka.io/docs/akka/snapshot/actors.html#send-messages
http://spartan1.iteye.com/blog/1641322
! tell 意味着 “fire-and-forget”,即异步的发送消息无需等待返回结果
? ask 异步发送消息并返回代表可能回复的Future。
消息在每个发件人的基础上是有序的。
MailBox
Akka邮箱包含发往Actor的消息。通常每个Actor都有自己的邮箱,但是也有例外,比如BalancingPool所有路由将共享一个邮箱实例。其中MessageQueue(akka.dispatch.MessageQueue)是形成Akka邮箱的心组件之一。
发送给Actor的普通消息将被排入队列(并随后出队列)它至少需要支持N个生产者和1个消费者的线程安全。 它实现了入队列,出队列等方法
def enqueue(receiver: ActorRef, handle: Envelope): Unit def dequeue(): Envelope def numberOfMessages: Int def hasMessages: Boolean def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
其中Envelope封装了message:Any和sender:ActorRef两个成员
final case class Envelope private (val message: Any, val sender: ActorRef)
SystemMessageQueue提供了systemEnqueue(入队列)和systemDrain(全部出队列)方法。MailBox继承自系统消息队列SystemMessageQueue和ForkJoinTask,实现了Runnable接口,同时包含ActorCell成员和MessageQueue成员
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { var actor: ActorCell = _ }
其中ForkJoinTask是用少数线程执行海量独立任务的极好架构(独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题)
MailBox代理了MessageQueue的所有方法。MessageQueue的具体类型,根据MailBoxType的不同而不同。
tell 操作
在创建ActorSystem时,初始化默认的dispatcher,默认ForkJoinPool(ExecutorService)在使用actorRef ! Message发送消息时,调用了actorCell对应的sendMessage方法,其中调用了dispatcher.dispatch方法
可以在ActorRef中可以看到
def ! (message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
在ActorCell.scala中
final def sendMessage(message: Any, sender: ActorRef): Unit = sendMessage(Envelope(message, sender, system))
之后可以追踪到dungeon的Dispatch.scala文件
def sendMessage(msg: Envelope): Unit = try { val msgToDispatch = if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg) else msg dispatcher.dispatch(this, msgToDispatch) } catch handleException
而代码里的dispatcher.dispatch可以在dispatch.Dispatcher中找到:
/** * INTERNAL API */ protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) } protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { executorService execute mbox true } catch { case e: RejectedExecutionException ⇒ try { executorService execute mbox true } catch { //Retry once case e: RejectedExecutionException ⇒ mbox.setAsIdle() eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!")) throw e } } } else false } else false }
dispatch方法做了两件事情:
一是将消息放到actorCell的消息队列中(maiBox 是 ActorCell 的成员变量)
二是调用dispather底层的线程池executor execute mbox执行mbox.run()(mailBox继承了
Runnable 接口所以能放入ExecutorService 中执行),
override final def run(): Unit = { try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages processMailbox() //Then deal with messages } } finally { setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } } /** * Process the messages in the mailbox */ @tailrec private final def processMailbox( left: Int = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = if (shouldProcessMessage) { val next = dequeue() if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next if (Thread.interrupted()) throw new InterruptedException("Interrupted while processing actor messages") processAllSystemMessages() if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) processMailbox(left - 1, deadlineNs) } }
执行mbox.run()中,先从SystemMessage链表中处理系统消息,
然后从MessageQueue成员中处理用户消息。
处理用户消息时,run 是一个递归函数,每次调用处理一个消息,
处理逻辑通过调用actorCell的invoke方法实现,根据dispatcher
的throughput决定处理多少条消息,
根据dispatcher的throughputDeadlineTime决定处理多长时间,
长度和时间在处理完一条消息后检查一次。
final def invoke(messageHandle: Envelope): Unit = { val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout] try { currentMessage = messageHandle if (influenceReceiveTimeout) cancelReceiveTimeout() messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) case msg ⇒ receiveMessage(msg) } currentMessage = null // reset current message after successful invocation } catch handleNonFatalOrInterruptedException { e ⇒ handleInvokeFailure(Nil, e) } finally { if (influenceReceiveTimeout) checkReceiveTimeout // Reschedule receive timeout } } final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
对 PoisonKill, Terminate 系统消息的处理在 autoReceiveMessage 中,
对普通消息的处理在 receiveMessage 中,
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
可以看到behaviorStack 是一个 List[Actor.Receive],
type Receive = PartialFunction[Any, Unit]
其中Receive (PartialFunction[Any, Unit])函数就是我们写的对 message 的处理逻辑。
因为 Actor 支持通过 become/unbecome 切换形态,
所以behaviorStack.head就是当前的Receive处理逻辑。
对于ForkJoinPool这种executor,每次执行execute(mbox)时,实 际上都是先创建一个继承自ForkJoinTask的MailboxExecutionTask, 其中的exec方法调用mbox.run方法,因此每次执行都会创建一个ForkJoinTask对象。
还有一点,消息队列都是放到actor对应的mailbox中(以Envelope的形式封装消息本身和sender),
而执行的task对象会放到Executor的每个线程对应的工作队列中,task和消息分别使用不同的队列。
参考
https://doc.akka.io/docs/akka/snapshot/mailboxes.html
https://doc.akka.io/docs/akka/snapshot/actors.html#send-messages
http://spartan1.iteye.com/blog/1641322
相关文章推荐
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一
- OpenStack建立实例完整过程源码详细分析(15)----依据AMQP通信架构实现消息接收机制解析之二
- OpenStack建立实例完整过程源码详细分析(13)----依据AMQP通信架构实现消息发送机制解析之二
- OpenStack建立实例完整过程源码详细分析(14)----依据AMQP通信架构实现消息接收机制解析之一
- Android 带你从源码的角度解析Scroller的滚动实现原理
- [置顶] Spring-Session实现Session共享实现原理以及源码解析
- Android消息机制Handler的实现原理解析
- scala-akka实现简单的分布式RPC通信框架(主从监听,消息发送)
- Android 从源码的角度解析Scroller的滚动实现原理
- HashSet实现不重复储值原理-附源码解析
- spring培训第一讲-ioc实现原理源码解析
- BezierDemo源码解析-实现qq消息气泡拖拽消失的效果
- Kafka源码深度解析-序列8 -Consumer -Fetcher实现原理与offset确认机制
- 【React Native】从源码一步一步解析它的实现原理
- react native实现原理解析(从源码入手,nice)
- Android 带你从源码的角度解析Scroller的滚动实现原理
- Android 带你从源码的角度解析Scroller的滚动实现原理
- spring AOP 源码解析 及其实现原理
- 即时通信系统中如何实现:聊天消息加密,让通信更安全? 【低调赠送:QQ高仿版GG 4.5 最新源码】
- Spring AOP源码解析——AOP动态代理原理和实现方式