flume学习03-flume收集log4j日志
2015-02-13 17:18
253 查看
前几篇已经介绍了flume相关的知识,包括flume架构、如何配置启动以及暂停,接下去主要说说flume接受log4j日志。
前面介绍过了log4j需要用avro的方式讲日志传递到flume,所以我们的配置都是基于avro的source
启动./start.sh log4j-agent.properties agent
写一段简单的代码用log4j输出log,代码如下:
那么我们该如何配置log4j.xml呢,其实只要加一个appender
接下去运行那么main函数,看flume的输出
这就是将log4j的日志输出到了flume的console中了
可以自己测试是不是多节点在工作,这里其实有很多种算法可选,具体可以看我flume学习01-flume介绍
修改配置文件
在多个节点上同样这么配置然后启动
log4j.xml
运行刚刚那个java main函数就可以看到日志输出到hbase中了
如果想要进入hbase格式按照自己定义的,那么需要自定义HbaseEventSerializer,这里给个简单例子
配置文件修改一行:
说明:
根据自己的需求配置flume的配置文件,可以多查查,自己先想好自己的flume拓扑结构,根据这个去配置flume
flume配置比较多,功能也比较强大,可以参照flume的用户手册
前面介绍过了log4j需要用avro的方式讲日志传递到flume,所以我们的配置都是基于avro的source
log4j日志输出到flume的console
修改配置文件log4j-agent.properties# distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = so1 agent.channels = c1 agent.sinks = s1 # For each one of the sources, the type is defined agent.sources.so1.type = avro agent.sources.so1.bind = 0.0.0.0 agent.sources.so1.port = 44444 tier1.channels.channel1.keep-alive=30 # The channel can be defined as follows. # agent.sources.seqGenSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.s1.type = logger #Specify the channel the sink should use # agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.so1.channels = c1 agent.sinks.s1.channel = c1 # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel # agent.channels.memoryChannel.capacity = 100
启动./start.sh log4j-agent.properties agent
写一段简单的代码用log4j输出log,代码如下:
import java.util.Date; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class WriteLog { protected static final Log logger = LogFactory.getLog("xxx"); public static void main(String[] args) throws InterruptedException { while (true) { // 每隔两秒log输出一下当前系统时间戳 logger.info(new Date().getTime()); Thread.sleep(1000); } } }
那么我们该如何配置log4j.xml呢,其实只要加一个appender
<appender name="flume-avro" class="org.apache.flume.clients.log4jappender.Log4jAppender"> <param name="Hostname" value="localhost" /> <param name="Port" value="44444" /> <param name="UnsafeMode" value="true" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" /> </layout> </appender>
接下去运行那么main函数,看flume的输出
2015-02-13 16:40:33,027 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816832971"}, "body": {"bytes": "2015-02-13 16:40:32 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [289ms] 1423816832971"}} 2015-02-13 16:40:33,031 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816832971} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 } 2015-02-13 16:40:34,052 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816834050"}, "body": {"bytes": "2015-02-13 16:40:34 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [1368ms] 1423816834050"}} 2015-02-13 16:40:34,053 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816834050} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 } 2015-02-13 16:40:35,056 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816835053"}, "body": {"bytes": "2015-02-13 16:40:35 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [2371ms] 1423816835053"}} 2015-02-13 16:40:35,056 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816835053} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 } 2015-02-13 16:40:36,059 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816836057"}, "body": {"bytes": "2015-02-13 16:40:36 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [3375ms] 1423816836057"}} 2015-02-13 16:40:36,059 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816836057} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 } 2015-02-13 16:40:37,064 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816837062"}, "body": {"bytes": "2015-02-13 16:40:37 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [4380ms] 1423816837062"}}
这就是将log4j的日志输出到了flume的console中了
flume多节点输出
之前说的都是flume单机,那么我们如果日志量比较大,或者很多系统产生日志,那么flume这一层我们就需要多个节点,其实比较简单,找多台机器安装flume,然后只要配置下log4j.xml<appender name="flume-load-balancing-avro" class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender"> <param name="Hosts" value="xxx:12306 xxx:12306 xxx:12306" /> <param name="Selector" value="ROUND_ROBIN" /> <param name="MaxBackoff" value="2000" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" /> </layout> </appender> <appender name="flume-async" class="org.apache.log4j.AsyncAppender"> <param name="BufferSize" value="256" /> <appender-ref ref="flume-load-balancing-avro" /> </appender>
可以自己测试是不是多节点在工作,这里其实有很多种算法可选,具体可以看我flume学习01-flume介绍
log4j日志输出到flume存入hbase
这里只说多节点的输出修改配置文件
# distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = avroSource agent.channels = memChannel agent.sinks = hbaseSink # For each one of the sources, the type is defined agent.sources.avroSource.type = avro agent.sources.avroSource.bind = 0.0.0.0 agent.sources.avroSource.port = 12306 # The channel can be defined as follows. # agent.sources.seqGenSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.hbaseSink.type = hbase agent.sinks.hbaseSink.table = flume agent.sinks.hbaseSink.columnFamily = cf agent.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer #Specify the channel the sink should use # agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memChannel.type = memory agent.channels.memChannel.capacity = 1000 agent.channels.memChannel.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.avroSource.channels = memChannel agent.sinks.hbaseSink.channel = memChannel
在多个节点上同样这么配置然后启动
log4j.xml
<appender name="flume-load-balancing-avro" class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender"> <param name="Hosts" value="xxx:12306 xxx:12306 xxx:12306" /> <param name="Selector" value="ROUND_ROBIN" /> <param name="MaxBackoff" value="2000" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" /> </layout> </appender> <appender name="flume-async" class="org.apache.log4j.AsyncAppender"> <param name="BufferSize" value="256" /> <appender-ref ref="flume-load-balancing-avro" /> </appender>
运行刚刚那个java main函数就可以看到日志输出到hbase中了
如果想要进入hbase格式按照自己定义的,那么需要自定义HbaseEventSerializer,这里给个简单例子
public class XXXHbaseEventSerializer implements HbaseEventSerializer { private static final Map<String, byte[]> LOG_LEVEL = new HashMap<String, byte[]>(); static { LOG_LEVEL.put("10000", "DEBUG".getBytes(Charsets.UTF_8)); LOG_LEVEL.put("20000", "INFO".getBytes(Charsets.UTF_8)); LOG_LEVEL.put("30000", "WARN".getBytes(Charsets.UTF_8)); LOG_LEVEL.put("40000", "ERROR".getBytes(Charsets.UTF_8)); LOG_LEVEL.put("50000", "FATAL".getBytes(Charsets.UTF_8)); } // 列族 private byte[] cf; // 消息等级列名 private byte[] levelCol; // 消息内容列名 private byte[] plCol; // 消息内容 private byte[] payload; // 日志级别 private byte[] level; // 系统标示 private String systemId; // 日志时间 private String timestamp; private byte[] incCol; private byte[] incrementRow; public XXXHbaseEventSerializer() { } @Override public void configure(Context context) { this.levelCol = "l".getBytes(Charsets.UTF_8); this.plCol = "m".getBytes(Charsets.UTF_8); this.incCol = "iCol".getBytes(Charsets.UTF_8); incrementRow = "incRow".getBytes(Charsets.UTF_8); } @Override public void configure(ComponentConfiguration conf) { } @Override public void initialize(Event event, byte[] cf) { this.cf = cf; Map<String, String> headers = event.getHeaders(); /** * 日志级别,10000表示DEBUG;20000表示INFO;30000表示WARN;40000表示ERROR */ this.level = LOG_LEVEL.get(headers.get("flume.client.log4j.log.level")); /** * 系统ID */ this.systemId = headers.get("flume.client.log4j.logger.name"); /** * 时间戳,格式:1421995677371 */ this.timestamp = headers.get("flume.client.log4j.timestamp"); /** * 日志内容 */ this.payload = event.getBody(); // /** // * 转换成String对象后,希望得到的消息结构:<systemId>\t<日志内容> // */ // String body = new String(event.getBody(), Charsets.UTF_8); // int index = body.indexOf("\t"); // // if (index == -1) { // this.systemId = "default"; // this.payload = body.getBytes(Charsets.UTF_8); // } else { // this.systemId = body.substring(0, index); // this.payload = body.substring(index + 1).getBytes(Charsets.UTF_8); // } } @Override public List<Row> getActions() throws FlumeException { List<Row> actions = new LinkedList<Row>(); if (plCol != null) { byte[] rowKey; try { rowKey = XXXRowKeyGenerator.getRowKey(systemId, timestamp); Put put = new Put(rowKey); put.add(cf, plCol, payload); put.add(cf, levelCol, level); actions.add(put); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } @Override public List<Increment> getIncrements() { List<Increment> incs = new LinkedList<Increment>(); if (incCol != null) { Increment inc = new Increment(incrementRow); inc.addColumn(cf, incCol, 1); incs.add(inc); } return incs; } @Override public void close() { }
配置文件修改一行:
up-agent.sinks.hbaseSink.serializer = xxx.xxx.xxx.xxx.XXXHbaseEventSerializer
说明:
根据自己的需求配置flume的配置文件,可以多查查,自己先想好自己的flume拓扑结构,根据这个去配置flume
flume配置比较多,功能也比较强大,可以参照flume的用户手册
相关文章推荐
- FLume收集log4j日志配置实践
- Hadoop学习笔记之flume Nginx日志收集到HBase
- flume学习(五):使用hive来分析flume收集的日志数据
- flume学习(三):flume将log4j日志数据写入到hdfs(转)
- 分布式日志收集框架Flume学习笔记
- 学习总结二十二:flume是分布式的日志收集系统
- flume学习(一):log4j直接输出日志到flume
- flume学习(一):log4j直接输出日志到flume
- Apache Flume 分布式日志收集系统学习
- Flume Log4J Appender Flume收集Log4j日志
- flume学习(六):使用hive来分析flume收集的日志数据
- flume学习(五):flume将log4j日志数据写入到hdfs
- flume学习(六):使用hive来分析flume收集的日志数据
- flume学习(六):使用hive来分析flume收集的日志数据
- flume学习(一):log4j直接输出日志到flume
- flume学习(一):log4j直接输出日志到flume
- flume学习(三):flume将log4j日志数据写入到hdfs
- flume学习(五):flume将log4j日志数据写入到hdfs
- flume学习(二):flume将log4j日志数据写入到hdfs
- flume学习(六):使用hive来分析flume收集的日志数据