您的位置:首页 > 其它

storm学习小结三:编写拓扑实践

2014-09-30 13:59 281 查看
本文在linux环境下安装配置storm,并成功运行一个topology,完成加法操作。

一.storm的安装和配置
    安装storm,需要依赖以几个系统:

1.zookeeper
    zookeeper的安装,请自行查找资料;

2.java的安装
    直接使用公司的某工具就可以安装完成,版本是6u45

3.jdk环境的配置(linux环境下)
    从sun的官网下载jdk-6u43-linux-x64.bin,放置在任一目录下;
    运行sh ./jdk-6u43-linux-x64.bin完成对jdk的安装
    然后在~/.bashrc下加入:export JAVA_HOME=/home/work/rp-product/jdk1.6.0_43
    并且运行 . ~/.bashrc

4.python的安装
    也是直接使用公司的某工具就可以安装完成,版本是2.6
    由于我的测试机在/usr/bin目录下已经有旧版的python二进制了,安装完成后,把他替换成新的。

5.zmp的安装
    下载tar包:http://download.zeromq.org/zeromq-2.1.7.tar.gz
    tar -zxvf ./zeromq-2.1.7.tar.gz
    cd zeromq-2.1.7.tar.gz
    ./configure
    make
    sudo make install

6.jzmq的安装
    从网上下载个jzmq.zip的压缩包,我是在公司内网下载的。
    unzip ./jzmq.zip
    cd jzmq-master
    ./autogen.sh
    ./configure
    make
    sudo make install

7.storm的安装
    下载tar包:https://github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
    unzip storm-0.8.1.zip
    cd storm-0.8.1
    修改conf/storm.yaml,加入以下几行就可以了:

    storm.zookeeper.servers:
        - "xxx.xxx.xxx.xxx"
    nimbus.host: "xxx.xxx.xxx.xxx"
    
    我把所有服务都搭在一台机器上,所以zk和nimbus.host设置成同一台机器的ip。并且把storm-0.8.1目录复制成storm-0.8.1-nimbus和storm-0.8.1-supervisor两个目录,分别作为nimbus和supervisor的运行环境。

8.运行storm
    cd storm-0.8.1-nimbus
    nohup ./bin/storm nimbus &
    nohup ./bin/storm ui & (不确定ui和nimbus运行的是同一个二进制有没有问题)   

    cd storm-0.8.1-supervisor
    nohup ./bin/storm supervisor & 
    由于是用作测试,就先在同一台机器上运行nimbus、ui和supervisor
    
二.topology的编写
    个人对java不熟,先写一个简单的拓扑,由spout发出一个如“2+3”这样的tuple,由bolt做加法计算。写拓扑前先搭建开发环境。

1.java环境搭建(windows环境)
    去官网上下载一个java se的windows版本,我选择的是:jdk-6u45-windows-x64.exe
    直接运行并安装到本地目录:D:\Program Files\Java\jdk1.6.0_45
    添加以下环境变量:
    path                D:\Program Files\Java\jdk1.6.0_45
    classpath        D:\Program Files\Java\jdk1.6.0_45\lib
    JAVA_HOME    D:\Program Files\Java\jdk1.6.0_45

2.maven
    使用maven这个项目管理工具的目的是为了方便的拉取写topology时需要的storm库,这也是storm官方推荐的一种做法。关于maven的使用,不在这里作介绍。
    去官网下载一个maven的windows客户端,我下载的是这个:http://mirrors.hust.edu.cn/apache/maven/maven-3/3.0.5/binaries/apache-maven-3.0.5-bin.zip
    将这个压缩包解压到任一个目录:D:\Program Files\apache-maven-3.0.5
    添加以下环境变量:
    M2_HOME        D:\Program Files\apache-maven-3.0.5
    path                   %M2_HOME%\bin

3.使用maven生成一个项目,用于编写topology
    在cmd中选定某个工作目录后,运行:mvn archetype:create
-DgroupId=testapp.storm.topology -DartifactId=aplusb
    这样,便会生成一个aplusb目录,并且该目录下有一个pom.xml文件,maven依靠他来构建项目。

4.修改pom.xml文件
    修改pom.xml文件,加入对storm的依赖。
    在<project> --> <repositories>标签下加入以下这个<repository>标签,用于表示storm依赖所在的仓库:
    <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
    </repository>
    在<project> --> <dependencies>标签下加入以下这个<dependency>标签,表示对storm的依赖:
    <dependency>  
        <groupId>storm</groupId>  
        <artifactId>storm</artifactId>  
        <version>0.8.1</version> 
    </dependency>

5.编写topology的java代码:
    代码是我从网上找的,做了一些修改,我直接贴上代码,文件是App\src\main\java\testapp\storm\topology\AplusbTopology.java

package testapp.storm.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class AplusbTopology {
public static void main(String[] args) {
try {
// 实例化TopologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
topologyBuilder.setSpout("AplusbSpout", new AplusbSpout(), 1);
// 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。
topologyBuilder.setBolt("AplusbBolt", new AplusbBolt(), 1).shuffleGrouping("AplusbSpout");
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
// 这里是本地模式下运行的启动代码。
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("aplusb", config, topologyBuilder.createTopology());
}

} catch (Exception e) {
e.printStackTrace();
}
}
}


6.编写spout的代码
    文件是App\src\main\java\testapp\storm\topology\AplusbSpout.java

package testapp.storm.topology;

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

@SuppressWarnings("serial")
public class AplusbSpout extends BaseRichSpout{
//用来发射数据的工具类
private SpoutOutputCollector collector;

Random random=new Random();

/**
* 初始化collector
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

/**
* 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用
*/
@Override
public void nextTuple() {
try {
String msg = random.nextInt(100) + "+" + random.nextInt(100);
// 调用发射方法
collector.emit(new Values(msg));
// 模拟等待100ms
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
* 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应
}

}


7.编写bolt代码
    文件是App\src\main\java\testapp\storm\topology\AplusbBolt.java

package testapp.storm.topology;

import java.util.Random;
import java.io.*;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

@SuppressWarnings("serial")
public class AplusbBolt extends BaseBasicBolt {

private File fileName = new File("/home/work/sandbox/output/output.txt");

public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString(0);
if (msg != null){
System.out.println(msg);
String[] strarray=msg.split("\\+");
int result = Integer.parseInt(strarray[0]) + Integer.parseInt(strarray[1]);
String result_str = Integer.toString(result);
collector.emit(new Values(msg + "=" + result_str));
PrintWriter outFile = new PrintWriter(fileName);
outFile.write(msg + "=" + result_str);
outFile.flush();
outFile.close();
}

} catch (Exception e) {
e.printStackTrace();
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result"));
}

}


8.编译
    直接使用maven来产生jar包,在App目录下运行:mvn package
    这个操作会在App\target目录下产生jar包:App-1.0-SNAPSHOT

9.运行
    把这个jar包拷贝到测试机上,使用storm工具来运行。
    本地模式:/home/work/sandbox/strom-0.8.1-nimbus/bin/storm jar App-1.0-SNAPSHOT testapp.storm.topology.AplusbTopology
    集群模式:/home/work/sandbox/strom-0.8.1-nimbus/bin/storm jar App-1.0-SNAPSHOT testapp.storm.topology.AplusbTopology testapp

    本地模式用于调试,可以直接观察到一些调试信息;集群模式就是正常的模式了,我在代码里,把加法的结果打印在了/home/work/sandbox/output/output.txt文件里
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: