您的位置:首页 > 其它

Flume NG 学习笔记(四)Source配置

2017-03-15 15:19 756 查看
首先、这节水的东西就比较少了,大部分是例子。

一、Avro Source与Thrift Source

Avro端口监听并接收来自外部的Avro客户流的事件。当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构。官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了。
Property NameDefaultDescription
channels
typeThe component type name, needs to be avro
bindhostname or IP address to listen on
portPort # to bind to
threadsMaximum number of worker threads to spawn
selector.type
selector.*
interceptorsSpace-separated list of interceptors
interceptors.*
compression-typenoneThis can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
sslFALSESet this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.
keystoreThis is the path to a Java keystore file. Required for SSL.
keystore-passwordThe password for the Java keystore. Required for SSL.
keystore-typeJKSThe type of the Java keystore. This can be “JKS” or “PKCS12”.
ipFilterFALSESet this to true to enable ipFiltering for netty
ipFilter.rulesDefine N netty ipFilter pattern rules with this config.
官网的例子就不放了,这边用实际例子显示。
[html] view plain copy
#配置文件avro_case2.conf 其实和第二节的pull.conf 一模一样

#Name the components on this agent

a1.sources= r1

a1.sinks= k1

a1.channels= c1

#Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 192.168.233.128

a1.sources.r1.port = 55555

#Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.channel= c1

#Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity= 1000

a1.channels.c1.transactionCapacity= 100

#敲命令flume-ng agent -cconf -f conf/avro_case2.conf -n a1 -Dflume.root.logger=INFO,console成功与否就不说明,与第二节的pull.conf 同。。。 #然后在另一个终端进行测试flume-ng avro-client -cconf -H 192.168.233.128 -p 44444 -F /tmp/logs/test.log这个就是模拟第二节push代理费pull代理发数据,这里不写配置直接命令方式测试。


发送事件成功,这里和push代理不一样的是没有用spool,所以日志文件名不会被改名称。看接受终端显示


ok数据发送成功。ThriftSource 与Avro Source 基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type = thrift比较简单,不做赘述。

二、Exec Source

ExecSource的配置就是设定一个Unix(Linux)命令,然后通过这个命令不断输出数据。如果进程退出,Exec Source也一起退出,不会产生进一步的数据。下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了。
Property NameDefaultDescription
channels
typeThe component type name, needs to be exec
commandThe command to execute
shellA shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle10000Amount of time (in millis) to wait before attempting a restart
restartFALSEWhether the executed cmd should be restarted if it dies
logStdErrFALSEWhether the command’s stderr should be logged
batchSize20The max number of lines to read and send to the channel at a time
selector.typereplicatingreplicating or multiplexing
selector.*Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.*
下面是实际例子显示。
[html] view plain copy
#配置文件exec_case3.conf

#Name the components on this agent

a1.sources= r1

a1.sinks= k1

a1.channels= c1

#Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/logs/test.log

a1.sources.r1.channels = c1

#Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.channel= c1

#Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity= 1000

a1.channels.c1.transactionCapacity= 100

这里我们用tail –F命令去一直都日志的尾部。#敲命令flume-ng agent -cconf -f conf/exec_case3.conf -n a1 -Dflume.root.logger=INFO,console这边会显示读取日志的所有数据




上图是日志,这边我们继续往日志里添加数据echo"looklook5" >> test.log ,会发现终端也在输出数据。

三、JMS Source

官网说:JMS Sourcereads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested withActiveMQ.简单说的,官网JMSsource 就测试了ActiveMQ,其他的还没有。下面是官网的例子:a1.sources=r1a1.channels=c1a1.sources.r1.type=jmsa1.sources.r1.channels=c1a1.sources.r1.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactorya1.sources.r1.connectionFactory=GenericConnectionFactorya1.sources.r1.providerURL=tcp://mqserver:61616a1.sources.r1.destinationName=BUSINESS_DATAa1.sources.r1.destinationType=QUEUE下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了
Property NameDefaultDescription
channels
typeThe component type name, needs to be jms
initialContextFactoryInital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactoryThe JNDI name the connection factory shoulld appear as
providerURLThe JMS provider URL
destinationNameDestination name
destinationTypeDestination type (queue or topic)
messageSelectorMessage selector to use when creating the consumer
userNameUsername for the destination/provider
passwordFileFile containing the password for the destination/provider
batchSize100Number of messages to consume in one batch
converter.typeDEFAULTClass to use to convert messages to flume events. See below.
converter.*Converter properties.
converter.charsetUTF-8Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
介于这个源目前还不成熟,那我们等他成熟了再来研究吧,这里偷点懒。

四、Spooling Directory Source

Spooling Directory Source在第二节的时候已经讲过,这里复述一下:监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。下面是官网给出的source的配置,加粗的参数是必选。可选项太多,这边就加一个fileSuffix,即文件读取后添加的后缀名,这个是可以更改。
Property NameDefaultDescription
channels
typeThe component type name, needs to be spooldir.
spoolDirThe directory from which to read files from.
fileSuffix.COMPLETEDSuffix to append to completely ingested files
下面给出例子,这个与第二节的push.conf 相似
[html] view plain copy
#配置文件:spool_case4.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type =spooldir

a1.sources.r1.spoolDir =/tmp/logs

a1.sources.r1.fileHeader= true

a1.sources.r1.channels =c1

# Describe the sink

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

# Use a channel which buffers events inmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

这里我们监控日志目录/tmp/logs#敲命令flume-ng agent -cconf -f conf/spool_case4.conf -n a1 -Dflume.root.logger=INFO,console


终端将数据都显示出来了。我们查看监控日志目录/tmp/logs


被读取的文件已经被加上后缀名,表示已经完成读取。

五、NetCat Source

Netcat source 在某一端口上进行侦听,它将每一行文字变成一个事件源,也就是数据是基于换行符分隔。它的工作就像命令nc -k -l [host] [port] 换句话说,它打开一个指定端口,侦听数据将每一行文字变成Flume事件,并通过连接通道发送。下面是官网给出的source的配置,加粗的参数是必选
Property NameDefaultDescription
channels
typeThe component type name, needs to be netcat
bindHost name or IP address to bind to
portPort # to bind to
max-line-length512Max line length per event body (in bytes)
ack-every-eventTRUERespond with an “OK” for every event received
selector.typereplicatingreplicating or multiplexing
selector.*Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.*
实际例子话,第二节的第一个例子就是Netcat source,这里不演示了。

六、Sequence Generator Source

一个简单的序列发生器,不断产成与事件计数器0和1的增量开始。主要用于测试(官网说),这里也不做赘述。

七、Syslog Sources

读取syslog数据,并生成Flume 事件。 这个Source分成三类SyslogTCP Source、Multiport Syslog TCP Source(多端口)与SyslogUDP Source。其中TCP Source为每一个用回车(\ n)来分隔的字符串创建一个新的事件。而UDP Source将整个消息作为一个单一的事件。

7.1、Syslog TCPSource

这个是最初的Syslog Sources下面是官网给出的source的配置,加粗的参数是必选,这里可选我省略了。
Property NameDefaultDescription
channels
typeThe component type name, needs to be syslogtcp
hostHost name or IP address to bind to
portPort # to bind to
官网案例a1.sources=r1a1.channels=c1a1.sources.r1.type=syslogtcpa1.sources.r1.port=5140a1.sources.r1.host=localhosta1.sources.r1.channels=c1下面是实际的例子
[html] view plain copy
#配置文件:syslog_case5.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type =syslogtcp

a1.sources.r1.port =50000

a1.sources.r1.host =192.168.233.128

a1.sources.r1.channels =c1

# Describe the sink

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

# Use a channel which buffers events inmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

这里我们设置的侦听端口为192.168.233.128 50000#敲命令flume-ng agent -cconf -f conf/syslog_case5.conf -n a1 -Dflume.root.logger=INFO,console启动成功后打开另一个终端输入,往侦听端口送数据echo "hellolooklook5" | nc 192.168.233.128 50000然后看之前的终端,将会有如下显示:


数据已经发送过来了。

7.2 Multiport Syslog TCP Source

这是一个更新,更快,支持多端口版本的SyslogTCP Source。他不仅仅监控一个端口,还可以监控多个端口。官网配置基本差不多,就是可选配置比较多
Property NameDefaultDescription
channels
typeThe component type name, needs to be multiport_syslogtcp
hostHost name or IP address to bind to.
portsSpace-separated list (one or more) of ports to bind to.
portHeaderIf specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port.
这里说明下需要注意的是这里ports设置已经取代tcp 的port,这个千万注意。还有portHeader这个可以与后面的interceptors 与 channel selectors自定义逻辑路由使用。下面是官网例子:a1.sources=r1a1.channels=c1a1.sources.r1.type=multiport_syslogtcpa1.sources.r1.channels=c1a1.sources.r1.host=0.0.0.0a1.sources.r1.ports=10001 10002 10003a1.sources.r1.portHeader=port下面是实际例子
[html] view plain copy
#配置文件:syslog_case6.conf

# Name thecomponents on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#Describe/configure the source

a1.sources.r1.type = multiport_syslogtcp

a1.sources.r1.ports = 50000 60000

a1.sources.r1.host = 192.168.233.128

a1.sources.r1.channels = c1

# Describe thesink

a1.sinks.k1.type= logger

a1.sinks.k1.channel = c1

# Use a channelwhich buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity= 1000

a1.channels.c1.transactionCapacity= 100

这里我们侦探192.168.233.128的2个端口50000与60000#敲命令flume-ng agent -cconf -f conf/syslog_case6.conf -n a1 -Dflume.root.logger=INFO,console启动成功后打开另一个终端输入,往侦听端口送数据echo "hellolooklook5" | nc 192.168.233.128 50000echo "hello looklook6"| nc 192.168.233.128 60000然后看之前的终端,将会有如下显示:



2个端口的数据已经发送过来了。

7.3 Syslog UDP Source

关于这个官网都懒的介绍了,其实就是与TCP不同的协议而已。官网配置与TCP一致,就不说了。下面是官网例子a1.sources=r1a1.channels=c1a1.sources.r1.type=syslogudpa1.sources.r1.port=5140a1.sources.r1.host=localhosta1.sources.r1.channels=c1下面是实际例子
[html] view plain copy
#配置文件:syslog_case7.conf

# Name thecomponents on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#Describe/configure the source

a1.sources.r1.type = syslogudp

a1.sources.r1.port = 50000

a1.sources.r1.host = 192.168.233.128

a1.sources.r1.channels = c1

# Describe thesink

a1.sinks.k1.type= logger

a1.sinks.k1.channel = c1

# Use a channelwhich buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity= 1000

a1.channels.c1.transactionCapacity= 100

#敲命令flume-ng agent -cconf -f conf/syslog_case7.conf -n a1 -Dflume.root.logger=INFO,console启动成功后打开另一个终端输入,往侦听端口送数据echo "hellolooklook5" | nc –u 192.168.233.128 50000#在启动的终端查看console输出


Ok,数据已经发送过来了 八、HTTP SourceHTTP Source是HTTP POST和GET来发送事件数据的,官网说GET应只用于实验。Flume 事件使用一个可插拔的“handler”程序来实现转换,它必须实现的HTTPSourceHandler接口。此处理程序需要一个HttpServletRequest和返回一个flume 事件列表。所有在一个POST请求发送的事件被认为是在一个事务里,一个批量插入flume 通道的行为。下面是官网给出的source的配置,加粗的参数是必选
Property NameDefaultDescription
typeThe component type name, needs to be http
portThe port the source should bind to.
bind0.0.0.0The hostname or IP address to listen on
handlerorg.apache.flume.source.http.JSONHandlerThe FQCN of the handler class.
官网例子a1.sources=r1a1.channels=c1a1.sources.r1.type=httpa1.sources.r1.port=5140a1.sources.r1.channels=c1a1.sources.r1.handler=org.example.rest.RestHandlera1.sources.r1.handler.nickname=random props下面是实际用例:

[html] view plain copy
#配置文件:http_case8.conf

#Name the components on this agent

a1.sources= r1

a1.sinks= k1

a1.channels= c1

#Describe/configure the source

a1.sources.r1.type= http

a1.sources.r1.port= 50000

a1.sources.r1.channels= c1

#Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.channel = c1

#Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity= 1000

a1.channels.c1.transactionCapacity= 100

#敲命令flume-ng agent -cconf -f conf/http_case8.conf -n a1 -Dflume.root.logger=INFO,console启动成功后#我们用生成JSON 格式的POSTrequest发数据curl -X POST -d '[{"headers" :{"looklook1" : "looklook1 isheader","looklook2" : "looklook2 isheader"},"body" : "hello looklook5"}]' http://192.168.233.128:50000#在启动的终端查看console输出

这里headers与body都正常输出。 九、Twitter 1%firehose Source(实验的)官网警告,慎用,说不定下个版本就木有了这个实验source 是通过时搜索服务,从Twitter的1%样本信息中获取事件数据。需要Twitter开发者账号。好吧,对于400网站,我们无可奈何用不到,就不多解释了。
十、自定义Source一个自定义 Source其实是对Source接口的实现。当我们开始flume代理的时候必须将自定义 Source和相依赖的jar包放到代理的classpath下面。自定义 Source的type就是我们实现Source接口对应的类全路径。这里后面的内容里会详细介绍,这里不做赘述。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  address number listen