kafka默认分区机制源码解析
2018-12-10 18:37
176 查看
版权声明:转载请注明原创作者名字与联系方式,方便交流,链接 https://blog.csdn.net/qq_43348337/article/details/84944339
如果key为null,则按照一种轮询的方式来计算分区分配
(第一次调用时,获得一个数,将他存入topicCounterMap中,下次调用时,取出该数,并counter.getAndIncrement(),这样每次获得的数都不一样,用他取代key的作用)
如果key不为null,则使用Hash算法(murmur的hash算法:非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配;
(key的hash值取模,除以分区数量,取余数)
(toPositive(Utils.murmur2(keyBytes)) % numPartitions;)
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap(); public DefaultPartitioner() { } public void configure(Map<String, ?> configs) { } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); //获取分区数量 if (keyBytes == null) { //如果key==null int nextValue = this.nextValue(topic); //获取下一个值 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) {//一种容错机制,获取可用分区大于0时 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; //用该值替换key的作用,返回分区 } } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);//从map中获取counter if (null == counter) {//如果counter为空, counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());//则初始化一个counter AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);//并将该counter存入map中 if (currentCounter != null) { counter = currentCounter;// } } return counter.getAndIncrement();//自增并返回一个counter } public void close() { } }
每天都会看博客,欢迎留言或者qq:1816912102
相关文章推荐
- Kafka源码深度解析-序列8 -Consumer -Fetcher实现原理与offset确认机制
- Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——命令篇
- Android事件分发机制完全解析,带你从源码的角度彻底理解(下)
- kafka源码解析之十二KafkaController(中篇)
- 解析从源码分析常见的基于Array的数据结构动态扩容机制的详解
- Android事件分发机制完全解析,带你从源码的角度彻底理解(下)
- "Java消息回收机制"之源码+图文完全解析
- Kafka Java API 之Producer源码解析
- Kakfa揭秘 Day4 Kafka中分区深度解析
- Kafka源码深度解析-序列3 -Producer -Java NIO
- 安卓5.1源码解析 : RecyclerView解析 从绘制流程,ViewHolder复用机制,LayoutManger,ItemAnimator等流程全面讲解
- Android事件分发机制完全解析,带你从源码的角度彻底理解(上) (出自郭霖老师)
- kafka源码解析之二kafka内部的专业术语
- (九)Tomcat源码解析 - WebApp类加载机制原理分析
- Kafka分区机制介绍与示例
- 【spring源码学习】spring的事件发布监听机制源码解析
- Android事件分发机制完全解析,带你从源码的角度彻底理解(下)
- Android异步消息处理机制完全解析,带你从源码的角度彻底理解
- Android事件分发机制完全解析,带你从源码的角度彻底理解
- Mybatis工作机制源码分析—初始化—mapper配置文件解析