您的位置:首页 > 其它

Flume介绍、安装部署、如何使用

2019-08-09 08:39 99 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/greenplum_xiaofan/article/details/98894712

文章目录

  • 2、安装
  • 3、NetCat Source,Sink Logger
  • 4、Exec Source,Sink Hdfs
  • 5、Spooling Directory Source、Sink Hdfs
  • 6、Taildir Source、Sink Hdfs
  • 1、Flume介绍

    1.1 下载的版本

    本章我们介绍的是CDH版本的Flume,不是Apache版本的。
    附上下载和User Guide地址:
    http://archive.cloudera.com/cdh5/cdh/5/
    http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html#
    Apache开源的Flume:http://flume.apache.org/

    下载的版本:

    flume-ng-1.6.0-cdh5.7.0.tar.gz
    (跟cdh版本一致)
    NG:1.x的版本,现在主要使用这个版本的
    OG:0.9x版本,这个基本不会用了。

    1.2 Flume特性

    Flume是一个分布式的、高可靠的、高可用的将大批量的不同数据源的日志数据收集、聚合、移动到数据中心进行存储的系统。即是日志采集和汇总的工具。
    像Logstash、FileBeat是ES栈的日志数据抽取工具,它们和Flume很类似,前者是轻量级、后者是重量级,若项目组使用的是ES,可以考虑使用它们。

    Flume核心三大组件:

    • Source(负责数据源的采集):Fluem提供了各种各样的Source、比如Taildir Source、NetCat、exec、Spooling Directory,同时还可以自定义Source。
    • Channel(负责缓存Source来的数据):主要是memory channel和File chnannel(生产常用)
    • Sink(负责将Channel里面的数据写入目标):如写入hdfs(批处理)、kafka(流处理)

    Agent:,你可以理解他就是Flume节点,由上面三大组件组成。每一台Flume Agent都会设置一个自己的名字,后面的配置再讲。
    Event:Flume数据传输的最小单位,一个Event就是一条记录,由head和body两个部分组成,body存储的是字节数组和实际数据。

    Event: { headers:{} body: 31 37 20 69 20 6C 6F 76 65 20 79 6F 75 0D       17 i love you. }

    1.3 Flume Agent框架

    单Agent:

    串联Agent:

    并联Agent(生产常用):

    多Sink Agent也很常用:

    2、安装

    [hadoop@vm01 software]$ tar -zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C ../app/

    配置环境变量

    [hadoop@vm01 apache-flume-1.6.0-cdh5.7.0-bin]$ vi ~/.bash_profile
    export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
    export PATH=$FLUME_HOME/bin:$PATH
    
    [hadoop@vm01 apache-flume-1.6.0-cdh5.7.0-bin]$ source ~/.bash_profile

    配置

    flume-env.sh
    文件

    [hadoop@vm01 conf]$ pwd
    /home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin/conf
    
    [hadoop@vm01 conf]$ cp flume-env.sh.template  flume-env.sh
    [hadoop@vm01 conf]$ vi flume-env.sh
    export JAVA_HOME=/usr/java/jdk1.8.0_45

    3、NetCat Source,Sink Logger

    3.1 配置

    NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。
    Logger:就是控制台类型的Sink
    如何配置Source,可以产看cdh官网,里面写得非常详细
    http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html#netcat-source

    [hadoop@vm01 conf]$ vi example.conf
    # example.conf: A single-node Flume configuration
    # Name the components on this agent
    #a1 表示agent名字,其他同理
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    #capatity,channel的存储最大event数,生产至少10万条,transationCapacity最多达到多少条必须提交事务
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    #三个组件链路 连通
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    3.2 启动、测试

    [hadoop@vm01 bin]$ pwd
    /home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin/bin
    
    # a1  是你配置的agent名字
    # --conf  指定conf的目录
    # --conf-file  指定你的conf配置文件
    # 最后一行是为了方便观察输出INFO日志到控制台,可以去掉
    [hadoop@vm01 bin]$ flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/example.conf \
    -Dflume.root.logger=INFO,console

    克隆一台出来,然后telnet测试

    #如果你系统已经有了telnet,这一步可以跳过
    [root@vm01 ~]# yum install telnet-server
    [root@vm01 ~]# yum install telnet.*

    退出telnet,

    ctrl+]
    进入telnet模式,然后quit退出

    telnet> quit
    Connection closed.
    [hadoop@vm01 ~]$


    4、Exec Source,Sink Hdfs

    Exec 就是在源端执行某个操作,这里使用

    tail -F
    数据文件进行数据采集。
    虽然此种Tail方式可以将日志数据采集到hdfs,但是tail -F进程挂了咋办,不还是会丢数据!生产上是行不通的,无法做到高可用。
    其次上面的采集流程并未解决生成大量小文件的问题,无法做到高可靠。
    Tail只能监控一个文件,生产中更多的是监控一个文件夹。不满足需求。

    [hadoop@vm01 conf]$ vi exec.conf
    
    # exec.conf: A single-node Flume configuration
    # Name the components on this agent
    exec-hdfs-agent.sources = exec-source
    exec-hdfs-agent.sinks = hdfs-sink
    exec-hdfs-agent.channels = memory-channel
    
    # Describe/configure the source
    exec-hdfs-agent.sources.exec-source.type = exec
    exec-hdfs-agent.sources.exec-source.command = tail -F /home/hadoop/data/test.log
    exec-hdfs-agent.sources.exec-source.shell = /bin/sh -c
    
    # Describe the sink
    exec-hdfs-agent.sinks.hdfs-sink.type = hdfs
    exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://vm01:9000/flume/exec
    exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream
    exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
    
    # Use a channel which buffers events in memory
    exec-hdfs-agent.channels.memory-channel.type = memory
    exec-hdfs-agent.channels.memory-channel.capacity = 1000
    exec-hdfs-agent.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    exec-hdfs-agent.sources.exec-source.channels = memory-channel
    exec-hdfs-agent.sinks.hdfs-sink.channel = memory-channel

    启动

    flume-ng agent \
    --name exec-hdfs-agent \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/exec.conf \
    -Dflume.root.logger=INFO,console

    测试

    [hadoop@vm01 data]$ echo "Hello Flume">>test.log
    [hadoop@vm01 data]$ echo "Hello Hadoop">>test.log
    
    [hadoop@vm01 ~]$ hdfs dfs -cat /flume/exec/*
    Hello Flume
    Hello Hadoop

    5、Spooling Directory Source、Sink Hdfs

    Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成

    • 写到hdfs上的文件大小最好是100M左右,略低于blockSize的大小。
    • 一般使用rollInterval(时间)、rollSize(大小)来控制文件的生成,哪个先触发就会生成hdfs文件,将根据条数的roll关闭。
    • rollSize控制的大小是指的压缩前的,所以若hdfs文件使用了压缩,需调大rollsize的大小。
    • 当文件夹下的某个文件被采集到hdfs上,会有个.complete标志。
    • 使用Spooling Directory Source采集文件数据时若该文件数据已经被采集,再对该文件做修改是会报错的停止的,其次若放进去一个已经完成采集的同名数据文件也是会报错停止的。
    • 写hdfs数据可按照时间分区,注意若该时间刻度内无数据则不会生成该时间文件夹。
    • 生成的文件名称默认是前缀+时间戳,这个是可以更改的。

    虽然能监控一个文件夹,但是无法监控递归的文件夹中的数据。
    若采集时Flume挂了,无法保证重启时还继续从之前文件读取的哪一行继续采集数据。

    [hadoop@vm01 conf]$ vi spool.conf
    
    # spool.conf: A single-node Flume configuration
    # Name the components on this agent
    spool-hdfs-agent.sources = spool-source
    spool-hdfs-agent.sinks = hdfs-sink
    spool-hdfs-agent.channels = memory-channel
    
    # Describe/configure the source
    spool-hdfs-agent.sources.spool-source.type = spooldir
    spool-hdfs-agent.sources.spool-source.spoolDir = /home/hadoop/data/flume/spool/input
    
    # Describe the sink
    spool-hdfs-agent.sinks.hdfs-sink.type = hdfs
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://vm01:9000/flume/spool/%Y%m%d%H%M
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = gzip
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = wsk
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 100000000
    spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    spool-hdfs-agent.channels.memory-channel.type = memory
    spool-hdfs-agent.channels.memory-channel.capacity = 1000
    spool-hdfs-agent.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    spool-hdfs-agent.sources.spool-source.channels = memory-channel
    spool-hdfs-agent.sinks.hdfs-sink.channel = memory-channel

    启动、测试

    [hadoop@vm01 data]$ mkdir -p flume/spool/input/
    flume-ng agent \
    --name spool-hdfs-agent \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/spool.conf \
    -Dflume.root.logger=INFO,console
    [hadoop@vm01 input]$ ll
    #创建一个文件,如果这个文件的数据读取完成了,那么就会加个 .COMPLETED
    -rw-rw-r--. 1 hadoop hadoop 13 Aug  8 17:03 1.log.COMPLETED
    
    [hadoop@vm01 ~]$ hdfs dfs -ls /flume/spool
    #根据你当天的时间,创建文件目录
    drwxr-xr-x   - hadoop supergroup          0 2019-08-08 17:04 /flume/spool/201908081704
    
    [hadoop@vm01 ~]$ hdfs dfs -ls /flume/spool/201908081704
    -rw-r--r--   3 hadoop supergroup         33 2019-08-08 17:05
    /flume/spool/201908081704/wsk.1565309095584.gz
    
    [hadoop@vm01 ~]$ hdfs dfs -text /flume/spool/201908081704/*
    hello hadoop

    6、Taildir Source、Sink Hdfs

    • Taildir Source是Apache flume 1.7新推出的,但是CDH Flume 1.6做了集成。
    • Taildir Source是高可靠(reliable)的source,它会实时的将文件偏移量写到json文件中并保存到磁盘。下次重启Flume时会读取Json文件获取文件O偏移量,然后从之前的位置读取数据,保证数据不丢失。
    • Taildir Source 可同时监控多个文件夹以及文件,但无法递归采集文件目录下数据,这需要改造源码
    • Taildir Source监控一个文件夹下所有的文件,一定要使用
      .*
    [hadoop@vm01 conf]$ vi taildir.conf
    
    # taildir.conf: A single-node Flume configuration
    # Name the components on this agent
    taildir-hdfs-agent.sources = taildir-source
    taildir-hdfs-agent.sinks = hdfs-sink
    taildir-hdfs-agent.channels = memory-channel
    
    # Describe/configure the source
    taildir-hdfs-agent.sources.taildir-source.type = TAILDIR
    taildir-hdfs-agent.sources.taildir-source.filegroups = f1
    taildir-hdfs-agent.sources.taildir-source.filegroups.f1 = /home/hadoop/data/flume/taildir/input/.*
    taildir-hdfs-agent.sources.taildir-source.positionFile = /home/hadoop/data/flume/taildir/taildir_position/taildir_position.json
    
    # Describe the sink
    taildir-hdfs-agent.sinks.hdfs-sink.type = hdfs
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://vm01:9000/flume/taildir/%Y%m%d%H%M
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = gzip
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = wsk
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 100000000
    taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    taildir-hdfs-agent.channels.memory-channel.type = memory
    taildir-hdfs-agent.channels.memory-channel.capacity = 1000
    taildir-hdfs-agent.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    taildir-hdfs-agent.sources.taildir-source.channels = memory-channel
    taildir-hdfs-agent.sinks.hdfs-sink.channel = memory-channel

    启动、测试

    [hadoop@vm01 flume]$ mkdir -p  taildir/input/
    flume-ng agent \
    --name taildir-hdfs-agent \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/taildir.conf \
    -Dflume.root.logger=INFO,console
    [hadoop@vm01 input]$ ll
    total 8
    -rw-rw-r--. 1 hadoop hadoop 12 Aug  8 17:24 1.log
    -rw-rw-r--. 1 hadoop hadoop 13 Aug  8 17:25 2.log
    
    [hadoop@vm01 ~]$ hdfs dfs -ls /flume/taildir
    #每个文件对应一个
    drwxr-xr-x   - hadoop supergroup          0 2019-08-08 17:25 /flume/taildir/201908081724
    drwxr-xr-x   - hadoop supergroup          0 2019-08-08 17:25 /flume/taildir/201908081725
    
    [hadoop@vm01 ~]$ hdfs dfs -ls /flume/taildir/201908081724
    Found 1 items
    -rw-r--r--   3 hadoop supergroup         32 2019-08-08 17:25
    /flume/taildir/201908081724/wsk.1565310299113.gz
    [hadoop@vm01 ~]$ hdfs dfs -text /flume/taildir/201908081724/*
    hello flume
    
    [hadoop@vm01 ~]$ hdfs dfs -ls /flume/taildir/201908081725
    Found 2 items
    -rw-r--r--   3 hadoop supergroup         33 2019-08-08 17:25 /flume/taildir/201908081725/wsk.1565310307275.gz
    -rw-r--r--   3 hadoop supergroup        165 2019-08-08 17:26 /flume/taildir/201908081725/wsk.1565310357463.gz
    [hadoop@vm01 ~]$ hdfs dfs -text /flume/taildir/201908081725/wsk.1565310307275.gz
    hello hadoop
    [hadoop@vm01 ~]$ hdfs dfs -text /flume/taildir/201908081725/wsk.1565310357463.gz
    3210#"! U3hadoopvm01~hadoop/data/flume/taildir/input/2.log
    3210#"! U3hadoopvm01~hadoop/data/flume/taildir/input/1.log

    模拟下flume挂掉场景,此时1.log依然在写数据,看能否再次启动flume时,能从上次位置开始读取。
    先停掉flume
    然后往1.log写几条数据
    再次启动flume,查看hdfs

    #此时的背景,flume是停止的
    [hadoop@vm01 input]$ echo "Welcome to reconnect" >> 1.log

    启动flume后,在hdfs查看

    [hadoop@vm01 ~]$ hdfs dfs -ls /flume/taildir
    Found 3 items
    drwxr-xr-x   - hadoop supergroup          0 2019-08-08 17:25 /flume/taildir/201908081724
    drwxr-xr-x   - hadoop supergroup          0 2019-08-08 17:26 /flume/taildir/201908081725
    drwxr-xr-x   - hadoop supergroup          0 2019-08-08 17:34 /flume/taildir/201908081734
    [hadoop@vm01 ~]$ hdfs dfs -ls /flume/taildir/201908081734
    Found 1 items
    -rw-r--r--   3 hadoop supergroup         41 2019-08-08 17:35 /flume/taildir/201908081734/wsk.1565310883802.gz
    [hadoop@vm01 ~]$ hdfs dfs -text /flume/taildir/201908081734/*
    Welcome to reconnect
    [hadoop@vm01 ~]$
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: