您的位置:首页 > 其它

flume学习03-flume收集log4j日志

2015-02-13 17:18 253 查看
前几篇已经介绍了flume相关的知识,包括flume架构、如何配置启动以及暂停,接下去主要说说flume接受log4j日志。

前面介绍过了log4j需要用avro的方式讲日志传递到flume,所以我们的配置都是基于avro的source

log4j日志输出到flume的console

修改配置文件log4j-agent.properties

# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0 #
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

agent.sources = so1
agent.channels = c1
agent.sinks = s1

# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30

# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.s1.type = logger

#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# agent.channels.memoryChannel.capacity = 100


启动./start.sh log4j-agent.properties agent

写一段简单的代码用log4j输出log,代码如下:

import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class WriteLog {

protected static final Log logger = LogFactory.getLog("xxx");

public static void main(String[] args) throws InterruptedException {
while (true) {
// 每隔两秒log输出一下当前系统时间戳
logger.info(new Date().getTime());
Thread.sleep(1000);
}
}
}


那么我们该如何配置log4j.xml呢,其实只要加一个appender

<appender name="flume-avro"
class="org.apache.flume.clients.log4jappender.Log4jAppender">
<param name="Hostname" value="localhost" />
<param name="Port" value="44444" />
<param name="UnsafeMode" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
</layout>
</appender>


接下去运行那么main函数,看flume的输出

2015-02-13 16:40:33,027 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816832971"}, "body": {"bytes": "2015-02-13 16:40:32 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [289ms] 1423816832971"}}
2015-02-13 16:40:33,031 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816832971} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:34,052 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816834050"}, "body": {"bytes": "2015-02-13 16:40:34 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [1368ms] 1423816834050"}}
2015-02-13 16:40:34,053 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816834050} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:35,056 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816835053"}, "body": {"bytes": "2015-02-13 16:40:35 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [2371ms] 1423816835053"}}
2015-02-13 16:40:35,056 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816835053} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:36,059 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816836057"}, "body": {"bytes": "2015-02-13 16:40:36 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [3375ms] 1423816836057"}}
2015-02-13 16:40:36,059 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8, flume.client.log4j.logger.name=xxx, flume.client.log4j.timestamp=1423816836057} body: 32 30 31 35 2D 30 32 2D 31 33 20 31 36 3A 34 30 2015-02-13 16:40 }
2015-02-13 16:40:37,064 (New I/O  worker #1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:347)] Avro source so1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "xxx", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1423816837062"}, "body": {"bytes": "2015-02-13 16:40:37 [XXX.flume.test.log.WriteLog.main(WriteLog.java:15)] [4380ms] 1423816837062"}}


这就是将log4j的日志输出到了flume的console中了

flume多节点输出

之前说的都是flume单机,那么我们如果日志量比较大,或者很多系统产生日志,那么flume这一层我们就需要多个节点,其实比较简单,找多台机器安装flume,然后只要配置下log4j.xml

<appender name="flume-load-balancing-avro"
class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender">
<param name="Hosts"
value="xxx:12306 xxx:12306 xxx:12306" />
<param name="Selector" value="ROUND_ROBIN" />
<param name="MaxBackoff" value="2000" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
</layout>
</appender>

<appender name="flume-async" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="256" />
<appender-ref ref="flume-load-balancing-avro" />
</appender>


可以自己测试是不是多节点在工作,这里其实有很多种算法可选,具体可以看我flume学习01-flume介绍

log4j日志输出到flume存入hbase

这里只说多节点的输出

修改配置文件

# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0 #
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

agent.sources = avroSource
agent.channels = memChannel
agent.sinks = hbaseSink

# For each one of the sources, the type is defined
agent.sources.avroSource.type = avro
agent.sources.avroSource.bind = 0.0.0.0
agent.sources.avroSource.port = 12306

# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hbaseSink.type = hbase
agent.sinks.hbaseSink.table = flume
agent.sinks.hbaseSink.columnFamily = cf
agent.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 1000
agent.channels.memChannel.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.avroSource.channels = memChannel
agent.sinks.hbaseSink.channel = memChannel


在多个节点上同样这么配置然后启动

log4j.xml

<appender name="flume-load-balancing-avro"
class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender">
<param name="Hosts"
value="xxx:12306 xxx:12306 xxx:12306" />
<param name="Selector" value="ROUND_ROBIN" />
<param name="MaxBackoff" value="2000" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%l] [%rms] %m" />
</layout>
</appender>

<appender name="flume-async" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="256" />
<appender-ref ref="flume-load-balancing-avro" />
</appender>


运行刚刚那个java main函数就可以看到日志输出到hbase中了

如果想要进入hbase格式按照自己定义的,那么需要自定义HbaseEventSerializer,这里给个简单例子

public class XXXHbaseEventSerializer implements HbaseEventSerializer {

private static final Map<String, byte[]> LOG_LEVEL = new HashMap<String, byte[]>();
static {
LOG_LEVEL.put("10000", "DEBUG".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("20000", "INFO".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("30000", "WARN".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("40000", "ERROR".getBytes(Charsets.UTF_8));
LOG_LEVEL.put("50000", "FATAL".getBytes(Charsets.UTF_8));
}

// 列族
private byte[] cf;
// 消息等级列名
private byte[] levelCol;
// 消息内容列名
private byte[] plCol;
// 消息内容
private byte[] payload;
// 日志级别
private byte[] level;
// 系统标示
private String systemId;
// 日志时间
private String timestamp;

private byte[] incCol;

private byte[] incrementRow;

public XXXHbaseEventSerializer() {

}

@Override
public void configure(Context context) {
this.levelCol = "l".getBytes(Charsets.UTF_8);
this.plCol = "m".getBytes(Charsets.UTF_8);
this.incCol = "iCol".getBytes(Charsets.UTF_8);
incrementRow = "incRow".getBytes(Charsets.UTF_8);
}

@Override
public void configure(ComponentConfiguration conf) {
}

@Override
public void initialize(Event event, byte[] cf) {
this.cf = cf;

Map<String, String> headers = event.getHeaders();
/**
* 日志级别,10000表示DEBUG;20000表示INFO;30000表示WARN;40000表示ERROR
*/
this.level = LOG_LEVEL.get(headers.get("flume.client.log4j.log.level"));
/**
* 系统ID
*/
this.systemId = headers.get("flume.client.log4j.logger.name");
/**
* 时间戳,格式:1421995677371
*/
this.timestamp = headers.get("flume.client.log4j.timestamp");
/**
* 日志内容
*/
this.payload = event.getBody();

//      /**
//       * 转换成String对象后,希望得到的消息结构:<systemId>\t<日志内容>
//       */
//      String body = new String(event.getBody(), Charsets.UTF_8);
//      int index = body.indexOf("\t");
//
//      if (index == -1) {
//          this.systemId = "default";
//          this.payload = body.getBytes(Charsets.UTF_8);
//      } else {
//          this.systemId = body.substring(0, index);
//          this.payload = body.substring(index + 1).getBytes(Charsets.UTF_8);
//      }
}

@Override
public List<Row> getActions() throws FlumeException {
List<Row> actions = new LinkedList<Row>();
if (plCol != null) {
byte[] rowKey;
try {
rowKey = XXXRowKeyGenerator.getRowKey(systemId, timestamp);
Put put = new Put(rowKey);
put.add(cf, plCol, payload);
put.add(cf, levelCol, level);
actions.add(put);
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}

}
return actions;
}

@Override
public List<Increment> getIncrements() {
List<Increment> incs = new LinkedList<Increment>();
if (incCol != null) {
Increment inc = new Increment(incrementRow);
inc.addColumn(cf, incCol, 1);
incs.add(inc);
}
return incs;
}

@Override
public void close() {
}


配置文件修改一行:

up-agent.sinks.hbaseSink.serializer = xxx.xxx.xxx.xxx.XXXHbaseEventSerializer


说明:

根据自己的需求配置flume的配置文件,可以多查查,自己先想好自己的flume拓扑结构,根据这个去配置flume

flume配置比较多,功能也比较强大,可以参照flume的用户手册
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: