springboot2.x +kafka使用和源码分析八(自定义分区器)
2020-01-11 18:26
621 查看
1:DefaultPartitioner默认分区器
如果producer没指定落地到指定partition中,Kafak通过默认的分区器对数据进行partition,默认的分区的规则是对key进行hash取值 % 分区数, 相同的key会分布到同一分区中。如果没指定key,则就按照轮询算法将消息均匀的分布在主题的可用分区上。
[code]/** * * 自定义分区器 */ public class DefaultPartitioner implements Partitioner { /** * 线程安全topic计数器容器 */ private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); /** * 获取配置定义属性 * @param configs */ @Override public void configure(Map<String, ?> configs) {} /** * 核心分区方法 */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取该topic下partition List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { //没有指定key的话 按照轮询算法来算出partition //获取下个 int nextValue = nextValue(topic); //获取有效的分区数 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //算出该topic下一个执行的partition // 例如A topic存在有效的0,1,2 产生1-6 6条数据 那么该算法保证0partition会保存1,4数据 1partition会保存2,5 2partition会保存3,6 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // 如果没有分区数 对nextValue进行hash取值 % 分区数 随机分区 return Utils.toPositive(nextValue) % numPartitions; } } else { // 如果指定key 对key进行hash取值 % 分区数 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { //获取当前topic以被执行的次数 AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { //初始化 counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } @Override public void close() {} }
如果使用key来进行partition分区,我们可以将一些特定的数据放到同一partiton中,由消费者组中某一个专门的consumer来进行数据处理。
2:自定义分区器
当默认分区器不满足于我们要求时,我们可以通过实现Partitioner接口自定义分区器,编写自己的分区规则
第一步:
[code] import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.utils.Utils; import java.util.Map; /** * @author fangyuan * 自定义分区器 */ public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //编写自己分区规则 int numPartitions =cluster.partitionCountForTopic(topic); //我这里随便编写了个 // 让所有数据都写到0分区中 return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
第二步:替换默认分区器
[code]/** * 自定义分区器 如果不指定则为默认分区器 DefaultPartitioner */ props.put("partitioner.class","com.demo.kafkaDemo.partitioner.MyPartitioner");
验证数据:
发现生产者生产的所有message全都落到到partition 0中了
Demo项目github地址:https://github.com/fangyuan94/kafkaDemo
- 点赞 1
- 收藏
- 分享
- 文章举报
相关文章推荐
- springboot2.x +redis使用和源码分析二(RedisTemplate)
- springboot2.x +redis使用和源码分析一(springboot自动装配源码分析)
- springboot2.x +redis使用和源码分析三(序列化器)
- springboot源码分析10-ApplicationContextInitializer使用
- springboot源码分析16-spring boot监听器使用
- Springboot 2使用外部Tomcat源码分析
- springboot源码分析4-springboot之SpringFactoriesLoader使用
- spring boot实战(第十四篇)整合RabbitMQ源码分析前言
- SpringBoot源码解析:tomcat启动分析
- spring boot实战(第十篇)Spring boot Bean加载源码分析
- Spring Boot下Druid连接池的使用配置分析
- spring-boot-admin源码分析及单机监控spring-boot-monitor的实现(一)
- Spring Boot + Spring Security 防止用户在多处同时登录(一个用户同时只能登录一次)及源码分析
- SpringBoot-Mybatis框架使用与源码解析
- SpringBoot源码分析:spring的基本架构
- [Spring Boot] 1. Spring Boot启动过程源码分析
- Spring boot源码分析-yaml语言(9)
- spring boot实战(第十篇)Spring boot Bean加载源码分析
- spring boot 源码解析26-Liquibase使用及LiquibaseEndpoint解析
- Spring Boot 2.x使用swagger2.8.0生成在线API文档