flume+kafka+zookeeper 单机实现实时数据的获取
2017-08-07 15:49
591 查看
之前在做大数据的时候,一直不知道数据是怎么上传到hdfs的,问了架构师用flume,自己也一直想玩一下flume,无奈没太多的时间,今天有点时间,就查找资料,搭建了一个单机环境下的日志监控。所有资料全部来源与网络,我只是做了一个简单的整合。
首先,第一步安装flume。
1.安装flume,首先要安装好jvm。
2.下载flume。地址 http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
3.解压项目,进入conf下面,默认的配置文件带一个后缀是.template,去掉这个后缀。
4.修改flume-env.sh 设置jdk的安装目录,
5.校验flume是否安装成功,可以进入bin目录下,输入:
查看是否输出版本信息,即为以成功。
6.测试flume的功能,修改配置文件:flume-conf.properties
安装kafka:(zookeeper的安装不再写了,也很简单,网上都是例子:http://blog.csdn.net/wo541075754/article/details/56483533)
1.下载kafka。地址:http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
2.解压,之后可以直接启动:
1.启动flume:
3.新建一个窗口:对
首先,第一步安装flume。
1.安装flume,首先要安装好jvm。
2.下载flume。地址 http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
3.解压项目,进入conf下面,默认的配置文件带一个后缀是.template,去掉这个后缀。
4.修改flume-env.sh 设置jdk的安装目录,
5.校验flume是否安装成功,可以进入bin目录下,输入:
flume-ng version
查看是否输出版本信息,即为以成功。
6.测试flume的功能,修改配置文件:flume-conf.properties
#Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点: #1) 拷贝到spool目录下的文件不可以再打开编辑。 #2) spool目录下不可包含相应的子目录
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 # logser可以看做是flume服务的名称,每个flume都由sources、channels和sinks三部分组成 # sources可以看做是数据源头、channels是中间转存的渠道、sinks是数据后面的去向 logser.sources = src_launcherclick logser.sinks = kfk_launcherclick logser.channels = ch_launcherclick # source # 源头类型是TAILDIR,就可以实时监控以追加形式写入文件的日志 logser.sources.src_launcherclick.type = TAILDIR # positionFile记录所有监控的文件信息 logser.sources.src_launcherclick.positionFile = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log1/taildir_position.json # 监控的文件组 logser.sources.src_launcherclick.filegroups = f1 # 文件组包含的具体文件,也就是我们监控的文件 logser.sources.src_launcherclick.filegroups.f1 = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/.* # interceptor # 写kafka的topic即可 logser.sources.src_launcherclick.interceptors = i1 i2 logser.sources.src_launcherclick.interceptors.i1.type=static logser.sources.src_launcherclick.interceptors.i1.key = type logser.sources.src_launcherclick.interceptors.i1.value = launcher_click logser.sources.src_launcherclick.interceptors.i2.type=static logser.sources.src_launcherclick.interceptors.i2.key = topic logser.sources.src_launcherclick.interceptors.i2.value = launcher_click # channel logser.channels.ch_launcherclick.type = memory logser.channels.ch_launcherclick.capacity = 10000 logser.channels.ch_launcherclick.transactionCapacity = 1000 # kfk sink # 指定sink类型是Kafka,说明日志最后要发送到Kafka logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink # Kafka broker logser.sinks.kfk_launcherclick.brokerList = 10.0.5.203:9092 # Bind the source and sink to the channel logser.sources.src_launcherclick.channels = ch_launcherclick logser.sinks.kfk_launcherclick.channel = ch_launcherclick
安装kafka:(zookeeper的安装不再写了,也很简单,网上都是例子:http://blog.csdn.net/wo541075754/article/details/56483533)
1.下载kafka。地址:http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
2.解压,之后可以直接启动:
bin/kafka-server-start.sh config/server.properties &结下来整合:
1.启动flume:
./flume-ng agent -c . -f ../conf/flume-conf.properties -n logser -Dflume.root.logger=INFO2.重新打开一个窗口,启动kafka的消费模式,监听:launcher_click 主题。命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning
3.新建一个窗口:对
/Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log这个目录下的文件进行追加,命令:
echo "spool test2s nihaoi www.baidu.com" >> /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/spool_text1sd.log 这个时候,在kafka的监听窗口会打印: $bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. spool test2s spool test2s nihaoi www spool test2s nihaoi www.baidu.com 下面是用java监听输出数据: 1.maven <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> 2.代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("launcher_click")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println("内容是:"+ record.value()); } } }
参考资料: Flume1.5.0入门:安装、部署、及flume的案例:http://www.aboutyun.com/thread-8917-1-1.html kafka的安装 http://www.cnblogs.com/wangyangliuping/p/5546465.html flume实时日志分析 http://itindex.net/detail/56956-flume-kafka-sparkstreaming kafka的java代码来源: http://blog.csdn.net/lnho2015/article/details/51353936
相关文章推荐
- Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(环境搭建篇)
- Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(程序案例篇)
- Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(案例测试篇)
- flume实现kafka到hdfs实时数据采集 - 有负载均衡策略
- SparkStreaming通过Flume获取数据(单机,push和poll两种方式)的实现
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
- Flume+Kafka+Storm+Redis构建大数据实时处理系统 - 大数据
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
- VC++6.0实现视频数据实时获取的探讨(作者:谭佐军 吴鹏飞)
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
- Flume+Kafka+Storm+Redis构建大数据实时处理系统 - 大数据
- 大数据架构入门总结(Flume + Kafka + ZooKeeper + Spark Streaming + Drools + ELK)
- [bigdata] flume+kafka+storm实现实时分析计算
- VC++6.0实现视频数据实时获取的探讨(转)
- Flume+Kafka+Zookeeper搭建大数据日志采集框架
- JAVA结合Oracle的Database Change Notification实现替代获取实时数据需要的刷库操作
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Flume ZooKeeper Kafka Redis MongoDB Java 机器学习 云计算 视频教程
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
- 大数据组件Shell工具分享(storm redis es kafka flume zookeeper)
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合