您的位置:首页 > 其它

Flume笔记二之source,channel,sink

2018-03-25 17:42 453 查看

Flume笔记二之source,channel,sink

原创 2016年11月20日 20:10:20标签:
flume /
source /
channel /
sink
690

Source

rpc远程过程调用协议,客户机与服务机的调用模式需要对数据进行序列化。
         1:客户机将参数序列化并以二进制形式通过网络传输到服务器。
         2:服务器接收到后进行反序列化再调用方法获取返回值。
         3:服务器将返回值序列化后再通过网络传输给客户机。
         4:客户机接收到结果后再进行反序列化获取结果。
Avro source:
         Avro就是一种序列化形式,avrosource监听一个端口只接收avro序列化后的数据,其他类型的不接收。
         type:avrosource的类型,必须是avro。
         bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。
         port:绑定的本地的端口。
 
Thrif source:
         和avro一样是一种数据序列化形式,Thrifsource只采集thrift数据序列化后的数据
 
Exec source:
         采集linux命令的返回结果传输给channel
         type:source的类型:必须是exec。
        command:要执行命令。
        tail  –f  若文件被删除即使重新创建同名文件也不会监听
        tail  -F  只要文件同名就可以继续监听
         以上可以用在日志文件切割时的监听
 
 
JMS Source:
        Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;
 
Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;
 
Kafka Source:从kafka服务中采集数据。
 
NetCat Source:绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入
        type:source的类型,必须是netcat。
       bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。
       port:绑定的本地的端口。
 
HTTP Source:监听HTTP POST和 GET产生的数据的采集
 

Chanel

         是一个数据存储池,中间通道,从source中接收数据再向sink目的地传输,如果sink写入失败会自动重写因此不会造成数据丢失。
         Memory:用内存存储,但服务器宕机会丢失数据。
                 Typechannel的类型:必须为memory
                 capacity:channel中的最大event数目
                 transactionCapacity:channel中允许事务的最大event数目
 
         File:使用文件存储数据不会丢失数据但会耗费io。
                 Typechannel的类型:必须为 file
                 checkpointDir :检查点的数据存储目录
                 dataDirs :数据的存储目录
                 transactionCapacity:channel中允许事务的最大event数目
 
         SpillableMemory Channel:内存文件综合使用,先存入内存达到阀值后flush到文件中。
                Typechannel的类型:必须为SPILLABLEMEMORY
                memoryCapacity:内存的容量event数
                overflowCapacity:数据存到文件的event阀值数
                checkpointDir:检查点的数据存储目录
                dataDirs:数据的存储目录
 
         Jdbc:使用jdbc数据源来存储数据。
         Kafka:使用kafka服务来存储数据。
 

Sink

         各种类型的目的地,接收channel写入的数据并以指定的形式表现出来。Sink有很多种类型。
                 type:sink的类型 必须是hdfs。
                 hdfs.path:hdfs的上传路径。
                 hdfs.filePrefix:hdfs文件的前缀。默认是:FlumeData
                 hdfs.rollInterval:间隔多久产生新文件,默认是:30(秒) 0表示不以时间间隔为准。
                 hdfs.rollSize:文件到达多大再产生一个新文件,默认是:1024(bytes)0表示不以文件大小为准。
                 hdfs.rollCount:event达到多大再产生一个新文件,默认是:10(个)0表示不以event数目为准。
                 hdfs.batchSize:每次往hdfs里提交多少个event,默认为100
                 hdfs.fileType:hdfs文件的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要设置压缩方式。
                 hdfs.codeC:压缩方式:gzip,bzip2, lzo, lzop, snappy
                 注:%{host}可以使用header的key。以及%Y%m%d来表示时间,但关于时间的表示需要在header里有timestamp这个key。
 
        Logger Sink将数据作为日志处理(根据flume中的设置的日志方式来显示)
                 要在控制台显示在运行agent的时候加入:-Dflume.root.logger=INFO,console。
                 type:sink的类型:必须是logger。
                 maxBytesToLog:打印body的最长的字节数 默认为16
 
 
        Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。
                 type:sink的类型:必须是 avro。
                 hostname:指定发送数据的主机名或者ip
                 port:指定发送数据的端口

实例

1:监听一个文件的增加变化,采集数据并在控制台打印。        在这个例子中我使用exec source,memory chanel,logger sink。可以看我的agent结构图



以下是我创建的exec_source.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
 
a1.sources.r1.type=exec 
a1.sources.r1.command=tail -F/usr/local/success.log
 
  a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
 
a1.sinks.k1.type=logger
 
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
 
执行命令:
bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &
 
然后更改/usr/local/success.log文件中的内容后可以看到flume采集到了文件的变化并在控制台上打印出来。文件初始内容hello和how are you,剩下的i am fine和ok为新增加内容。



2:监控一个文件变化并将其发送到另一个服务器上然后打印
这个例子可以建立在上一个例子之上,但是需要对flume的结构做一些修改,我使用avro序列化数据再发送到指定的服务器上。详情看结构图。



实际上flume可以进行多个节点关联,本例中我只使用131向139发送数据
131,139上都必须启动agent
服务器131配置
以下是我创建的exec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
 
a1.sources.r1.type=exec 
a1.sources.r1.command=tail -F/usr/local/success.log
 
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
 
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.79.139
a1.sinks.k1.port=42424
 
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
 
执行命令启动agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
 
139服务器配置
执行命令拷贝flume到139
scp -r apache-flume-1.7.0-bin/root@192.168.79.139:/usr/local/
修改exec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
 
a1.sources.r1.type=avro 
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=42424
 
  a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
 
a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
 
执行命令启动agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
 
结果可以在139控制台上看到131中修改success.log的变化信息



3:avro-client实例
执行bin/flume-ng会提示有命令如下
help                     display this help text
agent                     run aFlume agent
avro-client               run anavro Flume client
version                   show Flume version info
 
avro-clinet是avro客户端,可以把本地文件以avro序列化方式序列化后发送到指定的服务器端口。本例就是将131的一个文件一次性的发送到139中并打印。
Agent结构图如下



131启动的是一个avro-client,它会建立连接,发送数据,断开连接,它只是一个客户端。
启动一个avro客户端
bin/flume-ngavro-client --conf conf/ --host 192.168.79.139 --port 42424 --filename/usr/local/success.log --headerFile /usr/local/kv.log
 
--headerFile是用来区分是哪个服务器发送的数据,kv.log中的内容会被发送到139,可以作为标识来使用。
 
139的avro_client.conf如下
a1.sources=r1
a1.channels=c1
a1.sinks=k1
 
a1.sources.r1.type=avro 
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=42424
 
 
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
 
a1.sinks.k1.type=logger
 
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
 
启动agent
bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &
 
139控制台显示如下



可以看到headers的内容headers:{hostname=192.168.79.131}

注意:

1:Flume服务没有stop命令需要通过kill来杀掉进行,可以使用jps  -m来确认是那个agent的number
[root@shb01 conf]# jps -m
3610 Jps -m
3512 Application --conf-fileconf/exec_source.conf --name a1
 
2:修改flume的配置文件后如avro_client.conf,flume会自动重启
 
3:logger sink默认只显示16个字节
 
4:flume是以event为单位进行数据传输的,其中headers是一个map容器map<string,string>
Event: { headers:{hostname=192.168.79.131}body: 31 61                                           1a }
 
5:flume支持多节点关联但是sink和source的类型要一致,比如avro-client发送数据那么接收方的source也必须是avro否则会警告。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: