您的位置:首页 > 其它

kafka producer 中partition 使用方式

2015-10-27 08:47 309 查看
为了更好的实现负载均衡和消息的顺序性,kafka的producer在分发消息时可以通过分发策略发送给指定的partition。实现分发的程序是需要制定消息的key值,而kafka通过key进行策略分发。

为了更好的弄清楚相关原理,我们从kafka本身提供的分发函数分析:

源代码如下:
private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
private val random = new java.util.Random

def partition(key: T, numPartitions: Int): Int = {
if(key == null)
{
println("key is null")
random.nextInt(numPartitions)
}
else
{
println("key is "+ key + " hashcode is "+key.hashCode)
math.abs(key.hashCode) % numPartitions
}
}
}


上述类对key进行了模版封装,因此key 可以提供Int,String等类型。

其中numPartitions是来自ZKBrokerPartitionInfo生成的数据,具体代码是:
val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)


(Tips:从上面可以看到,我们可以更多的扩展分区信息,多多利用zookeeper提供的信息,比如sortedBrokerPartitions等等)

很多时候我们可能要自己实现一个分区函数,具体的使用方式就是:

private Properties props = new Properties();

...

props.put("partitioner.class","***/***/TestPartition");//一定要写对路径和partitioner.class


具体的实现代码就是改装自DefaultPartitioner的java实现方式,一并贴上:
public class TestPartition implements Partitioner<String>{

public int partition(String key,int numPartitions)
{
//System.out.println("Fuck!!!!");
System.out.print("partitions number is "+numPartitions+"   ");
if (key == null) {
Random random = new Random();
System.out.println("key is null ");
return random.nextInt(numPartitions);
}
else {
int result = Math.abs(key.hashCode())%numPartitions; //很奇怪,
//hashCode 会生成负数,奇葩,所以加绝对值
System.out.println("key is "+ key+ " partitions is "+ result);
return result;
}
}
}


而发送消息使用方式:
List<String> messages = new java.util.ArrayList<String>();
String messageString = "test-message"+Integer.toString(messageNo);
messages.add(messageString);
//producer.send(new ProducerData<String, String>(topic,"test_key", messageStr));
ProducerData<String, String> data = new ProducerData<String, String>(topic, messageString, messages);
producer.send(data);


kafka官方文档中直接使用
ProducerData<String, String> data = new ProducerData<String, String>(topic, “xxx”, "XXX");
producer.send(data);


但是我没有实现,第三个参数用String会报错。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: