您的位置:首页 > 其它

linkedin高吞吐量分布式消息系统kafka使用手记

2013-11-28 15:48 661 查看
kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。

支持通过kafka服务器和消费机集群来分区消息。

支持Hadoop并行数据加载。

设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。

kakfa的consumer使用拉的方式工作。

安装kafka

下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

> tar xzf kafka-.tgz

> cd kafka-

> ./sbt update

> ./sbt package

启动zkserver:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动server:

bin/kafka-server-start.sh config/server.properties

就是这么简单。

使用kafka

import java.util.Arrays;  
import java.util.List;  
import java.util.Properties;  
import kafka.javaapi.producer.SyncProducer;  
import kafka.javaapi.message.ByteBufferMessageSet;  
import kafka.message.Message;  
import kafka.producer.SyncProducerConfig;  
  
...  
  
Properties props = new Properties();  
props.put(“zk.connect”, “127.0.0.1:2181”);  
props.put("serializer.class", "kafka.serializer.StringEncoder");  
ProducerConfig config = new ProducerConfig(props);  
Producer<String, String> producer = new Producer<String, String>(config);  
  
Send a single message  
  
// The message is sent to a randomly selected partition registered in ZK  
ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");  
producer.send(data);      
  
producer.close();  

这样就是一个标准的producer。

consumer的代码

// specify some consumer properties  
Properties props = new Properties();  
props.put("zk.connect", "localhost:2181");  
props.put("zk.connectiontimeout.ms", "1000000");  
props.put("groupid", "test_group");  
  
// Create the connection to the cluster  
ConsumerConfig consumerConfig = new ConsumerConfig(props);  
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
  
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =   
    consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));  
List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");  
  
// create list of 4 threads to consume from each of the partitions   
ExecutorService executor = Executors.newFixedThreadPool(4);  
  
// consume the messages in the threads  
for(final KafkaMessageStream<Message> stream: streams) {  
  executor.submit(new Runnable() {  
    public void run() {  
      for(Message message: stream) {  
        // process message  
      }   
    }  
  });  
}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息