您的位置:首页 > 其它

storm与kafka的结合(相同单词写到一个文件)

2014-09-29 22:27 302 查看
创建工程,导入storm和kafka的jar包

主类

package cn.itcast.topo;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import cn.itcast.bout.WordSpliter;
import cn.itcast.bout.WriterBolt;
import cn.itcast.scheme.MessageScheme;

public class KafaTopo {
public static void main(String[] args) {

//storm的spout作为kafka的消费者,声明要消费的主题数据
String topic = "worldcount";
//声明kafka将本身的信息注册到zookeeper的配置
String zkRoot = "/kafka-strom";
//消息源
String spoutId = "KafkaSpout";

//指定zookeeper的地址
ZkHosts brokerHosts = new ZkHosts("storm01:2181,storm02:2181,storm03:2181");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,topic , zkRoot, spoutId);
spoutConfig.forceFromStart = true;
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

//拓扑构建器
TopologyBuilder builder = new TopologyBuilder();
//storm提供了对kafka的支持类,KafkaSpout
builder.setSpout(spoutId, new KafkaSpout(spoutConfig));
builder.setBolt("world-spliter", new WordSpliter()).shuffleGrouping(spoutId);
builder.setBolt("writer", new WriterBolt(),4).fieldsGrouping("world-spliter", new Fields("word"));

//配置对想
Config config = new Config();
config.setNumWorkers(4);
config.setNumAckers(0);
//本地提交模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("WordCount", config, builder.createTopology());
}
}

序列化类(采用utf-8编码)

package cn.itcast.scheme;

import java.io.UnsupportedEncodingException;
import java.util.List;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class MessageScheme implements Scheme{

//序列化uid
private static final long serialVersionUID = 2785643173745033380L;

/**
* 反序列化,以utf-8的范式编码
*/
@Override
public List<Object> deserialize(byte[] bytes) {
String msg;
try {
msg = new String(bytes, "UTF-8");
return new Values(msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}

@Override
public Fields getOutputFields() {

return new Fields("msg");
}
}

bolt切分单词
package cn.itcast.bout;

import org.apache.commons.lang.StringUtils;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
* 切分每一行的单词
* @author Administrator
*
*/
public class WordSpliter extends BaseBasicBolt{

private static final long serialVersionUID = -1034769322377482248L;

@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String line = input.getString(0);
String[] words = line.split(" ");
for (String word : words) {
word = word.trim();
if(StringUtils.isNotBlank(word)){
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("word"));
}

}

bolt单词写入到磁盘

package cn.itcast.bout;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

/**
* 单词写入到磁盘
* @author Administrator
*
*/
public class WriterBolt extends BaseBasicBolt{

private static final long serialVersionUID = -3550075953729137756L;

private FileWriter writer = null;

@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
writer = new FileWriter("E://"+UUID.randomUUID().toString());
} catch (IOException e) {
throw new RuntimeException();
}
}

@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String s = input.getString(0);
try {
writer.write(s);
writer.write("\n");
writer.flush();
} catch (IOException e) {
throw new RuntimeException();
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm+kafka
相关文章推荐