Kafka的JAVA操作
2015-10-08 22:41
441 查看
一、在pom.xml文件中添加依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
二、编写生产者Producer
三、编写消费者类
四、启动消费者、生产者类,并测试,会看到消费者接收到的消息,如下图所示:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
二、编写生产者Producer
package kafka; import java.util.Properties; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class Producer extends Thread{ private String topic; public Producer(String topic) { super(); this.topic = topic; } @Override public void run() { for (int i = 1; i <=100; i++) { kafka.javaapi.producer.Producer<String, String> createProducer = createProducer(); String messageStr = new String("Message_" + i); KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, messageStr); createProducer.send(message ); System.out.println("发送:" + messageStr); } } private kafka.javaapi.producer.Producer<String, String> createProducer() { Properties originalProps = new Properties(); //设置编码 originalProps.put("serializer.class", "kafka.serializer.StringEncoder"); //设置broker列表 originalProps.put("metadata.broker.list","192.168.215.90:9092"); ProducerConfig config = new ProducerConfig(originalProps); kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(config); return producer; } public static void main(String[] args) { new Producer("test").run(); } }
三、编写消费者类
Consumer
Consumer
package kafka;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class Consumerextends Thread{
private String topic = "";
public Consumer(String topic) {
this.topic = topic;
}
@Override
public void run() {
ConsumerConnector consumerConnector = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap );
KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
while(iterator.hasNext()){
MessageAndMetadata<byte[], byte[]> it = iterator.next();
try {
System.out.println("接收到:" + new String(it.message(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
private ConsumerConnector createConsumer() {
Properties originalProps = new Properties();
originalProps.setProperty("group.id", "test");
originalProps.setProperty("zookeeper.connect", "192.168.215.90:2181,192.168.215.110:2181");
ConsumerConfig conf = new ConsumerConfig(originalProps);
ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
return consumerConnector;
}
public static void main(String[] args) {
new Consumer("test").run();
}
}
四、启动消费者、生产者类,并测试,会看到消费者接收到的消息,如下图所示:
相关文章推荐
- 重载VS覆盖
- springmvc随笔
- [转载]关于网传JDK1.7语法层次支持集合的问题
- java 学习日记---------简易学生信息管理系统
- Android Studio下的library转换成Eclipse下可使用的library
- Struts2中ActionName-validator.xml文件的配置
- java框架Spring学习(一)
- 第一部分 Java的类
- java生成随机验证码
- SAX和Dom解析XML的区别
- Java:调用构造器的具体处理步骤
- MyEclipse10.6 myeclipse2013下添加jadClipse反编译插件 .
- Spring学习(一)
- JAVA简单学习
- Java知识点总结
- Java并发编程:Callable、Future和FutureTask
- 在eclipse中集成SVN插件
- 大龄屌丝自学笔记--Java零基础到菜鸟--030
- JAVA语言规范和API网址
- LeetCode118 LeetCode119 LeetCode151 LeetCode202 Java