您的位置:首页 > 移动开发

Flume Log4J Appender Flume收集Log4j日志

2016-01-06 00:00 489 查看
摘要: Flume Log4J Appender :Flume收集Log4j日志 发送到kafka;

Flume 收集Log4j日志并且发送到kafka简单示例:

第一:配置flume ,启动Flume; flume 配置文件如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type=avro
a1.sources.r1.bind=localhost
a1.sources.r1.port=4444

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList =192.168.1.12:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

kafka.producer.type=sync
kafka.partitioner.class=org.apache.flume.plugins.SinglePartition

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1


第二:测试工程:

创建工程,引入flume-ng-log4jappender-1.6.0-jar-with-dependencies.jar。



代码:

package com.ls.flume;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class FlumeLogAppender {
private static final Log logger = LogFactory.getLog(FlumeLogAppender.class);

public static void main(String[] args) {
int i = 0;
while (true) {
logger.info("Hello world ! 这是一个测试消息" + i);
System.out.println(i++);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

2. log4j.properties

log4j.category.com.ls=INFO,flume
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = localhost
log4j.appender.flume.Port = 4444
log4j.appender.flume.UnsafeMode = false
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n

第三:遇到的错误:

编译时报以下错误,不过不影响执行效果。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决方法:添加slf4j-nop-1.7.12.jar ;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息