storm学习小结三:编写拓扑实践
2014-09-30 13:59
281 查看
本文在linux环境下安装配置storm,并成功运行一个topology,完成加法操作。
一.storm的安装和配置
安装storm,需要依赖以几个系统:
1.zookeeper
zookeeper的安装,请自行查找资料;
2.java的安装
直接使用公司的某工具就可以安装完成,版本是6u45
3.jdk环境的配置(linux环境下)
从sun的官网下载jdk-6u43-linux-x64.bin,放置在任一目录下;
运行sh ./jdk-6u43-linux-x64.bin完成对jdk的安装
然后在~/.bashrc下加入:export JAVA_HOME=/home/work/rp-product/jdk1.6.0_43
并且运行 . ~/.bashrc
4.python的安装
也是直接使用公司的某工具就可以安装完成,版本是2.6
由于我的测试机在/usr/bin目录下已经有旧版的python二进制了,安装完成后,把他替换成新的。
5.zmp的安装
下载tar包:http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar -zxvf ./zeromq-2.1.7.tar.gz
cd zeromq-2.1.7.tar.gz
./configure
make
sudo make install
6.jzmq的安装
从网上下载个jzmq.zip的压缩包,我是在公司内网下载的。
unzip ./jzmq.zip
cd jzmq-master
./autogen.sh
./configure
make
sudo make install
7.storm的安装
下载tar包:https://github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
unzip storm-0.8.1.zip
cd storm-0.8.1
修改conf/storm.yaml,加入以下几行就可以了:
storm.zookeeper.servers:
- "xxx.xxx.xxx.xxx"
nimbus.host: "xxx.xxx.xxx.xxx"
我把所有服务都搭在一台机器上,所以zk和nimbus.host设置成同一台机器的ip。并且把storm-0.8.1目录复制成storm-0.8.1-nimbus和storm-0.8.1-supervisor两个目录,分别作为nimbus和supervisor的运行环境。
8.运行storm
cd storm-0.8.1-nimbus
nohup ./bin/storm nimbus &
nohup ./bin/storm ui & (不确定ui和nimbus运行的是同一个二进制有没有问题)
cd storm-0.8.1-supervisor
nohup ./bin/storm supervisor &
由于是用作测试,就先在同一台机器上运行nimbus、ui和supervisor
二.topology的编写
个人对java不熟,先写一个简单的拓扑,由spout发出一个如“2+3”这样的tuple,由bolt做加法计算。写拓扑前先搭建开发环境。
1.java环境搭建(windows环境)
去官网上下载一个java se的windows版本,我选择的是:jdk-6u45-windows-x64.exe
直接运行并安装到本地目录:D:\Program Files\Java\jdk1.6.0_45
添加以下环境变量:
path D:\Program Files\Java\jdk1.6.0_45
classpath D:\Program Files\Java\jdk1.6.0_45\lib
JAVA_HOME D:\Program Files\Java\jdk1.6.0_45
2.maven
使用maven这个项目管理工具的目的是为了方便的拉取写topology时需要的storm库,这也是storm官方推荐的一种做法。关于maven的使用,不在这里作介绍。
去官网下载一个maven的windows客户端,我下载的是这个:http://mirrors.hust.edu.cn/apache/maven/maven-3/3.0.5/binaries/apache-maven-3.0.5-bin.zip
将这个压缩包解压到任一个目录:D:\Program Files\apache-maven-3.0.5
添加以下环境变量:
M2_HOME D:\Program Files\apache-maven-3.0.5
path %M2_HOME%\bin
3.使用maven生成一个项目,用于编写topology
在cmd中选定某个工作目录后,运行:mvn archetype:create
-DgroupId=testapp.storm.topology -DartifactId=aplusb
这样,便会生成一个aplusb目录,并且该目录下有一个pom.xml文件,maven依靠他来构建项目。
4.修改pom.xml文件
修改pom.xml文件,加入对storm的依赖。
在<project> --> <repositories>标签下加入以下这个<repository>标签,用于表示storm依赖所在的仓库:
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
在<project> --> <dependencies>标签下加入以下这个<dependency>标签,表示对storm的依赖:
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.8.1</version>
</dependency>
5.编写topology的java代码:
代码是我从网上找的,做了一些修改,我直接贴上代码,文件是App\src\main\java\testapp\storm\topology\AplusbTopology.java
6.编写spout的代码
文件是App\src\main\java\testapp\storm\topology\AplusbSpout.java
7.编写bolt代码
文件是App\src\main\java\testapp\storm\topology\AplusbBolt.java
8.编译
直接使用maven来产生jar包,在App目录下运行:mvn package
这个操作会在App\target目录下产生jar包:App-1.0-SNAPSHOT
9.运行
把这个jar包拷贝到测试机上,使用storm工具来运行。
本地模式:/home/work/sandbox/strom-0.8.1-nimbus/bin/storm jar App-1.0-SNAPSHOT testapp.storm.topology.AplusbTopology
集群模式:/home/work/sandbox/strom-0.8.1-nimbus/bin/storm jar App-1.0-SNAPSHOT testapp.storm.topology.AplusbTopology testapp
本地模式用于调试,可以直接观察到一些调试信息;集群模式就是正常的模式了,我在代码里,把加法的结果打印在了/home/work/sandbox/output/output.txt文件里
一.storm的安装和配置
安装storm,需要依赖以几个系统:
1.zookeeper
zookeeper的安装,请自行查找资料;
2.java的安装
直接使用公司的某工具就可以安装完成,版本是6u45
3.jdk环境的配置(linux环境下)
从sun的官网下载jdk-6u43-linux-x64.bin,放置在任一目录下;
运行sh ./jdk-6u43-linux-x64.bin完成对jdk的安装
然后在~/.bashrc下加入:export JAVA_HOME=/home/work/rp-product/jdk1.6.0_43
并且运行 . ~/.bashrc
4.python的安装
也是直接使用公司的某工具就可以安装完成,版本是2.6
由于我的测试机在/usr/bin目录下已经有旧版的python二进制了,安装完成后,把他替换成新的。
5.zmp的安装
下载tar包:http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar -zxvf ./zeromq-2.1.7.tar.gz
cd zeromq-2.1.7.tar.gz
./configure
make
sudo make install
6.jzmq的安装
从网上下载个jzmq.zip的压缩包,我是在公司内网下载的。
unzip ./jzmq.zip
cd jzmq-master
./autogen.sh
./configure
make
sudo make install
7.storm的安装
下载tar包:https://github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
unzip storm-0.8.1.zip
cd storm-0.8.1
修改conf/storm.yaml,加入以下几行就可以了:
storm.zookeeper.servers:
- "xxx.xxx.xxx.xxx"
nimbus.host: "xxx.xxx.xxx.xxx"
我把所有服务都搭在一台机器上,所以zk和nimbus.host设置成同一台机器的ip。并且把storm-0.8.1目录复制成storm-0.8.1-nimbus和storm-0.8.1-supervisor两个目录,分别作为nimbus和supervisor的运行环境。
8.运行storm
cd storm-0.8.1-nimbus
nohup ./bin/storm nimbus &
nohup ./bin/storm ui & (不确定ui和nimbus运行的是同一个二进制有没有问题)
cd storm-0.8.1-supervisor
nohup ./bin/storm supervisor &
由于是用作测试,就先在同一台机器上运行nimbus、ui和supervisor
二.topology的编写
个人对java不熟,先写一个简单的拓扑,由spout发出一个如“2+3”这样的tuple,由bolt做加法计算。写拓扑前先搭建开发环境。
1.java环境搭建(windows环境)
去官网上下载一个java se的windows版本,我选择的是:jdk-6u45-windows-x64.exe
直接运行并安装到本地目录:D:\Program Files\Java\jdk1.6.0_45
添加以下环境变量:
path D:\Program Files\Java\jdk1.6.0_45
classpath D:\Program Files\Java\jdk1.6.0_45\lib
JAVA_HOME D:\Program Files\Java\jdk1.6.0_45
2.maven
使用maven这个项目管理工具的目的是为了方便的拉取写topology时需要的storm库,这也是storm官方推荐的一种做法。关于maven的使用,不在这里作介绍。
去官网下载一个maven的windows客户端,我下载的是这个:http://mirrors.hust.edu.cn/apache/maven/maven-3/3.0.5/binaries/apache-maven-3.0.5-bin.zip
将这个压缩包解压到任一个目录:D:\Program Files\apache-maven-3.0.5
添加以下环境变量:
M2_HOME D:\Program Files\apache-maven-3.0.5
path %M2_HOME%\bin
3.使用maven生成一个项目,用于编写topology
在cmd中选定某个工作目录后,运行:mvn archetype:create
-DgroupId=testapp.storm.topology -DartifactId=aplusb
这样,便会生成一个aplusb目录,并且该目录下有一个pom.xml文件,maven依靠他来构建项目。
4.修改pom.xml文件
修改pom.xml文件,加入对storm的依赖。
在<project> --> <repositories>标签下加入以下这个<repository>标签,用于表示storm依赖所在的仓库:
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
在<project> --> <dependencies>标签下加入以下这个<dependency>标签,表示对storm的依赖:
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.8.1</version>
</dependency>
5.编写topology的java代码:
代码是我从网上找的,做了一些修改,我直接贴上代码,文件是App\src\main\java\testapp\storm\topology\AplusbTopology.java
package testapp.storm.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; public class AplusbTopology { public static void main(String[] args) { try { // 实例化TopologyBuilder类。 TopologyBuilder topologyBuilder = new TopologyBuilder(); // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 topologyBuilder.setSpout("AplusbSpout", new AplusbSpout(), 1); // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 topologyBuilder.setBolt("AplusbBolt", new AplusbBolt(), 1).shuffleGrouping("AplusbSpout"); Config config = new Config(); config.setDebug(true); if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { // 这里是本地模式下运行的启动代码。 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("aplusb", config, topologyBuilder.createTopology()); } } catch (Exception e) { e.printStackTrace(); } } }
6.编写spout的代码
文件是App\src\main\java\testapp\storm\topology\AplusbSpout.java
package testapp.storm.topology; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; @SuppressWarnings("serial") public class AplusbSpout extends BaseRichSpout{ //用来发射数据的工具类 private SpoutOutputCollector collector; Random random=new Random(); /** * 初始化collector */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 */ @Override public void nextTuple() { try { String msg = random.nextInt(100) + "+" + random.nextInt(100); // 调用发射方法 collector.emit(new Values(msg)); // 模拟等待100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 } }
7.编写bolt代码
文件是App\src\main\java\testapp\storm\topology\AplusbBolt.java
package testapp.storm.topology; import java.util.Random; import java.io.*; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; @SuppressWarnings("serial") public class AplusbBolt extends BaseBasicBolt { private File fileName = new File("/home/work/sandbox/output/output.txt"); public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); if (msg != null){ System.out.println(msg); String[] strarray=msg.split("\\+"); int result = Integer.parseInt(strarray[0]) + Integer.parseInt(strarray[1]); String result_str = Integer.toString(result); collector.emit(new Values(msg + "=" + result_str)); PrintWriter outFile = new PrintWriter(fileName); outFile.write(msg + "=" + result_str); outFile.flush(); outFile.close(); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result")); } }
8.编译
直接使用maven来产生jar包,在App目录下运行:mvn package
这个操作会在App\target目录下产生jar包:App-1.0-SNAPSHOT
9.运行
把这个jar包拷贝到测试机上,使用storm工具来运行。
本地模式:/home/work/sandbox/strom-0.8.1-nimbus/bin/storm jar App-1.0-SNAPSHOT testapp.storm.topology.AplusbTopology
集群模式:/home/work/sandbox/strom-0.8.1-nimbus/bin/storm jar App-1.0-SNAPSHOT testapp.storm.topology.AplusbTopology testapp
本地模式用于调试,可以直接观察到一些调试信息;集群模式就是正常的模式了,我在代码里,把加法的结果打印在了/home/work/sandbox/output/output.txt文件里
相关文章推荐
- 一种提升IT服务水平的方法论和最佳实践----- itil学习小结
- storm学习小结二:转载一个比较好的storm官方文档译文
- asp.net学习实践小结一
- jquery插件编写学习小结
- 软件工程(C编码实践篇)学习小结
- 《编写高质量代码 改善Python程序的91个建议》读后程序学习小结
- storm学习小结一:storm概述
- Devops学习实践(六) Eclipse集成TestNg,mock编写单元测试
- Storm学习笔记(Storm技术内幕语大数据实践书籍学习)
- 转行前端自我学习记录——freecodecamp项目实践小结
- LR学习笔记六 之 脚本编写实践过程
- Storm学习小结(二)——集成JDBC和Redis
- Android性能优化学习与实践小结
- Apache Strom学习笔记三:在storm集群上运行拓扑
- Web Server程序编写学习笔记(四)源代码
- XSLT 编写要点一(学习笔记)
- Page 的生命周期学习小结(翻译兼笔记)
- 学习C++小结
- STL学习小结(原创:桑英硕 )
- EJB中JNDI的逻辑名的使用及部署_{EJB之无状态会话Bean简单应用-学习与实践}续(inber原作)