storm与kafka的结合(相同单词写到一个文件)
2014-09-29 22:27
302 查看
创建工程,导入storm和kafka的jar包
主类
序列化类(采用utf-8编码)
bolt切分单词
package cn.itcast.bout;
import org.apache.commons.lang.StringUtils;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 切分每一行的单词
* @author Administrator
*
*/
public class WordSpliter extends BaseBasicBolt{
private static final long serialVersionUID = -1034769322377482248L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String line = input.getString(0);
String[] words = line.split(" ");
for (String word : words) {
word = word.trim();
if(StringUtils.isNotBlank(word)){
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("word"));
}
}
bolt单词写入到磁盘
package cn.itcast.bout;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
/**
* 单词写入到磁盘
* @author Administrator
*
*/
public class WriterBolt extends BaseBasicBolt{
private static final long serialVersionUID = -3550075953729137756L;
private FileWriter writer = null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
writer = new FileWriter("E://"+UUID.randomUUID().toString());
} catch (IOException e) {
throw new RuntimeException();
}
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String s = input.getString(0);
try {
writer.write(s);
writer.write("\n");
writer.flush();
} catch (IOException e) {
throw new RuntimeException();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
主类
package cn.itcast.topo; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import cn.itcast.bout.WordSpliter; import cn.itcast.bout.WriterBolt; import cn.itcast.scheme.MessageScheme; public class KafaTopo { public static void main(String[] args) { //storm的spout作为kafka的消费者,声明要消费的主题数据 String topic = "worldcount"; //声明kafka将本身的信息注册到zookeeper的配置 String zkRoot = "/kafka-strom"; //消息源 String spoutId = "KafkaSpout"; //指定zookeeper的地址 ZkHosts brokerHosts = new ZkHosts("storm01:2181,storm02:2181,storm03:2181"); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,topic , zkRoot, spoutId); spoutConfig.forceFromStart = true; spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); //拓扑构建器 TopologyBuilder builder = new TopologyBuilder(); //storm提供了对kafka的支持类,KafkaSpout builder.setSpout(spoutId, new KafkaSpout(spoutConfig)); builder.setBolt("world-spliter", new WordSpliter()).shuffleGrouping(spoutId); builder.setBolt("writer", new WriterBolt(),4).fieldsGrouping("world-spliter", new Fields("word")); //配置对想 Config config = new Config(); config.setNumWorkers(4); config.setNumAckers(0); //本地提交模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("WordCount", config, builder.createTopology()); } }
序列化类(采用utf-8编码)
package cn.itcast.scheme; import java.io.UnsupportedEncodingException; import java.util.List; import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class MessageScheme implements Scheme{ //序列化uid private static final long serialVersionUID = 2785643173745033380L; /** * 反序列化,以utf-8的范式编码 */ @Override public List<Object> deserialize(byte[] bytes) { String msg; try { msg = new String(bytes, "UTF-8"); return new Values(msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } @Override public Fields getOutputFields() { return new Fields("msg"); } }
bolt切分单词
package cn.itcast.bout;
import org.apache.commons.lang.StringUtils;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 切分每一行的单词
* @author Administrator
*
*/
public class WordSpliter extends BaseBasicBolt{
private static final long serialVersionUID = -1034769322377482248L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String line = input.getString(0);
String[] words = line.split(" ");
for (String word : words) {
word = word.trim();
if(StringUtils.isNotBlank(word)){
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("word"));
}
}
bolt单词写入到磁盘
package cn.itcast.bout;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
/**
* 单词写入到磁盘
* @author Administrator
*
*/
public class WriterBolt extends BaseBasicBolt{
private static final long serialVersionUID = -3550075953729137756L;
private FileWriter writer = null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
writer = new FileWriter("E://"+UUID.randomUUID().toString());
} catch (IOException e) {
throw new RuntimeException();
}
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String s = input.getString(0);
try {
writer.write(s);
writer.write("\n");
writer.flush();
} catch (IOException e) {
throw new RuntimeException();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
相关文章推荐
- storm和kafka结合的一个小问题
- 从一个名为 in_file.txt 的文本文件中读取单词,然后把每个词写到一个名为out_file.txt的输出文件中 并且每个词之间用空格分开
- 从零开始学java之IO流 使用字节流将多个相同格式的文本、视频、音乐写到一个文件中
- Shell: 如何计算一个文本文件中有多少个相同的单词??
- Storm结合kafka参数配置详解+代码示例(累计单词出现的次数)
- java生成一个XML文件,并且将该文件压缩成ZIP格式后再写到硬盘上
- 在一个目录及其所有子目录下所有的文件中查找某个单词
- C# Visual Studio 2005中将一个TextBox中的内容通过FileStream写到文件中
- 练习2-3:编一个程序用来打开文件并统计文件中以空格隔开的单词数目
- 在某一个目录中寻找与指定的文件内容相同的文件
- 统计一个文件的单词的个数
- 统计一个文件的单词的个数(2)
- [导入]给定一个英文原文,统计文件里面一共有多少个不同的英文单词
- 两台硬件和软件配置完全相同的机器A和B,现在要用系统自带的Copy功能把A上的一个文件,复制到B上。在哪台机器上执行程序,效率更高?
- 面试题,编写一个程序,将a.txt文件中的单词与b.txt文件中的单词交替合并到c.txt文件中,a.txt文件中的单词用回车符分隔,b.txt文件中用回车或空格进行分隔。
- 一种将压缩包中与另外一个目录中相同的文件提取出来的方法
- 把命令行参数中的前一个文件名标识的文件, 复制到后一个文件名标识的文件中, 如命令行中只有一个文件名则把该文件写到标准输出文件(显示器)中
- 给定一个英文原文,统计文件里面一共有多少个不同的英文单词
- 一个通过散列在文件夹里找出相同文件工具(译)
- 查找两个文件中相同的单词stl