您的位置:首页 > Web前端 > JavaScript

JStorm环境搭建

2016-07-09 00:44 501 查看
开始JStorm学习之前需要搭建集群环境,这里演示搭建单机JStorm环境,仅供学习使用,生产环境部署大同小异,但建议参考JStorm社区及相关说明文档。

一、前提

JStorm核心代码均用Java实现,所以依赖Java Runtime,另外,JStorm有脚本采用Python实现,所以还需要Python的支持。

1、JAVA环境





2、Python环境





这里选择Java版本1.6.0_35及Python版本2.6.5,如果默认没有安装可以参考相关文档(www.java.comwww.python.org)。

二、版本选择

zeromq-3.2.4

zookeeper-3.4.5

jstorm-0.7.1

三、JStorm环境搭建

与Storm一样,JStorm的底层消息通信机制依赖zeromq/jzmq,另外,JStorm通过zookeeper实现数据共享和协调服务。

1、安装zeromq

wget http://download.zeromq.org/zeromq-3.2.4.tar.gz
tar zxf zeromq-3.2.4.tar.gz

cd zeromq-3.2.4

./configure

make

sudo make install

sudo ldconfig

2、安装jzmq

wget https://github.com/zeromq/jzmq/tarball/master -O jzmq.tar.gz

tar zxf jzmq.tar.gz

cd jzmq

./autogen.sh

./configure

make

make install

3、安装zookeeper

wget http://apache.dataguru.cn/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
tar zxf zookeeper-3.4.5.tar.gz

cd zookeeper-3.4.5

./bin/zkServer.sh start

./bin/zkServer.sh stop

4、安装jstorm

wget http://42.121.19.155/jstorm/jstorm-0.7.1.zip
unzip jstorm-0.7.1.zip

编辑配置文件conf/storm.yaml

storm.zookeeper.servers:

- “localhost”

nimbus.host: “localhost”

storm.zookeeper.root: “/jstorm”

storm.local.dir: “/tmp/jstorm”

drpc.servers:

- “localhost”

如果是开发环境本地内存不足情况时启动nimbus可能会抛出异常:

Error occurred during initialization of VM

Could not reserve enough space for object heap

只需要在conf/storm.yaml里配置:

nimbus.childopts: “-Xmx256m”

supervisor.childopts: “-Xmx256m”

worker.childopts: “-Xmx128m”

其中大小可根据实际情况配置

5、UI

前提:tomcat 7.0 或以上版本;

将jstorm-ui-0.7.1.war复制到tomcat的webapps目录下;

6、启动JStorm

启动zookeeper:进入zookeeper目录,执行bin/zkServer.sh start

启动Nimbus:进入JStorm目录,执行bin/jstorm nimbus

启动Supervisor:进入JStorm目录,执行bin/jstorm supervisor

启动Tomcat:进入Tomcat目录,执行bin/startup.sh

四、JStorm HelloWorld

1、编写源码

这个例子取自:github

HelloWorldTopology.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

package storm.cookbook;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/**
* Author: ashrith
* Date: 8/26/13
* Time: 12:03 PM
* Desc: setup the topology and submit it to either a local of remote Storm cluster depending on the arguments
*       passed to the main method.
*/
public class HelloWorldTopology {
/*
* main class in which to define the topology and a LocalCluster object (enables you to test and debug the
* topology locally). In conjunction with the Config object, LocalCluster allows you to try out different
* cluster configurations.
*
* Create a topology using 'TopologyBuilder' (which will tell storm how the nodes area arranged and how they
* exchange data)
* The spout and the bolts are connected using 'ShuffleGroupings'
*
* Create a 'Config' object containing the topology configuration, which is merged with the cluster configuration
* at runtime and sent to all nodes with the prepare method
*
* Create and run the topology using 'createTopology' and 'submitTopology'
*/
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10);
builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1).shuffleGrouping("randomHelloWorld");
Config conf = new Config();
conf.put(Config.NIMBUS_HOST, "localhost");
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
conf.setDebug(true);
if(args!=null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
}
}
}

HelloWorldSpout.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
4445
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

package storm.cookbook;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* Author: ashrith
* Date: 8/21/13
* Time: 8:33 PM
* Desc: spout essentially emits a stream containing 1 of 2 sentences 'Other Random Word' or 'Hello World' based on
*       random probability. It works by generating a random number upon construction and then generating subsequent
*       random numbers to test against the original member variable's value. When it matches "Hello World" is emitted,
*       during the remaining executions the other sentence is emitted.
*/
public class HelloWorldSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
private int referenceRandom;
private static final int MAX_RANDOM = 10;
public HelloWorldSpout() {
final Random rand = new Random();
referenceRandom = rand.nextInt(MAX_RANDOM);
}
/*
* declareOutputFields() => you need to tell the Storm cluster which fields this Spout emits within the
*  declareOutputFields method.
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
/*
* open() => The first method called in any spout is 'open'
*           TopologyContext => contains all our topology data
*           SpoutOutputCollector => enables us to emit the data that will be processed by the bolts
*           conf => created in the topology definition
*/
@Override
public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.collector = collector;
}
/*
* nextTuple() => Storm cluster will repeatedly call the nextTuple method which will do all the work of the spout.
*  nextTuple() must release the control of the thread when there is no work to do so that the other methods have
*  a chance to be called.
*/
@Override
public void nextTuple() {
final Random rand = new Random();
int instanceRandom = rand.nextInt(MAX_RANDOM);
if(instanceRandom == referenceRandom){
collector.emit(new Values("Hello World"));
} else {
collector.emit(new Values("Other Random Word"));
}
}
}

HelloWorldBolt.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

package storm.cookbook;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.Map;
/**
* Author: ashrith
* Date: 8/26/13
* Time: 11:48 AM
* Desc: This bolt will consume the produced Tuples from HelloWorldSpout and implement the required counting logic
*/
public class HelloWorldBolt extends BaseRichBolt {
private int myCount = 0;
/*
* prepare() => on create
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
/*
* execute() => most important method in the bolt is execute(Tuple input), which is called once per tuple received
*  the bolt may emit several tuples for each tuple received
*/
@Override
public void execute(Tuple tuple) {
String test = tuple.getStringByField("sentence");
if(test == "Hello World"){
myCount++;
System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));
}
}
/*
* declareOutputFields => This bolt emits nothing hence no body for declareOutputFields()
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}

2、提交Topology

上述源码编译打包Helloworld.jar后提交到jstorm集群:

bin/jstorm jar Helloworld.jar storm.cookbook.HelloWorldTopology HelloWorld

其中参数[HelloWorld]为TopologyName

3.查看Topology运行状况

通过ui等途径可以查看Topology的执行情况。

五、结语

本节简单介绍了JStorm单机环境的搭建,用供初学者搭建单机JStorm,并能够编写HelloWolrd,生产环境集群搭建仅做参考,详细配置建议查询相关文档。

六、参考文档

[1]https://github.com/alibaba/jstorm/wiki

[2]https://github.com/nathanmarz/storm/wiki
http://hexiaoqiao.sinaapp.com/2014/06/09/jstorm%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: