Kafka的Producer和Consumer的示例(使用java语言)
2017-08-24 23:24
465 查看
我使用的kafka版本是:0.7.2
jdk版本是:1.6.0_20
http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。
Producer Code
[java] view
plain copy
import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
public class ProducerSample {
public static void main(String[] args) {
ProducerSample ps = new ProducerSample();
Properties props = new Properties();
1、配置文件设置
props.put("zk.connect", "127.0.0.1:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
2、带着配置文件创建Producer
Producer<String, String> producer = new Producer<String, String>(config);
要发布的消息(任务) topic
消息任务 3、要发布的消息任务
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message2");
producer.send(data); 4、发布消息任务
producer.close();
}
}
Consumer Code
[java] view
plain copy
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
public class ConsumerSample {
public static void main(String[] args) {
//1、消费者配置文件设置
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
//groupid和要消费的topic
//
2、根据配置文件创建与集群的连接
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
//3、事先说明有4个线程来消费test-topic
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test-topic", 4);
//topicMessageStreams是信息流,根据信息流来创建管道s
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map); // a map of (topic,#stream)
//4、创建四个 MessageStreams来消费(监听)topic(通过MessageStrings的get方法从topic中取数据,在消费者和生产者的配置文件中两者必须位于不同的topic中,否者不能访问同一topic中的数据),每个MessageString
对应一个线程。4个MessageStreams构成一捆管道topicMessageStreams,4个MessageStreams分别通向4个线程,得到4个stream,
//5、从test-topic中获取消息任务 streams是创建的一捆管道,stream是管道
List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
//6、声明一个4个线程的线程池,用于消费各个partition
ExecutorService executor = Executors.newFixedThreadPool(4);
//7、消费消息任务
for (final KafkaStream<Message> stream : streams) {
//stream 是MessageAndMetadata类型,包含topic、message字段
//8、 为每一个partition分配一个线程去消费
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
<span style="whitespace:pre"> </span>
byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}
});
}
}
}
分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码
运行ProducerSample:
运行ConsumerSample:
由于本人不熟悉Java的多线程,将官方给的Consumer Code做点小改动,如下所示:
[java] view
plain copy
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
public class ConsumerSample2 {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test-topic", 1);
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
<strong>for (final KafkaStream<Message> stream : streams) {
for (MessageAndMetadata msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}</strong>
}
}
我在Producer端又发送了一条“test-message2”的消息,Consumer收到了两条消息,如下所示:
jdk版本是:1.6.0_20
http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。
Producer Code
[java] view
plain copy
import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
public class ProducerSample {
public static void main(String[] args) {
ProducerSample ps = new ProducerSample();
Properties props = new Properties();
1、配置文件设置
props.put("zk.connect", "127.0.0.1:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
2、带着配置文件创建Producer
Producer<String, String> producer = new Producer<String, String>(config);
要发布的消息(任务) topic
消息任务 3、要发布的消息任务
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message2");
producer.send(data); 4、发布消息任务
producer.close();
}
}
Consumer Code
[java] view
plain copy
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
public class ConsumerSample {
public static void main(String[] args) {
//1、消费者配置文件设置
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
//groupid和要消费的topic
//
2、根据配置文件创建与集群的连接
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
//3、事先说明有4个线程来消费test-topic
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test-topic", 4);
//topicMessageStreams是信息流,根据信息流来创建管道s
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map); // a map of (topic,#stream)
//4、创建四个 MessageStreams来消费(监听)topic(通过MessageStrings的get方法从topic中取数据,在消费者和生产者的配置文件中两者必须位于不同的topic中,否者不能访问同一topic中的数据),每个MessageString
对应一个线程。4个MessageStreams构成一捆管道topicMessageStreams,4个MessageStreams分别通向4个线程,得到4个stream,
//5、从test-topic中获取消息任务 streams是创建的一捆管道,stream是管道
List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
//6、声明一个4个线程的线程池,用于消费各个partition
ExecutorService executor = Executors.newFixedThreadPool(4);
//7、消费消息任务
for (final KafkaStream<Message> stream : streams) {
//stream 是MessageAndMetadata类型,包含topic、message字段
//8、 为每一个partition分配一个线程去消费
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
<span style="whitespace:pre"> </span>
byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}
});
}
}
}
分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码
运行ProducerSample:
运行ConsumerSample:
由于本人不熟悉Java的多线程,将官方给的Consumer Code做点小改动,如下所示:
[java] view
plain copy
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
public class ConsumerSample2 {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test-topic", 1);
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
<strong>for (final KafkaStream<Message> stream : streams) {
for (MessageAndMetadata msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}</strong>
}
}
我在Producer端又发送了一条“test-message2”的消息,Consumer收到了两条消息,如下所示:
相关文章推荐
- Kafka的Producer和Consumer的示例(使用java语言)
- 本地Consumer和Producer无法使用远程Kafka服务器的处理办法
- 本地Consumer和Producer无法使用远程Kafka服务器的处理办法
- Springboot集成Kafka实现producer和consumer的示例代码
- Kafka 学习笔记(九)producer 和 consumer分别在两个虚拟机上
- 基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型
- 手动初始化kafka consumer、producer
- Kafka的Producer和Consumer源码学习
- Kafka 编写自己的producer、partitioner和consumer
- springboot kafka集成(实现producer和consumer)
- 漫游Kafka设计篇之Producer和Consumer
- 漫游Kafka设计篇之Producer和Consumer
- 使用kafka consumer api时,中文乱码问题
- storm集成kafka简单使用示例
- kafka消息服务的producer、broker、consumer的配置
- KafkaConsumer使用详解
- 漫游Kafka设计篇之Producer和Consumer(4)
- Python KafkaProducer and KafkaConsumer的开发模块
- Kafka(producer和consumer编程实践)
- kafka新的producer api使用