您的位置:首页 > 其它

使用scala的actor模型实现并发的例子

2016-07-29 17:09 579 查看
</pre><pre code_snippet_id="1793449" snippet_file_name="blog_20160729_3_9097610" name="code" class="java">/**
* Created by lengmengwuxie on 2016/7/29.
*/
import scala.actors._
import scala.actors.Actor._

//设计方式:
// 1,任何模拟对象在所有其他模拟对象完成对时间n的处理之前,都不应该处理时间n+1的事件
// 2,假设要实现所有模拟对象同步执行,使用一个“时钟”actor来跟踪当前的时间

//在选定的时间去发送Ping消息到所有actor 检查是否当前时间点的事件都已经被完成
case class Ping(time: Int)
//模拟对象确认已经处理完毕 返回Pong事件
case class Pong(time: Int, from: Actor)

//3,为了模拟对象能够切确知道自己已经完成了当前时间点的工作,而不是还需要等待与其他actor之间的
//消息反馈,需要增加两个限制:
//    a,模拟对象从不直接相互发送消息,而是相互安排事件日程;
//    b,从不向当前时间点提交时间,而是向未来至少比当前多一点的时间提交;
//因此我们需要一个工作项的日程表,这个日程表也可以在“时钟”actor上,时钟actor先为所有模拟对象
//发送当前时间点所有的工作项的请求之后,才发送Ping消息。

//工作项消息
case class WorkItem(time: Int, msg: Any, target: Actor)
//安排新工作的消息
case class AfterDelay(delay: Int, msg: Any, target: Actor)
//用于要求模型启动和停止的消息
case object Start
case object Stop

//时钟actor
class Clock extends Actor{
private var running = false
private var currentTime = 0
private var agenda: List[WorkItem] = List()
private var allSimulants: List[Actor] = List()
private var busySimulants: Set[Actor] = Set.empty

def add(sim: Simulant): Unit ={
allSimulants = sim :: allSimulants
}

def act(): Unit ={
loop{
if (running && busySimulants.isEmpty)
advance()
reactToOneMessage()
}
}

//时间前进
def advance(): Unit ={
//日程表为空 且模拟已经开始了则模拟需要退出
if (agenda.isEmpty && currentTime > 0){
println("** Agenda empty. Clock exiting at time " + currentTime + ".")
self ! Stop
return
}
currentTime += 1
println("Advacting to time" + currentTime)

processCurrentEvents()
for (sim <- allSimulants) //向所有工作中的模拟对象发送Ping
sim ! Ping(currentTime)

busySimulants = Set.empty ++ allSimulants
}

//处理所有在日程表里时间为currentTime的事件
private def processCurrentEvents():Unit = {
val todoNow = agenda.takeWhile(_.time <= currentTime) //获取agenda中所有时间等于currentTime的事件
agenda = agenda.drop(todoNow.length) //从agenda中去掉与todoNow包含的条目
for(WorkItem(time, msg, target) <- todoNow){
assert(time == currentTime)
target ! msg
}
}

//react事件处理
def reactToOneMessage(): Unit ={
react{
case AfterDelay(delay, msg, target) =>      //将新的条目添加到工作队列
val item = WorkItem(currentTime + delay, msg, target)
agenda = insert(agenda, item)
case Pong(time, sim) =>                     //从忙碌的模拟对象中去除一个模拟对象
assert(time == currentTime)
assert(busySimulants contains sim)
busySimulants -= sim
case Start => running = true                //让模拟开始
case Stop =>                                //让时钟停止
for (sim <- allSimulants)
sim ! Stop
exit()
}
}

def insert(ag: List[WorkItem], item: WorkItem): List[WorkItem] ={
if (ag.isEmpty || item.time > ag.head.time) item :: ag
else ag.head :: insert(ag.tail, item)
}
}

//不同的被模拟对象之间有的共同行为,将其定义为特质
//simulant是能够接受模拟消息Stop和Ping并于他们合作的任何actor
trait Simulant extends Actor {
val clock: Clock
def handleSimMessage(msg: Any)
def simStarting() {}

def act(): Unit = {
loop {
react {
case Stop => exit()
case Ping(time) =>
if (time == 1) simStarting()
clock ! Pong(time, self)
case msg => handleSimMessage(msg)
}
}
}
//模拟对象在创建时便启动运行,安全且方便,在接受到时钟消息之前不会做任何事
start()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: