您的位置:首页 > 编程语言

第94讲:Akka第一个案例动手实战MapActor、ReduceActor、AggregateActor代码详解学习笔记

2015-09-07 22:58 501 查看
第94讲:Akka第一个案例动手实战MapActor、ReduceActor、AggregateActor代码详解学习笔记

本期内容:

1.关于代码实现的说明

2.MapActor代码实现详解

3.ReduceActor代码实现详解

4.AggregateActor代码实现详解

MapActor主要职责是把传进来的字符串进行单词解析,并把每个单词计数一次。

ReduceActor是把重复的单词进行计数,

AggregateActor对所有的MapActor和ReduceActor进行全局计数

package akka.dt.app.java.messages;

public class MapActor{

String[] STOP_WORDS = {"a","is"};

oruvate Kust<String> STOP_WORDS_LIST - Array.asList(STOP_WORDS);

public MapActor(ActorRef inReduceActor) {reduceActor = inReduceActor;}

@Override

public void onReceive(Object message) throws Exception{

//MapActor接收到的消息是MasterActor通过tell的方式传入的字符串。

if (message instanceof String) {

String work = (String) message;

//map the words in the sentence

MapData data = evaluateExpression(work); //产生MapData实体

//send the result to ReduceActor

reduceActor.tell(data);

//获得MapData后交给reduceActor,

} else

unhandled(message);

}

private MapData evaluateExpression(String line){

List<WordCount> dataList = new ArrayList<!>();

//WordCount是单词和计数的List。

StringTokenizer parser = new StringTokenizer(line);

//按照既定的规则把传进来的字符串进行切分。

/*StringTokenizer的源码:

* public StringTokenizer(String str,String delim) { this(str, delim, false);}

public StringTokenizer(String str){

this(str, "\t\n\r\f",false);

}

*/

while(parser.hasMoreTokens()) { //hasMoreTokens看看有没有更多元素,如果有的话就获取,并转换成小写字母构成的单词。

String word = parser.nextToken().toLowerCase();

if(!STOP_WORDS_LIST.contains(word)){ //如果含有特殊字符(STOP_WORDS定义的)就不计数。

dataList.add(new WordCount(word, Integer.valueof(1)));

//把当前切分的单词以WordCount实例的方式构成一个实体对象写进dataList中。一直循环,即使同一单词重要也重复写入。

}

}

return new MapData(dataList); //通过MapData的方式把dataList传进MapData的构造器

}

}

public class ReduceActor extends UntypeActor { //就是把相同的单词出现的次数进行统计。

private ActorRef aggregateActor = null;

public ReduceActor(ActorRef inAggregateActor) {aggregateActor = inAggregateActor;}

@Override

public void onReceive(Object message) throws Exception{

//AggregateActor收到的消息有两种,一种是ReduceData类型,一种是Result类型

if (message instanceof MapData) {

MapData mapData = (MapData) message;

//reduce the incoming data

ReduceData reduceData = reduce(mapData.gerDataList());

//forward the result to aggregate actor

aggregateActor.tell(reduceData);

} else

unhandled(message);

}

private ReduceData reduce(List<WordCount> dataList){

HashMap<String, Integer> reducedMap = new HashMap<~>();

for (WordCount wordCount : dataList){

if(reduceMap.containsKey(wordCount.getWord())){

Integer value = (Integer) reducedMap.get(wordCount.getWord());

value++;

rduceMap.put(wordCount.getWord(),value);

} else {

reduceMap.put(wordCount.getWord(),Integer.valueOf(1));

}

}

return new ReduceData(reducedMap);

}

}

package akka.dt.app.java.actors;

import ...

public class AggregateActor extends UntypeActor {

private Map<String,Integer> finalReducedMap = new HashMap<~>();

@Override

public void onReceive(Object message) throws Exception{

//AggregateActor收到的消息有两种,一种是ReduceData类型,一种是Result类型

if (message instanceof ReduceData) {

ReduceData reduceData = (ReduceData) message;

aggregateInMemoryReduce(reduceData.getReduceDataList());

} else if (message instanceof Result) {

System.out.prontln(finalReducedMap.toString());

} else

unhandled(message);

}

private void aggreInMemoryReduce(Map<String,Integer> reducedList){

Integer count = nul;

for (String key : reducedList.keySet()) {

if (finalReduceMap.containsKey(key)){

count - reducedList.get(key) + finalReducedMap.get(key);

finalReducedMap.put(key, count);

} else {

finalReducedMap.put(key,reducedList.get(ley));

} //对所有reduceActor进行全局统计。

}

}

}

public class MasterActor extends UntypedActor {

private ActorRef aggregaterActor = getContext().actorOf(

new Props(AggregateActor.class), "aggregate");

private ActorRef reduceActor - getContext().actorOf(

new Props((UntypeActorFactory create() → {return new ReduceActor(aggregateActor);}),"map")

@override

public void onReceive(Object message) throws Exception{

if (message instanceof String) {

mapActor.tell(message);

} else if (message instanceof Result) {

aggregateActor.tell(message);

} else

unhandled(message);

}

}

//messages:MapData,ReduceData,Result,WordCount

package akka.dt.app.java.messages;

import java.util.List;

public class MapData { //用来让MapActor处理数据后存储在MapData实体中,

//然后方便把处理结果交给ReduceActor

private List<WordCount> dataList; //私有成员是List,具体类型是WordCount,WordCount也是javabeen

public List<WordCount> getDataList() {return dataList;}

public MapData(List<WordCount> dataList) {this.dataList = dataList;}

}

package akka.dt.app.java.messages;

import java.util.HashMap;

public class ReduceData {

private HashMap<String, Integer> reduceDataList;

public HashMap<String, Integer> getReduceDataList(){return reduceDataList;}

public ReduceData(HashMap<String, Integer> reduceDataList){this.reduceDataList = reduceDataList;}

}

//ReduceData进行本地型统计

package akka.dt.app.java.messages;

public class Result {

}

//传入的字符串先交给MapActor进行切分,然后交给ReduceActor进行本地统计,

//最后交给AggregateActor进行全局的统计,

//想要获得这个结果,通过MasterActor发一个消息Result,Result本身为空,不需要有任何内容。

//这个消息交给MasterActor,MasterActor收到消息时,如果消息是result类型的话转过来会告诉AggregateActor,

//再转发给AggregateActor。

WordCount是纯粹的javabeen,

package akka.dt.app.java.messages;

public class WordCount {

private String word;

private Integer count;

public WordCount(String inWord, Integer inCount){

word = inWord;

count = inCount;

}

public String getWord(){return word;}

public Integer getCount(){return count;}

}

public class HelloAkka {

public static void main(String[] args) throws Exception {

ActorSystem _system = ActorSystem.create("HelloAkka");

ActorRef master = _system.actorOf(new Props(MasterActor.class),"master");

master.tell("Hi! My name is Rocky. I'm so so so so happy to ne here. ");

master.tell("Today, I'm going to read a news article for you. ");

master.tell("I hope I hope you'll like it.")

Thread.sleep(500);

master.tell(new Result());

Thread.sleep(500);

_system.shutdown();

}

}

一般情况是MasterActor把字符串传递给MapActor,MapActor把每一个单词都计数为1,

MapActor统计完成后把结果传给MasterActor,MasterActor再把这个结果传递给ReduceActor,

统计每个单词的出现次数,统计完成后ReduceActor把结果传递回MasterActor,再

由MasterActor把所有ReduceActor传回的结果送给AggregateActor,由AggregateActor进行全局统计。

本次实例采用的是:

MasterActor把字符串发给MapActor,MapActor把每一个单词都计数为1,

统计之后MapActor直接把结果发给了ReduceActor,这就是一个local reduce的过程,

一般都在一个jvm中,AggregateActor对所有ReduceActor的结果进行的全局统计也没有必须和MasterActor联系。

这是内部处理逻辑,Master是负责调度的,MapActor/ReduceActor/AggregateActor是负责计算的。

这样的方式逻辑更清晰,处理效率更高。

以上内容是从王家林老师DT大数据课程第94讲的学习笔记。

DT大数据微信公众账号:DT_Spark

王家林老师QQ:1740415547

王家林老师微信号:18610086859

第94讲视频网站地址:

51CTO视频:http://edu.51cto.com/lesson/id-74808.html

土豆视频:http://www.tudou.com/programs/view/fwiUrVWSGeM/

优酷视频:http://v.youku.com/v_show/id_XMTMyOTA4Njk2MA==.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: