您的位置:首页 > 运维架构

flume+kafka+zookeeper 单机实现实时数据的获取

2017-08-07 15:49 591 查看
之前在做大数据的时候,一直不知道数据是怎么上传到hdfs的,问了架构师用flume,自己也一直想玩一下flume,无奈没太多的时间,今天有点时间,就查找资料,搭建了一个单机环境下的日志监控。所有资料全部来源与网络,我只是做了一个简单的整合。

首先,第一步安装flume。

1.安装flume,首先要安装好jvm。

2.下载flume。地址 http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

3.解压项目,进入conf下面,默认的配置文件带一个后缀是.template,去掉这个后缀。

4.修改flume-env.sh 设置jdk的安装目录,

5.校验flume是否安装成功,可以进入bin目录下,输入:

flume-ng version


查看是否输出版本信息,即为以成功。

6.测试flume的功能,修改配置文件:flume-conf.properties

#Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:
    #1) 拷贝到spool目录下的文件不可以再打开编辑。
    #2) spool目录下不可包含相应的子目录


a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/logs
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# logser可以看做是flume服务的名称,每个flume都由sources、channels和sinks三部分组成
# sources可以看做是数据源头、channels是中间转存的渠道、sinks是数据后面的去向
logser.sources = src_launcherclick
logser.sinks = kfk_launcherclick
logser.channels = ch_launcherclick

# source
# 源头类型是TAILDIR,就可以实时监控以追加形式写入文件的日志
logser.sources.src_launcherclick.type = TAILDIR
# positionFile记录所有监控的文件信息
logser.sources.src_launcherclick.positionFile = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log1/taildir_position.json
# 监控的文件组
logser.sources.src_launcherclick.filegroups = f1
# 文件组包含的具体文件,也就是我们监控的文件
logser.sources.src_launcherclick.filegroups.f1 = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/.*

# interceptor
# 写kafka的topic即可
logser.sources.src_launcherclick.interceptors = i1 i2
logser.sources.src_launcherclick.interceptors.i1.type=static
logser.sources.src_launcherclick.interceptors.i1.key = type
logser.sources.src_launcherclick.interceptors.i1.value = launcher_click
logser.sources.src_launcherclick.interceptors.i2.type=static
logser.sources.src_launcherclick.interceptors.i2.key = topic
logser.sources.src_launcherclick.interceptors.i2.value = launcher_click

# channel
logser.channels.ch_launcherclick.type = memory
logser.channels.ch_launcherclick.capacity = 10000
logser.channels.ch_launcherclick.transactionCapacity = 1000

# kfk sink
# 指定sink类型是Kafka,说明日志最后要发送到Kafka
logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink
# Kafka broker
logser.sinks.kfk_launcherclick.brokerList = 10.0.5.203:9092

# Bind the source and sink to the channel
logser.sources.src_launcherclick.channels = ch_launcherclick
logser.sinks.kfk_launcherclick.channel = ch_launcherclick


安装kafka:(zookeeper的安装不再写了,也很简单,网上都是例子:http://blog.csdn.net/wo541075754/article/details/56483533)

1.下载kafka。地址:http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz

2.解压,之后可以直接启动:

bin/kafka-server-start.sh config/server.properties &
结下来整合:

1.启动flume:

./flume-ng agent -c . -f ../conf/flume-conf.properties -n logser -Dflume.root.logger=INFO
2.重新打开一个窗口,启动kafka的消费模式,监听:launcher_click 主题。命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning


3.新建一个窗口:对

/Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log这个目录下的文件进行追加,命令:
echo "spool test2s nihaoi www.baidu.com" >> /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/spool_text1sd.log
这个时候,在kafka的监听窗口会打印:
$bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
spool test2s
spool test2s nihaoi www
spool test2s nihaoi www.baidu.com
下面是用java监听输出数据:
1.maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>

2.代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("launcher_click"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("内容是:"+ record.value());
}
}
}
参考资料:
Flume1.5.0入门:安装、部署、及flume的案例:http://www.aboutyun.com/thread-8917-1-1.html
kafka的安装  http://www.cnblogs.com/wangyangliuping/p/5546465.html
flume实时日志分析  http://itindex.net/detail/56956-flume-kafka-sparkstreaming
kafka的java代码来源: http://blog.csdn.net/lnho2015/article/details/51353936











                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐