您的位置:首页 > 编程语言 > Java开发

Kafka的Producer和Consumer的示例(使用java语言)

2017-08-24 23:24 465 查看
我使用的kafka版本是:0.7.2
jdk版本是:1.6.0_20
http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。

Producer Code

[java] view
plain copy

import java.util.*;  

import kafka.message.Message;  

import kafka.producer.ProducerConfig;  

import kafka.javaapi.producer.Producer;  

import kafka.javaapi.producer.ProducerData;  

  

public class ProducerSample {  

  

  

    public static void main(String[] args) {  

        ProducerSample ps = new ProducerSample();  

  

        Properties props = new Properties();
  1、配置文件设置

        props.put("zk.connect", "127.0.0.1:2181");  

        props.put("serializer.class", "kafka.serializer.StringEncoder");  

  

        ProducerConfig config = new ProducerConfig(props);  

2、带着配置文件创建Producer 

        Producer<String, String> producer = new Producer<String, String>(config); 

要发布的消息(任务)     topic
  消息任务 3、要发布的消息任务

        ProducerData<String, String> data = 

new ProducerData<String, String>("test-topic", "test-message2");  

        producer.send(data);  4、发布消息任务

        producer.close();  

    }  

}  


Consumer Code

[java] view
plain copy

import java.nio.ByteBuffer;  

import java.util.HashMap;  

import java.util.List;  

import java.util.Map;  

import java.util.Properties;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  

import kafka.consumer.Consumer;  

import kafka.consumer.ConsumerConfig;  

import kafka.consumer.KafkaStream;  

import kafka.javaapi.consumer.ConsumerConnector;  

import kafka.message.Message;  

import kafka.message.MessageAndMetadata;  

  

public class ConsumerSample {  

  

    public static void main(String[] args) {  

       
//1、消费者配置文件设置

        Properties props = new Properties();  

        props.put("zk.connect", "localhost:2181");  

        props.put("zk.connectiontimeout.ms", "1000000");  

        props.put("groupid", "test_group");  

        //groupid和要消费的topic

  

       
//
2、根据配置文件创建与集群的连接

        ConsumerConfig consumerConfig = new ConsumerConfig(props);  

        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  

  
//3、事先说明有4个线程来消费test-topic 

        HashMap<String, Integer> map = new HashMap<String, Integer>();  

        map.put("test-topic", 4); 

//topicMessageStreams是信息流,根据信息流来创建管道s

      Map<String, List<KafkaStream<Message>>> topicMessageStreams =  

         consumerConnector.createMessageStreams(map);  // a map of (topic,#stream)

       
//4、创建四个 MessageStreams来消费(监听)topic(通过MessageStrings的get方法从topic中取数据,在消费者和生产者的配置文件中两者必须位于不同的topic中,否者不能访问同一topic中的数据),每个MessageString
对应一个线程。4个MessageStreams构成一捆管道topicMessageStreams,4个MessageStreams分别通向4个线程,得到4个stream, 

//5、从test-topic中获取消息任务 streams是创建的一捆管道,stream是管道

        List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
 

  

     
  //6、声明一个4个线程的线程池,用于消费各个partition 

        ExecutorService executor = Executors.newFixedThreadPool(4); 

  

         
//7、消费消息任务

        for (final KafkaStream<Message> stream : streams) { 

//stream 是MessageAndMetadata类型,包含topic、message字段 

         
//8、 为每一个partition分配一个线程去消费

     executor.submit(new Runnable() {
          

                public void run() {  

                    for (MessageAndMetadata msgAndMetadata : stream) {  

                        // process message (msgAndMetadata.message())  

                        System.out.println("topic: " + msgAndMetadata.topic());  

                        Message message = (Message) msgAndMetadata.message();  

                        ByteBuffer buffer = message.payload();  

                 <span style="whitespace:pre">  </span>

byte[] bytes = new byte[message.payloadSize()];  

                        buffer.get(bytes);  

                        String tmp = new String(bytes);  

                        System.out.println("message content: " + tmp);  

                    }  

                }  

            });  

        }  

  

    }  

}  

分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码

运行ProducerSample:



运行ConsumerSample:



由于本人不熟悉Java的多线程,将官方给的Consumer Code做点小改动,如下所示:

[java] view
plain copy

import java.nio.ByteBuffer;  

import java.util.HashMap;  

import java.util.List;  

import java.util.Map;  

import java.util.Properties;  

import kafka.consumer.Consumer;  

import kafka.consumer.ConsumerConfig;  

import kafka.consumer.KafkaStream;  

import kafka.javaapi.consumer.ConsumerConnector;  

import kafka.message.Message;  

import kafka.message.MessageAndMetadata;  

  

public class ConsumerSample2 {  

  

    public static void main(String[] args) {  

        // specify some consumer properties  

        Properties props = new Properties();  

        props.put("zk.connect", "localhost:2181");  

        props.put("zk.connectiontimeout.ms", "1000000");  

        props.put("groupid", "test_group");  

  

        // Create the connection to the cluster  

        ConsumerConfig consumerConfig = new ConsumerConfig(props);  

        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  

  

        HashMap<String, Integer> map = new HashMap<String, Integer>();  

        map.put("test-topic", 1);  

        Map<String, List<KafkaStream<Message>>> topicMessageStreams =  

                consumerConnector.createMessageStreams(map);  

        List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");  

  

        <strong>for (final KafkaStream<Message> stream : streams) {  

            for (MessageAndMetadata msgAndMetadata : stream) {  

                // process message (msgAndMetadata.message())  

                System.out.println("topic: " + msgAndMetadata.topic());  

                Message message = (Message) msgAndMetadata.message();  

                ByteBuffer buffer = message.payload();  

                byte[] bytes = new byte[message.payloadSize()];  

                buffer.get(bytes);  

                String tmp = new String(bytes);  

                System.out.println("message content: " + tmp);  

            }  

        }</strong>  

    }  

}  

我在Producer端又发送了一条“test-message2”的消息,Consumer收到了两条消息,如下所示:



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: