您的位置:首页 > 大数据 > Hadoop

Kafka+Storm+HDFS整合实践

2015-05-06 11:18 295 查看
在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

直接使用Storm的Topology对数据进行实时分析处理

整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:

zookeeper-3.4.5.tar.gz

kafka_2.9.2-0.8.1.1.tgz

apache-storm-0.9.2-incubating.tar.gz

hadoop-2.2.0.tar.gz

程序配置运行所基于的操作系统为CentOS 5.11。

Kafka安装配置

我们使用3台机器搭建Kafka集群:

1
192.168.4.142   h1
2
192.168.4.143   h2
3
192.168.4.144   h3
在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令:

1
cd
/usr/
local
/
2
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
3
tar
xvzf kafka_2.9.2-0.8.1.1.tgz
4
ln
-s /usr/
local
/kafka_2.9.2-0.8.1.1 /usr/
local
/kafka
5
chown
-R kafka:kafka /usr/
local
/kafka_2.9.2-0.8.1.1 /usr/
local
/kafka
修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:

1
broker.id=0
2
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:

1
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:

1
cd
/usr/
local
/zookeeper
2
bin/zkCli.sh
在ZooKeeper执行如下命令创建chroot路径:

1
create /kafka ''
这样,每次连接Kafka集群的时候(使用
--zookeeper
选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:

1
scp
-r /usr/
local
/kafka_2.9.2-0.8.1.1/ h2:/usr/
local
/
2
scp
-r /usr/
local
/kafka_2.9.2-0.8.1.1/ h3:/usr/
local
/
最后,在h2、h3节点上配置,执行如下命令:

1
cd
/usr/
local
/
2
ln
-s /usr/
local
/kafka_2.9.2-0.8.1.1 /usr/
local
/kafka
3
chown
-R kafka:kafka /usr/
local
/kafka_2.9.2-0.8.1.1 /usr/
local
/kafka
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:

1
broker.id=1  # 在h1修改
2
3
broker.id=2  # 在h2修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:

1
bin/kafka-server-start.sh /usr/
local
/kafka/config/server.properties &
可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:

1
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
查看创建的Topic,执行如下命令:

1
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
结果信息如下所示:

1
Topic:my-replicated-topic5PartitionCount:5ReplicationFactor:3Configs:
2
Topic: my-replicated-topic5Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1
3
Topic: my-replicated-topic5Partition: 1Leader: 0Replicas: 1,0,2Isr: 0,2,1
4
Topic: my-replicated-topic5Partition: 2Leader: 2Replicas: 2,1,0Isr: 2,0,1
5
Topic: my-replicated-topic5Partition: 3Leader: 0Replicas: 0,1,2Isr: 0,2,1
6
Topic: my-replicated-topic5Partition: 4Leader: 2Replicas: 1,2,0Isr: 2,0,1
上面Leader、Replicas、Isr的含义如下:

1
Partition: 分区
2
Leader   : 负责读写指定分区的节点
3
Replicas : 复制该分区log的节点列表
4
Isr : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:

1
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

1
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

Storm安装配置

Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

1
192.168.4.142   h1
2
192.168.4.143   h2
3
192.168.4.144   h3
首先,在h1节点上,执行如下命令安装:

1
cd
/usr/
local
/
2
wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.
tar
.gz
3
tar
xvzf apache-storm-0.9.2-incubating.
tar
.gz
4
ln
-s /usr/
local
/apache-storm-0.9.2-incubating /usr/
local
/storm
5
chown
-R storm:storm /usr/
local
/apache-storm-0.9.2-incubating /usr/
local
/storm
然后,修改配置文件conf/storm.yaml,内容如下所示:

01
storm.zookeeper.servers:
02
- "h1"
03
- "h2"
04
- "h3"
05
storm.zookeeper.port: 2181
06
#
07
nimbus.host: "h1"
08
09
supervisor.slots.ports:
10
- 6700
11
- 6701
12
- 6702
13
- 6703
14
15
storm.local.dir: "/tmp/storm"
将配置好的安装文件,分发到其他节点上:

1
scp
-r /usr/
local
/apache-storm-0.9.2-incubating/ h2:/usr/
local
/
2
scp
-r /usr/
local
/apache-storm-0.9.2-incubating/ h3:/usr/
local
/
最后,在h2、h3节点上配置,执行如下命令:

1
cd
/usr/
local
/
2
ln
-s /usr/
local
/apache-storm-0.9.2-incubating /usr/
local
/storm
3
chown
-R storm:storm /usr/
local
/apache-storm-0.9.2-incubating /usr/
local
/storm
Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:

1
bin/storm nimbus &
2
bin/storm supervisor &
为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:

1
bin/storm ui &
这样可以通过访问http://h2:8080/来查看Topology的运行状况。

整合Kafka+Storm

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:

01
<
dependency
>
02
<
groupId
>org.apache.storm</
groupId
>
03
<
artifactId
>storm-core</
artifactId
>
04
<
version
>0.9.2-incubating</
version
>
05
<
scope
>provided</
scope
>
06
</
dependency
>
07
<
dependency
>
08
<
groupId
>org.apache.storm</
groupId
>
09
<
artifactId
>storm-kafka</
artifactId
>
10
<
version
>0.9.2-incubating</
version
>
11
</
dependency
>
12
<
dependency
>
13
<
groupId
>org.apache.kafka</
groupId
>
14
<
artifactId
>kafka_2.9.2</
artifactId
>
15
<
version
>0.8.1.1</
version
>
16
<
exclusions
>
17
<
exclusion
>
18
<
groupId
>org.apache.zookeeper</
groupId
>
19
<
artifactId
>zookeeper</
artifactId
>
20
</
exclusion
>
21
<
exclusion
>
22
<
groupId
>log4j</
groupId
>
23
<
artifactId
>log4j</
artifactId
>
24
</
exclusion
>
25
</
exclusions
>
26
</
dependency
>
下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:

001
package
org.shirdrn.storm.examples;
002
003
import
java.util.Arrays;
004
import
java.util.HashMap;
005
import
java.util.Iterator;
006
import
java.util.Map;
007
import
java.util.Map.Entry;
008
import
java.util.concurrent.atomic.AtomicInteger;
009
010
import
org.apache.commons.logging.Log;
011
import
org.apache.commons.logging.LogFactory;
012
013
import
storm.kafka.BrokerHosts;
014
import
storm.kafka.KafkaSpout;
015
import
storm.kafka.SpoutConfig;
016
import
storm.kafka.StringScheme;
017
import
storm.kafka.ZkHosts;
018
import
backtype.storm.Config;
019
import
backtype.storm.LocalCluster;
020
import
backtype.storm.StormSubmitter;
021
import
backtype.storm.generated.AlreadyAliveException;
022
import
backtype.storm.generated.InvalidTopologyException;
023
import
backtype.storm.spout.SchemeAsMultiScheme;
024
import
backtype.storm.task.OutputCollector;
025
import
backtype.storm.task.TopologyContext;
026
import
backtype.storm.topology.OutputFieldsDeclarer;
027
import
backtype.storm.topology.TopologyBuilder;
028
import
backtype.storm.topology.base.BaseRichBolt;
029
import
backtype.storm.tuple.Fields;
030
import
backtype.storm.tuple.Tuple;
031
import
backtype.storm.tuple.Values;
032
033
public
class
MyKafkaTopology {
034
035
public
static
class
KafkaWordSplitter
extends
BaseRichBolt {
036
037
private
static
final
Log LOG = LogFactory.getLog(KafkaWordSplitter.
class
);
038
private
static
final
long
serialVersionUID = 886149197481637894L;
039
private
OutputCollector collector;
040
 
041
@Override
042
public
void
prepare(Map stormConf,TopologyContext context,
043
OutputCollector collector) {
044
this
.collector = collector;
045
}
046
047
@Override
048
public
void
execute(Tuple input) {
049
String line = input.getString(
0
);
050
LOG.info(
"RECV[kafka ->splitter] "
+line);
051
String[] words = line.split(
"\\s+"
);
052
for
(String word : words) {
053
LOG.info(
"EMIT[splitter ->counter] "
+word);
054
collector.emit(input,
new
Values(word,
1
));
055
}
056
collector.ack(input);
057
}
058
059
@Override
060
public
void
declareOutputFields(OutputFieldsDeclarer declarer) {
061
declarer.declare(
new
Fields(
"word"
,
"count"
));
062
}
063
 
064
}
065
 
066
public
static
class
WordCounter
extends
BaseRichBolt {
067
068
private
static
final
Log LOG = LogFactory.getLog(WordCounter.
class
);
069
private
static
final
long
serialVersionUID = 886149197481637894L;
070
private
OutputCollector collector;
071
private
Map<String,AtomicInteger>counterMap;
072
 
073
@Override
074
public
void
prepare(Map stormConf,TopologyContext context,
075
OutputCollector collector) {
076
this
.collector = collector;
077
this
.counterMap =
new
HashMap<String,AtomicInteger>();
078
}
079
080
@Override
081
public
void
execute(Tuple input) {
082
String word = input.getString(
0
);
083
int
count = input.getInteger(
1
);
084
LOG.info(
"RECV[splitter ->counter] "
+word +
" : "
+count);
085
AtomicInteger ai =
this
.counterMap.get(word);
086
if
(ai ==
null
) {
087
ai =
new
AtomicInteger();
088
this
.counterMap.put(word,ai);
089
}
090
ai.addAndGet(count);
091
collector.ack(input);
092
LOG.info(
"CHECK statistics map: "
+
this
.counterMap);
093
}
094
095
@Override
096
public
void
cleanup() {
097
LOG.info(
"The final result:"
);
098
Iterator<Entry<String,AtomicInteger>>iter =
this
.counterMap.entrySet().iterator();
099
while
(iter.hasNext()) {
100
Entry<String,AtomicInteger>entry = iter.next();
101
LOG.info(entry.getKey() +
"\t:\t"
+entry.getValue().get());
102
}
103
 
104
}
105
106
@Override
107
public
void
declareOutputFields(OutputFieldsDeclarer declarer) {
108
declarer.declare(
new
Fields(
"word"
,
"count"
));
109
}
110
}
111
 
112
public
static
void
main(String[] args)
throws
AlreadyAliveException,InvalidTopologyException,InterruptedException {
113
String zks =
"h1:2181,h2:2181,h3:2181"
;
114
String topic =
"my-replicated-topic5"
;
115
String zkRoot =
"/storm"
;
// default zookeeper root configuration for storm
116
String id =
"word"
;
117
 
118
BrokerHosts brokerHosts =
new
ZkHosts(zks);
119
SpoutConfig spoutConf =
new
SpoutConfig(brokerHosts,topic,zkRoot,id);
120
spoutConf.scheme =
new
SchemeAsMultiScheme(
new
StringScheme());
121
spoutConf.forceFromStart =
false
;
122
spoutConf.zkServers = Arrays.asList(
new
String[] {
"h1"
,
"h2"
,
"h3"
});
123
spoutConf.zkPort =
2181
;
124
 
125
TopologyBuilder builder =
new
TopologyBuilder();
126
builder.setSpout(
"kafka-reader"
,
new
KafkaSpout(spoutConf),
5
);
// Kafka我们创建了一个5分区的Topic,这里并行度设置为5
127
builder.setBolt(
"word-splitter"
,
new
KafkaWordSplitter(),
2
).shuffleGrouping(
"kafka-reader"
);
128
builder.setBolt(
"word-counter"
,
new
WordCounter()).fieldsGrouping(
"word-splitter"
,
new
Fields(
"word"
));
129
 
130
Config conf =
new
Config();
131
 
132
String name = MyKafkaTopology.
class
.getSimpleName();
133
if
(args !=
null
&& args.length >
0
) {
134
// Nimbus host name passed from command line
135
conf.put(Config.NIMBUS_HOST,args[
0
]);
136
conf.setNumWorkers(
3
);
137
StormSubmitter.submitTopologyWithProgressBar(name,conf,builder.createTopology());
138
}
else
{
139
conf.setMaxTaskParallelism(
3
);
140
LocalCluster cluster =
new
LocalCluster();
141
cluster.submitTopology(name,conf,builder.createTopology());
142
Thread.sleep(
60000
);
143
cluster.shutdown();
144
}
145
}
146
}
上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:

1
cp
/usr/
local
/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/
local
/storm/lib/
2
cp
/usr/
local
/kafka/libs/scala-library-2.9.2.jar /usr/
local
/storm/lib/
3
cp
/usr/
local
/kafka/libs/metrics-core-2.2.0.jar /usr/
local
/storm/lib/
4
cp
/usr/
local
/kafka/libs/snappy-java-1.0.5.jar /usr/
local
/storm/lib/
5
cp
/usr/
local
/kafka/libs/zkclient-0.3.jar /usr/
local
/storm/lib/
6
cp
/usr/
local
/kafka/libs/log4j-1.2.15.jar /usr/
local
/storm/lib/
7
cp
/usr/
local
/kafka/libs/slf4j-api-1.7.2.jar /usr/
local
/storm/lib/
8
cp
/usr/
local
/kafka/libs/jopt-simple-3.2.jar /usr/
local
/storm/lib/
然后,就可以提交我们开发的Topology程序了:

1
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1
可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:

1
spoutConf.forceFromStart =
false
;
该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。

整合Storm+HDFS

Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:

001
package
org.shirdrn.storm.examples;
002
003
import
java.text.DateFormat;
004
import
java.text.SimpleDateFormat;
005
import
java.util.Date;
006
import
java.util.Map;
007
import
java.util.Random;
008
009
import
org.apache.commons.logging.Log;
010
import
org.apache.commons.logging.LogFactory;
011
import
org.apache.storm.hdfs.bolt.HdfsBolt;
012
import
org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
013
import
org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
014
import
org.apache.storm.hdfs.bolt.format.FileNameFormat;
015
import
org.apache.storm.hdfs.bolt.format.RecordFormat;
016
import
org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
017
import
org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
018
import
org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
019
import
org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
020
import
org.apache.storm.hdfs.bolt.sync.SyncPolicy;
021
022
import
backtype.storm.Config;
023
import
backtype.storm.LocalCluster;
024
import
backtype.storm.StormSubmitter;
025
import
backtype.storm.generated.AlreadyAliveException;
026
import
backtype.storm.generated.InvalidTopologyException;
027
import
backtype.storm.spout.SpoutOutputCollector;
028
import
backtype.storm.task.TopologyContext;
029
import
backtype.storm.topology.OutputFieldsDeclarer;
030
import
backtype.storm.topology.TopologyBuilder;
031
import
backtype.storm.topology.base.BaseRichSpout;
032
import
backtype.storm.tuple.Fields;
033
import
backtype.storm.tuple.Values;
034
import
backtype.storm.utils.Utils;
035
036
public
class
StormToHDFSTopology {
037
038
public
static
class
EventSpout
extends
BaseRichSpout {
039
040
private
static
final
Log LOG = LogFactory.getLog(EventSpout.
class
);
041
private
static
final
long
serialVersionUID = 886149197481637894L;
042
private
SpoutOutputCollector collector;
043
private
Random rand;
044
private
String[] records;
045
 
046
@Override
047
public
void
open(Map conf,TopologyContext context,
048
SpoutOutputCollector collector) {
049
this
.collector = collector;
050
rand =
new
Random();
051
records =
new
String[] {
052
"10001ef2da82d4c8b49c44199655dc14f39f64.2.1HUAWEI G610-U00HUAWEI270:72:3c:73:8b:222014-10-13 12:36:35"
,
053
"10001ffb52739a29348a67952e47c12da54ef4.3GT-I9300samsung250:CC:F8:E4:22:E22014-10-13 12:36:02"
,
054
"10001ef2da82d4c8b49c44199655dc14f39f64.2.1HUAWEI G610-U00HUAWEI270:72:3c:73:8b:222014-10-13 12:36:35"
055
};
056
}
057
058
059
@Override
060
public
void
nextTuple() {
061
Utils.sleep(
1000
);
062
DateFormat df =
new
SimpleDateFormat(
"yyyy-MM-dd_HH-mm-ss"
);
063
Date d =
new
Date(System.currentTimeMillis());
064
String minute = df.format(d);
065
String record = records[rand.nextInt(records.length)];
066
LOG.info(
"EMIT[spout ->hdfs] "
+minute +
" : "
+record);
067
collector.emit(
new
Values(minute,record));
068
}
069
070
@Override
071
public
void
declareOutputFields(OutputFieldsDeclarer declarer) {
072
declarer.declare(
new
Fields(
"minute"
,
"record"
));
073
}
074
075
076
}
077
 
078
public
static
void
main(String[] args)
throws
AlreadyAliveException,InvalidTopologyException,InterruptedException {
079
// use "|" instead of "," for field delimiter
080
RecordFormat format =
new
DelimitedRecordFormat()
081
   
.withFieldDelimiter(
" : "
);
082
083
// sync the filesystem after every 1k tuples
084
SyncPolicy syncPolicy =
new
CountSyncPolicy(
1000
);
085
086
// rotate files
087
FileRotationPolicy rotationPolicy =
new
TimedRotationPolicy(
1
.0f,TimeUnit.MINUTES);
088
089
FileNameFormat fileNameFormat =
new
DefaultFileNameFormat()
090
   
.withPath(
"/storm/"
).withPrefix(
"app_"
).withExtension(
".log"
);
091
092
HdfsBolt hdfsBolt =
new
HdfsBolt()
093
   
.withFsUrl(
"hdfs://h1:8020"
)
094
   
.withFileNameFormat(fileNameFormat)
095
   
.withRecordFormat(format)
096
   
.withRotationPolicy(rotationPolicy)
097
   
.withSyncPolicy(syncPolicy);
098
 
099
TopologyBuilder builder =
new
TopologyBuilder();
100
builder.setSpout(
"event-spout"
,
new
EventSpout(),
3
);
101
builder.setBolt(
"hdfs-bolt"
,hdfsBolt,
2
).fieldsGrouping(
"event-spout"
,
new
Fields(
"minute"
));
102
 
103
Config conf =
new
Config();
104
 
105
String name = StormToHDFSTopology.
class
.getSimpleName();
106
if
(args !=
null
&& args.length >
0
) {
107
conf.put(Config.NIMBUS_HOST,args[
0
]);
108
conf.setNumWorkers(
3
);
109
StormSubmitter.submitTopologyWithProgressBar(name,conf,builder.createTopology());
110
}
else
{
111
conf.setMaxTaskParallelism(
3
);
112
LocalCluster cluster =
new
LocalCluster();
113
cluster.submitTopology(name,conf,builder.createTopology());
114
Thread.sleep(
60000
);
115
cluster.shutdown();
116
}
117
}
118
119
}
上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:

01
<
plugin
>
02
<
groupId
>org.apache.maven.plugins</
groupId
>
03
<
artifactId
>maven-shade-plugin</
artifactId
>
04
<
version
>1.4</
version
>
05
<
configuration
>
06
   
<
createDependencyReducedPom
>true</
createDependencyReducedPom
>
07
</
configuration
>
08
<
executions
>
09
   
<
execution
>
10
  
<
phase
>package</
phase
>
11
  
<
goals
>
12
 
<
goal
>shade</
goal
>
13
  
</
goals
>
14
  
<
configuration
>
15
 
<
transformers
>
16
<
transformer
17
   
implementation
=
"org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
18
<
transformer
19
   
implementation
=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"
>
20
<
mainClass
></
mainClass
>
21
</
transformer
>
22
 
</
transformers
>
23
  
</
configuration
>
24
   
</
execution
>
25
</
executions
>
26
</
plugin
>
整合Kafka+Storm+HDFS

上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:

001
package
org.shirdrn.storm.examples;
002
003
import
java.util.Arrays;
004
import
java.util.Map;
005
006
import
org.apache.commons.logging.Log;
007
import
org.apache.commons.logging.LogFactory;
008
import
org.apache.storm.hdfs.bolt.HdfsBolt;
009
import
org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
010
import
org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
011
import
org.apache.storm.hdfs.bolt.format.FileNameFormat;
012
import
org.apache.storm.hdfs.bolt.format.RecordFormat;
013
import
org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
014
import
org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
015
import
org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
016
import
org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
017
import
org.apache.storm.hdfs.bolt.sync.SyncPolicy;
018
019
import
storm.kafka.BrokerHosts;
020
import
storm.kafka.KafkaSpout;
021
import
storm.kafka.SpoutConfig;
022
import
storm.kafka.StringScheme;
023
import
storm.kafka.ZkHosts;
024
import
backtype.storm.Config;
025
import
backtype.storm.LocalCluster;
026
import
backtype.storm.StormSubmitter;
027
import
backtype.storm.generated.AlreadyAliveException;
028
import
backtype.storm.generated.InvalidTopologyException;
029
import
backtype.storm.spout.SchemeAsMultiScheme;
030
import
backtype.storm.task.OutputCollector;
031
import
backtype.storm.task.TopologyContext;
032
import
backtype.storm.topology.OutputFieldsDeclarer;
033
import
backtype.storm.topology.TopologyBuilder;
034
import
backtype.storm.topology.base.BaseRichBolt;
035
import
backtype.storm.tuple.Fields;
036
import
backtype.storm.tuple.Tuple;
037
import
backtype.storm.tuple.Values;
038
039
public
class
DistributeWordTopology {
040
 
041
public
static
class
KafkaWordToUpperCase
extends
BaseRichBolt {
042
043
private
static
final
Log LOG = LogFactory.getLog(KafkaWordToUpperCase.
class
);
044
private
static
final
long
serialVersionUID = -5207232012035109026L;
045
private
OutputCollector collector;
046
 
047
@Override
048
public
void
prepare(Map stormConf,TopologyContext context,
049
OutputCollector collector) {
050
this
.collector = collector;
051
}
052
053
@Override
054
public
void
execute(Tuple input) {
055
String line = input.getString(
0
).trim();
056
LOG.info(
"RECV[kafka ->splitter] "
+line);
057
if
(!line.isEmpty()) {
058
String upperLine = line.toUpperCase();
059
LOG.info(
"EMIT[splitter ->counter] "
+upperLine);
060
collector.emit(input,
new
Values(upperLine,upperLine.length()));
061
}
062
collector.ack(input);
063
}
064
065
@Override
066
public
void
declareOutputFields(OutputFieldsDeclarer declarer) {
067
declarer.declare(
new
Fields(
"line"
,
"len"
));
068
}
069
 
070
}
071
 
072
public
static
class
RealtimeBolt
extends
BaseRichBolt {
073
074
private
static
final
Log LOG = LogFactory.getLog(KafkaWordToUpperCase.
class
);
075
private
static
final
long
serialVersionUID = -4115132557403913367L;
076
private
OutputCollector collector;
077
 
078
@Override
079
public
void
prepare(Map stormConf,TopologyContext context,
080
OutputCollector collector) {
081
this
.collector = collector;
082
}
083
084
@Override
085
public
void
execute(Tuple input) {
086
String line = input.getString(
0
).trim();
087
LOG.info(
"REALTIME: "
+line);
088
collector.ack(input);
089
}
090
091
@Override
092
public
void
declareOutputFields(OutputFieldsDeclarer declarer) {
093
 
094
}
095
096
}
097
098
public
static
void
main(String[] args)
throws
AlreadyAliveException,InvalidTopologyException,InterruptedException {
099
100
// Configure Kafka
101
String zks =
"h1:2181,h2:2181,h3:2181"
;
102
String topic =
"my-replicated-topic5"
;
103
String zkRoot =
"/storm"
;
// default zookeeper root configuration for storm
104
String id =
"word"
;
105
BrokerHosts brokerHosts =
new
ZkHosts(zks);
106
SpoutConfig spoutConf =
new
SpoutConfig(brokerHosts,topic,zkRoot,id);
107
spoutConf.scheme =
new
SchemeAsMultiScheme(
new
StringScheme());
108
spoutConf.forceFromStart =
false
;
109
spoutConf.zkServers = Arrays.asList(
new
String[] {
"h1"
,
"h2"
,
"h3"
});
110
spoutConf.zkPort =
2181
;
111
 
112
// Configure HDFS bolt
113
RecordFormat format =
new
DelimitedRecordFormat()
114
   
.withFieldDelimiter(
"\t"
);
// use "\t" instead of "," for field delimiter
115
SyncPolicy syncPolicy =
new
CountSyncPolicy(
1000
);
// sync the filesystem after every 1k tuples
116
FileRotationPolicy rotationPolicy =
new
TimedRotationPolicy(
1
.0f,TimeUnit.MINUTES);
// rotate files
117
FileNameFormat fileNameFormat =
new
DefaultFileNameFormat()
118
   
.withPath(
"/storm/"
).withPrefix(
"app_"
).withExtension(
".log"
);
// set file name format
119
HdfsBolt hdfsBolt =
new
HdfsBolt()
120
   
.withFsUrl(
"hdfs://h1:8020"
)
121
   
.withFileNameFormat(fileNameFormat)
122
   
.withRecordFormat(format)
123
   
.withRotationPolicy(rotationPolicy)
124
   
.withSyncPolicy(syncPolicy);
125
 
126
// configure & build topology
127
TopologyBuilder builder =
new
TopologyBuilder();
128
builder.setSpout(
"kafka-reader"
,
new
KafkaSpout(spoutConf),
5
);
129
builder.setBolt(
"to-upper"
,
new
KafkaWordToUpperCase(),
3
).shuffleGrouping(
"kafka-reader"
);
130
builder.setBolt(
"hdfs-bolt"
,hdfsBolt,
2
).shuffleGrouping(
"to-upper"
);
131
builder.setBolt(
"realtime"
,
new
RealtimeBolt(),
2
).shuffleGrouping(
"to-upper"
);
132
 
133
// submit topology
134
Config conf =
new
Config();
135
String name = DistributeWordTopology.
class
.getSimpleName();
136
if
(args !=
null
&& args.length >
0
) {
137
String nimbus = args[
0
];
138
conf.put(Config.NIMBUS_HOST,nimbus);
139
conf.setNumWorkers(
3
);
140
StormSubmitter.submitTopologyWithProgressBar(name,conf,builder.createTopology());
141
}
else
{
142
conf.setMaxTaskParallelism(
3
);
143
LocalCluster cluster =
new
LocalCluster();
144
cluster.submitTopology(name,conf,builder.createTopology());
145
Thread.sleep(
60000
);
146
cluster.shutdown();
147
}
148
}
149
150
}
上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:

1
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1
可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: