您的位置:首页 > 编程语言 > Java开发

springboot2.x +kafka使用和源码分析八(自定义分区器)

2020-01-11 18:26 621 查看

1:DefaultPartitioner默认分区器

如果producer没指定落地到指定partition中,Kafak通过默认的分区器对数据进行partition,默认的分区的规则是对key进行hash取值 % 分区数, 相同的key会分布到同一分区中。如果没指定key,则就按照轮询算法将消息均匀的分布在主题的可用分区上。

[code]/**
*
* 自定义分区器
*/
public class DefaultPartitioner implements Partitioner {

/**
*  线程安全topic计数器容器
*/
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

/**
* 获取配置定义属性
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {}

/**
* 核心分区方法
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

//获取该topic下partition
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (keyBytes == null) {
//没有指定key的话 按照轮询算法来算出partition

//获取下个
int nextValue = nextValue(topic);

//获取有效的分区数
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

if (availablePartitions.size() > 0) {
//算出该topic下一个执行的partition
// 例如A topic存在有效的0,1,2 产生1-6 6条数据 那么该算法保证0partition会保存1,4数据 1partition会保存2,5 2partition会保存3,6
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {

// 如果没有分区数 对nextValue进行hash取值 % 分区数 随机分区
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 如果指定key 对key进行hash取值 % 分区数
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
//获取当前topic以被执行的次数
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
//初始化
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);

if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

@Override
public void close() {}

}

如果使用key来进行partition分区,我们可以将一些特定的数据放到同一partiton中,由消费者组中某一个专门的consumer来进行数据处理。

2:自定义分区器

当默认分区器不满足于我们要求时,我们可以通过实现Partitioner接口自定义分区器,编写自己的分区规则

第一步:

[code]
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;

/**
* @author fangyuan
* 自定义分区器
*/
public class MyPartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

//编写自己分区规则
int numPartitions =cluster.partitionCountForTopic(topic);

//我这里随便编写了个
// 让所有数据都写到0分区中
return 0;

}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs) {

}
}

第二步:替换默认分区器

[code]/**
* 自定义分区器 如果不指定则为默认分区器 DefaultPartitioner
*/
props.put("partitioner.class","com.demo.kafkaDemo.partitioner.MyPartitioner");

验证数据:

发现生产者生产的所有message全都落到到partition 0中了

Demo项目github地址:https://github.com/fangyuan94/kafkaDemo

  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
F_Hello_World 发布了38 篇原创文章 · 获赞 47 · 访问量 1055 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: