java客户端连接kafka简单测试
2017-10-16 18:31
651 查看
java客户端连接kafka简单测试
本案例kafka版本是kafka_2.11-0.9.0.1,用java来实现kafka生产者、消费者的示例在测试的过程中遇到的特别的问题以及解决办法,其他小问题就不一一列举了。
1 . 使用kafka-clients进行测试,maven依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
需要注意:kafka-clients的版本必须和kafka安装的版本一致
2 . Consumer测试类收不到消息的原因:
①. 先开启consumer测试类,等待poll数据。
②. 开启producer测试类,发送数据,并确保发送成功。
③. 正常情况下这样就可以收到数据了。
生产者示例
public class ProducerTest { private void execMsgSend() throws Exception{ Properties props = new Properties(); props.put("bootstrap.servers", "阿里云外网Ip:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> procuder = new KafkaProducer<String, String>(props); String topic = "test"; for (int i = 1; i <= 10; i++) { String value = " this is another message_" + i; ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,i+"",value); procuder.send(record,new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset()); } }); System.out.println(i+" ---- success"); Thread.sleep(1000); } System.out.println("send message over."); procuder.close(); } public static void main(String[] args) throws Exception{ ProducerTest test1 = new ProducerTest(); test1.execMsgSend(); } }
消费者示例
public class Consumer { public static void main(String[] s){ Properties props = new Properties(); props.put("bootstrap.servers", "阿里云外网Ip:9092"); props.put("group.id", "1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { System.out.println("poll start..."); ConsumerRecords<String, String> records = consumer.poll(100); int count = records.count(); System.out.println("the numbers of topic:" + count); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } }
相关文章推荐
- Kafka集群搭建及简单使用(控制台与Java客户端)
- socket连接 java服务器端 C#客户端进行交互 简单例子
- kafka集群部署以及java客户端测试
- kafka集群部署以及java客户端测试
- kafka集群部署以及java客户端测试
- RabbitMQ学习之Java客户端连接测试(二)
- 什么是socket?什么是socket的长、短连接?java如何简单实现socket客户端和服务器?
- kafka集群部署以及java客户端测试
- kafka集群部署以及java客户端测试
- kafka集群部署以及java客户端测试
- kafka集群部署以及java客户端测试
- 简单的JAVA TCP/IP连接测试
- kafka集群部署以及java客户端测试
- kafka集群部署以及java客户端测试
- RabbitMQ学习之Java客户端带权限连接测试
- kafka集群部署以及java客户端测试
- 安装kafka到window上,编写kafka java客户端连接kafka
- kafka集群部署以及java客户端测试
- java客户端连接MongoDB数据库的简单使用
- kafka集群部署以及java客户端测试