您的位置:首页 > 其它

scala之Akka的Actor模型(下)

2014-09-26 10:22 302 查看
原文地址:http://my.oschina.net/jingxing05/blog/287462


ActorSystem(“companyname”)

相当于注册一家公司一样,负责:

通用配置 如:dispatchers, deployments, remote capabilities and addresses
创建Actor和搜索actor
通常一个应用一个Actorsystem


ActorRef

actOf会异步的启动一个Actor,并返回这个Actor的ActorRef,作为消息目的地,ActorRef的特征:

immutable
与Actor是一对一关系
可序列化和网络传输,以实现远程透明性


启动Actor

actor_system.actorOf(Props(new HelloActor("jingxing05")), name = "helloactor")

如果一个Actor有多个属性,可以通过如下方式设置其值:

发送适当的消息
放到构造函数中
重写preStart方法


Actor交互消息

消息必须是immutable的

通过 ! 给Actor发送消息

Actor的消息处理方式

def receive = { // 接受消息时隐式的传入了 发送者的 ActorRef 名为 sender

case "hello" =>

println("hello back to you")

send ! “done”

case _ => println("Huh ? ")

}


Actor的生命周期

construct Actor代码体 是constructor的一部分
preStart Actor刚启动后被call
receive
preRestart 待重启Actor知晓 重启原因
postRestart 新启动的Actor 立即调用 显示重启原因 在其中调 preStart
postStop Actor停止后 被call 用来 清理现场资源


停止Actor的方法

在Actor系统层 system.stop(actorRef)
在Actor里面context.stop(actorRef)
向Actor发送 PoisonPill消息
gracefuStop方法 + try + future技术

stop方法 等Actor执行完正在处理的消息 就停止,其后的消息丢失

PoisonPill跟普通消息一样进入Actor的邮箱队列,遇到PoisonPill消息 就停止,PoisonPill之后的消息丢失

停止Actor是异步的,有两步:1、挂起邮箱不再接受消息,并向下属Actor发送stop消息

2、处理下属Actor返回的终结消息,直到所有下属Actor都结束,然后 自行了断

3、如果子Actor没响应,则必须等待

丢失的没有处理的消息被送到 Actor系统的deadLetter Actor中

向Actor发送 Kill 消息会导致该Actor重启


监控Actor的死

使用Actor的context的watch方法可以监控该context生出的子Actor

当子Actor收到Kill或停止时 其上级Actor会收到Terminated(actorrefName)的消息,可进行处理

Actor抛出异常时,并不会发送Terminated消息,而是自动restart


搜索actor

一个actor系统中有很多actor可以使用ActorSystem实例或context的actorSelection方法进行搜索

四种实例

val kenny = system.actorSelection("/user/Parent/Kenny")
val kenny = context.actorSelection("../Kenny") //相对路径
val kenny = system.actorFor("akka://DeathWatchTest/user/Parent/Kenny")
val kenny = system.actorFor(Seq("user", "Parent", "Kenny"))
val kenny = system.actorFor(Seq("..", "Kenny"))


Future对象


使用future

可以包裹你想要执行的代码到future对象中

返回值可以是future

有三个回调可以使用 : onSuccess onFailure onComplete 都是偏函数 partial func

scala移位操作

左移:补充0, 右移:补充最高的符号位, 无符号右移则 补充0

可以调用几个返回future的代码

然后用for 表达式来join返回的结果,如下示例:

1: println("starting futures")
2: val result1 = Cloud.runAlgorithm(10)
3: val result2 = Cloud.runAlgorithm(20)
4: val result3 = Cloud.runAlgorithm(30)
5: println("before for-comprehension")
6: val result = for {
7:   r1 <- result1
8:   r2 <- result2
9:   r3 <- result3
10: } yield (r1 + r2 + r3)
11: println("before onSuccess")
12: result onSuccess {
13: case result => println(s"total = $result")
14: }


Future及其执行上下文ExecutionContext

Future[T]是包含并发计算的容器,在executioncontext分配给他的线程中,在某个不确定的时间开始执行,在不确定的时间返回T 或者Exception
Future计算完毕可以返回结果时视为completed,有三种情况:complete success failure
Future提供从其返回结果中提取值的接口,如三个回调函数,for、map、flatMap等
ExecutionContext可视为一个线程池,Future在这个池中的某个线程中执行
ExecutionContext.Implicits.global是默认的执行上下文,可以import
回调函数也是异步的,可能在不同的线程中执行,顺序不确定
多个Future可通过以下函数组合:map flatMap filter foreach recoverWith fallbackTo andThen


? and ask来发送消息并等待回复


在actor中定义一些状态,然后使用 become(somestate)方法转换状态


使用scala的parallel集合提高性能

适用于 顺序无关的计算,在测试使用和不用并行集合的性能后取舍

两种途径:1、调用集合的par方法转换为并行集合;2、使用ParXX集合类

不可变并行集合:

ParHashMap ParHashSet ParIterable ParMap

ParRange ParSeq ParSet ParVector

可变的并行集合: ParArray
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: