flume+kafka+storm
2015-07-22 15:15
344 查看
[align=left]centos06.6+JDK1.7[/align]
[align=left]flume1.4+kafka2.10+storm0.9.3[/align]
[align=left]zookeeper3.4.6[/align]
[align=left]集群:[/align]
[align=left]192.168.80.133 x01[/align]
[align=left]192.168.80.134 x02[/align]
[align=left]1.两台机器上设置hostname和hosts[/align]
[align=left]。。。[/align]
[align=left]2.两台机器上安装JDK并设置环境变量[/align]
[align=left]3.下载安装zookeeper并设置环境变量[/align]
# example sakes.
dataDir=/data/zookeeper/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=x01:2888:3888
server.2=x02:2888:3888
复制代码
zkServer.sh startzkserver.sh status
[align=left]4.下载安装flume[/align]
[align=left]/article/6338829.html[/align]
[align=left]5.下载安装kafka[/align]
[align=left]/article/6338019.html[/align]
[align=left]6.整合flume和kafka[/align]
[align=left]下载整合插件flumeng-kafka-plugin:https://github.com/beyondj2ee/flumeng-kafka-plugin[/align]
[align=left]提取插件中的flume-conf.properties,修改后放到kafka的conf目录下[/align]
############################################
# producer config
###########################################
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
producer.sources.s.type = spooldir
producer.sources.s.spoolDir=/home/hadoop/testFlume
producer.sources.s.fileHeader=false
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=x01:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
复制代码
[align=left]将Plugin中的jar包拷贝到flume的lib目录中[/align]
[align=left]在/home/hadoop/testFlume中放入文件,在kafka中启用一个console的consumer来测试[/align]
bin/flume-ng agent -n producer -c conf -f conf/kafka.conf -Dflume.root.logger=DEBUG,console
bin/kafka-console-consumer.sh --zookeeper x01:2181 --topic test --from-beginning
[align=left]测试成功[/align]
[align=left]注意:如果让flume传输中文的话,文件编码最好是UTF-8,否则容易乱码导致flume死掉[/align]
[align=left]7.安装storm[/align]
[align=left]/article/6338035.html[/align]
[align=left]8.整合storm和kafka[/align]
将kafka的一些jar包复制到storm的lib目录中
cp kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/scala-library-2.10.1.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/metrics-core-2.2.0.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/snappy-java-1.0.5.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/log4j-1.2.15.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/slf4j-api-1.7.2.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/jopt-simple-3.2.jar apache-storm-0.9.3/lib/
把zookeeper的zookeeper-3.4.6.jar复制到storm的lib目录中
cp zookeeper-3.4.6/zookeeper-3.4.6.jar apache-storm-0.9.3/lib/
编写storm程序来测试
pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
spout
package org.admln.flume_kafka_storm;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class KafkaSpout extends BaseRichSpout {
private static final long serialVersionUID = -9174998944310422274L;
private SpoutOutputCollector collector;
private ConsumerConnector consumer;
private String topic;
public KafkaSpout() {}
public KafkaSpout(String topic) {
this.topic = topic;
}
public void nextTuple() { }
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void ack(Object msgId) { }
public void activate() {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
System.out.println("*********Results********topic:"+topic);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
while(it.hasNext()){
String value =new String(it.next().message());
System.out.println("storm接收到来自kafka的消息------->" + value);
collector.emit(new Values(value), value);
}
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put("zookeeper.connect","x01:2181,x02:2181");
// 设置group id
props.put("group.id", "1");
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put("auto.commit.interval.ms", "1000");
props.put("zookeeper.session.timeout.ms","10000");
return new ConsumerConfig(props);
}
public void close() { }
public void deactivate() { }
public void fail(Object msgId) { }
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
System.out.println("getComponentConfiguration被调用");
topic="test";
return null;
}
}
bolt(wordsplitter)
package org.admln.flume_kafka_storm;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class KafkaWordSplitterBolt extends BaseRichBolt {
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String line = input.getString(0);
String[] words = line.split(",");
for(String word : words) { //这里除了提交一个传向下个bolt的list集,还把tuple提交了,这是collector的emit方法之一,为了下面的ack错误恢复
collector.emit(input, new Values(word, 1));
}
collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
bolt(wordcount)
package org.admln.flume_kafka_storm;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
public class KafkaWordCounterBolt extends BaseRichBolt {
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map<String, AtomicInteger> counterMap;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap<String, AtomicInteger>();
}
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
AtomicInteger ai = this.counterMap.get(word);
if (ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
}
public void cleanup() {
Iterator<Entry<String, AtomicInteger>> iter = this.counterMap
.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
System.out.println(entry.getKey() + "\t:\t" + entry.getValue().get());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
topology
package org.admln.flume_kafka_storm;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class KafkaTopology {
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(""), 1); //bolt1 是此bolt在这个图中的ID //2表示启用多少线程来运行,可以省略,省略的话则默认分配一个线程
builder.setBolt("bolt1", new KafkaWordSplitterBolt(), 2)
.shuffleGrouping("spout");
builder.setBolt("bolt2", new KafkaWordCounterBolt(), 2).fieldsGrouping(
"bolt1", new Fields("word"));
String name = KafkaTopology.class.getSimpleName();
if (args != null && args.length > 0) {
Config conf = new Config();
// 通过指定nimbus主机
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar(name, conf,
builder.createTopology());
} else {
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 1);
conf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("my-flume-kafka-storm-topology-integration",
conf, builder.createTopology());
}
}
}
可以直接在eclipse中本地运行也可以放到集群上运行
集群上
bin/storm jar flume-kafka-storm.jar org.admln.flume_kafka_storm.KafkaToplology x01
[align=left]flume1.4+kafka2.10+storm0.9.3[/align]
[align=left]zookeeper3.4.6[/align]
[align=left]集群:[/align]
[align=left]192.168.80.133 x01[/align]
[align=left]192.168.80.134 x02[/align]
[align=left]1.两台机器上设置hostname和hosts[/align]
[align=left]。。。[/align]
[align=left]2.两台机器上安装JDK并设置环境变量[/align]
[align=left]3.下载安装zookeeper并设置环境变量[/align]
# example sakes.
dataDir=/data/zookeeper/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=x01:2888:3888
server.2=x02:2888:3888
复制代码
zkServer.sh startzkserver.sh status
[align=left]4.下载安装flume[/align]
[align=left]/article/6338829.html[/align]
[align=left]5.下载安装kafka[/align]
[align=left]/article/6338019.html[/align]
[align=left]6.整合flume和kafka[/align]
[align=left]下载整合插件flumeng-kafka-plugin:https://github.com/beyondj2ee/flumeng-kafka-plugin[/align]
[align=left]提取插件中的flume-conf.properties,修改后放到kafka的conf目录下[/align]
############################################
# producer config
###########################################
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
producer.sources.s.type = spooldir
producer.sources.s.spoolDir=/home/hadoop/testFlume
producer.sources.s.fileHeader=false
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=x01:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
复制代码
[align=left]将Plugin中的jar包拷贝到flume的lib目录中[/align]
[align=left]在/home/hadoop/testFlume中放入文件,在kafka中启用一个console的consumer来测试[/align]
bin/flume-ng agent -n producer -c conf -f conf/kafka.conf -Dflume.root.logger=DEBUG,console
bin/kafka-console-consumer.sh --zookeeper x01:2181 --topic test --from-beginning
[align=left]测试成功[/align]
[align=left]注意:如果让flume传输中文的话,文件编码最好是UTF-8,否则容易乱码导致flume死掉[/align]
[align=left]7.安装storm[/align]
[align=left]/article/6338035.html[/align]
[align=left]8.整合storm和kafka[/align]
将kafka的一些jar包复制到storm的lib目录中
cp kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/scala-library-2.10.1.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/metrics-core-2.2.0.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/snappy-java-1.0.5.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/log4j-1.2.15.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/slf4j-api-1.7.2.jar apache-storm-0.9.3/lib/
cp kafka_2.10-0.8.1.1/libs/jopt-simple-3.2.jar apache-storm-0.9.3/lib/
把zookeeper的zookeeper-3.4.6.jar复制到storm的lib目录中
cp zookeeper-3.4.6/zookeeper-3.4.6.jar apache-storm-0.9.3/lib/
编写storm程序来测试
pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
spout
package org.admln.flume_kafka_storm;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class KafkaSpout extends BaseRichSpout {
private static final long serialVersionUID = -9174998944310422274L;
private SpoutOutputCollector collector;
private ConsumerConnector consumer;
private String topic;
public KafkaSpout() {}
public KafkaSpout(String topic) {
this.topic = topic;
}
public void nextTuple() { }
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void ack(Object msgId) { }
public void activate() {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
System.out.println("*********Results********topic:"+topic);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
while(it.hasNext()){
String value =new String(it.next().message());
System.out.println("storm接收到来自kafka的消息------->" + value);
collector.emit(new Values(value), value);
}
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put("zookeeper.connect","x01:2181,x02:2181");
// 设置group id
props.put("group.id", "1");
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put("auto.commit.interval.ms", "1000");
props.put("zookeeper.session.timeout.ms","10000");
return new ConsumerConfig(props);
}
public void close() { }
public void deactivate() { }
public void fail(Object msgId) { }
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
System.out.println("getComponentConfiguration被调用");
topic="test";
return null;
}
}
bolt(wordsplitter)
package org.admln.flume_kafka_storm;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class KafkaWordSplitterBolt extends BaseRichBolt {
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String line = input.getString(0);
String[] words = line.split(",");
for(String word : words) { //这里除了提交一个传向下个bolt的list集,还把tuple提交了,这是collector的emit方法之一,为了下面的ack错误恢复
collector.emit(input, new Values(word, 1));
}
collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
bolt(wordcount)
package org.admln.flume_kafka_storm;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
public class KafkaWordCounterBolt extends BaseRichBolt {
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map<String, AtomicInteger> counterMap;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap<String, AtomicInteger>();
}
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
AtomicInteger ai = this.counterMap.get(word);
if (ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
}
public void cleanup() {
Iterator<Entry<String, AtomicInteger>> iter = this.counterMap
.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
System.out.println(entry.getKey() + "\t:\t" + entry.getValue().get());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
topology
package org.admln.flume_kafka_storm;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class KafkaTopology {
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(""), 1); //bolt1 是此bolt在这个图中的ID //2表示启用多少线程来运行,可以省略,省略的话则默认分配一个线程
builder.setBolt("bolt1", new KafkaWordSplitterBolt(), 2)
.shuffleGrouping("spout");
builder.setBolt("bolt2", new KafkaWordCounterBolt(), 2).fieldsGrouping(
"bolt1", new Fields("word"));
String name = KafkaTopology.class.getSimpleName();
if (args != null && args.length > 0) {
Config conf = new Config();
// 通过指定nimbus主机
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar(name, conf,
builder.createTopology());
} else {
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 1);
conf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("my-flume-kafka-storm-topology-integration",
conf, builder.createTopology());
}
}
}
可以直接在eclipse中本地运行也可以放到集群上运行
集群上
bin/storm jar flume-kafka-storm.jar org.admln.flume_kafka_storm.KafkaToplology x01
相关文章推荐
- Codeforces469C——构造——Game
- http状态码 200 304 404 503等
- js中下载弹窗链接
- 智能指针总结及应用(一)
- hdu 5294 Tricks Device
- SecureCRT 常用命令大全
- .NET知识点构架
- hdu5288 多校
- Xshell连接Linux服务器总掉线
- Zookeeper Introduce Zookeeper原理介绍
- 几种线程池的实现算法分析
- nginx 1.9 + tomcat8 + nfs 配置集群环境
- android官方技术文档翻译——设计时布局属性
- java SpringMVC 邮件发送配置及Code
- Spring配置声明
- lazy instructor
- PHP empty()函数说明---用了N遍了就是记不住
- SpringJDBC的简单应用
- GRE写作必备句型
- 150722