kafka负载均衡相关资料收集(二)
2017-03-03 17:45
316 查看
【转】关于kafka producer 分区策略的思考
from:http://blog.csdn.net/ouyang111222/article/details/51086037
今天跑了一个简单的kafka produce程序,如下所示
发现其只向topic:user11中的某一个partiton中写数据。一下子感觉不对啊,kafka不是号称可以实现producer的消息均发吗?后来查了一下相关的参数:partitioner.class
在上面的程序中,我在producer中没有定义分区策略,也就是说程序采用默认的kafka.producer.DefaultPartitioner,来看看源码中是怎么定义的:
其核心思想就是对每个消息的key的hash值对partition数取模得到。再来看看我的程序中有这么一段:
来看看keyMessage:
由于上面生产者代码中没有传入key,所以程序调用:
但是如果key为null时会发送到哪个分区?我在实验的时候发现,每次运行生产者线程好像发送的分区都不太相同。具体的解释可以参考博文:http://colobu.com/2015/01/22/which-kafka-partition-will-keyedMessages-be-sent-to-if-key-is-null/
好的问题发现了该怎么解决呢?只需要在生产者线程中对每条消息指定key,如下:
看看效果:
然后修改配置即可:
其结果如下:
from:http://blog.csdn.net/ouyang111222/article/details/51086037
今天跑了一个简单的kafka produce程序,如下所示
public class kafkaProducer extends Thread{ private String topic; public kafkaProducer(String topic){ super(); this.topic = topic; } @Override public void run() { Producer producer = createProducer(); int i=0; while(true){ i++; String string = "hello"+i; producer.send(new KeyedMessage<Integer, String>(topic,string)); if(i==100){ break; } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } private Producer createProducer() { Properties properties = new Properties(); properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "ip1:9092,ip2:9092,ip3:9092"); return new Producer<Integer, String>(new ProducerConfig(properties)); } public static void main(String[] args) { new kafkaProducer("user11").start(); } }
发现其只向topic:user11中的某一个partiton中写数据。一下子感觉不对啊,kafka不是号称可以实现producer的消息均发吗?后来查了一下相关的参数:partitioner.class
1 partitioner.class
# 分区的策略 # 默认为kafka.producer.DefaultPartitioner,取模 partitioner.class = kafka.producer.DefaultPartitioner
在上面的程序中,我在producer中没有定义分区策略,也就是说程序采用默认的kafka.producer.DefaultPartitioner,来看看源码中是怎么定义的:
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner { private val random = new java.util.Random def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions } }
其核心思想就是对每个消息的key的hash值对partition数取模得到。再来看看我的程序中有这么一段:
producer.send(new KeyedMessage<Integer, String>(topic,string))
来看看keyMessage:
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message) def this(topic: String, key: K, message: V) = this(topic, key, key, message) def partitionKey = { if(partKey != null) partKey else if(hasKey) key else null } def hasKey = key != null }
由于上面生产者代码中没有传入key,所以程序调用:
def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
但是如果key为null时会发送到哪个分区?我在实验的时候发现,每次运行生产者线程好像发送的分区都不太相同。具体的解释可以参考博文:http://colobu.com/2015/01/22/which-kafka-partition-will-keyedMessages-be-sent-to-if-key-is-null/
好的问题发现了该怎么解决呢?只需要在生产者线程中对每条消息指定key,如下:
producer.send(new KeyedMessage<String, String>(topic,String.valueOf(i),string));
看看效果:
2 自定义partitioner.class
如下所示为自定义的分区函数,分区函数实现了Partitioner接口public class PersonalPartition implements Partitioner{ public PersonalPartition(VerifiableProperties properties){ } public int partition(Object arg0, int arg1) { if(arg0==null){ return 0; } else{ return 1; } } }
然后修改配置即可:
properties.put("partitioner.class", "com.xx.kafka.PersonalPartition");
3 向指定的partition写入数据
当然,也可以向topic中指定的partition中写数据,如下代码为向”user11”中partition 1中写入数据:public class kafkaProducer extends Thread{ private String topic; public kafkaProducer(String topic){ super(); this.topic = topic; } @Override public void run() { KafkaProducer producer = createProducer(); int i=0; while(true){ i++; String string = "hello"+i; producer.send(new ProducerRecord(topic,1,null,string.getBytes())); if(i==10000){ break; } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } private KafkaProducer createProducer() { Properties properties = new Properties(); properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "ip1:9092,ip2:9092,ip3:9092"); return new KafkaProducer(properties); } }
其结果如下:
相关文章推荐
- kafka负载均衡相关资料收集(一)
- kafka负载均衡相关资料收集(三)
- DjangoWeb 开发相关资料收集
- 收集的一些FLASH相关的技术资料(补)
- AssetBundle机制相关资料收集
- TI DSP开发相关资料收集
- MPEG 编解码相关资料收集
- webkit 硬件渲染相关资料收集
- Selenium相关资料收集
- 我在网上收集的java相关资料
- opengl相关资料收集
- Flex2,FMS2,FLASH相关资料收集
- 版本控制工具-SVN相关资料收集整理
- tensorflow学习相关资料收集
- Mina TCP 编码解码相关资料收集
- Sublime Text相关资料收集
- IBM AIX操作系统相关资料收集汇总
- 人脸检测、对齐、识别相关资料收集
- 应用架构设计与分析相关资料的收集、总结与复习(动态更新)
- as相关资料收集