flume实现监控文件,并将文件内容传入kafka的,kafka在控制台实现消费
2018-12-08 17:44
218 查看
在flume的配置里建一个文件flume-kafka.conf
生产者产生的数据放在/home/hadoop/c.txt中
topic消费c.txt中的文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type=exec #设置要监控的文件夹 a1.sources.s1.command=tail -F /home/hadoop/c.txt a1.sources.s1.channels=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100 #设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka的broker地址和端口号 a1.sinks.k1.brokerList=hadoop01:9092 #设置Kafka的Topic a1.sinks.k1.topic=test02 #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1
将生产者的java代码做成一个jar包
生产者代码
public class Test { public static void main(String[] args) { int i = 0; while(true) { i++; System.out.println( "测试数据"+i); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
开启一个页面启动flume
bin/flume-ng a1 --conf-file conf/flume-kafka.conf -c conf/ --name a1 -Dflume.root.logger=DEBUG,console
另开一个页面启动消费者,前提是kafka集群要开启
bin/kafka-console-consumer.sh --zookeeper 192.168.147.136:2181,192.168.147.137:2181,192.168.147.138:2181 --topic test02 --from-beginning
再开一个页面启动生产者,jar包传到/hadoop/hadoop/下
java -cp /home/hadoop/Test-1.0-SNAPSHOT.jar com.ceshi.Test /home/hadoop/c.txt
可以看到消费者页面出现与生产者页面相同消息则测试成功。
相关文章推荐
- Flume之监控文件内容变化
- 分布式日志收集框架Flume:监控一个文件实时采集新增的数据输出到控制台
- 【Flume】flume文件监控的source组件开发,增量传输文件内容,支持断点续传功能
- 3.数据采集 - 文件内容断点续采[flume1.6 + kafka0.10.2.0]
- 关于flume使用SpoolDir监控目录传入文件时报出java.nio.charset.MalformedInputException: Input length = 1,个人解决方案
- Flume应用案例之监控一个文件实时采集新增的数据输出到控制台
- zabbix 3.0+saltstack实现对日志文件内容监控
- Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果
- FLume监控文件夹,将数据发送给Kafka以及HDFS的配置文件详解
- flume 1.7 新接口,监控文件内容变化,且监控目录内新增文件变化
- Java 实现自动监听并更新配置文件内容
- ReadDirectoryChangesW实现文件监控的封装类
- JSP实现下载的痛苦[word、excel、下载乱码] 彻底解决 下载文件内容及文件名乱码...
- java实现文件监控
- 心得6--Response介绍和如何实现动态文件内容的下载详细介绍
- 用递归的方式实现文件内容搜索(java)
- perl 实现a文件通过键值取b文件内容
- 编程实现DTD规范对XML文件内容 ...
- Hook SHFileOperation实现文件监控
- 读取数据库中内容实现文件的下载