您的位置:首页 > 其它

Kafka Producer 发送消息源码阅读

2017-06-08 17:09 731 查看
今天看了kafka 发送消息部分的源码(0.8.2.1版本的),针对kafka的消息发送,分区策略如下:

1 kafka的分区策略

1.1 如果指定了partition,则将消息发到对应的partition

1.2 如果没有指定partition,但指定了key, 会根据key的hash选择一个partition,

如果如果key名固定,则消息只会发到固定的一个partition上, 所以key不要设置为固定的值,如果需要设置,则需要考虑修改kafka的源码,以支持将数据均匀发到不同的partition上

1.3 如果key,partition都没有指定,则采用round-robin即轮循的方式发到每个partition



2 消息的发送都是异步的,发送过程如下

涉及到三个对象:

2.1 RecordAccumulator

维护了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches 对象

一个partition对应一个RecordBatch的ArrayDeque

调用KafkaProducer.send方法发送消息,最终调用如下方法:



如果RecordBatch已经满 或 创建了新的RecordBatch,则唤醒发送对象Sender



2.2 Sender

The background thread that handles the sending of produce requests to the Kafka cluster

Sender通过kafkaclient将RecordAccumulator 的数据批量写入到server

Sender定义的run方法实现如下:



在run(long now)中,实现逻辑如下:

2.2.1 首先通过如下条件获取发送数据的节点



2.2.2删除掉当前不能发送的kafka node



2.2.3 获取发送的数据列表

循环此节点上是leader的partition

根据partition,获取此partition对应的RecordBatch,并放到此节点对应的 List<RecordBatch>



2.2.4组装请求对象,发送到不同的kafka节点

计算pollTimeout并发送请求对象到不同的kafka节点

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}

// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);


2.2.5 针对返回的数据进行处理

// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
for (ClientResponse response : responses) {
if (response.wasDisconnected())
handleDisconnect(response, now);
else
handleResponse(response, now);
}


2.3 KafkaClient

其实现类是:NetworkClient,基于socket方式与server进行数据交互

3 kafka参数配置

用于存储批量数据的缓冲大小(对应类:MemoryRecords) batch-size : 16384

用于整个client缓存所有发送对象的大小(对应类:BufferPool ) :BUFFER_MEMORY 32 * 1024 * 1024L 即 32M

用于发送延迟的时间配置(LINGER_MS),如果设置为1秒,则记录先发送到client缓存中,等待1秒后再发送数据,默认为0 表示立即发送

指定数据压缩类型: compression.type ,支持:none,gzip, snappy, lz4, 默认为none

理论上,设置LINGER_MS 会提高消息的吞吐量
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息