您的位置:首页 > 大数据 > Hadoop

flume整合kafka和hdfs

2016-12-20 13:26 302 查看
flume版本:1.7.0 kafka版本:2.11-0.10.1.0 hadoop 版本:2.6.0

最近在玩这个flume和kafka这两个东西,网上有很多这方面的简介,我就不多说了,我的理解为啥要整合这两个在一起的,我的理解就是,flume作为消息的持久化,然后就是kafka来用于消息的传输,但我们搜集这个网站的日志的时候,我们就可以使用flume监控log的一个文件或者是一个目录,每当有新的log,flume就可以将其持久化到hdfs,然后将这个消息发给kafka,kafka在对消息进行分发,处理,实时计算等等。

既然要做,那么第一步就是搭建好flume的环境,以及整合整个框架。

我自己画了一个图,不是很好看,但是意思懂就行。



 但我在网上找资料的时候,试过很多种方式来配置flume,但是都不怎么有效果。在这里建议使用官方网站上的文档来进行配置。

遇见过的问题:
org.apache.flume.conf.ConfigurationException: Bootstrap Servers must be specified
at org.apache.flume.sink.kafka.KafkaSink.translateOldProps(KafkaSink.java:353)
at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:295)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:411)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


解决方式:在配置的时候添加上bootstrap的指向,不是使用broke.list

问题2:
org.apache.flume.FlumeException: Unable to load sink type: org.apache.flume.plugins.KafkaSink, class: org.apache.flume.plugins.KafkaSink
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:70)
at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:408)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSink
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:68)
... 11 more


解决方式:由于flume的改版,原来的配置不是很合理,建议不要照抄网上的版本的配置,这里参考官网修改为:org.apache.flume.sink.kafka.KafkaSink

最后附加上我的flume的配置:

#设置source的名称
agent.sources = s
#设置channels的名称
agent.channels = c c1
#设置sink的名称
agent.sinks = r r1
# For each one of the sources, the type is defined
#exec 文件 spoolddir 文件夹
agent.sources.s.type = exec
agent.sources.s.command = tail -n +0 -F /opt/testLog/test.log
#agent.source.s.type = spooldir
#agent.source.s.spoolDir = /usr/log
#agent.source.s.fileHeader = true
#agent.source.s.bathSize =100
# The channel can be defined as follows.
agent.sources.s.channels = c c1

# Each sink's type must be defined
#agent.sinks.r.type = org.apache.flume.plugins.KafkaSink
agent.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
#Specify the channel the sink should use
#agent.sinks.r.metadata.broker.list = localhost:9092
agent.sinks.r.kafka.bootstrap.servers = localhost:9092
agent.sinks.r.partition.key=0
agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
agent.sinks.r.serializer.class=kafka.serializer.StringEncoder
agent.sinks.r.request.required.acks=0
agent.sinks.r.max.message.size=1000000
agent.sinks.r.producer.type=sync
agent.sinks.r.custom.encoding=UTF-8
agent.sinks.r.kafka.topic = testFlume
# Each channel's type is defined.
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
#channel存储容量
#agent.channels.memoryChannel.capacity = 1000
#事务容量
#agent.channels.memoryChannel.transactionCapacity = 100

#hdfsSink配置
agent.sinks.r1.type = hdfs
agent.sinks.r1.channel = c1
agent.sinks.r1.hdfs.path = hdfs://HDFSIP:8020/zy/flume/%y-%m-%d-%H
agent.sinks.r1.hdfs.filePrefix=events-
#设置文件后缀名
#agent.sinks.r1.hdfs.fileSuffix = .log
agent.sinks.r1.hdfs.round = true
agent.sinks.r1.hdfs.roundValue = 10
agent.sinks.r1.hdfs.roundUnit = minute
#文件格式 默认 seq文件,
agent.sinks.r1.hdfs.fileType = DataStream
agent.sinks.r1.hdfs.writeFormat=Text
agent.sinks.r1.hdfs.rollInterval=0
#--触发roll操作的文件大小in bytes (0: never roll based on file size)
agent.sinks.r1.hdfs.rollSize=128000000
#--在roll操作之前写入文件的事件数量(0 = never roll based on number of events)
agent.sinks.r1.hdfs.rollCount=0
agent.sinks.r1.hdfs.idleTimeout=60
#--使用local time来替换转移字符 (而不是使用event header的timestamp)
agent.sinks.r1.hdfs.useLocalTimeStamp = true
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity=1000
agent.channels.c1.keep-alive=30
agent.sinks.r.channel = c
agent.channels.c.type = memory
agent.channels.c.capacity = 1000
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdfs kafka flume