Akka实现简易MapReduce-Scala版
2015-09-23 14:22
141 查看
import scala.collection.mutable.{HashMap, ListBuffer} case class MapData(var dataList: ListBuffer[WordCount]) case class ReduceData(var reduceDataList: HashMap[String, Int]) case class Result() case class WordCount(var word: String, var count: Int)
AggregateActor
<pre name="code" class="java">package main.Actor import akka.actor.Actor import main.Bean.{Result, ReduceData} import scala.collection.mutable class AggregateActor extends Actor { val resultData = new mutable.HashMap[String, Int] def receive = { case obj: ReduceData => aggregateProc(obj) case obj: Result => println(resultData.toString) case _ => println("Unhandled Message ...AggregateActor") } def aggregateProc(obj: ReduceData) = { obj.reduceDataList.foreach { x => if (resultData.contains(x._1)) resultData(x._1) = resultData(x._1) + 1 else resultData += (x._1 -> 1) } } }
MapActor
package main.Actor import java.util import akka.actor.Actor import akka.actor.ActorRef import main.Bean.{MapData, WordCount} import scala.collection.mutable.ListBuffer class MapActor(val reduceActor: ActorRef) extends Actor { def receive = { case x: String => reduceActor ! mapProcess(x) case _ => println("Unhandled Message ...MapActor") } def mapProcess(msg: String) = { val mapResult = ListBuffer[WordCount]() msg.split(" ").foreach (mapResult += WordCount(_, 1)) new MapData(mapResult) } }
MasterActor
package main.Actor import akka.actor.{Props, Actor} import main.Bean.Result class MasterActor extends Actor { // @throws[Exception](classOf[Exception]) val aggreActor = context.actorOf(Props(classOf[AggregateActor])) val reduceActor = context.actorOf(Props(new ReduceActor(aggreActor))) val mapActor = context.actorOf(Props(new MapActor(reduceActor))) def receive = { case x: String => mapActor ! x case x: Result => aggreActor ! x case _ => println("Unhandled Message ...MasterActor") } }
ReduceActor
package main.Actor import akka.actor.Actor import akka.actor.ActorRef import main.Bean.{ReduceData, WordCount, MapData} class ReduceActor(val aggreActor: ActorRef) extends Actor { // @throws[Exception](classOf[Exception]) def receive = { case obj: MapData => aggreActor ! reduceDue(obj) case _ => println("Unhandled Message ...ReduceActor") } def reduceDue(mapResult: MapData) = { val reduceResult = new scala.collection.mutable.HashMap[String,Int] mapResult.dataList.foreach{ x:WordCount => if(reduceResult.contains(x.word)) reduceResult(x.word) = reduceResult(x.word) + 1 else reduceResult += (x.word -> 1) } new ReduceData(reduceResult) } }
HelloAkka
package main import akka.actor.{Props, ActorSystem} import Actor._ import main.Bean.Result import scala.io.Source class HelloAkka { } object HelloAkka { def main(args: Array[String]) { val systemActor = ActorSystem("HelloAkka") val masterActor = systemActor.actorOf(Props[MasterActor], "master") masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\构造数据.sql").mkString masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\新建文本文档 (2).txt").mkString masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\oracle调优.sql").mkString masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\新建文本文档3.txt").mkString Thread.sleep(5000) masterActor ! new Result() Thread.sleep(1000) systemActor.shutdown() } }
相关文章推荐
- 电话号码正则
- Lnmp网站服务器搭建
- Java 集合体系详解——Set体系无序不重复集合
- 已知linux命令整理
- 二、Tiny4412开发板运行安卓系统
- leetcode decode ways
- 你应该知道的16个Linux服务器监控命令
- 【JavaScript】图片上传预览
- Qt信号-槽源码解析(二)
- Android 一组textview 点击之后更换背景 并保持不变
- Android学习笔记:最简纯素大白板
- XE8-indy10中TIdThread.Execute函数的源码与解读
- NPAPI运行流程
- yum和apt-get的区别
- nyoj 1087 摆方格
- jquery 排除重复
- C语言-----二维字符串数组内存图解
- Q&A: HOW TO FIX XCODE’S "USED AS THE NAME OF THE PREVIOUS PARAMETER" WARNING
- 一个常用的布局
- 单例模式