Akka实战:分散、聚合模式
2015-11-26 00:16
417 查看
分散与聚合:简单说就是一个任务需要拆分成多个小任务,每个小任务执行完后再把结果聚合在一起返回。
代码 http://git.oschina.net/yangbajing/akka-action
传入一个关键词:
可选传入一个超时参数:
若超时到返回实际已爬取数据,则任务应继续运行直到所以数据抓取完成,并存库
NewsTask:接收请求,并设置超时时间
SearchPageTask:执行实际的新闻抓取操作(本实例将使用TimeUnit模拟抓取耗时)
在
在收到
SearchPageTask
代码 http://git.oschina.net/yangbajing/akka-action
实例背景
本实例来自一个真实的线上产品,现将其需求简化如下:传入一个关键词:
key,根据
key从网上抓取相关新闻
可选传入一个超时参数:
duration,设置任务到期时必须反回数据(返回实际已抓取数据)
若超时到返回实际已爬取数据,则任务应继续运行直到所以数据抓取完成,并存库
设计
根据需求,一个简化的分散、聚合模式可以使用两个actor来实现。
NewsTask:接收请求,并设置超时时间
SearchPageTask:执行实际的新闻抓取操作(本实例将使用TimeUnit模拟抓取耗时)
实现
NewsTaskhttps://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/NewsTask.scala
override def metricPreStart(): Unit = { context.system.scheduler.scheduleOnce(doneDuration, self, TaskDelay) } override def metricReceive: Receive = { case StartFetchNews => _receipt = sender() (0 until NewsTask.TASK_SIZE).foreach { i => context.actorOf(SearchPageTask.props(self), "scatter-" + i) ! SearchPage(key) } case GetNewsItem(newsItem) => _newses ::= newsItem if (_newses.size == NewsTask.TASK_SIZE) { logger.debug(s"分散任务,${NewsTask.TASK_SIZE}个已全部完成") if (_receipt != null) { _receipt ! NewsResult(key, _newses) _receipt = null } self ! PoisonPill } case TaskDelay => if (_receipt != null) { _receipt ! NewsResult(key, _newses) _receipt = null } }
metricPreStart方法中设置定时方法,调用时间为从代码运行开始到
doneDuration时间为止。定时被触发时将向当前
Actor发送一个
TaskDelay消息。
在
metricReceive方法中,分别对
StartFetchNews、
GetNewsItem、
TaskDelay三个消息进行操作。
在收到
StartFetchNews消息时,actor首先保存发送者actor的引用(结果将返回到此actor)。再根据
TASK_SIZE生成相应子任务
GetNewsItem消息的处理中,每收到一个消息就将其添加到
_newses列表中。并判断当
_newses个数等于
TASK_SIZE时(所有子任务已完成)将结果发送给
_receipt。
self ! PoisonPill,这句代码停止actor自身。它将把“毒药”发送到
NewsTaskActor的接收邮箱队列中。
TaskDelay消息被触发时,将直接返回已完成的新闻
_newses。返回数据后并不终止当前还未运行完任务。
SearchPageTask
https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/SearchPageTask.scala
override def metricReceive: Receive = { case SearchPage(key) => // XXX 模拟抓取新闻时间 TimeUtils.sleep(Random.nextInt(20).seconds) val item = NewsItem( "http://newssite/news/" + self.path.name, "测试新闻" + self.path.name, self.path.name, TimeUtils.now().toString, "内容简介", "新闻正文") taskRef ! GetNewsItem(item) context.stop(self) }
SearchPageTask的代码逻辑就比较易懂了,这里使用
sleep来模拟实际抓取新闻时的耗时。生成结果后返回数据给`taskRef`,并终止自己。
执行测试
./sbt akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest
总结
这是一个简单的Akka实例,实现了任务分发与结果聚合。提供了一种在指定时间内返回部份有效数据,同时任务继续执行的方式。这种分散、聚合的模式在实际生产中很常用,比如对多种数据源的整合,或某些需要长时间运行同时对返回数据完整性无强制要求的情况等。
MetricActor演示了怎么自定义
Actor,并为其提供一些侦测点的方式。以后有时间会写篇详文介绍。
相关文章推荐
- Windows下Scala环境搭建
- Windows7下安装Scala 2.9.2教程
- XML 文件解析--含Unicode字符的XML文件
- Play! Akka Flume实现的完整数据收集
- 分分钟掌握快速排序(Java / Scala 实现)
- Scala极速入门
- Spark初探
- Scala实现REST操作
- Scala method call syntax
- 关于Scala多重继承的菱形问题
- Scala 高阶函数(high-order function)剖析
- 用Spray构建RESTful接口
- Spray.io搭建Rest服务
- Spray.io搭建Rest — 支持Twirl模板并部署
- akka-rpc(基于akka的rpc实现)
- 搭建hadoop/spark集群环境
- Akka (actors) remote example
- 函数定义
- 使用 Scala 写WordContext程序