Kafka消息生产消费的一个java小案例(伪分布)
2016-05-03 21:16
369 查看
本文是传智播客hadoop八天-第七天学习笔记
个人感觉kafka有点像JMS的点对点模式,都是一个生产者一个(组)消费者,消息被一个(组)消费者消费以后,其他(组)消费者就无法查看消息。
生产者:
package cn.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class ProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (int i = 1; i <= 1000; i++) { Thread.sleep(200); producer.send(new KeyedMessage<String, String>("order", "the message no is" + i)); } } }
消费者
package cn.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerDemo { private static final String topic = "order"; private static final Integer threads = 1; public static void main(String[] args) { Properties props = new Properties(); //在这里使用zk.connect会报错,这是为什么呢?明明生产者用的就是zk.connect props.put("zookeeper.connect", "localhost:2181"); // 若干个消费者为一个组,这个组消费的消息别的组是无法查看的。 props.put("group.id", "1111"); // 偏移量,smallest指将指针指向topic最起始的位置 props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); // 创建java连接 ConsumerConnector consumer = Consumer .createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //可以定义多个topic topicCountMap.put(topic, threads); topicCountMap.put("topic1", threads); topicCountMap.put("topic2", threads); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); //可以取出任意topic的消息 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); for (final KafkaStream<byte[], byte[]> kafkaStream : streams) { new Thread(new Runnable() { public void run() { for (MessageAndMetadata<byte[], byte[]> mm : kafkaStream) { String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } }
相关文章推荐
- Java八种基本数据类型所占字节
- 《java入门第一季》之参数引用
- 《java入门第一季》之参数引用
- Spring+Swagger文档无法排序问题解决
- Java多线程-新特征-信号量Semaphore
- Java:Comparator接口
- java--HibernateSynchronizer在MyEclipse中的配置
- 在eclipse操作hbase时,遇到的问题
- JAVA 的几种单例模式以及优缺点
- 数组相加
- 20160503-spring入门1
- Java核心技术(三) —— 对象与类(2)
- java常用集合及相关接口
- java关键字final
- Java中的自动装箱与拆箱
- IT十八掌作业_java基础第二天_进制转换原理和补码存储方式
- springcloud(第二篇)springcloud config 修改配置
- FreeMarker 一二事 - 静态模板结合spring展示
- java窗口事件
- JAVA中引用本身占用内存空间的问题