您的位置:首页 > 其它

【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析

2015-05-31 09:29 686 查看
2015-04-01 11:20 263人阅读 评论(0) 收藏 举报

storm

目录(?)[+]

前言:阅读笔记

storm和hadoop集群很像,hadoop运行mr,storm运行topologies。

mr和topologies最关键的不同点是:mr运行最终会结束,而topologies永远运行直到你kill。

storm集群有两种节点:master和worker。

master运行一个后台进程Nimbus,和hadoop的jobtracker相似。

Nimbus负责在集群中分发代码,为工作节点分配任务,并监控故障。

worker运行一个后台进程Supervisor。

supervisor监听分配来的任务,启动和停止worker进程去处理nimbus分配来的任务。

每个worker进程执行拓扑的一个子集;一个运行的拓扑结构由很多分布在不同机器的worker进程构成。



所有nimbus和supervisor之间的协调工作是有zk集群来做的。

此外,nimbus和supervisor是fail-fast和stateless;所有状态保存在zk或者本地磁盘。

守护进程可以是无状态的而且失效或重启时不会影响整个系统的健康。

运行storm

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

storm jar负责连接nimbus并且上传jar。

原始基本的storm提供了spouts和bolts做流转换。spouts和bolts是运行应用逻辑要实现的接口。

spout是流的源,读进数据并以流的形式发送出去;

bolt消费输入的流,处理或者以新的流发送出去。



Storm会自动重新分配失败的任务,并且storm保证不会有数据丢失,即使机器宕机



下载安装

http://storm.apache.org/downloads.html

1、依赖安装

yum install uuid -y

yum install e2fsprogs -y

yum install libuuid*

yum install libtool -y

yum install *c++* -y

yum install git -y

2、zk集群

/article/3601532.html

3、zeromq&jzmq

tar -xzvf zeromq-4.0.5.tar.gz

./autogen.sh

./configure && make && make install

jzmq

git clone git://github.com/nathanmarz/jzmq.git

./autogen.sh

./configure && make && make install

4、python

./configure

make

make install

rm -f /usr/bin/python

ln /usr/local/bin/python3.4 /usr/bin/python

python -V

vi /usr/bin/yum

#!/usr/bin/python 改为 #!/usr/bin/python2.4

配置运行

storm.yaml

storm.zookeeper.servers:
- 192.168.11.176
- 192.168.11.177
- 192.168.11.178
storm.zookeeper.port: 2181
nimbus.host: "192.168.11.176"

storm.local.dir: "/home/storm/workdir"

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

1、Nimbus: 在Storm主控节点上运行"bin/storm nimbus >/dev/null 2>&1 &"启动Nimbus后台程序,并放到后台执行;

2、Supervisor: 在Storm各个工作节点上运行"bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行;

3、UI: 在Storm主控节点上运行"bin/storm ui >/dev/null 2>&1 &"启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。



单词计数程序

Spout

[java] view
plaincopyprint?





package com.cmcc.chiwei.storm;

import java.io.BufferedReader;

import java.io.FileNotFoundException;

import java.io.FileReader;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {

private static final long serialVersionUID = 1L;

private SpoutOutputCollector collector;

private FileReader fileReader;

private boolean completed = false;

public boolean isDistributed() {

return false;

}

public void ack(Object msgId) {

System.out.println("OK:"+msgId);

}

public void close() {}

public void fail(Object msgId) {

System.out.println("FAIL:"+msgId);

}

/**

* The only thing that the methods will do It is emit each

* file line

*/

public void nextTuple() {

if(completed){

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

//Do nothing

}

return;

}

String str;

//Open the reader

BufferedReader reader = new BufferedReader(fileReader);

try{

//Read all lines

while((str = reader.readLine()) != null){

/**

* By each line emmit a new value with the line as a their

*/

this.collector.emit(new Values(str),str);

}

}catch(Exception e){

throw new RuntimeException("Error reading tuple",e);

}finally{

completed = true;

}

}

/**

* We will create the file and get the collector object

*/

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

try {

this.fileReader = new FileReader(conf.get("wordsFile").toString());

} catch (FileNotFoundException e) {

throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");

}

this.collector = collector;

}

/**

* Declare the output field "word"

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("line"));

}

}

Bolt1

[java] view
plaincopyprint?





package com.cmcc.chiwei.storm;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {

private static final long serialVersionUID = 1L;

public void cleanup() {}

/**

* The bolt will receive the line from the

* words file and process it to Normalize this line

*

* The normalize will be put the words in lower case

* and split the line to get all words in this

*/

public void execute(Tuple input, BasicOutputCollector collector) {

String sentence = input.getString(0);

String[] words = sentence.split(" ");

for(String word : words){

word = word.trim();

if(!word.isEmpty()){

word = word.toLowerCase();

collector.emit(new Values(word));

}

}

}

/**

* The bolt will only emit the field "word"

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

Bolt2

[java] view
plaincopyprint?





package com.cmcc.chiwei.storm;

import java.util.HashMap;

import java.util.Map;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {

private static final long serialVersionUID = 1L;

Integer id;

String name;

Map<String, Integer> counters;

/**

* At the end of the spout (when the cluster is shutdown

* We will show the word counters

*/

@Override

public void cleanup() {

System.out.println("-- Word Counter ["+name+"-"+id+"] --");

for(Map.Entry<String, Integer> entry : counters.entrySet()){

System.out.println(entry.getKey()+": "+entry.getValue());

}

}

/**

* On create

*/

@Override

public void prepare(Map stormConf, TopologyContext context) {

this.counters = new HashMap<String, Integer>();

this.name = context.getThisComponentId();

this.id = context.getThisTaskId();

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {}

public void execute(Tuple input, BasicOutputCollector collector) {

String str = input.getString(0);

if(!counters.containsKey(str)){

counters.put(str, 1);

}else{

Integer c = counters.get(str) + 1;

counters.put(str, c);

}

}

}

Topology

[java] view
plaincopyprint?





package com.cmcc.chiwei.storm;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

public class TopologyMain {

public static void main(String[] args) throws InterruptedException {

//Topology创建拓扑,安排storm各个节点以及它们交换数据的方式

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word-reader",new WordReader());

builder.setBolt("word-normalizer", new WordNormalizer())

.shuffleGrouping("word-reader");

builder.setBolt("word-counter", new WordCounter(),1)

.fieldsGrouping("word-normalizer", new Fields("word"));

//Configuration

Config conf = new Config();

conf.put("wordsFile", args[0]);

conf.setDebug(false);

//Topology run

conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("Getting-Started-Topology", conf, builder.createTopology());

Thread.sleep(2000);

cluster.shutdown();

}

}

words.txt

[plain] view
plaincopyprint?





hello

world

storm flume hadoop hdfs

what's wrong flume ?

what's up hdfs ?

Hi,storm,what are you doing ?

运行结果

[plain] view
plaincopyprint?





OK:hello

OK:world

OK:storm flume hadoop hdfs

OK:what's wrong flume ?

OK:what's up hdfs ?

OK:Hi,storm,what are you doing ?

[plain] view
plaincopyprint?





-- Word Counter [word-counter-2] --

what's: 2

flume: 2

hdfs: 2

you: 1

storm: 1

up: 1

hello: 1

hadoop: 1

hi,storm,what: 1

are: 1

doing: 1

wrong: 1

?: 3

world: 1

分析内容:

spout

读取原始数据,为bolt提供数据。

bolt

从spout或其它bolt接收数据并处理,处理结果可作为其它bolt的数据源或最终结果。

nimbus

主节点的守护进程,负责为工作节点分发任务。

topology

拓扑结构,storm的一个任务单元。

define fields定义域,由spout和bolt提供,被bolt接收。

一个storm集群就是在一连串的bolt之间转换spout传过来的数据。

如:

spout读到一行文本,文本行传给一个bolt,按单词切割后传给另一个bolt,第二个bolt做计数累加。

Spout

open --> nextTuple

Bolt1

declareOutputFields --> execute

Bolt2

prepare --> execute --> cleanup

更详细的内容,将在后续慢慢讲解,我也在研究中。。。。。

望各位不吝指教!!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: