您的位置:首页 > 运维架构 > Apache

Apache Kafka

2015-11-30 15:09 706 查看
Apache kafka是分布式发布-订阅消息系统。

主要的组件:

·话题(Topic)

话题是特定类型的消息流
消息是字节的有效负载(payload)
而话题就是消息的分类名 或者 种子(Feed)名
已经发布的消息保存在一组服务器上面,它们被成为代理(Broker)或者Kafka集群


·生产者(Producer)

生产者是能够发布消息到话题的对象,任意对象!


·消费者(Consumer)

消费者可以订阅一个或者多个消息,并且从Broker中拉取数据,进而消费这些已经发布的消息。


架构如图所示



生产者代码

/**
* Instantiates a new Kafka producer.
*
* @param topic the topic
* @param directoryPath the directory path
*/
public KafkaMailProducer(String topic, String directoryPath) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
this.directoryPath = directoryPath;
}

public void run() {
Path dir = Paths.get(directoryPath);
try {
new WatchDir(dir).start();
new ReadDir(dir).start();
} catch (IOException e) {
e.printStackTrace();
}
}


消费者代码

public KafkaMailConsumer(String topic) {
consumer =
Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}

/**
* Creates the consumer config.
*
* @return the consumer config
*/
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaMailProperties.zkConnect);
props.put("group.id", KafkaMailProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}

public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息