kafka消息中间件及java示例
2016-09-26 16:38
330 查看
kafka是一个消息中间件,用于各个系统之间传递消息,并且消息可持久化!
可以认为是队列模型,也可以看作是生产者消费着模型;
简单的生产者消费者客户端代码如下:
Consumer.java
可以认为是队列模型,也可以看作是生产者消费着模型;
简单的生产者消费者客户端代码如下:
package cn.outofmemory.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class KafkaConsumer { private final ConsumerConnector consumer; public KafkaConsumer() { Properties props = new Properties(); //zookeeper 配置 props.put("zookeeper.connect", "192.168.91.231:2181"); //group 代表一个消费组 props.put("group.id", "jd-group"); //zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } public void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("panteng", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get("panteng").get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } public void stop(){ try { consumer.shutdown(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } public static void main(String[] args) { new KafkaConsumer().consume(); } }
Consumer.java
相关文章推荐
- Java消息中间件--ActiveMq,RabbitMQ,Kafka
- 利用Kafka发送/消费消息-Java示例
- Java使用kafka发送和生产消息的示例
- kafka+storm整合 java示例
- kafka java示例
- kafka java示例
- 分布式消息系统Kafka Java客户端代码
- JMS与Java消息中间件
- 东方通消息中间件(TongLINK/Q)开发实例和java客户端实现
- kafka集群和zookeeper集群的部署,kafka的java代码示例
- java后端系统架构之消息队列篇:kafka的实验
- kafka与传统的消息中间件对比
- flex+blazeds+java后台消息推送(简单示例)
- windows平台下,采用kafka作为消息中间件,consumer和producer采用java api (环境搭建)
- 【linux】RocketMQ:一个纯java的开源消息中间件--开发测试环境搭建
- kafka消息demo的java实现
- flex+blazeds+java后台消息推送(简单示例)
- java actor模型和消息传递简单示例
- kafka java示例
- java实现Kafka的消费者示例