kafka 消费者代码示例
2016-10-28 00:27
288 查看
kafka 消费者代码示例
最近在公司项目中使用kafka,主要的功能是从kafka消费数据,并且将数据以对象的方式写入自定义的日志文件中,每天生产一个日志文件。一开始使用高版本的kafka(0.9.0.0)
maven配置
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency>
public void process() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfig.consumerProperties()); List<String> topicList = new ArrayList<>(); topicList.add(kafkaConfig.getTopic()); consumer.subscribe(topicList); try { while (flag) { System.out.println("定时执行consumer、、、"); long start = System.currentTimeMillis(); ConsumerRecords<String, String> records = consumer.poll(kafkaConfig.getPollTime()); for (ConsumerRecord<String, String> record : records){ ClientRequestsModel model = (ClientRequestsModel) JsonUtils.json2Object(record.value(), ClientRequestsModel.class); System.out.println("**********"); saveResult(model); } System.out.println("本次耗时:" + (System.currentTimeMillis() - start)); } System.out.println("这里是while循环外面"); } finally { consumer.close(); } }
最后发现上面的代码并不能完美的解决问题,首先while循环一直执行,导致System.out.println(“这里是while循环外面”);这句代码不会执行到,因为while在这里会阻塞;
其次这里用的是kafka0.9的版本,自kafka0.8.x版本后kafka的offset的管理就提交给broker了,并没有提交给zookeeper,导致在使用kafka monitor监控数据的时候不显示consumer的offset信息,虽然kafka中国社区群(162272557)里面的大神说kafka monitor可以通过配置来让其支持高版本的kafka,但是毕竟kafka monitor是运维人员搭建的还要麻烦运维实在不好;
另外一个最不好的地方是与业务上有关联,应该要脱离业务,将kafka独立出来。
为了解决问题2决定采用低版本的kafka
<!--kafka配置--> <!--高版本的kafka处理消息--> <!--<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency>--> <!--低版本的kafka处理消息--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency>
为了解决问题1和3,在社区大神的指导下进行了重构,代码如下:
对外消费方法
/** * 旧版本0.8.2.2处理消息方法 */ public void kafkaProcess() { ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(kafkaConfig.createAdPvConsumerConfig()); String topic = kafkaConfig.getAdPvtopic(); String topic2 = kafkaConfig.getAdDevRegtopic(); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); int localConsumerCount = 1; topicCountMap.put(topic, localConsumerCount); topicCountMap.put(topic2, localConsumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); for (Map.Entry<String, List<KafkaStream<byte[], byte[]>>> kafkaStream : consumerMap.entrySet()) { new KafkaHandler(kafkaStream.getValue()).run(); } }
KafkaHandler类
package com.coocaa.salad.stat.kafka; import kafka.consumer.KafkaStream; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by silence on 2016/10/18. * Desc : */ public class KafkaHandler extends Thread { List<KafkaStream<byte[], byte[]>> kafkaStreams; public KafkaHandler(List<KafkaStream<byte[], byte[]>> kafkaStreams) { super(); this.kafkaStreams = kafkaStreams; } ExecutorService service = Executors.newFixedThreadPool(2); public void run() { try { Iterator<KafkaStream<byte[], byte[]>> iterator = kafkaStreams.iterator(); while (iterator.hasNext()) { KafkaStream<byte[], byte[]> next = iterator.next(); service.submit(new MessageHandler(next)); } } catch (Exception e) { e.printStackTrace(); } } }
MessageHandler 类
package com.coocaa.salad.stat.kafka; import com.alibaba.fastjson.JSON; import com.coocaa.salad.core.entity.AdScheduleEntity; import com.coocaa.salad.stat.entity.AdOrderEntity; import com.coocaa.salad.stat.entity.CustomerEntity; import com.coocaa.salad.stat.logFile.MyLogger; import com.coocaa.salad.stat.model.ClientRequestsModel; import com.coocaa.salad.stat.model.StatisticsModel; import com.coocaa.salad.stat.service.ConstantService; import com.origin.eurybia.utils.JsonUtils; import com.origin.eurybia.utils.StringUtils; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; import org.apache.log4j.Logger; import org.slf4j.LoggerFactory; /** * Created by silence on 2016/10/18. * Desc : */ public class MessageHandler extends Thread { private static final Logger logger = Logger.getLogger("messageHandler"); private static final org.slf4j.Logger log = LoggerFactory.getLogger(MessageHandler.class); KafkaStream<byte[], byte[]> kafkaStreams; public MessageHandler(KafkaStream<byte[], byte[]> kafkaStreams) { super(); this.kafkaStreams = kafkaStreams; } public void run() { //处理kafka数据 ConsumerIterator<byte[], byte[]> streamIterator = kafkaStreams.iterator(); ClientRequestsModel model = new ClientRequestsModel(); while (streamIterator.hasNext()) { log.info("开始处理kafka数据。。。"); MessageAndMetadata<byte[], byte[]> record = streamIterator.next(); String message = new String(record.message()); log.info("{} topic 的数据,写入{}日志文件中。。。", record.topic(), record.topic()); if (record.topic().equals("adPv")) { model = (ClientRequestsModel) JsonUtils.json2Object(message, ClientRequestsModel.class); //保存数据到日志文件中 } else if (record.topic().equals("adDevReg")) { MyLogger.myInfo2(logger, message); } } }
上面的MessageHandler 是同时消费两个topic,可以根据自己的需要进行设置。MyLogger.myInfo2(logger, message);是将信息写入自定义的日志级别的文件中,通过log4j写日志的方式写文件,可以快速并且高效的写入,写入速度还是挺快的。推荐使用。关于如何使用自定义log4j网上有例子,也可以看我另一篇博客。
至此,以上的三个问题都已经解决,但是对于使用低版本的kafka,内心是拒绝的,可是没有办法,至今kafka还没有发布1.0,高版本不稳定。期待新版本的发布。
相关文章推荐
- kafka 官方示例代码--消费者
- kafka生产者、消费者代码示例
- kafka生产者和消费者的javaAPI的示例代码
- kafka生产者消费者示例代码
- kafka生产者与消费者java代码示例
- 【问底】Michael G. Noll:整合Kafka到Spark Streaming——代码示例和挑战
- java写kafka的生产者与消费者代码
- kafka生产者、消费者java示例
- kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)
- java实现Kafka的消费者示例
- kafka增加SSL认证的Producer客户端代码示例
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- kafka消费数据存入elasticsearch代码示例
- 生产者消费者问题代码示例
- Kafka使用Java客户端进行访问的示例代码
- spark streaming 接收 kafka 数据java代码WordCount示例
- 整合Kafka到Spark Streaming——代码示例和挑战
- Java线程同步:生产者-消费者 模型(代码示例)
- kafka Kafka JAVA客户端代码示例
- Java线程同步:生产者-消费者 模型(代码示例)