您的位置:首页 > 其它

kafka默认分区机制源码解析

2018-12-10 18:37 176 查看
版权声明:转载请注明原创作者名字与联系方式,方便交流,链接 https://blog.csdn.net/qq_43348337/article/details/84944339

如果key为null,则按照一种轮询的方式来计算分区分配
(第一次调用时,获得一个数,将他存入topicCounterMap中,下次调用时,取出该数,并counter.getAndIncrement(),这样每次获得的数都不一样,用他取代key的作用)

如果key不为null,则使用Hash算法(murmur的hash算法:非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配;
(key的hash值取模,除以分区数量,取余数)
(toPositive(Utils.murmur2(keyBytes)) % numPartitions;)

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

public DefaultPartitioner() {
}

public void configure(Map<String, ?> configs) {
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();  //获取分区数量
if (keyBytes == null) { //如果key==null
int nextValue = this.nextValue(topic); //获取下一个值
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {//一种容错机制,获取可用分区大于0时
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;  //用该值替换key的作用,返回分区
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);//从map中获取counter
if (null == counter) {//如果counter为空,
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());//则初始化一个counter
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);//并将该counter存入map中
if (currentCounter != null) {
counter = currentCounter;//
}
}
return counter.getAndIncrement();//自增并返回一个counter
}
public void close() {
}
}

每天都会看博客,欢迎留言或者qq:1816912102

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: