storm和kafka结合的一个小问题
2017-05-13 03:37
399 查看
参考文档中说到
打包上传到服务器,运行
Storm jar jarname CountTopology 回车,会看到他在等待数据传入。
这个时候运行kafka消费者程序,将数据输出,则会看到storm 会迅速输出数据和统计数目。
这里测试不写了。
正确的说法是:
是运行kafka生产者程序,将数据输入到storm,这时会看到storm 会迅速输出数据和统计数目。
因为这篇文字的开头说“这里的目标是kafka 负责生产数据,storm 消费数据并将结果输出”
参考文档
http://blog.csdn.net/looklook5/article/details/41749523
为对比,将原文放置如下
这里的目标是kafka 负责生产数据,storm 消费数据并将结果输出
https://github.com/wurstmeister/storm-kafka-0.8-plus
下载、解压以及将这个目录下的代码添加进项目
storm-kafka-0.8-plus-master\storm-kafka-0.8-plus-master\src\jvm
将kafka 和 storm 的JAR 添加进项目,作为依赖jar 包
然后添加com.netflix.curator 的相关包括client、framework和recipes
下载地址:http://maven.outofmemory.cn/com.netflix.curator/
最新的所有com.google.common类,下载地址
http://central.maven.org/maven2/com/google/guava/guava/18.0/guava-18.0.jar
这样storm-kafka-0.8-plus项目应该就不会报错了。
http://blog.csdn.net/looklook5/article/details/41248561
地址是:
https://github.com/wurstmeister/storm-kafka-0.8-plus-test/blob/master/src/main/java/storm/kafka/KafkaSpoutTestTopology.java
代码如下:
[java] view
plain copy
package storm.kafka;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
public class KafkaSpoutTestTopology {
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestTopology.class);
public static class PrinterBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
LOG.info(tuple.toString());
}
}
private final BrokerHosts brokerHosts;
public KafkaSpoutTestTopology(String kafkaZookeeper) {
brokerHosts = new ZkHosts(kafkaZookeeper);
}
public StormTopology buildTopology() {
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "storm-sentence", "", "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
return builder.createTopology();
}
public static void main(String[] args) throws Exception {
String kafkaZk = args[0];
KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
Config config = new Config();
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
if (args != null && args.length > 1) {
String name = args[1];
String dockerIp = args[2];
config.setNumWorkers(2);
config.setMaxTaskParallelism(5);
config.put(Config.NIMBUS_HOST, dockerIp);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
StormSubmitter.submitTopology(name, config, stormTopology);
} else {
config.setNumWorkers(2);
config.setMaxTaskParallelism(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, stormTopology);
}
}
}
这里清晰的写出了创建一个与kafka整合的storm Topology,观察main 函数,从上往下看:
下面是关于zookeeper的设定以及spout和bolt 的设定
[java] view
plain copy
String kafkaZk = args[0];
KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
下面的语句中,storm-sentence是话题,下面的语句是要求在zookeeper 服务器中在根目录创建文件夹storm,用于kafka存放zookeeper相关数据
[java] view
plain copy
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, " storm-sentence ", "", "storm");
builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); 这里是设定spout,负责从kafka消费数据,其中word 是spout 名称,KafkaSpout 由storm-kafka-0.8-plus 提供,10为并发数。
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words"); 这个是设定spout 接下去的bolt, PrinterBolt看名称应该负责打印bolt的数据的类。shuffleGrouping("words")表示数据是采用随机模式。后面接的数据来自与叫做words的spout
下面是设置Topology的相关设定
[java] view
plain copy
Config config = new Config(); 初始化一个storm设置
config.setNumWorkers(2); 这个代表分配2个Worker。
StormSubmitter.submitTopology(args[0], config, builder.createTopology()); 这个表示想Storm 服务器提交Topology任务,其中第一个参数是Topology的name.
config.setMaxTaskParallelism(3); 一个work的最大并发数为3
LocalCluster cluster = new LocalCluster(); 开启Storm本地模式
cluster.submitTopology("special-topology", config, builder.createTopology()); 在本地网模式下提交storm任务。
cluster.shutdown(); 关闭Storm本地模式。
下面是我修改后的脚本
[java] view
plain copy
import com.google.common.collect.ImmutableList;
import com.ks.bolt.CounterBolt;
import com.ks.bolt.DateCutBolt;
import com.ks.bolt.InsertMysqlBolt;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
public class CountTopology {
/**
* @param args
*/
public static void main(String[] args) {
try{
String kafkaZookeeper = "carl:2181,slave1:2181,slave2:2181";
BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.zkServers = ImmutableList.of("carl","slave1","slave2");
kafkaConfig.zkPort = 2181;
//kafkaConfig.forceFromStart = true;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(kafkaConfig), 2);
//*************************下面是所有处理逻辑,只关注这个*****************************
builder.setBolt("datecut", new CounterBolt(),1).shuffleGrouping("spout");
//*************************下面是所有处理逻辑,只关注这个*****************************
Config config = new Config();
config.setDebug(true);
if(args!=null && args.length > 0) {
config.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("special-topology", config, builder.createTopology());
Thread.sleep(500000);
cluster.shutdown();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
这里在本地模式下让他运行20秒钟自动结束,因为这个比较耗资源。注意以下这句,
[java] view
plain copy
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
请记得在zookeeper 根目录下面创建文件夹storm,然后在storm 文件夹下面继续创建文件夹stormid 用于存放kafka信息数据
上面的Topology 设定了bolt 为CounterBolt,因此还要建一个CounterBolt的bolt 类。
这里设定了,运行jar包敲参数为提交到storm服务器,不敲参数则是运行storm本地模式。
[java] view
plain copy
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class CounterBolt extends BaseBasicBolt {
/**
*
*/
private static final long serialVersionUID = -5508421065181891596L;
private static long counter = 0;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("msg = "+tuple.getString(0)+" -------------counter = "+(counter++));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
这里很简单就是将bolt 获取的数据进行简单的输出,并统计接收到的数据条目数。这里继续BaseBasicBolt 类,因为这样开发会比较简单。因为这个是唯一的bolt,没有输出,因此在declareOutputFields 方法中不需要声明output。
System.out.println("msg = "+ tuple.getString(0)+"-------------counter = "+(counter++));
这里tuple就是这个bolt 从上一个spout获取的数据集合。
这里是控制台输出,因此请用本地模式进行调试。
打包上传到服务器,运行
Storm jar jarname CountTopology 回车,会看到他在等待数据传入。
这个时候运行kafka消费者程序,将数据输出,则会看到storm 会迅速输出数据和统计数目。
这里测试不写了。
打包上传到服务器,运行
Storm jar jarname CountTopology 回车,会看到他在等待数据传入。
这个时候运行kafka消费者程序,将数据输出,则会看到storm 会迅速输出数据和统计数目。
这里测试不写了。
正确的说法是:
是运行kafka生产者程序,将数据输入到storm,这时会看到storm 会迅速输出数据和统计数目。
因为这篇文字的开头说“这里的目标是kafka 负责生产数据,storm 消费数据并将结果输出”
参考文档
http://blog.csdn.net/looklook5/article/details/41749523
为对比,将原文放置如下
这里的目标是kafka 负责生产数据,storm 消费数据并将结果输出
一、wurstmeister/storm-kafka-0.8-plus
这里用的是引进别人家写的整合代码,因为使用的人也比较多,下面是项目地址https://github.com/wurstmeister/storm-kafka-0.8-plus
下载、解压以及将这个目录下的代码添加进项目
storm-kafka-0.8-plus-master\storm-kafka-0.8-plus-master\src\jvm
将kafka 和 storm 的JAR 添加进项目,作为依赖jar 包
然后添加com.netflix.curator 的相关包括client、framework和recipes
下载地址:http://maven.outofmemory.cn/com.netflix.curator/
最新的所有com.google.common类,下载地址
http://central.maven.org/maven2/com/google/guava/guava/18.0/guava-18.0.jar
这样storm-kafka-0.8-plus项目应该就不会报错了。
二、kafka 生产者的创建
在我的这篇文章里3.6、Producer Java API,有生产者的例子,可以拿来直接用。http://blog.csdn.net/looklook5/article/details/41248561
三、创建消费 kafka 数据的Topology
storm-kafka-0.8-plus 给我们写了个测试代码地址是:
https://github.com/wurstmeister/storm-kafka-0.8-plus-test/blob/master/src/main/java/storm/kafka/KafkaSpoutTestTopology.java
代码如下:
[java] view
plain copy
package storm.kafka;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
public class KafkaSpoutTestTopology {
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestTopology.class);
public static class PrinterBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
LOG.info(tuple.toString());
}
}
private final BrokerHosts brokerHosts;
public KafkaSpoutTestTopology(String kafkaZookeeper) {
brokerHosts = new ZkHosts(kafkaZookeeper);
}
public StormTopology buildTopology() {
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "storm-sentence", "", "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
return builder.createTopology();
}
public static void main(String[] args) throws Exception {
String kafkaZk = args[0];
KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
Config config = new Config();
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
if (args != null && args.length > 1) {
String name = args[1];
String dockerIp = args[2];
config.setNumWorkers(2);
config.setMaxTaskParallelism(5);
config.put(Config.NIMBUS_HOST, dockerIp);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
StormSubmitter.submitTopology(name, config, stormTopology);
} else {
config.setNumWorkers(2);
config.setMaxTaskParallelism(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, stormTopology);
}
}
}
这里清晰的写出了创建一个与kafka整合的storm Topology,观察main 函数,从上往下看:
下面是关于zookeeper的设定以及spout和bolt 的设定
[java] view
plain copy
String kafkaZk = args[0];
KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
下面的语句中,storm-sentence是话题,下面的语句是要求在zookeeper 服务器中在根目录创建文件夹storm,用于kafka存放zookeeper相关数据
[java] view
plain copy
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, " storm-sentence ", "", "storm");
builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); 这里是设定spout,负责从kafka消费数据,其中word 是spout 名称,KafkaSpout 由storm-kafka-0.8-plus 提供,10为并发数。
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words"); 这个是设定spout 接下去的bolt, PrinterBolt看名称应该负责打印bolt的数据的类。shuffleGrouping("words")表示数据是采用随机模式。后面接的数据来自与叫做words的spout
下面是设置Topology的相关设定
[java] view
plain copy
Config config = new Config(); 初始化一个storm设置
config.setNumWorkers(2); 这个代表分配2个Worker。
StormSubmitter.submitTopology(args[0], config, builder.createTopology()); 这个表示想Storm 服务器提交Topology任务,其中第一个参数是Topology的name.
config.setMaxTaskParallelism(3); 一个work的最大并发数为3
LocalCluster cluster = new LocalCluster(); 开启Storm本地模式
cluster.submitTopology("special-topology", config, builder.createTopology()); 在本地网模式下提交storm任务。
cluster.shutdown(); 关闭Storm本地模式。
下面是我修改后的脚本
[java] view
plain copy
import com.google.common.collect.ImmutableList;
import com.ks.bolt.CounterBolt;
import com.ks.bolt.DateCutBolt;
import com.ks.bolt.InsertMysqlBolt;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
public class CountTopology {
/**
* @param args
*/
public static void main(String[] args) {
try{
String kafkaZookeeper = "carl:2181,slave1:2181,slave2:2181";
BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.zkServers = ImmutableList.of("carl","slave1","slave2");
kafkaConfig.zkPort = 2181;
//kafkaConfig.forceFromStart = true;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(kafkaConfig), 2);
//*************************下面是所有处理逻辑,只关注这个*****************************
builder.setBolt("datecut", new CounterBolt(),1).shuffleGrouping("spout");
//*************************下面是所有处理逻辑,只关注这个*****************************
Config config = new Config();
config.setDebug(true);
if(args!=null && args.length > 0) {
config.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("special-topology", config, builder.createTopology());
Thread.sleep(500000);
cluster.shutdown();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
这里在本地模式下让他运行20秒钟自动结束,因为这个比较耗资源。注意以下这句,
[java] view
plain copy
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
请记得在zookeeper 根目录下面创建文件夹storm,然后在storm 文件夹下面继续创建文件夹stormid 用于存放kafka信息数据
上面的Topology 设定了bolt 为CounterBolt,因此还要建一个CounterBolt的bolt 类。
这里设定了,运行jar包敲参数为提交到storm服务器,不敲参数则是运行storm本地模式。
四、创建数据输出的Bolt
这里实现一个十分简单的bolt 类[java] view
plain copy
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class CounterBolt extends BaseBasicBolt {
/**
*
*/
private static final long serialVersionUID = -5508421065181891596L;
private static long counter = 0;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("msg = "+tuple.getString(0)+" -------------counter = "+(counter++));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
这里很简单就是将bolt 获取的数据进行简单的输出,并统计接收到的数据条目数。这里继续BaseBasicBolt 类,因为这样开发会比较简单。因为这个是唯一的bolt,没有输出,因此在declareOutputFields 方法中不需要声明output。
System.out.println("msg = "+ tuple.getString(0)+"-------------counter = "+(counter++));
这里tuple就是这个bolt 从上一个spout获取的数据集合。
这里是控制台输出,因此请用本地模式进行调试。
打包上传到服务器,运行
Storm jar jarname CountTopology 回车,会看到他在等待数据传入。
这个时候运行kafka消费者程序,将数据输出,则会看到storm 会迅速输出数据和统计数目。
这里测试不写了。
相关文章推荐
- storm与kafka的结合(相同单词写到一个文件)
- hadoop和hive结合使用总结(关于一个稳定性的问题)
- Page.FindControl(string id) 与母版页结合后发现的一个问题
- storm、hbase、kafka整合过程中遇到的log4j冲突问题
- storm 集成kafka时遇见的问题
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- 做一个项目结合.net和oracle碰到的问题总结
- flume+kafka+storm整合02---问题
- storm与kafka结合
- storm 0.10.0 kafkaSpout 总是读取旧消息 offset丢失问题
- storm、hbase、kafka整合过程中遇到的log4j冲突问题
- png库结合zlib库使用出现的一个链接问题的解决
- storm-kafka常见问题(更新中)
- (三)storm-kafka源代码走读之怎样构建一个KafkaSpout
- storm-kafka数据流量问题
- 【原】storm源码之一个class解决nimbus单点问题
- storm源码之一个class解决nimbus单点问题【转】
- storm源码之一个class解决nimbus单点问题
- kafka启动过程中的一个小问题
- storm组件初始化问题及与spring的结合方式