您的位置:首页 > 其它

Kafka初步学习总结

2016-01-11 00:00 681 查看
摘要: Kafka的一些基础概念

Kafka是什么?

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它有着诸多特点,如支持分布式,消息可持久化,高吞吐量等。

Kafka的相关术语。



Topic:主题。就像微博,你关注了某类主题消息,那么你就会接收到该主题的消息。算是kafka消息源的一种分类。

Partition:一个topic有着一个或多个partition。每个partion都包含着消息。每个partition对应一个log文件,当有新数据添加的时候,是直接加到log文件尾部的,所以数据是有序的。

Sequence:序列。一个Partition又有着一个或多个Sequence。

Message:消息。Kafka中通信的最基本单位。每个Sequence包含着多个消息,这些消息在Partition中都是有序的,每个消息有着一个id(offset(偏移量))。

Proudcer:顾名思义,这就是生产者。生产者生产数据由消费者来消费。负责发布消息到Kafka broker。他决定将消息发往哪个主题,哪个Partition。

Consumer:消费者。向Kafka broker读取消息的客户端。

Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。每个组可消费多个partition,这些partition平摊到组里面的消费者上,所以,每个partition中的数据是有序的消费。

Broker:Kafka集群中的一台或多台服务器被叫做Broker,主要用来持久化消息数据。

3.Kafka的索引

我们知道建立索引可以使查询效率提高。可是kafka数据这么多,每个数据都建立索引是得不偿失的。所以kafka是稀疏索引。Topic分成了多个partition,每个partition有一个索引文件,每个索引文件记录了该partition内的sequence的第一个message的id和偏移量。这样在一定程度上提高了效率,而且因为分成了多个partition,所以用并发访问来提高了效率。

4.kafka的一些基本命令

(启动kafka之前,要启动zookeeper。)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test(创建一个名为test的主题,一个分区,一个副本)
bin/kafka-topics.sh --list --zookeeper localhost:2181(查看所有主题)
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test(查看test主题详情)
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test(删除test主题)

5.Kafka的基本java代码

public class KafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "115.28.138.100:2181");
props.put("metadata.broker.list", "115.28.138.100:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig config = new ProducerConfig(props);
Producer<Integer, String> producer = new Producer<Integer, String>(config);
for(int i=0;i<100;i++){
KeyedMessage<Integer, String> message=new KeyedMessage<Integer, String>("test", "message"+i);
producer.send(message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("发送成功"+i);
}
producer.close();

}
}

public class KafkaConsumer {
public static void main(String[] args) {
Properties props=new Properties();
props.put("zookeeper.connect", "115.28.138.100:2181");
props.put("group.id", "g1");
ConsumerConfig consumerConfig=new ConsumerConfig(props);
ConsumerConnector connector=Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =connector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> kafkaStream=consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> consumerIterator=kafkaStream.iterator();
while(consumerIterator.hasNext()){
System.out.println(new String(consumerIterator.next().message()));
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Kafka