(一)kafka-jstorm集群实时日志分析 之 ---------kafka实时日志处理
2014-10-25 23:55
337 查看
package com.doctor.logbackextend; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.commons.lang.RandomStringUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * zookeeper 和kafka环境准备好。本地端口号默认设置 * * @author doctor * * @time 2014年10月24日 下午3:14:01 */ public class KafkaAppenderTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class); /** 先启动此测试方法,模拟log日志输出到kafka */ @Test public void test_log_producer() { while(true){ LOG.info("test_log_producer : " + RandomStringUtils.random(3, "hello doctro,how are you,and you")); } } /** 再启动此测试方法,模拟消费者获取日志,进而分析,此方法仅仅是打印打控制台,不是log,防止模拟log测试方法数据混淆 */ @Test public void test_comsumer(){ Properties props = new Properties(); props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); props.put("group.id", "kafkatest-group"); // props.put("zookeeper.session.timeout.ms", "400"); // props.put("zookeeper.sync.time.ms", "200"); // props.put("auto.commit.interval.ms", "1000"); ConsumerConfig paramConsumerConfig = new ConsumerConfig(props ); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig ); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put("kafka-test", new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test"); for (KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) System.out.println(new String("test_comsumer: " + new String(it.next().message()))); } } }
为了实时日志处理互联网系统的日志,对于电商来说具有很重要的意义,比如,淘宝购物时候,你浏览某些商品的时候,系统后台实时日志处理分析后,系统可以向用户实时推荐给用户相关商品,来引导用户的选择等等。
为了实时日志处理,我们选择kafka集群,日志的处理分析选择jstorm集群,至于jstorm处理的结果,你可以选择保存到数据库里,入hbase、mysql,maridb等。
系统的日志接口选择了slf4j,logback组合,为了让系统的日志能够写入kafka集群,选择扩展logback Appender,在logback里配置一下,就可以自动输出日志到kafka集群。
kafka的集群安装,在此不介绍了,为了模拟真实性,zookeeper本地集群也安装部署了。
下面是如何扩展logback Appender
package com.doctor.logbackextend; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.AppenderBase; public class KafkaAppender extends AppenderBase<ILoggingEvent> { private String topic; private String zookeeperHost; private String broker; private Producer<String, String> producer; private Formatter formatter; public String getBroker() { return broker; } public void setBroker(String broker) { this.broker = broker; } @Override protected void append(ILoggingEvent eventObject) { String message = this.formatter.formate(eventObject); this.producer.send(new KeyedMessage<String, String>(this.topic, message)); } @Override public void start() { if (this.formatter == null) { this.formatter = new MessageFormatter(); } super.start(); Properties props = new Properties(); props.put("zk.connect", this.zookeeperHost); props.put("metadata.broker.list", this.broker); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); this.producer = new Producer<String, String>(config); } @Override public void stop() { super.stop(); this.producer.close(); } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getZookeeperHost() { return zookeeperHost; } public void setZookeeperHost(String zookeeperHost) { this.zookeeperHost = zookeeperHost; } public Producer<String, String> getProducer() { return producer; } public void setProducer(Producer<String, String> producer) { this.producer = producer; } public Formatter getFormatter() { return formatter; } public void setFormatter(Formatter formatter) { this.formatter = formatter; } /** * 格式化日志格式 * @author doctor * * @time 2014年10月24日 上午10:37:17 */ interface Formatter{ String formate(ILoggingEvent event); } public static class MessageFormatter implements Formatter{ @Override public String formate(ILoggingEvent event) { return event.getFormattedMessage(); } } }
对于日志的输出格式MessageFormatter没有特殊处理,因为只是模拟一下,你可以制定你的格式,入json等。
在logback.xml的配置如下:
<appender name="kafka" class="com.doctor.logbackextend.KafkaAppender"> <topic>kafka-test</topic> <!-- <zookeeperHost>127.0.0.1:2181</zookeeperHost> --> <!-- <broker>127.0.0.1:9092</broker> --> <zookeeperHost>127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183</zookeeperHost> <broker>127.0.0.1:9092,127.0.0.1:9093</broker> </appender> <root level="all"> <appender-ref ref="stdout" /> <appender-ref ref="defaultAppender" /> <appender-ref ref="kafka" /> </root>
<zookeeperHost>我本地启动了三个zookeer,根据配置,你可以知道是如何配置的吧。
kafka集群的broker我配置了两个,都是在本地机器。
测试代码:
package com.doctor.logbackextend; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.commons.lang.RandomStringUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * zookeeper 和kafka环境准备好。本地端口号默认设置 * * @author doctor * * @time 2014年10月24日 下午3:14:01 */ public class KafkaAppenderTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class); /** 先启动此测试方法,模拟log日志输出到kafka */ @Test public void test_log_producer() { while(true){ LOG.info("test_log_producer : " + RandomStringUtils.random(3, "hello doctro,how are you,and you")); } } /** 再启动此测试方法,模拟消费者获取日志,进而分析,此方法仅仅是打印打控制台,不是log,防止模拟log测试方法数据混淆 */ @Test public void test_comsumer(){ Properties props = new Properties(); props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); props.put("group.id", "kafkatest-group"); // props.put("zookeeper.session.timeout.ms", "400"); // props.put("zookeeper.sync.time.ms", "200"); // props.put("auto.commit.interval.ms", "1000"); ConsumerConfig paramConsumerConfig = new ConsumerConfig(props ); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig ); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put("kafka-test", new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test"); for (KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) System.out.println(new String("test_comsumer: " + new String(it.next().message()))); } } }
明天再把结果截图附上。
相关文章推荐
- (一个)kafka-jstorm集群实时日志分析 它 ---------kafka实时日志处理
- (一)kafka-jstorm集群实时日志分析 之 ---------kafka实时日志处理-----续
- (二) kafka-jstorm集群实时日志分析 之 ---------jstorm集成spring
- (三 )kafka-jstorm集群实时日志分析 之 ---------jstorm集成spring 续(代码)
- 使用Flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【公安大数据】
- Storm实时数据分析的常用架构(组合):队列服务器+storm集群实时处理+mysql存储
- 通过Shell和Redis来实现集群业务中日志的实时收集分析
- wang-----利用Kafka, Cloudera Search以及Hue实现实时日志分析系统【extention】
- 新浪是如何分析处理32亿条实时日志的?
- fluentd结合kibana、elasticsearch实时搜索分析hadoop集群日志
- flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统
- 新浪技术分享:如何扛下32亿条实时日志的分析处理
- ELK+Kafka集群日志分析系统
- 玩儿透ELK日志分析集群搭建管理(rsyslog->kafka->Logstash->ES->Kibana)
- 通过shell和redis来实现集群业务中日志的实时收集分析 推荐
- fluentd结合kibana、elasticsearch实时搜索分析hadoop集群日志
- 利用Kafka, Cloudera Search以及Hue实现实时日志分析系统
- DockOne技术分享(十二):新浪是如何分析处理32亿条实时日志的?
- 基于 Kafka 和 ElasticSearch,LinkedIn是如何构建实时日志分析系统的