您的位置:首页 > 大数据

getting start with storm 翻译 第二章 part-2

2013-08-14 11:35 351 查看

转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/9961747

Bolts

现在我们有了一个spout来读取文件并且每一行发射一个元组。我们需要建立两个bolt来处理元组(见图2-1)。这些bolts实现了backtype.storm.topology.IRichBolt接口。

Bolt最重要的方法是voidexecute(Tuple input),每收到一个元组调用一次。对于每个收到的元组,bolt会发射出一些元组。


bolt或者spout可以发射如所需一样多的元组。当nextTuple或execute方法被调用时,它们可能发射0个,1个或多个元组。你将在第五章了解到更多。

 

第一个bolt,WordNormalizer,负责获取行并且标准化行。它会将行分隔成单词,将单词转化成小写并且trim单词。

首先我们要声明bolt的输出参数:

public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{
declarer.declare(newFields("word"));
}

这里我们声明bolt将输出一个域,名为word。

接着我们实现public void execute(Tuple input)方法,在这里输入的元组将被处理:

public voidexecute(Tuple input)
{
String sentence =input.getString(0);
String[]words=
sentence.split(" ");
for(String word:words){
word =word.trim();
if(!word.isEmpty()){
word =word.toLowerCase();
//Emit the word
collector.emit(newValues(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}

第一行读取元组的值。值可以通过位置或者名字读取。值被处理然后使用collector对象发射。在每个元组被处理完成后,collector的ack()方法被调用来表明处理被成功的完成。如果该元组不能被处理,应该调用collector的fail()方法。

示例2-2包含这个类的完整代码。

Example 2-2.src/main/java/bolts/WordNormalizer.java

package bolts;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizerimplementsIRichBolt{
private OutputCollector collector;

         publicvoidcleanup(){}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public voidexecute(Tuple input)
{
String sentence =input.getString(0);
String[]words=
sentence.split(" ");
for(String word:words){
word =word.trim();
if(!word.isEmpty()){
word =word.toLowerCase();
//Emit the word
List a =newArrayList();
a.add(input);
collector.emit(a,newValues(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
public voidprepare(Map stormConf,TopologyContext
context,
OutputCollector collector) {
this.collector=collector;
}
/**
* The bolt will only emit the field "word"
*/
public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{
declarer.declare(newFields("word"));
}
}


在这个类中,我们看到了在一个单独的execute方法中发射多个元组的例子。如果方法收到句子This is the
Storm book,在一个单独的execute方法中,它将发射五个元组。

 

下一个bolt,WordCounter,负责为单词计数。当topology结束的时候(cleanup()方法被调用时),我们会显示每个单词的计数。

 


这是一个bolt不发射任何东西的示例。在这个例子中,数据被加到一个map中,但在实际中,bolt会将数据存到数据库中。

package bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class WordCounterimplementsIRichBolt{
Integer id;
String name;
Map<String,Integer>counters;
private OutputCollector collector;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public voidcleanup(){
System.out.println("--
Word Counter ["+name+"-"+id+"]--");
for(Map.Entry<String,Integer>entry:
counters.entrySet()){
System.out.println(entry.getKey()+":
"+entry.getValue());
}
}
/**
* On each word We will count
*/
@Override
public voidexecute(Tuple input)
{
String str =input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c =counters.get(str)
+1;
counters.put(str,c);
}
//Set the tuple as Acknowledge
collector.ack(input);
}
/**
* On create
*/
@Override
public voidprepare(Map stormConf,TopologyContext
context,
OutputCollector collector) {
this.counters=newHashMap<String,Integer>();
this.collector=collector;
this.name=context.getThisComponentId();
this.id=context.getThisTaskId();
}
@Override
public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{}
}

execute方法是用map来收集和计数单词。当topology结束的时候,cleanup()方法被调用并且打印出计数的map。(这只是一个示例,通常来讲你应该在cleanup()方法里关闭有效的连接和其他资源当topology关闭的时候。)

主类

在主类中,你会建立topology和LocalCluster对象,它使你可以在本地测试和调试topology。与Config对象结合,LocalCluster允许你尝试不同的集群配置。例如,当一个全局或者类变量被不慎使用时,在使用不同数量的workers配置来测试你的topology时你会找到这个错误。(在第三章你将看到更多关于config对象。)


所有的topology结点应该可以在进程间没有数据共享的情形下独立运行(例如,没有全局或类变量),因为topology在实际的集群中运行时,这些进程可能运行在不同的机器上。

你将使用TopologyBuilder创建topology,topology告诉storm结点是怎么安排的并且它们之间怎样交换数据。

TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("word-reader",newWordReader());
builder.setBolt("word-normalizer",newWordNormalizer()).shuffleGrouping("wordreader");
builder.setBolt("word-counter",newWordCounter()).shuffleGrouping("wordnormalizer");

spout和bolts通过shuffleGroupings连接起来。这种分组(grouping)告诉storm在源结点和目标结点之间以随机分布的方式发送消息。

接着,建立包含topology配置的Config对象,该配置在运行时会被与集群的配置合并并且通过prepare方法发送到所有结点。

Config conf =newConfig();
conf.put("wordsFile",args[0]);
conf.setDebug(true);

对将要被spout读取的文件名设置属性wordsFile,因为你在开发过程中所以debug属性为true。当debug为true时,storm打印结点间交换的所有消息,debug数据对于理解topology怎样运行是有用的。

正如前边提到的,你将使用LocalCluster来运行topology。在生产环境中,topology持续的运行,但对于这个例子你可以只运行topology几秒以便你可以查看结果。

LocalCluster cluster =newLocalCluster();
cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();

创建和运行topology使用createTopology和submitTopology,睡眠两秒(topology运行在不同的线程中),然后通过关闭集群来停止topology。

看示例2-3来把它放到一起。

Example 2-3.src/main/java/TopologyMain.java

import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;
public class TopologyMain{
public static voidmain(String[]args)throws
InterruptedException{
//Topology definition
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("word-reader",newWordReader());
builder.setBolt("word-normalizer",newWordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter",newWordCounter(),2)
.fieldsGrouping("word-normalizer",newFields("word"));
//Configuration
Config conf =newConfig();
conf.put("wordsFile",args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("Getting-Started-Toplogie",conf,
builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}

实践一下

你将要运行你的第一个topology ! 如果你已经建立了文件src/main/resources/
words.txt并且其中每行有一个单词,你可以用这个命令运行topology:

         mvnexec:java -Dexec.mainClass="TopologyMain"-Dexec.args="src/main/resources/
words.txt"

例如,你可以用下边的word.txt文件:

Storm

test

are

great

is

an

Storm

simple

application

but

very

powerful

really

Storm

is

great

在日志中,你应该看到类似如下的输出:

is: 2

application: 1

but: 1

great: 1

test: 1

simple: 1

Storm: 3

really: 1

are: 1

great: 1

an: 1

powerful: 1

very: 1

在这个示例中,你只使用了每个结点的一个单例。但如果你有一个非常大的日志文件呢?你可以轻易的改变系统中结点的数量来并行工作。在这个例子中,你可以建立两个WordCounter的实例:

builder.setBolt("word-counter",newWordCounter(),2)
.shuffleGrouping("word-normalizer");

如果你重新运行程序,你将看到:

-- Word Counter [word-counter-2] --

application: 1

is: 1

great: 1

are: 1

powerful: 1

Storm: 3

-- Word Counter [word-counter-3] --

really: 1

is: 1

but: 1

great: 1

test: 1

simple: 1

an: 1

very: 1

牛逼啊!改变并行度这么容易(在实际中,当然,每个实例运行在独立的机器中)。但看起来似乎有个问题:单词is和great在每个WordCounter实例中各被计算了一次。为什么呢?当你使用shuffleGrouping的时候,你告诉storm以随机分布的方式将每条消息发送至你的bolt实例。在这个例子中,总是把相同的单词送到相同的WordCounter是更理想的。为了实现这个,你可以将shuffleGrouping("wordnormalizer")换成fieldsGrouping("word-normalizer",newFields("word"))。尝试一下并且重新运行程序来验证结果。你会在后续的章节看到更多关于分组和消息流的内容。

结论

我们已经讨论了storm本地操作模式和远程操作模式的不同,以及用storm开发的强大和简便。你也学到了更多关于storm的基本概念,这些概念我们将在接下来的章节深入解释。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Storm 大数据