您的位置:首页 > 理论基础 > 计算机网络

Apache Kafka源码剖析:第2篇 Kafka网络引擎: 核心字段&初始化

2017-08-12 00:00 295 查看
上一节,我们通过Thrift的案例讲解了Reactor的网络服务器处理模式,

其实,从redis,mina,netty,thrift[下面简称为RMNT]的源码来看,万变不离其宗,

或者说套路都是一样一样的,这没办法,纯技术的原理本来就是一样的。

下面开始正式进入Kafka的网络引擎的世界!

-----------------------------------------------------------------------------------------

Kafka的网络层采用多线程,多个Selector的设计,这跟RMNT的思路差不多(Redis特殊一些,后面不再强调这1点)。

核心类是SocketServer,包含1个Acceptor用于接受新连接,每个Acceptor对应多个Processor线程,每个Processor线程拥有自己独立的Selector.主要用于从连接中读取请求和写回响应。

每个Processor拥有自己的Selector,这样才可以将某个socket限制在自己的处理范围内,直到这个Socket的生命周期结束。
而且不但读取请求是这个Processor的Selector线程处理,写回响应也是它处理,
一言以蔽之:我会对你负责到底的!!! :)
其实想想也正常,总不可能同1个 socket一会被Selector A处理,一会被B处理,这就乱套了,还是稳定起来好!

每个Acceptor也对应多个Handle线程,一般业内称之为业务处理线程池

所以如果你去别的单位面试,

问你Netty如何处理耗时业务的,你不说要新开一个业务线程池,相信我,面试官内心会把你鄙视一顿的 :)

千万别说Netty的业务处理跟IO线程池在一个线程处理的,绝对要丢分!

此话一出,基本Netty这一项就不用问下去了

---这里要注意的是:业务线程池的结果是要返回给IO线程池的,也就是Processor线程组,

这2种线程之间通过RequestChannel进行通信

在Thrift中,是通过Runnable封装FrameBuffer来实现的
protected Runnable getRunnable(FrameBuffer frameBuffer) {
return new Invocation(frameBuffer);
}

FrameBuffer封装的就是业务逻辑完整的一个请求体,
你就理解为一个完整意义的HTTP请求体一样的

技术背景:TCP的字节流协议特性!!!不多说了


SocketServer的核心字段

源码位置

find ./ -name SocketServer.*
./core/src/main/scala/kafka/network/SocketServer.scala

1)endpoints:

EndPoint集合。一般服务器有多个网卡,这就可以配置多个IP,Kafka可以同时监听多个端口,

一个endpoint就定义了host,port,网络协议等信息,

每个Endpoint对应1个Acceptor对象

这个其实有点类似于ActiveMQ的概念,ActiveMQ支持多个协议,每个协议开启了一个TCP协议的监听端口,
所以一个ActiveMQ进程其实占用了很多个listening port.

2)numProcessorThreads & totalProcessorThreads

numProcessorThreads 的意思是 每个endpoint的Processor线程的个数

那么后面1个呢?因为有多个endpoint,所以就是endpoint的个数* numProcessorThreads

3)maxQueuedRequests: 缓存的最大请求个数

想一想,在Thrift中,最多可以缓存多少个? :)

这个其实是通过ProcessorThread对Socket进行读取后得到请求,塞到这个队列里进行缓冲

4)maxConnectionsPerIp: 每个IP上能创建的最大连接数

正常来说,不会有限制吧,难道要限制 client不连过来吗???

5)maxConnectionsPerIpOverrides: 略

6)requestChannel: 队列

kafka里的一个逻辑完整请求封装对应的队列,想想http的请求体对应的是HttpRequest

Thrift对应的是

在Thrift中,是通过Runnable封装FrameBuffer来实现的
protected Runnable getRunnable(FrameBuffer frameBuffer) {
return new Invocation(frameBuffer);
}

FrameBuffer封装的就是业务逻辑完整的一个请求体,
你就理解为一个完整意义的HTTP请求体一样的

技术背景:TCP的字节流协议特性!!!不多说了

不解释!

7)Acceptors:

Acceptor对象集合,每个Endpoint对应一个这样的对象,不解释!

8)Processors:

IO线程的集合,不解释!

===介绍完了核心字段,下面看SocketServer的初始化流程===

首先,老规矩,构造Linux的debug环境。

1)启动Kafka server
2)查看启动命令
/root/myAllFiles/jdk1.8.0_111/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -cp /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar:

3)构造debug命令
jdb -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -classpath /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/libs/*:  kafka.Kafka config/server.properties

第3步如果报错,读者可以自己体会然后修正 :)

好,通过jdb跑起来之后,我们的目标是大体了解SocketServer的执行过程,具体每个组件的实现会在后面详细介绍。

SocketServer会在启动时遍历EndPoint,启动对应的各种线程 :)

安装scala插件 见 http://www.cnblogs.com/xiyuan2016/p/6626825.html http://scala-ide.org/download/prev-stable.html

------开始尝试debug,来热热身------

先来个断点

stop in kafka.network.SocketServer:54

然后可以开始debug了,必要的话,请自己加上源码的文件夹即可。

如图所示:



================下面正式debug==================

小贴士:

属性文件的对应关系,请参考:
kafka.server.KafkaConfig.scala

class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
//默认就1个
private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private val numProcessorThreads = config.numNetworkThreads//配置文件中是3,val NumNetworkThreadsProp = "num.network.threads"
private val maxQueuedRequests = config.queuedMaxRequests//默认500, val QueuedMaxRequestsProp = "queued.max.requests"
private val totalProcessorThreads = numProcessorThreads * endpoints.size//3*1

private val maxConnectionsPerIp = config.maxConnectionsPerIp//默认值Int.MaxValue->2147483647 val MaxConnectionsPerIpProp = "max.connections.per.ip"
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides

this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "

接下来初始化RequestChannel对象

val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)

跟进去

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道
for(i <- 0 until numProcessors)//初始化,用了LinkedBlockingQueue
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

回到SocketServer.scala

private val processors = new Array[Processor](totalProcessorThreads)//准备构造IO线程池

private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()//Acceptor
private var connectionQuotas: ConnectionQuotas = _

// register the processor threads for notification of responses
requestChannel.addResponseListener(id => processors(id).wakeup())

意思就是说当有业务响应准备好时,需要wakeup当前io线程的Selector.

接下来,执行startup方法,这是核心

Step completed: "thread=main", kafka.network.SocketServer.startup(), line=74 bci=0

我们来看看做了哪些事情

/**
* Start the socket server
*/
def startup() {
this.synchronized {
//限额
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
//一些TCP的参数
val sendBufferSize = config.socketSendBufferBytes//配置文件:102400 val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
val recvBufferSize = config.socketReceiveBufferBytes//配置文件:102400 val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
val brokerId = config.brokerId//这个就不用说了

var processorBeginIndex = 0
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads//每个endpoint都启动这么多个线程

for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)//初始化Processor线程

val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)//初始化Acceptor线程
acceptors.put(endpoint, acceptor)
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
acceptor.awaitStartup()

processorBeginIndex = processorEndIndex
}
}

关闭就比较简单了

/**
* Shutdown the socket server
*/
def shutdown() = {//关闭操作
info("Shutting down")
this.synchronized {
acceptors.values.foreach(_.shutdown)//关闭acceptor
processors.foreach(_.shutdown)//关闭processor
}
info("Shutdown completed")
}


AbstractServerThread

看下面2个

/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
processors: Array[Processor],
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
*/
private[kafka] class Processor(val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

知道,Acceptor和Processor都是继承了AbstractServerThread这个类

/**
* A base class with some helper variables and methods
*/
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {

它实现了Runnable接口的抽象类,分别为acceptor和Processor线程提供了具体的startup/shutdown功能!

小贴士:
停下来,回顾一下Netty的玩法,是不是很熟悉?

/**
* A base class with some helper variables and methods
*/
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
private val startupLatch = new CountDownLatch(1)//标识是否已经启动完毕

// `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception is thrown in the constructor
// (e.g. if the address is already in use). We want `shutdown` to proceed in such cases, so we first assign an open
// latch and then replace it in `startupComplete()`.
@volatile private var shutdownLatch = new CountDownLatch(0)//标记是否关闭完毕

private val alive = new AtomicBoolean(true)//是否存活

def wakeup(): Unit

/**
* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
*/
def shutdown(): Unit = {
alive.set(false)
wakeup()
shutdownLatch.await()
}

/**
* Wait for the thread to completely start up
*/
def awaitStartup(): Unit = startupLatch.await

/**
* Record that the thread startup is complete
*/
protected def startupComplete(): Unit = {
// Replace the open latch with a closed one
shutdownLatch = new CountDownLatch(1)
startupLatch.countDown()
}

/**
* Record that the thread shutdown is complete
*/
protected def shutdownComplete(): Unit = shutdownLatch.countDown()

/**
* Is the server still running?
*/
protected def isRunning: Boolean = alive.get

/**
* Close the connection identified by `connectionId` and decrement the connection count.
*/
def close(selector: KSelector, connectionId: String): Unit = {//关闭socket,减少连接数统计
val channel = selector.channel(connectionId)
if (channel != null) {
debug(s"Closing selector connection $connectionId")
val address = channel.socketAddress
if (address != null)
connectionQuotas.dec(address)
selector.close(connectionId)
}
}

/**
* Close `channel` and decrement the connection count.
*/
def close(channel: SocketChannel): Unit = {
if (channel != null) {
debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
connectionQuotas.dec(channel.socket.getInetAddress)
swallowError(channel.socket().close())
swallowError(channel.close())
}
}
}

关于更详细的Acceptor和Processor的细节,后面再说!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Kafka