您的位置:首页 > 运维架构

flume实现监控文件,并将文件内容传入kafka的,kafka在控制台实现消费

2018-12-08 17:44 218 查看

在flume的配置里建一个文件flume-kafka.conf
生产者产生的数据放在/home/hadoop/c.txt中
topic消费c.txt中的文件

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type=exec
#设置要监控的文件夹
a1.sources.s1.command=tail -F /home/hadoop/c.txt
a1.sources.s1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=hadoop01:9092
#设置Kafka的Topic
a1.sinks.k1.topic=test02
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1

将生产者的java代码做成一个jar包
生产者代码

public class Test {
public static void main(String[] args) {
int i = 0;
while(true) {
i++;
System.out.println(	"测试数据"+i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

开启一个页面启动flume

bin/flume-ng a1 --conf-file  conf/flume-kafka.conf -c conf/ --name a1 -Dflume.root.logger=DEBUG,console

另开一个页面启动消费者,前提是kafka集群要开启

bin/kafka-console-consumer.sh --zookeeper 192.168.147.136:2181,192.168.147.137:2181,192.168.147.138:2181 --topic test02 --from-beginning

再开一个页面启动生产者,jar包传到/hadoop/hadoop/下

java -cp /home/hadoop/Test-1.0-SNAPSHOT.jar com.ceshi.Test /home/hadoop/c.txt

可以看到消费者页面出现与生产者页面相同消息则测试成功。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: