您的位置:首页 > 大数据

【十八掌●武功篇】第十二掌:Flume之Source、Channel、Sink

2017-12-02 07:31 302 查看
这一篇博文是【大数据技术●降龙十八掌】系列文章的其中一篇,点击查看目录:

大数据技术●降龙十八掌


系列文章:
【十八掌●武功篇】第十二掌:Flume之工作原理与使用

【十八掌●武功篇】第十二掌:Flume之Source、Channel、Sink

【十八掌●武功篇】第十二掌:Flume之安装和测试使用

一、 Source

Source是从Flume Agent外部接收数据的组件,接收数据后写入到一个或者多个Channel中。

1、 Source的配置

(1) Source必须的参数

参数说明
typesource的类型
channelssource的数据写入的channel列表,多个channel之间用空格隔开
(2) Source可选参数

参数说明
Interceptors一连串拦截器的名单
interceptors.\
#给agent1的source s1设置两个拦截器,命名为i1 i2
agent1.sources.s1.interceptors= i1 i2
#设置i1拦截器类型为host拦截器
agent1.sources.s1.interceptors.i1.type=host
#配置host拦截器的属性preserverExisting为true,如果headers中有host就不替换了
agent1.sources.s1.interceptors.i1.preserveExisting=true
#设置i2拦截器类型为静态拦截器
agent1.sources.s1.interceptors.i2.type=static
#指定i2静态拦截器的key为header
agent1.sources.s1.interceptors.i2.key=header
#指定i2静态拦截器的Value值
agent1.sources.s1.interceptors.i2.value=myValue
#指定s1 source的channel选择器类型为multiplexing(多路复用)
agent1.sources.s1.selector.type=multiplexing
#指定多路复用选择器参照的header key为mytype
agent1.sources.s1.selector.header=mytype
#指定到mytype的值为1时,选择channel1
agent1.sources.s1.selector.mapping.1=channel1
#指定到mytype的值为2时,选择channel2
agent1.sources.s1.seleector.mapping.2=channel2
#指定到mytype的值为其他时,选择channel2
agent1.sources.s1.seleector.default=channel2


2、 Sink-to-Source通信

Flume水平扩展非常简单,是因为很容易添加新的Agent,两个Agent之间通信一般采用avro sink – avro source或者thrift sink – thrift source。

(1) Avro Source

Avro Source可以从Avro Sink接收event对象,event对象是压缩过的。Avro Source配置如下:

参数默认值说明
type可以用缩写avro,也可以用全称org.apache.flume.source.AvroSource
bind监听的IP地址或者是主机名。
port监听的端口
threadsinfinity接收数据的最大线程数量
sslfalse是否启用ssl
keystore使用ssl的keystore的路径,如果启用ssl,这个属性是必填的
keystore-password打开keystore使用的密码,如果启用了ssl,这个是必填的
keystore-typeJKS0使用的keystore的类型
compression-type对数据解压用的压缩格式。如果是zlib,设为deflate,默认为none(不压缩) 如果是分批传输,数据是分批压缩的,而不是对每一个event压缩。
(2) Thrift Source

Avro Source只是支持Java的RPC调用,不能接受非JVM语言的数据,为了解决这个问题,Flume提供了Thrift Source,它支持跨语言通信。

Thrift Source配置如下:

参数默认值说明
typeThrift,全称是org.apache.flume.source.ThriftSource
bind监听的IP地址或者是主机名。
port监听的端口
threadsinfinity接收数据的最大线程数量
3、 HTTP Source

Flume自带的HTTP Source可以通过Http Post接受event数据,它表现的像web服务器一样接受Flume event对象数据,配置如下:

参数默认值说明
typehttp,全称是org.apache.flume.source.HttpSource
bind监听的IP地址或者是主机名。
port监听的端口
enableSSLfalse是否启用ssl
keystore使用ssl的keystore的路径,如果启用ssl,这个属性是必填的
keystore-password打开keystore使用的密码,如果启用了ssl,这个是必填的
handlerJSONHandlerHttp Source使用的处理程序类
handler.*处理程序类的参数
Post的Json数据格式应该为以下形式:

[
{
"headers": {
"event1Header1": "event1vale1",
"event1Header2": "event1vale2"
},
"body": "event1内容"
},
{
"headers": {
"event2Header1": "event2vale1",
"event2Header2": "event2vale2"
},
"body": "event2内容"
}
]


4、 Spooling Directory Source

Spooling Directory Source是监控某一个目录的Source,当目录中一旦有了新文件,就采集这个文件数据,采集完成后,可以修改文件名,在文件名后面添加一个后缀,表明已经采集过了。Spooling Directory Source的参数如下表所示:

参数默认值说明
typespooldir 全称是org.apache.flume.source.SpoolDirectorySource
spooldir监控的目录路径,子目录不会被扫描
batchSize100每一批的event的个数
ignorePattern\^$文件名符合这个正则表达式的文件将会被忽略,不会被扫描读取
deletePolicynever设置为never或者immediate,那么读取完这个文件后就会删除这个文件
fileSuffix.COMPLETED读取完的文件添加的后缀,这个属性是必填的
fileHeaderfalse文件名是否被添加到event的header上去
fileHeaderKeyfile如果文件名被添加到header,这个是指定key名称
trackerDir.flumespool存储元数据的目录,用来source中断时重启Source
deserializerline
deserializer.*

5、 Syslog Source

6、 Exec Source

Exec Source执行用户配置的命令,且基于命令的标准输出来生成event。配置参数如下:

参数默认值说明
typeexec 全称是org.apache.flume.source.ExecSource
commandSource运行的命令
shellSource运行的脚本
restartfalse当命令执行失败后是否重新执行命令
restartThrottle10000重新启动命令先等待的毫秒数
logStdErrfalse运行的命令标准错误输出是否被记录
bathSize20一批中event对象的个数
bathTimeout3000一批刷写的超时时间,当超时时间和bathSize任何一个达到条件就进行刷写

7、 JMS Source

JMS Source可以获取来自于Java消息服务队列的数据。

二、 Channel

Channel是位于Source和Sink之间的缓冲区,Channel允许Source和Sink运行在不同的速率上,Channel是保证Flume不丢失数据的关键,Source写入一个或者多个Channel中,再由一个或者多个Sink读取。

Channel本质上是事务性的,每次从Channel上写入或者读取数据都在事务的上下文中执行,只有当写事务提交后,Sink才能读取这个数据。

Flume自带两种Channel:Memory Channel和File Channel。

1、 Memory Channel

Memory Channel是将数据存储在内存中,可以支持很高的吞吐量,但是当程序崩溃、机器宕机或者重启时都有可能会造成数据丢失

配置如下:

参数默认值说明
typememory 全称是org.apache.flume.channel.MemoryChannel
capacity100Channel能保存的event对象的最大数量,channel中满的时候如果写入event,就会抛出异常。
transactionCapactiy100一个事务中event对象的最大数量,这个参数可以防止客户端一次提交的event对象数量过大,导致agent的内存溢出
byteCapacity进程中可用堆空间总量的80%Channel中event允许使用最大的的堆空间
byteCapacityBufferPercentage20
keep-alive3每次写入或者取走等待完成的最大时间周期,单位为秒

2、 File Channel

File Channel是将数据写入磁盘,所以在程序关闭或者宕机时不会造成数据丢失,另外只要磁盘空间可用,File Channel可以有非常大的容量。File Channel配置如下:

参数默认值说明
typefile 全称是org.apache.flume.channel.FileChannel
capacity1000000Channel能保存的event对象的最大数量
transactionCapactiy1000一个事务中event对象的最大数量
checkpointDir\~/flume/filechannel/checkpointChannel写出到检查点的目录
dataDirs\~/flume/filechannel/dataevent存储数据的目录,可以设置多个目录,以逗号隔开,多个目录可以挂在到多个磁盘,通过并行写入提高效率
useDualCheckpointsfalse是否启用备用检查点目录
backupCheckpointDir检查点备用目录路径,这个目录不能和主检查点目录和数据目录相同
checkpointInterval30检查点之间的间隔(单位是秒)
maxFileSize2146435071每个数据文件的最大大小,单位是字节,一旦达到这个大小,就重新创建一个文件
minimumRequiredSpace524288000磁盘最小保留的大小
keep-alive3

三、 Sink

Sink将数据从Channel里读取出来然后插入到其他外部数据存储中,Sink是完全事务性的,在从Channel批量移除数据之前,每个Sink用channel启动一个事务,批量event一旦成功写出到存储系统或者下一个Flume Agent,Sink就利用channel提交事务,事务一旦提交,该Channel从自己的内部缓冲区删除event。

1、 HDFS Sink

(1) hdfs path中的转义字符

hdfs path的路径中可以用转义字符表示,所有的转义字符都是以%开头,当HDFS Sink从Channel读取一个event对象,读取headers中的值,将路径中的转义字符替换。

%{topid}是值以headers中key为topid的参数值。

HDFS Sink中的时间戳转义字符:

转义字符替换值
%t毫秒值
%s
%H小时值,24小时制,共2位,前面补零
%I小时值,12小时制,共两位,前面补零
%M分钟值
%S秒数,共两位,不够补零
%k小时值,24小时制,但是不补零
%pAM或者PM
%z
%a周的简写,如Mon、Tue
%A周的全称,如Monday、Tuesday
%b月的简称,如Jan、Feb
%B月的全称,如果January、February
%d天,如01,12
%c天、日期和时间,如Sun Feb 9 14:05:45 2017
%m月,如01、12
%D以mm/dd/yy表示日期
%y当前世纪以来的年,如00、17
%Y当前年份,如2013、2017
(2) HDFS Sink配置

参数默认值说明
type应该为hdfs
hdfs.pathSink应该写入的HDFS路径,可以有转义字符
hdfs.filePrefixFlumeData生成的hdfs文件名的前缀
hdfs.fileSuffix生成hdfs文件名的后缀(不会自动添加.)
hdfs.inUsePrefixhdfs sink正在写入中的文件的前缀
hdfs.inUseSuffix.tmphdfs sink正在写入中的文件名的后缀
hdfs.rollInterval30设置每多少秒生成一个新的文件,如果是0,就不按照时间来生成文件
hdfs.rollSize1024设置文件大小达到多少后生成一个新的文件,如果是0就是不基于文件大小来生成新文件
hdfs.rollCount10设置每写多少个event生成一个新的文件。
hdfs.idleTimeout0
hdfs.batchSize100每一批写入hdfs的event数量
hdfs.codeC用来压缩文件的格式,可以是gzip、bzip2、lzo、lzop或snappy
hdfs.fileTypeSequenceFile使用的文件格式
hdfs.maxOpenFiles5000HDFS Sink一次可以保持打开文件的最大数量,如果打开的文件数量超过这个数,最老的文件会被关闭
hdfs.minBlockReplicashdfs文件最小副本数,如果不设置就使用hadoop中的设置数
hdfs.writeFormat文件记录的格式,比如:Text、Writable
hdfs.callTimeout10000每个HDFS操作的超时时间
hdfs.threadsPoolSize10线程池中的线程数量
hdfs.rollTimerPoolSize1
hdfs.kerberosPrincipal登录到kerberos key分布中心(KDC)所使用的kerbero主体
hdfs.kerberosKeytab登录到kerberos key分布中心(KDC)的密码
hdfs.proxyUserHDFS用哪个账号写入HDFS
hdfs.roundFALSE设置event的时间戳是否取整
hdfs.roundValue1
hdfs.roundUnitsecond
hdfs.timeZoneLocal Time
hdfs.useLocalTimeStampFALSE是否使用当前agent的本地时间作为时间戳
hdfs.closeTries0
hdfs.retryInterval180
serializerTEXT
serializer.*

2、 HBase Sink

HBase Sink有两种:HBase Sink和Async HBase Sink。HBase Sink是使用HBase API客户端来写数据到HBase,是阻塞的,Async HBase Sink使用非阻塞的且多线程方式写数据到HBase。所以Async HBase Sink性能会好一些,但是不安全,HBase Sink是安全的。

HBase Sink配置:

参数默认值说明
typeHBase Sink简称为hbase;Async HBase Sink简称为asynchbase
tableSink写入的数据的表,这个表必须已经存储,HBase Sink不会创建表
columnFamily写入的列族,列族必须已经存在,HBase Sink不会自动创建列族
batchSize100每一批event的数量, HBase Sink 和Async HBase Sink都是批量写event的,batchSize控制了每批event的个数
zookeeperQuorumHBase 使用的zookeeper服务列表,逗号隔开
znodeParent/hbaseZookeeper上HBase集群使用的父节点
serializer
serializer.*

3、 Avro Sink

参数默认值说明
typeThe component type name, needs to be avro. 应该配置为avro
hostnameThe hostname or IP address to bind to. Sink要写入的avro服务的Ip或者主机名
portThe port # to listen on. Sink写入Avro服务的端口号
batch-size100number of event to batch together for send. 每一批event对象的个数
connect-timeout20000Amount of time (ms) to allow for the first (handshake) request. 第一次连接握手的超时时间,单位毫秒
request-timeout20000Amount of time (ms) to allow for requests after the first. 一次写入调用的超时时间,单位毫秒
reset-connection-intervalnoneAmount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. 一次断开连接后,等待多少时间后进行重新连接,单位秒。
compression-typenoneThis can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource 解压数据的压缩格式。
compression-level6The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression 如果启用了压缩,设置压缩级别1-9,数值越大压缩率越高
sslFALSESet to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”. 传输数据是否启用ssl
trust-all-certsFALSEIf this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.
truststoreThe path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-passwordThe password for the specified truststore.
truststore-typeJKSThe type of the Java truststore. This can be “JKS” or other supported Java truststore type.
exclude-protocolsSSLv2Hello SSLv3Space-separated list of SSL/TLS protocols to exclude
maxIoWorkers2 * the number of available processors in the machineThe maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.
4、 Thrift Sink

5、 Elastic Searche Sink

6、 Rolling File Sink

7、 Logger Sink

8、 Kafka Sink

四、 拦截器

拦截器是插件式组件,设置在Source和Source写入数据的Channel之间,Source接收到的event在写入到对应的Channel之前,拦截器可以转换或者删除这些event,每个拦截器实例只处理同一个Source接收到的event。可以有多个拦截器,event会依次经过拦截器的处理,最后写入channel。

在配置中,必须给每个拦截器命名一个名称,每个拦截器都要使用type参数指定类型。例如:

#给agent1的source s1设置两个拦截器,命名为i1 i2
agent1.sources.s1.interceptors= i1 i2

#设置i1拦截器类型为host拦截器
agent1.sources.s1.interceptors.i1.type=host
#配置host拦截器的属性preserverExisting为true,如果headers中有host就不替换了
agent1.sources.s1.interceptors.i1.preserveExisting=true

#设置i2拦截器类型为静态拦截器
agent1.sources.s1.interceptors.i2.type=static
#指定i2静态拦截器的key为header
agent1.sources.s1.interceptors.i2.key=header
#指定i2静态拦截器的Value值
agent1.sources.s1.interceptors.i2.value=myValue


Flume内置了很多种拦截器:

1、 时间戳拦截器

时间戳拦截器是最常用的拦截器,这个拦截器会将时间戳添加到event的headers上,key为timestamp。preserveExisting参数如果为true时,如果headers中有key为timestamp的属性,就不替换其值了,如果preserverExisting参数为false,headers中没有key为timestamp的属性就添加上,如果有就更新其值。

时间戳拦截器参数配置:

参数默认值说明
typetimestamp
preserveExistingfalse如果preserverExisting参数为false,headers中没有key为timestamp的属性就添加上,如果有就更新其值
举例:

#给名为agent1的Agent里的mys1这个source设置拦截器,只有一个拦截器名为mytsInterceptor
agent1.sources.mys1.interceptors=mytsInterceptor

#设置mytsInterceptor这个拦截器的类型为timestamp
agent1.sources.mys1.interceptors.mytsInterceptor.type=timestamp

agent1.sources.mys1.interceptors.mytsInterceptor.preserveExisting=false


2、 host拦截器

host拦截器会插入服务器的IP地址或者主机名,Agent将这些内容写入到Flume的Headers中,Key的名称可以用hostHeader配置,默认key名为host。参数如下:

参数默认值说明
typehost
hostHeaderhostHeaders中key的值
useIPtrue如果设置为true,值为IP地址,否则是主机名
preserveExistingfalse如果preserverExisting参数为false,headers中没有对应的key属性就添加上,如果有就更新其值

3、 静态拦截器

静态拦截器是将指定的key和value写入Headers中。配置参数如下:

参数默认值说明
typestatic
keykey指定的key
valuevalue指定的value
preserveExistingfalse如果preserverExisting参数为false,headers中没有对应的key属性就添加上,如果有就更新其值

4、 正则过滤拦截器

正则过滤器可以用于过滤event,根据配置的正则表达式决定允许通过或者丢弃当前event。是对event body部分进行匹配,参数配置如下:

参数默认值说明
typeregex_filter
regex.*用于匹配event body的正则表达式
excludeEventsfalse为true时,匹配上正则表达式就会将event丢弃,为false时匹配上的event才会被通过

5、 Morphline拦截器

6、 UUID拦截器

UUID拦截器会给event生成一个UUID唯一标识符,放入Headers中,参数配置如下:

参数默认值说明
typeorg.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerNameidkey名称
prefixuuid前缀
preserveExistingfalse如果preserverExisting参数为false,headers中没有对应的key属性就添加上,如果有就更新其值

五、 Channel选择器

Channel选择器决定Source接收的一个特定事件写入哪些Channel中,它将选择的结果告知Channel处理器,然后由Channel处理器将event写入指定的Channel。

Flume的Channel选择器分为:replicating(复制选择器)和multiplexing(多路复用选择器)。

1、 复制选择器

复制选择器是将Source采集到的数据,给每一个它连接的Channel发送一份,如果发送给其中一个Channel失败,就会抛出异常、事务失败、进行全部重试,这个过程中有可能会造成数据重复。

Channel选择器还有一个配置参数optional,可以配置几个Channel(多个用空格隔开),这个参数是指定哪些Channel是可选的,如果发送给可选的Channel失败就会忽略,不会造成事务的失败。

2、 多路复用选择器

多路复用选择器是根据headers里的某一个key的值决定发送到哪个Channel里去,是个动态路由的选择过程。

实例:

#定义名为a1的agent中的source s1的类型为avro
a1.sources.s1.type=avro
#s1的对应的channel有 c1 c2 c3
a1.sources.s1.channel=c1 c2 c3
#定义source的Channel选择器类型为复制选择器
a1.sources.s1.selector.type=multiplexing
#定义Channel选择器选择的依据是key名为mystate的headers键值对
a1.sources.s1.selector.header=mystate
#mystate中值为1的发送到c1 channel中
a1.sources.s1.selector.mapping.1=c1
#mystate中值为2的发送到c2 channel中
a1.sources.s1.selector.mapping.2=c2
#mystate中值为3的发送到c3 channel中
a1.sources.s1.selector.mapping.3=c3
#mystate中值为其他的默认发送到c1 channel中
a1.sources.s1.selector.default=c1
#Channel选择器中可选的Channel是c3,向c3发送失败可以忽略
a1.sources.s1.selector.optional=c3


这一篇博文是【大数据技术●降龙十八掌】系列文章的其中一篇,点击查看目录:

大数据技术●降龙十八掌
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  大数据 flume