您的位置:首页 > 其它

使用SBT开发Akka第一个案例源码解析MapActor、ReduceActor、AggregateActor

2015-09-11 10:25 453 查看
学习了使用SBT开发Akka第一个案例源码解析MapActor、ReduceActor、AggregateActor,应用MapActor对单词计数,发信息给ReduceActor,对信息进行local级的汇总,然后交给AggregateActor。

案例如下:

class MapActor(reduceActor: ActorRef) extend Actor{

val STOP_WORDS_LIST=List("a","is")

deg receive: Receive={

case message: string=>

reduceActor ! evaluateExprssion(message)

case _=>

}

def evaluateExpression(line:String):MapData={

var dataList=new ArrayList[word]

var parser:StringTokenizer=new StringTokenizer(line)

var defaultCount: Tnteger=1

while (parser.hasMoreTokens()){

var word:String=parser.nextToken().toLowerCase()

if (!STOP_WORDS_LIST.contains(word)){

dataList.add(new Word(word, defaultCount))

}

}

return new MapData(dataList)

}

}

class ReduceActor(aggregateActor: ActorRef) extend Actor{

def receive : Receive = {

case message : MapData =>

aggregateActor ! reduce(message.dataList)

case _ =>

}

def reduce(dataList : ArrayList[word]);ReduceData={

var reducedMap = new HashMap[Sting,Integer]

for (wc:word <-dataList) {

var word: String =wc.word

if (reduceMap.containsKey(word){

reduceMap.put(word,reduceMap.get(word)+1)

} else{ reduceMap.put(word,1) }

}

return new ReduceData(reduceMap)}

}

class AggregateActor extend Actor {

var finalReduceMap = new HashMap[string, Integer]

def receive: Receive ={

case message: ReduceData =>

aggregateInMemoryReduce(message.reduceDataMap)

case message : Result =>

System.out.println(finalReduceMap.toString())}

def aggregateInMemoryReduce(reducedList: HashMap[String,Integer]){

var count: Integer =0

for (key <- reduceList.keyset){

if ( finalReduceMap.containsKey(key)){

count = reducedList.get(key)

count+=finalReduceMao.get(key)

finalReducedMap.put(key,count)

}else{ finalReducedMap.put(key,reducedList.get(key))}

}

}

}

}

DT大数据梦工厂Scala视频于今日更新突破100集!风雨同舟,感谢各位大数据爱好者的持续学习和支持!(DT大数据梦工厂1至100集scala的所有视频、PPT和代码在百度云盘的链接: http://url.cn/fSFPjS

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: