Java连接Kafka创建Consumer
2019-08-21 11:32
1101 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/RONE321/article/details/99945705
首先Maven依赖
<--==========KAFKA===========--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.2.0</version> </dependency>
新建KafkaConsumerSimple类
package com.aas.demo.Kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaConsumerSimple { public static void main(String[] args) { // 设置配置信息 Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "cdh01:9092,cdh02:9092,cdh03:9092"); // Just a user-defined string to identify the consumer group props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // Enable auto offset commit props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { //要消费的主题列表 配置可以多个 consumer.subscribe(Arrays.asList("testTopic")); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { //数据处理部分 System.out.printf("Offset = %d\n", record.offset()); System.out.printf("Key = %s\n", record.key()); System.out.printf("Value = %s\n", record.value()); } } catch (Exception e) { e.printStackTrace(); } } } } }
然后先建立testTopic 再创建生产者,最后Run起来消费者
(关于生产者请查看上一篇博客)
控制台打印信息
相关文章推荐
- Java Api Consumer 连接启用Kerberos认证的Kafka
- [Kafka] - Kafka Java Consumer实现(二)
- elasticsearch__1__java操作之连接es,创建Mapping,保存数据
- kafka java连接操作
- Java操作ElasticSearch之创建客户端连接
- kafka入门2:java 创建及删除 topic
- kafka 创建消费者报错 consumer zookeeper is not a recognized option
- kafka java中发送数据、连接失败问题解决
- Kafka Consumer java api 配置
- Kafka系列(21)java消费者是如何管理Tcp连接的
- java JDBC编程——从属性文件读取信息,并创建到数据库的连接
- 用java代码手动控制kafkaconsumer偏移量
- kafka java中发送数据、连接失败问题解决
- kafka java中发送数据、连接失败问题解决
- java后台创建url连接,获取接口数据
- hbase-创建连接报错 java.lang.OutOfMemoryError: unable to create new native thread
- Java中通过方法创建一个http连接并请求(服务器间进行通信)
- JAVA连接mysql数据库,动态创建表以及动态插入数据
- webSocket.java创建连接和关闭连接
- kafka-java客户端连接