您的位置:首页 > 运维架构 > Apache

apache kafka系列之源码分析走读-kafkaApi详解

2017-12-21 19:55 1066 查看
Kafka源码中数据交互流程



图1


1.概述

kafka启动时做很多初始化运行环境工作,具体请参考:apache
kafka系列之源码分析走读-kafka内部模块分析

其中SockeServer类启动时,首先初始化NIO网络环境、启动监听、创建主线程、工作线程池、设置参数等等。

从上图1中可以看到整个交互过程中,kafka的所有逻辑处理和交互实际是交给KafkaApi类来处理的。

通过请求的类型,把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,

而是每个handler都是一个函数,混在KafkaApi类中。


2. Request请求类别

kafka-0.8.1版本中定义了10种类型请求,请求类型说明如下:

参数
说明(解释)
请求二进制数据解码类
RequestKeys.ProduceKey
producer请求
ProducerRequest
RequestKeys.FetchKey
consumer请求
FetchRequest
RequestKeys.OffsetsKey
topic的offset请求
OffsetRequest
RequestKeys.MetadataKey
topic元数据请求
TopicMetadataRequest
RequestKeys.LeaderAndIsrKey
leader和isr信息更新请求
LeaderAndIsrRequest
RequestKeys.StopReplicaKey
停止replica请求
StopReplicaRequest
RequestKeys.UpdateMetadataKey
更新元数据请求
UpdateMetadataRequest
RequestKeys.ControlledShutdownKey
controlledShutdown请求
ControlledShutdownRequest
RequestKeys.OffsetCommitKey
commitOffset请求
OffsetCommitRequest
RequestKeys.OffsetFetchKey
consumer的offset请求
OffsetFetchRequest
下面是KafkaApi中handle方法代码:

[java] view
plain copy

def handle(request: RequestChannel.Request) {  

  try{  

    trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)  

    request.requestId match {  

      case RequestKeys.ProduceKey => handleProducerRequest(request)  // producer  

      case RequestKeys.FetchKey => handleFetchRequest(request)       // consumer  

      case RequestKeys.OffsetsKey => handleOffsetRequest(request)  

      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)  

      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息  

      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)  

      case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)  

      case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)  //shutdown broker  

      case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)  

      case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)  

      case requestId => throw new KafkaException("Unknown api code " + requestId)  

    }  

  } catch {  

    case e: Throwable =>  

      request.requestObj.handleError(e, requestChannel, request)  

      error("error when handling request %s".format(request.requestObj), e)  

  } finally  

    request.apiLocalCompleteTimeMs = SystemTime.milliseconds  

}  


3.请求交互二进制数据格式

kafka中客户端与server端交互有多种类型,那它是怎么交互数据呢,格式是怎样?下面来揭开面纱。

请求交互二进制数据组成为:请求类型 + 请求数据。




3.1 ProducerRequest二进制格式



3.2 FetchRequest二进制格式



3.3 OffsetRequest二进制格式



3.4 TopicMetadataRequest二进制格式



3.5 LeaderAndIsrRequest二进制格式



3.6 StopReplicaRequest二进制格式



3.7 UpdateMetadataRequest二进制格式



3.8 ControlledShutdownRequest二进制格式



3.9 OffsetCommitRequest二进制格式



3.10 OffsetFetchRequest二进制格式

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: