您的位置:首页 > 运维架构 > Tomcat

Flume 日志收集、使用Flume收集日志到HDFS

2017-12-02 18:29 489 查看
第一章 概览与架构

1.1 源、通道与接收器

Flume代理的架构如下图:输入叫做源,输出叫作接收器。通过提供类源与接收器之间的胶水。它们都运行在叫做代理的守护进程中。



源将事件写到一个或者多个通道中。

通道作为事件从源到接收器传递的保留区。

接收器只能从一个通道接收事件。

代理可能会有多个源、通道与接收器。

1.2 Flume事件

Flume传输的基本的数据负载叫做事件。事件由0个或者多个头与体组成。

头是一些键值对,可以看作与HTTP头完成相同的功能——传递与体不同的额外信息。

体是个字节数组,包含类实际的负载。例如如果输入文件由日志文件组成,那么该数组就非常类似于包含了单行文本的UTF-8编码的字符串。

Flume可能会自动添加头(比如,源添加了数据来自的主机名或者创建了事件的时间戳),不过体基本上不会受影响,除非在中途使用拦截器对其进行编辑。

1.3 拦截器、通道选择器与选择处理器

拦截器值的是数据流中的一个点,可以在这里检查和修改Flume事件。可以在【源创建事件】后/【接收器发送事件】前链接0个或者多个拦截器。类似于SpringAOP中的MethodInterceptor和Java Servlet中的ServletFilter。

通道选择器负责将数据从一个源转向一个或者多个通道上。Flume自带两个通道选择器,分别是复制通道选择器和多路通道选择器。复制通道选择器(默认的)只是将时间事件的副本放到每个通道上,前提是你已经配置好了多个通道。多路通道选择器会根据某些头信息将事件写到不同的通道中。

输入处理器为输入器创建故障恢复路径,或者是跨越一个通道的多个输入器创建负载均衡时间。

1.4 多层数据收集(多数据流与代理)

拦截器、通道选择器等的组合可以实现复杂的多层数据收集。

不同的数据源可以发送到同一个通道上,同一个数据源可以通过通道选择器转向一个或者多个通道上,由此构成了复杂的有向无环网状结构。

第二章 Flume快速起步

2.1 下载Flume

2.2 Flume配置文件概览

每个代理的配置都以如下3个参数开始:

agent.sources=

agent.chanels=

agent.sinks

分别对应源、通道和接收器

2.3 Hello,World

在flume的conf目录(~/apache-flume-1.5.0-bin/conf$)下新建一个hello.conf文件,内容如下:

#源:s1

agent.sources=s1

#通道:c1

agent.channels=c1

#接收器:k1

agent.sinks=k1

#源s1的类型为netcat,它会打开一个Socket监听事件(每个事件一行文本)。它需要#两个参数,分别是一个绑定IP与一个端口号。

agent.sources.s1.type=netcat

#源s1的绑定IP

agent.sources.s1.bind=0.0.0.0

#源s1的监听端口

agent.sources.s1.port=12345

#源s1的去向通道为通道c1

agent.sources.s1.channels=c1

#通道c1的类型为内存通道

agent.channels.c1.type=memory

#接收器k1的类型为日志

agent.sinks.k1.type=logger

#接收器k1的来向通道为c1

agent.sinks.k1.channel=c1

然后回到flume根目录(~/apache-flume-1.5.0-bin$),执行命令

./bin/flume-ng agent -n agent -c conf -f conf/flume.conf -Dflume.root.logger=INFO,console -Duser.timezone=UTC -Dflume.monitoring.type=http -Dflume.monitoring.port=44444

其中:

-Dflume.root.logger=INFO,console 属性覆盖了conf/log4j.propertites中的rootlogger,使用console追加器。如果没有覆盖,输出会被写到log/flume.log文件中。

-Duser.timezone=UTC 属性设置了时区(后面会提到)

-Dflume.monitoring.type=http 属性指定了监控类型(后面会提到)

-Dflume.monitoring.port=44444 属性指定了监控端口(后面会提到)

会看到如下输出:

2017-12-02 06:04:10,625 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting

2017-12-02 06:04:10,634 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/hello.conf

2017-12-02 06:04:10,645 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: agent

2017-12-02 06:04:10,645 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1

2017-12-02 06:04:10,645 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1

2017-12-02 06:04:10,656 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent]

2017-12-02 06:04:10,657 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels

2017-12-02 06:04:10,666 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel c1 type memory

2017-12-02 06:04:10,670 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1

2017-12-02 06:04:10,671 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source s1, type netcat

2017-12-02 06:04:10,681 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: k1, type: logger

2017-12-02 06:04:10,683 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel c1 connected to [s1, k1]

2017-12-02 06:04:10,689 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE}
}} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@20c5df70 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }

2017-12-02 06:04:10,697 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1

2017-12-02 06:04:10,754 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.

2017-12-02 06:04:10,756 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started

2017-12-02 06:04:10,760 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1

2017-12-02 06:04:10,761 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source s1

2017-12-02 06:04:10,762 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting

2017-12-02 06:04:10,775 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:12345]

2017-12-02 06:04:10,784 (conf-file-poller-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog

2017-12-02 06:04:10,831 (conf-file-poller-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26

2017-12-02 06:04:10,853 (conf-file-poller-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@0.0.0.0:44444

然后另外打开一个终端,输入命令nc localhost 12345进入消息发送状态,然后输入一条消息,并按回车,会看到出现OK。此时,刚才开启flume的窗口出现:

2017-12-02 06:08:08,780 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }

截图如下:



第三章 通道

3.1     内存通道(非持久化通道)

事件存储在内存中的通道。

好处:接收速度快(内存的速度要比磁盘快几个数量级)

坏处:代理失败(硬件问题、断电、JVM崩溃、Flume重启等)会导致代理失败。

设置:

必须设置agent.channels.c1.type=memory

可选

capacity:int 默认值100 默认通道容量

transactionCapacity单个事务中可以写入的最大事件数量

byteCapacityBufferPercentage和byteCapacity,使用字节而非事件数量来作为调整内存通道大小的方式

keep-alive 通道已满的情况下,线程将数据写入到通道的等待时间,超时将放弃此数据

3.2     文件通道(持久化通道)

事件存储到本地系统中的通道。

优点:数据流不会出现缺口、支持加密

缺点:较慢

设置:

必须设置agent.channels.c1.type=file

可选

checkpointDir

dataDirs

用来设置Flume代理持有数据的位置

capacity

keep-alive

transactionCapacity

同上

checkpointInterval 两个检查点之间间隔的毫秒数

write-timeout 写入的超时时间

maxFileSize 最大文件大小

minimumRequiredSpace不想用作日志的空间数量

第四章     接收器与接收处理器

4.1     HDFS接收器

agent.sinks.k1.type=hdfs

4.1.1   路径与文件名

agent.sinks.k1.hdfs.path=/logs/chenjie/web/%Y%m%d       指定路径(Flume支持各种基于时间的转义序列)

此外还支持一种转移序列机制:在路径中使用Flume头值的功能,例如有个键为logType的头,那么就能使用:

agent.sinks.k1.hdfs.path=/logs/chenjie/web/%{logType}/%Y%m%d

agent.sinks.k1.hdfs.filePrefix=access   指定前缀

agent.sinks.k1.hdfs.fileSuffix=.log     指定后缀

4.1.2   文件转储

默认情况下,Flume会每隔30s、10个事件或者是1024字节来转储写入的文件。

如果希望每分钟转储一次:

agent.sinks.k1.hdfs.rollInterval=60

agent.sinks.k1.hdfs.rollCount=0

agent.sinks.k1.hdfs.rollSize=0

4.2     压缩编解码器

agent.sinks.k1.hdfs.codeC=gzip

4.3     事件序列化器

4.3.1   文本输出

4.3.2   带有头信息的文本

4.3.3   Apache Avro

4.3.4   文件类型

4.3.5   超时设置与线程池

4.4     接收器组

为了消除单点失败。

agent.sinkgroups=sg1

agent.sinkgroups.sg1.sinks=k1,k2

4.4.1   负载均衡

如果想要均衡地对k1和k2的流量进行负载均衡

processor.type load_balance循环选择/round_robin/random

4.4.2   故障恢复

在使用某个接收器时,如果其不可用,那么你希望能够使用其它的接收器。

agent.sinkgroups.sg1.sinks=k1,k2,k3

agent.sinkgroups.sg1.processor.type=failover

agent.sinkgroups.sg1.processor.priority.k1=10

agent.sinkgroups.sg1.processor.priority.k2=210

agent.sinkgroups.sg1.processor.priority.k3=30

数字任意,越小越优先

第五章     源与通道选择器

5.1     exec源

在flume的conf文件夹下新建一个exec.conf文件,内容如下:

#源:s1

agent.sources=s1

#通道:c1

agent.channels=c1

#接收器:k1

agent.sinks=k1

#源s1的类型为exec源

agent.sources.s1.type=exec

#源s1的exec源 命令

agent.sources.s1.command=tail -F /home/chenjie/app.log

#源s1的去向通道为通道c1

agent.sources.s1.channels=c1

#通道c1的类型为内存通道

agent.channels.c1.type=memory

#接收器k1的类型为日志

agent.sinks.k1.type=logger

#接收器k1的来向通道为c1

agent.sinks.k1.channel=c1

到flume根目录下启动:

./bin/flume-ng agent -n agent -c conf -f conf/exec.conf -Dflume.root.logger=INFO,console -Duser.timezone=UTC -Dflume.monitoring.type=http -Dflume.monitoring.port=44444

打开/home/chenjie/app.log文件,输入文本,看到flume窗口感知到变化。

2017-12-02 07:07:11,959 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 69 20 6C 6F 76 65 20 79 6F 75                   i love you }

截图如下



5.2     假脱机目录源

5.3     syslog源

用作操作系统级的机制来捕获和移动系统日志

5.3.1   syslog UDP源

5.3.2   syslog TCP源

5.3.3   多端口 syslog UDP源

5.4     通道选择器

第六章     拦截器、ETL与路由

6.1     拦截器

6.1.1   Timestamp

在头部添加时间戳

6.1.2   Host

6.1.3   Static

agent.sources.s1.interceptors.i2.type=static

agent.sources.s1.interceptors.i2.key=author

agent.sources.s1.interceptors.i2.value=CHENJIE

用于向每个处理的Flume事件插入任意单个的键值头

6.1.4   正则表达式过滤

根据内容体的内容来过滤事件

6.1.5   正则表达式抽取

将事件体的内容抽取出来并放到Flume头中以便可以通过通道选择器执行路由。

6.1.6   自定义拦截器

6.2     数据流分层

6.2.1   Avro源/接收器

6.2.2   命令行Avro

6.2.3   Log4J追加器

6.2.4   负载均衡Log4J追加器

6.3     路由

第7章 监控Flume

7.1     监控代理进程

7.1.1   Monit

7.1.2   Nagios

7.2     监控性能度量情况

7.2.1   Ganglia

7.2.2   内部HTTP服务器

-Dflume.monitoring.type=http 属性指定了监控类型(前面提到)

-Dflume.monitoring.port=44444 属性指定了监控端口(前面提到)

访问http://localhost:44444/metrics可以监控Flume

{"SINK.k1":{"ConnectionCreatedCount":"3","ConnectionClosedCount":"2","Type":"SINK","BatchCompleteCount":"0","BatchEmptyCount":"21","EventDrainAttemptCount":"19","StartTime":"1512191883957","EventDrainSuccessCount":"19","BatchUnderflowCount":"3","StopTime":"0","ConnectionFailedCount":"0"},"CHANNEL.c1":{"ChannelCapacity":"1000000","ChannelFillPercentage":"0.0","Type":"CHANNEL","ChannelSize":"0","EventTakeSuccessCount":"19","EventTakeAttemptCount":"43","StartTime":"1512191883951","EventPutAttemptCount":"19","EventPutSuccessCount":"19","StopTime":"0"},"SOURCE.s1":{"EventReceivedCount":"19","AppendBatchAcceptedCount":"0","Type":"SOURCE","EventAcceptedCount":"19","AppendReceivedCount":"0","StartTime":"1512191883960","AppendAcceptedCount":"0","OpenConnectionCount":"0","AppendBatchReceivedCount":"0","StopTime":"0"}}

7.2.3   自定义监控钩子   

最后,使用flume进行tomcat日志监控,并将结果写入HDFS的完整示例如下。

1、新建tomcat_log.conf

agent.sources=s1
agent.channels=c1
agent.sinks=k1
agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /media/chenjie/0009418200012FF3/ubuntu/apache-tomcat-7.0.82/logs/localhost_access_log.2017-12-02.txt
agent.sources.s1.channels=c1

agent.channels.c1.type=file
agent.sinks.k1.type=hdfs
agent.sinks.k1.hdfs.path=/logs/chenjie/web/%Y%m%d
agent.sinks.k1.hdfs.rollInterval=60
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.rollSize=0
agent.sinks.k1.hdfs.filePrefix=access
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.writeType=text
agent.sinks.k1.channel=c1

agent.sources.s1.interceptors=i1 i2
agent.sources.s1.interceptors.i1.type=timestamp
agent.sources.s1.interceptors.i1.preserveExisting=true
agent.sources.s1.interceptors.i2.type=static
agent.sources.s1.interceptors.i2.key=author
agent.sources.s1.interceptors.i2.value=CHENJIE


2、启动Flume

./bin/flume-ng agent -n agent -c conf -f conf/tomcat_log.conf -Dflume.root.logger=INFO,console -Duser.timezone=UTC -Dflume.monitoring.type=http -Dflume.monitoring.port=44444


3、启动tomcat

4、在tomcat中访问部署在其中的web程序(tomcat会在日志文件中写入此操作)


127.0.0.1 - - [02/Dec/2017:16:44:49 +0800] "GET / HTTP/1.1" 200 11217
0:0:0:0:0:0:0:1 - - [02/Dec/2017:16:44:50 +0800] "GET /CJHadoopOnline/ HTTP/1.1" 200 281
0:0:0:0:0:0:0:1 - - [02/Dec/2017:16:55:03 +0800] "GET /CJHadoopOnline HTTP/1.1" 302 -
0:0:0:0:0:0:0:1 - - [02/Dec/2017:16:55:03 +0800] "GET /CJHadoopOnline/ HTTP/1.1" 200 281
0:0:0:0:0:0:0:1 - - [02/Dec/2017:16:55:17 +0800] "GET /CJHadoopOnline/login.jsp HTTP/1.1" 200 450


5、观察HDFS中的文件



6、使用程序读取HDFS中的文件

import org.apache.spark.{SparkConf, SparkContext}
object FlumeTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CJDouban").setMaster("local")
val sc = new SparkContext(sparkConf)
val log = sc.textFile("hdfs://pc1:9000/logs/chenjie/web/20171202")
log.foreach(println)
}
}




      
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Flume 日志 Tomcat HDFS 收集