您的位置:首页 > 编程语言 > Java开发

kafka生产者、消费者java示例

2016-08-02 14:14 369 查看


1. 生产者



import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MyProducer {

public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("metadata.broker.list","localhost:9092");
props.setProperty("serializer.class","kafka.serializer.StringEncoder");
props.put("request.required.acks","1");
ProducerConfig config = new ProducerConfig(props);
//创建生产这对象
Producer<String, String> producer = new Producer<String, String>(config);
//生成消息
KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");
try {
int i =1;
while(i < 100){
//发送消息
producer.send(data);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}



2. 消费者



import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer extends Thread{
//消费者连接
private final ConsumerConnector consumer;
//要消费的话题
private final String topic;

public MyConsumer(String topic) {
consumer =kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic =topic;
}

//配置相关信息
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
//        props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
//配置要连接的zookeeper地址与端口
//The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster.
//Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Group
props.put("zookeeper.connect","localhost:2181");

//配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.)
props.put("group.id", "0");

//配置zookeeper连接超时间隔
//The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for
//ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.
props.put("zookeeper.session.timeout.ms","10000");

//The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.
props.put("zookeeper.sync.time.ms", "200");

//The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper.
//Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}

public void run(){

Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);

KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
System.out.println("*********Results********");
while(true){
if(it.hasNext()){
//打印得到的消息
System.err.println(Thread.currentThread()+" get data:" +new String(it.next().message()));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
MyConsumer consumerThread = new MyConsumer("mykafka");
consumerThread.start();
}
}



3. 消费者的线程执行器实现

  首先建立一个处理消息的类Consumer



import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class Consumer implements Runnable {

private KafkaStream stream;
private int threadNumber;

public Consumer(KafkaStream a_stream, int a_threadNumber) {
threadNumber = a_threadNumber;
stream = a_stream;
}

public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + threadNumber);
}
}


  其次实现多线程的调用



import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.concurrent.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroup {
private final ConsumerConnector consumer;
private final String topic;
private  ExecutorService executor;

public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}

public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new Consumer(stream, threadNumber));
threadNumber++;
}
}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);
}

public static void main(String[] args) {
String zooKeeper = "localhost:2181";
String groupId = "0";
String topic = "mykafka";
int threads = 2;  //启动的线程数

ConsumerGroup group = new ConsumerGroup(zooKeeper, groupId, topic);
group.run(threads);

try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
group.shutdown();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: