Kafka分区分配计算(分区器Partitions)
2017-12-03 18:49
295 查看
KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景下,业务逻辑需要控制每条消息落到合适的分区中,有些情形下则只要根据默认的分配规则即可。在KafkaProducer计算分配时,首先根据的是ProducerRecord中的partition字段指定的序号计算分区。读者有可能刚睡醒,看到这个ProducerRecord似曾相识,没有关系,先看段Kafka生产者的示例片段:
没错,ProducerRecord只是一个封装了消息的对象而已,ProducerRecord一共有5个成员变量,即:
在KafkaProducer的源码(1.0.0)中,计算分区时调用的是下面的partition()方法:
可以看出的确是先判断有无指明ProducerRecord的partition字段,如果没有指明,则再进一步计算分区。上面这段代码中的partitioner在默认情况下是指Kafka默认实现的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:
由上源码可以看出partition的计算方式:
1. 如果key为null,则按照一种轮询的方式来计算分区分配
2. 如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。
KafkaProducer中还支持自定义分区分配方式,与org.apache.kafka.clients.producer.internals.DefaultPartitioner一样首先实现org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class为对应的自定义分区器(Partitioners)即可,即:
自定义DemoPartitioner主要是实现Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的计算方式,详细参考如下:
这个自定义分区器的实现比较简单,读者可以根据自身业务的需求来灵活实现分配分区的计算方式,比如:一般大型电商都有多个仓库,可以将仓库的名称或者ID作为Key来灵活的记录商品信息。
欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。
Producer<String,String> producer = new KafkaProducer<String,String>(properties); String message = "kafka producer demo"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message); try { producer.send(producerRecord).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
没错,ProducerRecord只是一个封装了消息的对象而已,ProducerRecord一共有5个成员变量,即:
private final String topic;//所要发送的topic private final Integer partition;//指定的partition序号 private final Headers headers;//一组键值对,与RabbitMQ中的headers类似,kafka0.11.x版本才引入的一个属性 private final K key;//消息的key private final V value;//消息的value,即消息体 private final Long timestamp;//消息的时间戳,可以分为Create_Time和LogAppend_Time之分,这个以后的文章中再表。
在KafkaProducer的源码(1.0.0)中,计算分区时调用的是下面的partition()方法:
/** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */ private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
可以看出的确是先判断有无指明ProducerRecord的partition字段,如果没有指明,则再进一步计算分区。上面这段代码中的partitioner在默认情况下是指Kafka默认实现的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); }
由上源码可以看出partition的计算方式:
1. 如果key为null,则按照一种轮询的方式来计算分区分配
2. 如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。
KafkaProducer中还支持自定义分区分配方式,与org.apache.kafka.clients.producer.internals.DefaultPartitioner一样首先实现org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class为对应的自定义分区器(Partitioners)即可,即:
properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");
自定义DemoPartitioner主要是实现Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的计算方式,详细参考如下:
public class DemoPartitioner implements Partitioner { private final AtomicInteger atomicInteger = new AtomicInteger(0); @Override public void configure(Map<String, ?> configs) {} @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (null == keyBytes || keyBytes.length<1) { return atomicInteger.getAndIncrement() % numPartitions; } //借用String的hashCode的计算方式 int hash = 0; for (byte b : keyBytes) { hash = 31 * hash + b; } return hash % numPartitions; } @Override public void close() {} }
这个自定义分区器的实现比较简单,读者可以根据自身业务的需求来灵活实现分配分区的计算方式,比如:一般大型电商都有多个仓库,可以将仓库的名称或者ID作为Key来灵活的记录商品信息。
欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。
相关文章推荐
- Kafka分区分配策略(Partition Assignment Strategy)
- kafka分区及副本在broker的分配
- kafka分区及副本在broker的分配
- kafka分区及副本在broker的分配
- flume+kafka实现根据消息的标识分配到不同的分区
- Kafka分区分配策略(Partition Assignment Strategy)
- kafka扩容和分区重新分配
- 动态分区分配实验报告
- kafka 0.9.0.0 rebalance后部分分区不再读数据
- 动态分区的分配策略
- 动态分区的分配策略
- Storm+Kafka实时计算框架搭建
- 如何确定Kafka的分区数,key和consumer线程数,以及不消费问题解决
- storm集成kafka报错【KeeperErrorCode = NoNode for /kafka/brokers/topics/test/partitions】
- 动态存储器分配:内存动态分区分配方式的理解以及模拟(一)
- 盈利分配 方格计算
- kafka中的topic为什么要进行分区?
- Linpack使用Infiniband网络计算时内存分配问题
- 分区分配方法的主要缺点是什么,如何克服这一缺点?
- kafka发送消息分区策略详解