您的位置:首页 > 其它

storm使用范例

2016-05-10 08:51 197 查看
此案例实现从数组中随机读取字符串发送到bolt,bolt将字符串变成大写发送到下一个bolt,bolt将字符串加上时间戳然后写到文件中

public class RandomWordSpout extends BaseRichSpout {
/**
* 数据源
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
String[] str = new String[]{"cloud","web","android","ios","java","bigdata","linux"};

/*
* 初始化方法,在spout组件实例化时调用一次
*/
@Override
public void open(Map map, TopologyContext tc, SpoutOutputCollector collector) {
this.collector = collector;
}

//不断地往下一个组件发送tuple消息
//这里面是该spout组件的核心逻辑
@Override
public void nextTuple() {
//可以从kafka消息队列中拿到数据,简便起见,我们从str数组中随机挑选一个发送出去
Random r = new Random();
int index = r.nextInt(str.length);
String job = str[index];
//将信息封装成tuple,发送消息给下一个组件
collector.emit(new Values(job));
//每发送一个消息,休眠500ms
Utils.sleep(500);
}

//声明本spout组件发送出去的tuple中的数据的字段名
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("orignname"));
}

}


public class HandlerBolt extends BaseBasicBolt {

private static final long serialVersionUID = 1L;

//业务处理逻辑
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String job = tuple.getString(0);//获取nextTuple()方法emit()过来的数据
String job_upper = job.toUpperCase();

collector.emit(new Values(job_upper));//处理完毕后向下一级发送
}

//声明该bolt组件要发出去的tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("job_upper"));
}

}


public class MoreBolt extends BaseBasicBolt {

private static final long serialVersionUID = 1L;
FileWriter fileWriter = null;

//在bolt组件运行过程中只会被调用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter = new FileWriter("/home/hadoop/storm/" + UUID.randomUUID());
} catch (IOException e) {

e.printStackTrace();
}

}

//该bolt组件的核心处理逻辑
//每收到一个tuple消息,就会被调用一次
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Date date = new Date();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
String d = format.format(date);
String job_upper = tuple.getString(0);
String jbo_date = job_upper + d;
try {
fileWriter.write(jbo_date);
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {

e.printStackTrace();
}
}

//本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {

}

}


public class TopoMain {

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
//将spout设置到topology中
//parallelism_hint:4 表示用4个excutor来执行这个组件
//setNumTasks(8),设置该组件执行时并发的task数量,也就意味着1个excutor会执行2个task
builder.setSpout("randomspout", new RandomWordSpout(),4).setNumTasks(8);
//将bolt设置到topology中,并且指定他接收randomspout组件的消息
builder.setBolt("upperjob", new HandlerBolt(),4).shuffleGrouping("randomspout");
//将bolt设置到topology中,并且指定他接收upperjob组件的消息
builder.setBolt("datejob", new MoreBolt(), 4).shuffleGrouping("upperjob");
//用builder来创建topology
StormTopology topology = builder.createTopology();

//配置一些topology在集群中运行的参数
Config conf = new Config();
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);

//conf.set("hbase.zookeeper.quorum", "lp5,lp6,lp7");

//提交
StormSubmitter.submitTopology("demo3", conf, topology);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: