您的位置:首页 > 编程语言 > Java开发

Kafka 生产者消费者 Java API 编程

2018-01-29 18:33 781 查看
我们先创建一个topic,然后启动生产者和消费者,进行消息通信,然后在使用Kafka API编程的方式实现,笔者使用的ZK和Kafka都是单节点,你也可以使用集群方式。

启动Zookeeper

zkServer.sh start


启动Kafka

kafka-server-start.sh $KAFKA_HOME/config/server.properties


创建topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_api


查看topic详细信息

[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_api
Topic:kafka_api PartitionCount:1    ReplicationFactor:1 Configs:
Topic: kafka_api    Partition: 0    Leader: 0   Replicas: 0 Isr: 0


启动生产者和消费者,测试消息通信

# 生产者
kafka
4000
-console-producer.sh --broker-list localhost:9092 --topic kafka_api




# 消费者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_api




Java API 编程实现

1.创建maven项目,pom.xml中引入kafka依赖

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>


2.创建KafkaProperties类,配置Kafka相关属性

package com.bigdata.kafka;

/**
* Kafka 相关属性配置类
*/
public interface KafkaProperties {

// zookeeper连接,与server.properties中的zookeeper.connect属性一致,多个用逗号隔开,例如:zk01:2181,zk02:2181
public static final String ZK = "Master:2181";

// 如果是多个blocker,用逗号分隔即可,例如:kafka01::9092,kafka02:9093
public static final String BLOCK_LIST = "Master:9092";

// 主题
public static final String TOPIC = "kafka_api";
}


3.Kafka Producer API 开发

生产者API中常用的类如下

Producer:生产者

ProducerConfig:生产者对应的配置

KeyedMessage:封装的消息对象

创建KafkaProducer类,代码如下

package com.bigdata.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
* Kafka 生产者
*/
public class KafkaProducer extends Thread {

private String topic;

private Producer<Integer, String> producer;

public KafkaProducer(String topic) {
this.topic = topic;

Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaProperties.BLOCK_LIST);
properties.put("serializer.class", "kafka.serializer.StringEncoder");

producer = new Producer<Integer, String>(new ProducerConfig(properties));
}

@Override
public void run() {
int messageNo = 1;

while(true) {
String message = "message_" + messageNo;

System.out.println("Send:" + message);

producer.send(new KeyedMessage<Integer, String>(topic, message));

messageNo ++;

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
}
}


运行上述代码,在控制台中使用命令启动一个消费者,观察控制台是否能接收到消息



4.Kafka Consumer API 开发

消费者API中常用的类如下

Consumer:消费者

ConsumerConnector:消费者连接器

ConsumerConfig:消费者对应的配置

KafkaStream:数据流

创建KafkaConsumer类,代码如下

package com.bigdata.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* Kafuka 消费者
*/
public class KafkaConsumer extends Thread{

private String topic;

public KafkaConsumer(String topic) {
this.topic = topic;
}

private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", KafkaProperties.ZK);
properties.setProperty("group.id", "testGroup");

return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}

@Override
public void run() {
// 创建Consumer
ConsumerConnector consumer = createConsumer();

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);

Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);

// 获取每次接受到的数据
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);

ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

// 不停地从stream中读取最新接收到的数据
while(iterator.hasNext()){
String message = String.valueOf(iterator.next().message());

System.out.println("message:" + message);
}
}

public static void main(String[] args) {
new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}


运行生产者及消费者代码,观察控制台

生产者控制台(部分结果):

Send:message_5
Send:message_6
Send:message_7
Send:message_8
Send:message_9
Send:message_10


消费者控制台(部分结果):只接收最新的数据

message:message_7
message:message_8
message:message_9
message:message_10
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: