您的位置:首页 > 编程语言 > Java开发

Kafka的JAVA操作

2015-10-08 22:41 441 查看
一、在pom.xml文件中添加依赖

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.2.0</version>

</dependency>

二、编写生产者Producer

package kafka;

import java.util.Properties;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer extends Thread{
private String topic;

public Producer(String topic) {
super();
this.topic = topic;
}

@Override
public void run() {
for (int i = 1; i <=100; i++) {
kafka.javaapi.producer.Producer<String, String> createProducer = createProducer();
String messageStr = new String("Message_" + i);
KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, messageStr);
createProducer.send(message );
System.out.println("发送:" + messageStr);
}

}
private kafka.javaapi.producer.Producer<String, String> createProducer() {
Properties originalProps = new Properties();
//设置编码
originalProps.put("serializer.class", "kafka.serializer.StringEncoder");
//设置broker列表
originalProps.put("metadata.broker.list","192.168.215.90:9092");
ProducerConfig config = new ProducerConfig(originalProps);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(config);
return producer;
}
public static void main(String[] args) {
new Producer("test").run();
}

}


三、编写消费者类
Consumer
Consumer


package kafka;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class Consumerextends Thread{

private String topic = "";

public Consumer(String topic) {
this.topic = topic;
}

@Override
public void run() {
ConsumerConnector consumerConnector = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap );
KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
while(iterator.hasNext()){
MessageAndMetadata<byte[], byte[]> it = iterator.next();
try {
System.out.println("接收到:" + new String(it.message(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}

private ConsumerConnector createConsumer() {
Properties originalProps = new Properties();
originalProps.setProperty("group.id", "test");
originalProps.setProperty("zookeeper.connect", "192.168.215.90:2181,192.168.215.110:2181");
ConsumerConfig conf = new ConsumerConfig(originalProps);
ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
return consumerConnector;
}
public static void main(String[] args) {
new Consumer("test").run();
}

}


四、启动消费者、生产者类,并测试,会看到消费者接收到的消息,如下图所示:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: