您的位置:首页 > 其它

Akka实现简易MapReduce-Scala版

2015-09-23 14:22 141 查看
import scala.collection.mutable.{HashMap, ListBuffer}

case class MapData(var dataList: ListBuffer[WordCount])

case class ReduceData(var reduceDataList: HashMap[String, Int])

case class Result()

case class WordCount(var word: String, var count: Int)


AggregateActor


<pre name="code" class="java">package main.Actor

import akka.actor.Actor
import main.Bean.{Result, ReduceData}

import scala.collection.mutable

class AggregateActor extends Actor {

val resultData = new mutable.HashMap[String, Int]

def receive = {
case obj: ReduceData => aggregateProc(obj)
case obj: Result => println(resultData.toString)
case _ => println("Unhandled Message ...AggregateActor")
}

def aggregateProc(obj: ReduceData) = {
obj.reduceDataList.foreach {
x =>
if (resultData.contains(x._1))
resultData(x._1) = resultData(x._1) + 1
else
resultData += (x._1 -> 1)
}
}
}




MapActor


package main.Actor

import java.util

import akka.actor.Actor
import akka.actor.ActorRef
import main.Bean.{MapData, WordCount}

import scala.collection.mutable.ListBuffer

class MapActor(val reduceActor: ActorRef) extends Actor {

def receive = {
case x: String => reduceActor ! mapProcess(x)
case _ => println("Unhandled Message ...MapActor")
}

def mapProcess(msg: String) = {
val mapResult = ListBuffer[WordCount]()
msg.split(" ").foreach (mapResult += WordCount(_, 1))
new MapData(mapResult)
}
}


MasterActor


package main.Actor

import akka.actor.{Props, Actor}
import main.Bean.Result

class MasterActor extends Actor {
//  @throws[Exception](classOf[Exception])
val aggreActor = context.actorOf(Props(classOf[AggregateActor]))
val reduceActor = context.actorOf(Props(new ReduceActor(aggreActor)))
val mapActor = context.actorOf(Props(new MapActor(reduceActor)))

def receive = {
case x: String => mapActor ! x
case x: Result => aggreActor ! x
case _ => println("Unhandled Message ...MasterActor")
}

}


ReduceActor


package main.Actor

import akka.actor.Actor
import akka.actor.ActorRef
import main.Bean.{ReduceData, WordCount, MapData}

class ReduceActor(val aggreActor: ActorRef) extends Actor {
//  @throws[Exception](classOf[Exception])
def receive = {
case obj: MapData => aggreActor ! reduceDue(obj)
case _ => println("Unhandled Message ...ReduceActor")
}

def reduceDue(mapResult: MapData) = {
val reduceResult = new scala.collection.mutable.HashMap[String,Int]
mapResult.dataList.foreach{
x:WordCount =>
if(reduceResult.contains(x.word))
reduceResult(x.word) = reduceResult(x.word) + 1
else
reduceResult += (x.word -> 1)
}
new ReduceData(reduceResult)
}
}


HelloAkka


package main

import akka.actor.{Props, ActorSystem}
import Actor._
import main.Bean.Result

import scala.io.Source

class HelloAkka {
}

object HelloAkka {
def main(args: Array[String]) {

val systemActor = ActorSystem("HelloAkka")
val masterActor = systemActor.actorOf(Props[MasterActor], "master")

masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\构造数据.sql").mkString
masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\新建文本文档 (2).txt").mkString
masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\oracle调优.sql").mkString
masterActor ! Source.fromFile("C:\\Users\\ght\\Desktop\\新建文本文档3.txt").mkString

Thread.sleep(5000)

masterActor ! new Result()

Thread.sleep(1000)

systemActor.shutdown()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: