Kafka 生产者消费者 Java API 编程
2018-01-29 18:33
781 查看
我们先创建一个topic,然后启动生产者和消费者,进行消息通信,然后在使用Kafka API编程的方式实现,笔者使用的ZK和Kafka都是单节点,你也可以使用集群方式。
启动Zookeeper
启动Kafka
创建topic
查看topic详细信息
启动生产者和消费者,测试消息通信
Java API 编程实现
1.创建maven项目,pom.xml中引入kafka依赖
2.创建KafkaProperties类,配置Kafka相关属性
3.Kafka Producer API 开发
生产者API中常用的类如下
Producer:生产者
ProducerConfig:生产者对应的配置
KeyedMessage:封装的消息对象
创建KafkaProducer类,代码如下
运行上述代码,在控制台中使用命令启动一个消费者,观察控制台是否能接收到消息
4.Kafka Consumer API 开发
消费者API中常用的类如下
Consumer:消费者
ConsumerConnector:消费者连接器
ConsumerConfig:消费者对应的配置
KafkaStream:数据流
创建KafkaConsumer类,代码如下
运行生产者及消费者代码,观察控制台
生产者控制台(部分结果):
消费者控制台(部分结果):只接收最新的数据
启动Zookeeper
zkServer.sh start
启动Kafka
kafka-server-start.sh $KAFKA_HOME/config/server.properties
创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_api
查看topic详细信息
[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_api Topic:kafka_api PartitionCount:1 ReplicationFactor:1 Configs: Topic: kafka_api Partition: 0 Leader: 0 Replicas: 0 Isr: 0
启动生产者和消费者,测试消息通信
# 生产者 kafka 4000 -console-producer.sh --broker-list localhost:9092 --topic kafka_api
# 消费者 kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_api
Java API 编程实现
1.创建maven项目,pom.xml中引入kafka依赖
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.0</version> </dependency> </dependencies>
2.创建KafkaProperties类,配置Kafka相关属性
package com.bigdata.kafka; /** * Kafka 相关属性配置类 */ public interface KafkaProperties { // zookeeper连接,与server.properties中的zookeeper.connect属性一致,多个用逗号隔开,例如:zk01:2181,zk02:2181 public static final String ZK = "Master:2181"; // 如果是多个blocker,用逗号分隔即可,例如:kafka01::9092,kafka02:9093 public static final String BLOCK_LIST = "Master:9092"; // 主题 public static final String TOPIC = "kafka_api"; }
3.Kafka Producer API 开发
生产者API中常用的类如下
Producer:生产者
ProducerConfig:生产者对应的配置
KeyedMessage:封装的消息对象
创建KafkaProducer类,代码如下
package com.bigdata.kafka; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; /** * Kafka 生产者 */ public class KafkaProducer extends Thread { private String topic; private Producer<Integer, String> producer; public KafkaProducer(String topic) { this.topic = topic; Properties properties = new Properties(); properties.put("metadata.broker.list", KafkaProperties.BLOCK_LIST); properties.put("serializer.class", "kafka.serializer.StringEncoder"); producer = new Producer<Integer, String>(new ProducerConfig(properties)); } @Override public void run() { int messageNo = 1; while(true) { String message = "message_" + messageNo; System.out.println("Send:" + message); producer.send(new KeyedMessage<Integer, String>(topic, message)); messageNo ++; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { new KafkaProducer(KafkaProperties.TOPIC).start(); } }
运行上述代码,在控制台中使用命令启动一个消费者,观察控制台是否能接收到消息
4.Kafka Consumer API 开发
消费者API中常用的类如下
Consumer:消费者
ConsumerConnector:消费者连接器
ConsumerConfig:消费者对应的配置
KafkaStream:数据流
创建KafkaConsumer类,代码如下
package com.bigdata.kafka; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * Kafuka 消费者 */ public class KafkaConsumer extends Thread{ private String topic; public KafkaConsumer(String topic) { this.topic = topic; } private ConsumerConnector createConsumer() { Properties properties = new Properties(); properties.setProperty("zookeeper.connect", KafkaProperties.ZK); properties.setProperty("group.id", "testGroup"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } @Override public void run() { // 创建Consumer ConsumerConnector consumer = createConsumer(); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); // 获取每次接受到的数据 KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); // 不停地从stream中读取最新接收到的数据 while(iterator.hasNext()){ String message = String.valueOf(iterator.next().message()); System.out.println("message:" + message); } } public static void main(String[] args) { new KafkaConsumer(KafkaProperties.TOPIC).start(); } }
运行生产者及消费者代码,观察控制台
生产者控制台(部分结果):
Send:message_5 Send:message_6 Send:message_7 Send:message_8 Send:message_9 Send:message_10
消费者控制台(部分结果):只接收最新的数据
message:message_7 message:message_8 message:message_9 message:message_10
相关文章推荐
- kafka生产者和消费者的javaAPI的示例代码
- Kafka java api-消费者代码与消费分析、生产者消费者配置文件详解
- Kafka消费者JavaAPI Kafka生产者JavaAPI
- Kafka消费者生产者编程模型(二)
- Kafka生产者消费者java示例(包含Avro序列化)
- kafka生产者和消费者的javaAPI demo
- 生产者消费者模型的四种java编程例子
- Java并发编程之十六:生产者—消费者模型(含代码)
- 多线程编程之生产者消费者(java实现)
- Java多线程编程核心技术——生产者消费者模型
- 【Java并发编程】之十三:生产者—消费者模型(含代码)
- Java多线程编程——生产者-消费者模式(1)
- Kafka之Java API-生产者(Producers)
- Kafka java api-生产者代码、高性能吞吐
- kafka集群搭建和使用Java写kafka生产者消费者
- kafka集群搭建和使用Java写kafka生产者消费者
- kafka本地java示例生产者与消费者,非ZK版
- kafka集群搭建和使用Java写kafka生产者消费者
- Kafka消费者生产者编程模型(一)
- 【Java并发编程】之十三:生产者—消费者模型(含代码)(r)