apache kafka系列之源码分析走读-kafkaApi详解
2017-12-21 19:55
1066 查看
Kafka源码中数据交互流程
图1
kafka启动时做很多初始化运行环境工作,具体请参考:apache
kafka系列之源码分析走读-kafka内部模块分析
其中SockeServer类启动时,首先初始化NIO网络环境、启动监听、创建主线程、工作线程池、设置参数等等。
从上图1中可以看到整个交互过程中,kafka的所有逻辑处理和交互实际是交给KafkaApi类来处理的。
通过请求的类型,把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,
而是每个handler都是一个函数,混在KafkaApi类中。
kafka-0.8.1版本中定义了10种类型请求,请求类型说明如下:
下面是KafkaApi中handle方法代码:
[java] view
plain copy
def handle(request: RequestChannel.Request) {
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request) // producer
case RequestKeys.FetchKey => handleFetchRequest(request) // consumer
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) //shutdown broker
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
kafka中客户端与server端交互有多种类型,那它是怎么交互数据呢,格式是怎样?下面来揭开面纱。
请求交互二进制数据组成为:请求类型 + 请求数据。
3.2 FetchRequest二进制格式
3.3 OffsetRequest二进制格式
3.4 TopicMetadataRequest二进制格式
3.5 LeaderAndIsrRequest二进制格式
3.6 StopReplicaRequest二进制格式
3.7 UpdateMetadataRequest二进制格式
3.8 ControlledShutdownRequest二进制格式
3.9 OffsetCommitRequest二进制格式
3.10 OffsetFetchRequest二进制格式
图1
1.概述
kafka启动时做很多初始化运行环境工作,具体请参考:apachekafka系列之源码分析走读-kafka内部模块分析
其中SockeServer类启动时,首先初始化NIO网络环境、启动监听、创建主线程、工作线程池、设置参数等等。
从上图1中可以看到整个交互过程中,kafka的所有逻辑处理和交互实际是交给KafkaApi类来处理的。
通过请求的类型,把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,
而是每个handler都是一个函数,混在KafkaApi类中。
2. Request请求类别
kafka-0.8.1版本中定义了10种类型请求,请求类型说明如下:参数 | 说明(解释) | 请求二进制数据解码类 |
RequestKeys.ProduceKey | producer请求 | ProducerRequest |
RequestKeys.FetchKey | consumer请求 | FetchRequest |
RequestKeys.OffsetsKey | topic的offset请求 | OffsetRequest |
RequestKeys.MetadataKey | topic元数据请求 | TopicMetadataRequest |
RequestKeys.LeaderAndIsrKey | leader和isr信息更新请求 | LeaderAndIsrRequest |
RequestKeys.StopReplicaKey | 停止replica请求 | StopReplicaRequest |
RequestKeys.UpdateMetadataKey | 更新元数据请求 | UpdateMetadataRequest |
RequestKeys.ControlledShutdownKey | controlledShutdown请求 | ControlledShutdownRequest |
RequestKeys.OffsetCommitKey | commitOffset请求 | OffsetCommitRequest |
RequestKeys.OffsetFetchKey | consumer的offset请求 | OffsetFetchRequest |
[java] view
plain copy
def handle(request: RequestChannel.Request) {
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request) // producer
case RequestKeys.FetchKey => handleFetchRequest(request) // consumer
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) //shutdown broker
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
3.请求交互二进制数据格式
kafka中客户端与server端交互有多种类型,那它是怎么交互数据呢,格式是怎样?下面来揭开面纱。请求交互二进制数据组成为:请求类型 + 请求数据。
3.1 ProducerRequest二进制格式
3.2 FetchRequest二进制格式
3.3 OffsetRequest二进制格式
3.4 TopicMetadataRequest二进制格式
3.5 LeaderAndIsrRequest二进制格式
3.6 StopReplicaRequest二进制格式
3.7 UpdateMetadataRequest二进制格式
3.8 ControlledShutdownRequest二进制格式
3.9 OffsetCommitRequest二进制格式
3.10 OffsetFetchRequest二进制格式
相关文章推荐
- apache kafka系列之源码分析走读-kafka内部模块分析
- apache kafka系列之源码分析走读-kafka内部模块分析
- apache kafka系列之源码分析走读-kafkaApi详解
- apache kafka系列之源码分析走读-server端网络架构分析
- apache kafka系列之源码分析走读-server端网络架构分析
- apache kafka系列之源码分析走读-SocketServer分析
- Kafka java api-消费者代码与消费分析、生产者消费者配置文件详解
- dubbo源码分析系列——dubbo-rpc-api模块源码分析
- Apache Kafka源码分析 - kafka controller
- Java程序员从笨鸟到菜鸟之(五十二)细谈Hibernate(三)Hibernate常用API详解及源码分析--csdn 曹胜欢
- Hibernate(三)Hibernate常用API详解及源码分析
- apache kafka源码分析走读-Producer分析
- nova创建虚拟机源码分析系列之六 api入口create方法
- Hibernate学习(第三篇)——Hibernate常用API详解及源码分析
- 详解Node.js API系列 Module模块(2) 案例分析
- hadoop源码分析系列(七)——org.apache.hadoop.hdfs包完结篇——dataNode详解及总结
- SSH学习(十)Hibernate常用API详解及源码分析
- jQuery-1.9.1源码分析系列(十六)ajax——响应数据处理和api整理
- Hibernate常用API详解及源码分析 .
- jQuery-1.9.1源码分析系列(三) Sizzle选择器引擎——编译原理续(伪类选择器“PSEUDO”和子伪类选择器"CHILD"原子选择器详解)