您的位置:首页 > Web前端 > JavaScript

(一个)kafka-jstorm集群实时日志分析 它 ---------kafka实时日志处理

2015-07-01 18:34 661 查看
package com.doctor.logbackextend;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.commons.lang.RandomStringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* zookeeper 和kafka环境准备好。

本地端口号默认设置
*
* @author doctor
*
* @time   2014年10月24日 下午3:14:01
*/
public class KafkaAppenderTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class);

/** 先启动此測试方法,模拟log日志输出到kafka */
@Test
public void test_log_producer() {
while(true){
LOG.info("test_log_producer : "  + RandomStringUtils.random(3, "hello doctro,how are you,and you"));
}
}

/** 再启动此測试方法。模拟消费者获取日志,进而分析,此方法不过打印打控制台,不是log。防止模拟log測试方法数据混淆 */
@Test
public void test_comsumer(){
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
props.put("group.id", "kafkatest-group");
//		props.put("zookeeper.session.timeout.ms", "400");
//		props.put("zookeeper.sync.time.ms", "200");
//		props.put("auto.commit.interval.ms", "1000");
ConsumerConfig paramConsumerConfig = new ConsumerConfig(props );
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig );

Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("kafka-test", new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test");

for (KafkaStream<byte[], byte[]> stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(new String("test_comsumer: " + new String(it.next().message())));
}

}

}


为了实时日志处理互联网系统的日志,对于电商来说具有非常重要的意义,比方,淘宝购物时候,你浏览某些商品的时候。系统后台实时日志处理分析后,系统能够向用户实时推荐给用户相关商品。来引导用户的选择等等。

为了实时日志处理。我们选择kafka集群,日志的处理分析选择jstorm集群,至于jstorm处理的结果,你能够选择保存到数据库里。入hbase、mysql。maridb等。

系统的日志接口选择了slf4j,logback组合,为了让系统的日志可以写入kafka集群,选择扩展logback Appender。在logback里配置一下。就行自己主动输出日志到kafka集群。

kafka的集群安装,在此不介绍了,为了模拟真实性,zookeeper本地集群也安装部署了。

以下是怎样扩展logback Appender

package com.doctor.logbackextend;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;

public class KafkaAppender extends AppenderBase<ILoggingEvent> {

private String topic;
private String zookeeperHost;

private String broker;
private Producer<String, String> producer;
private Formatter formatter;

public String getBroker() {
return broker;
}

public void setBroker(String broker) {
this.broker = broker;
}
@Override
protected void append(ILoggingEvent eventObject) {
String message = this.formatter.formate(eventObject);
this.producer.send(new KeyedMessage<String, String>(this.topic, message));

}

@Override
public void start() {
if (this.formatter == null) {
this.formatter = new MessageFormatter();
}

super.start();
Properties props = new Properties();
props.put("zk.connect", this.zookeeperHost);
props.put("metadata.broker.list", this.broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig config = new ProducerConfig(props);
this.producer = new Producer<String, String>(config);
}

@Override
public void stop() {
super.stop();
this.producer.close();
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getZookeeperHost() {
return zookeeperHost;
}

public void setZookeeperHost(String zookeeperHost) {
this.zookeeperHost = zookeeperHost;
}

public Producer<String, String> getProducer() {
return producer;
}

public void setProducer(Producer<String, String> producer) {
this.producer = producer;
}

public Formatter getFormatter() {
return formatter;
}

public void setFormatter(Formatter formatter) {
this.formatter = formatter;
}

/**
* 格式化日志格式
* @author doctor
*
* @time   2014年10月24日 上午10:37:17
*/
interface Formatter{
String formate(ILoggingEvent event);
}

public static class MessageFormatter implements Formatter{

@Override
public String formate(ILoggingEvent event) {

return event.getFormattedMessage();
}

}
}


对于日志的输出格式MessageFormatter没有特殊处理,由于仅仅是模拟一下,你能够制定你的格式,入json等。

在logback.xml的配置例如以下:

<appender name="kafka" class="com.doctor.logbackextend.KafkaAppender">
<topic>kafka-test</topic>
<!-- <zookeeperHost>127.0.0.1:2181</zookeeperHost> -->
<!-- <broker>127.0.0.1:9092</broker> -->
<zookeeperHost>127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183</zookeeperHost>
<broker>127.0.0.1:9092,127.0.0.1:9093</broker>
</appender>

<root level="all">
<appender-ref ref="stdout" />
<appender-ref ref="defaultAppender" />
<appender-ref ref="kafka" />
</root>


<zookeeperHost>
我本地启动了三个zookeer。依据配置。你能够知道是怎样配置的吧。

kafka集群的broker我配置了两个,都是在本地机器。

測试代码:

package com.doctor.logbackextend;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.commons.lang.RandomStringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* zookeeper 和kafka环境准备好。本地端口号默认设置
*
* @author doctor
*
* @time   2014年10月24日 下午3:14:01
*/
public class KafkaAppenderTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class);

/** 先启动此測试方法,模拟log日志输出到kafka */
@Test
public void test_log_producer() {
while(true){
LOG.info("test_log_producer : "  + RandomStringUtils.random(3, "hello doctro,how are you,and you"));
}
}

/** 再启动此測试方法,模拟消费者获取日志,进而分析,此方法不过打印打控制台,不是log。防止模拟log測试方法数据混淆 */
@Test
public void test_comsumer(){
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
props.put("group.id", "kafkatest-group");
//		props.put("zookeeper.session.timeout.ms", "400");
//		props.put("zookeeper.sync.time.ms", "200");
//		props.put("auto.commit.interval.ms", "1000");
ConsumerConfig paramConsumerConfig = new ConsumerConfig(props );
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig );

Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("kafka-test", new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test");

for (KafkaStream<byte[], byte[]> stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(new String("test_comsumer: " + new String(it.next().message())));
}

}

}


结果,明天再附上截图。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: