flume整合kafka和hdfs
2016-12-20 13:26
302 查看
flume版本:1.7.0 kafka版本:2.11-0.10.1.0 hadoop 版本:2.6.0
最近在玩这个flume和kafka这两个东西,网上有很多这方面的简介,我就不多说了,我的理解为啥要整合这两个在一起的,我的理解就是,flume作为消息的持久化,然后就是kafka来用于消息的传输,但我们搜集这个网站的日志的时候,我们就可以使用flume监控log的一个文件或者是一个目录,每当有新的log,flume就可以将其持久化到hdfs,然后将这个消息发给kafka,kafka在对消息进行分发,处理,实时计算等等。
既然要做,那么第一步就是搭建好flume的环境,以及整合整个框架。
我自己画了一个图,不是很好看,但是意思懂就行。
但我在网上找资料的时候,试过很多种方式来配置flume,但是都不怎么有效果。在这里建议使用官方网站上的文档来进行配置。
遇见过的问题:
解决方式:在配置的时候添加上bootstrap的指向,不是使用broke.list
问题2:
解决方式:由于flume的改版,原来的配置不是很合理,建议不要照抄网上的版本的配置,这里参考官网修改为:org.apache.flume.sink.kafka.KafkaSink
最后附加上我的flume的配置:
最近在玩这个flume和kafka这两个东西,网上有很多这方面的简介,我就不多说了,我的理解为啥要整合这两个在一起的,我的理解就是,flume作为消息的持久化,然后就是kafka来用于消息的传输,但我们搜集这个网站的日志的时候,我们就可以使用flume监控log的一个文件或者是一个目录,每当有新的log,flume就可以将其持久化到hdfs,然后将这个消息发给kafka,kafka在对消息进行分发,处理,实时计算等等。
既然要做,那么第一步就是搭建好flume的环境,以及整合整个框架。
我自己画了一个图,不是很好看,但是意思懂就行。
但我在网上找资料的时候,试过很多种方式来配置flume,但是都不怎么有效果。在这里建议使用官方网站上的文档来进行配置。
遇见过的问题:
org.apache.flume.conf.ConfigurationException: Bootstrap Servers must be specified at org.apache.flume.sink.kafka.KafkaSink.translateOldProps(KafkaSink.java:353) at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:295) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:411) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
解决方式:在配置的时候添加上bootstrap的指向,不是使用broke.list
问题2:
org.apache.flume.FlumeException: Unable to load sink type: org.apache.flume.plugins.KafkaSink, class: org.apache.flume.plugins.KafkaSink at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:70) at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:408) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSink at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:68) ... 11 more
解决方式:由于flume的改版,原来的配置不是很合理,建议不要照抄网上的版本的配置,这里参考官网修改为:org.apache.flume.sink.kafka.KafkaSink
最后附加上我的flume的配置:
#设置source的名称 agent.sources = s #设置channels的名称 agent.channels = c c1 #设置sink的名称 agent.sinks = r r1 # For each one of the sources, the type is defined #exec 文件 spoolddir 文件夹 agent.sources.s.type = exec agent.sources.s.command = tail -n +0 -F /opt/testLog/test.log #agent.source.s.type = spooldir #agent.source.s.spoolDir = /usr/log #agent.source.s.fileHeader = true #agent.source.s.bathSize =100 # The channel can be defined as follows. agent.sources.s.channels = c c1 # Each sink's type must be defined #agent.sinks.r.type = org.apache.flume.plugins.KafkaSink agent.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink #Specify the channel the sink should use #agent.sinks.r.metadata.broker.list = localhost:9092 agent.sinks.r.kafka.bootstrap.servers = localhost:9092 agent.sinks.r.partition.key=0 agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition agent.sinks.r.serializer.class=kafka.serializer.StringEncoder agent.sinks.r.request.required.acks=0 agent.sinks.r.max.message.size=1000000 agent.sinks.r.producer.type=sync agent.sinks.r.custom.encoding=UTF-8 agent.sinks.r.kafka.topic = testFlume # Each channel's type is defined. # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel #channel存储容量 #agent.channels.memoryChannel.capacity = 1000 #事务容量 #agent.channels.memoryChannel.transactionCapacity = 100 #hdfsSink配置 agent.sinks.r1.type = hdfs agent.sinks.r1.channel = c1 agent.sinks.r1.hdfs.path = hdfs://HDFSIP:8020/zy/flume/%y-%m-%d-%H agent.sinks.r1.hdfs.filePrefix=events- #设置文件后缀名 #agent.sinks.r1.hdfs.fileSuffix = .log agent.sinks.r1.hdfs.round = true agent.sinks.r1.hdfs.roundValue = 10 agent.sinks.r1.hdfs.roundUnit = minute #文件格式 默认 seq文件, agent.sinks.r1.hdfs.fileType = DataStream agent.sinks.r1.hdfs.writeFormat=Text agent.sinks.r1.hdfs.rollInterval=0 #--触发roll操作的文件大小in bytes (0: never roll based on file size) agent.sinks.r1.hdfs.rollSize=128000000 #--在roll操作之前写入文件的事件数量(0 = never roll based on number of events) agent.sinks.r1.hdfs.rollCount=0 agent.sinks.r1.hdfs.idleTimeout=60 #--使用local time来替换转移字符 (而不是使用event header的timestamp) agent.sinks.r1.hdfs.useLocalTimeStamp = true agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity=1000 agent.channels.c1.keep-alive=30 agent.sinks.r.channel = c agent.channels.c.type = memory agent.channels.c.capacity = 1000
相关文章推荐
- flume+kafka+storm+hdfs整合
- flume+kafka+storm+hdfs整合
- flume+kafka+storm+hdfs整合
- flume-ng+Kafka+Storm+HDFS+jdbc 实时系统搭建的完美整合
- flume-NG整合hdfs和kafka
- Kafka+Storm+HDFS整合实践
- Flume使用大全之kafka source-kafka channel-hdfs(kerberos认证,SSL加密)
- Flume 整合 Kafka 使用
- flume-ng+Kafka+Storm+HDFS 实时系统搭建
- Kafka学习笔记-Flume整合Kafka整合SparkStreaming
- flume+kafka整合
- Kafka+Storm+HDFS整合实践
- Kafka+Flume+Hive的整合
- Kafka+Flume+Hive的整合
- Kafka+Storm+HDFS整合实践
- flume-ng+Kafka+Storm+HDFS 实时系统搭建
- flume kafka sparkstreaming整合后集群报错org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti
- Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
- Kafka+Storm+HDFS整合实践
- Kafka+Storm+HDFS整合实践