Kafka初步学习总结
2016-01-11 00:00
681 查看
摘要: Kafka的一些基础概念
Kafka是什么?
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它有着诸多特点,如支持分布式,消息可持久化,高吞吐量等。
Kafka的相关术语。
Topic:主题。就像微博,你关注了某类主题消息,那么你就会接收到该主题的消息。算是kafka消息源的一种分类。
Partition:一个topic有着一个或多个partition。每个partion都包含着消息。每个partition对应一个log文件,当有新数据添加的时候,是直接加到log文件尾部的,所以数据是有序的。
Sequence:序列。一个Partition又有着一个或多个Sequence。
Message:消息。Kafka中通信的最基本单位。每个Sequence包含着多个消息,这些消息在Partition中都是有序的,每个消息有着一个id(offset(偏移量))。
Proudcer:顾名思义,这就是生产者。生产者生产数据由消费者来消费。负责发布消息到Kafka broker。他决定将消息发往哪个主题,哪个Partition。
Consumer:消费者。向Kafka broker读取消息的客户端。
Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。每个组可消费多个partition,这些partition平摊到组里面的消费者上,所以,每个partition中的数据是有序的消费。
Broker:Kafka集群中的一台或多台服务器被叫做Broker,主要用来持久化消息数据。
3.Kafka的索引
我们知道建立索引可以使查询效率提高。可是kafka数据这么多,每个数据都建立索引是得不偿失的。所以kafka是稀疏索引。Topic分成了多个partition,每个partition有一个索引文件,每个索引文件记录了该partition内的sequence的第一个message的id和偏移量。这样在一定程度上提高了效率,而且因为分成了多个partition,所以用并发访问来提高了效率。
4.kafka的一些基本命令
(启动kafka之前,要启动zookeeper。)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test(创建一个名为test的主题,一个分区,一个副本)
bin/kafka-topics.sh --list --zookeeper localhost:2181(查看所有主题)
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test(查看test主题详情)
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test(删除test主题)
5.Kafka的基本java代码
public class KafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "115.28.138.100:2181");
props.put("metadata.broker.list", "115.28.138.100:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<Integer, String> producer = new Producer<Integer, String>(config);
for(int i=0;i<100;i++){
KeyedMessage<Integer, String> message=new KeyedMessage<Integer, String>("test", "message"+i);
producer.send(message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("发送成功"+i);
}
producer.close();
}
}
public class KafkaConsumer {
public static void main(String[] args) {
Properties props=new Properties();
props.put("zookeeper.connect", "115.28.138.100:2181");
props.put("group.id", "g1");
ConsumerConfig consumerConfig=new ConsumerConfig(props);
ConsumerConnector connector=Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =connector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> kafkaStream=consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> consumerIterator=kafkaStream.iterator();
while(consumerIterator.hasNext()){
System.out.println(new String(consumerIterator.next().message()));
}
}
}
Kafka是什么?
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它有着诸多特点,如支持分布式,消息可持久化,高吞吐量等。
Kafka的相关术语。
Topic:主题。就像微博,你关注了某类主题消息,那么你就会接收到该主题的消息。算是kafka消息源的一种分类。
Partition:一个topic有着一个或多个partition。每个partion都包含着消息。每个partition对应一个log文件,当有新数据添加的时候,是直接加到log文件尾部的,所以数据是有序的。
Sequence:序列。一个Partition又有着一个或多个Sequence。
Message:消息。Kafka中通信的最基本单位。每个Sequence包含着多个消息,这些消息在Partition中都是有序的,每个消息有着一个id(offset(偏移量))。
Proudcer:顾名思义,这就是生产者。生产者生产数据由消费者来消费。负责发布消息到Kafka broker。他决定将消息发往哪个主题,哪个Partition。
Consumer:消费者。向Kafka broker读取消息的客户端。
Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。每个组可消费多个partition,这些partition平摊到组里面的消费者上,所以,每个partition中的数据是有序的消费。
Broker:Kafka集群中的一台或多台服务器被叫做Broker,主要用来持久化消息数据。
3.Kafka的索引
我们知道建立索引可以使查询效率提高。可是kafka数据这么多,每个数据都建立索引是得不偿失的。所以kafka是稀疏索引。Topic分成了多个partition,每个partition有一个索引文件,每个索引文件记录了该partition内的sequence的第一个message的id和偏移量。这样在一定程度上提高了效率,而且因为分成了多个partition,所以用并发访问来提高了效率。
4.kafka的一些基本命令
(启动kafka之前,要启动zookeeper。)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test(创建一个名为test的主题,一个分区,一个副本)
bin/kafka-topics.sh --list --zookeeper localhost:2181(查看所有主题)
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test(查看test主题详情)
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test(删除test主题)
5.Kafka的基本java代码
public class KafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "115.28.138.100:2181");
props.put("metadata.broker.list", "115.28.138.100:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<Integer, String> producer = new Producer<Integer, String>(config);
for(int i=0;i<100;i++){
KeyedMessage<Integer, String> message=new KeyedMessage<Integer, String>("test", "message"+i);
producer.send(message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("发送成功"+i);
}
producer.close();
}
}
public class KafkaConsumer {
public static void main(String[] args) {
Properties props=new Properties();
props.put("zookeeper.connect", "115.28.138.100:2181");
props.put("group.id", "g1");
ConsumerConfig consumerConfig=new ConsumerConfig(props);
ConsumerConnector connector=Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =connector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> kafkaStream=consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> consumerIterator=kafkaStream.iterator();
while(consumerIterator.hasNext()){
System.out.println(new String(consumerIterator.next().message()));
}
}
}
相关文章推荐
- Kafka 之 中级
- Linux下Kafka单机安装配置方法(图文)
- Kafka使用入门教程第1/2页
- Logstash 与Elasticsearch整合使用示例
- Kafka+Log4j实现日志集中管理
- Kafka深度解析
- Kafka设计解析(三)- Kafka High Availability (下)
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- kafka 一些基本知识
- note of kafka learning (first part, before replication)
- flume部署
- Kafka集群安装
- Kafka的副本策略——《Learning Apache Kafka》学习笔记(原书第三章第4节)
- kafka性能测试
- kafka集群搭建
- kafka.common.ConsumerRebalanceFailedException-org.I0Itec.zkclient.exception.ZkNoNodeException
- kafka producer demo 生产者