您的位置:首页 > 运维架构 > Apache

Apache Kafka源码剖析:第4篇 业务线程池的原理

2017-08-13 00:00 281 查看
下面,我们来讲解一个请求是如何被业务线程池处理的!

Processor线程与Handler业务线程之间传递数据是通过RequestChannel里的RequestQueue来实现的。

响应则是通过responseQueues队列来实现的。

每个Processor线程对应着一个responseQueue.读到的请求放到requestQueue中,Handler业务线程从这个队里拿出请求进行处理;

业务线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程负责发送给客户端。

这个跟netty和thrift如出一辙!

看一下核心概念:

1)requestQueue

private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道

/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
def sendRequest(request: RequestChannel.Request) {
requestQueue.put(request)
}

注意这是全局唯一的1个对象,跟Thrift也是走这条路!

2)responseQueues

private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道

/** Send a response back to the socket server to be sent over the network */
def sendResponse(response: RequestChannel.Response) {
responseQueues(response.processor).put(response)
for(onResponse <- responseListeners)
onResponse(response.processor)
}

3)numProcessors

IO线程的个数

4)queueSize

请求的最大个数

5)responseListeners

监听器列表

在socketserver的初始化过程中,有注册监听器

// register the processor threads for notification of responses
requestChannel.addResponseListener(id => processors(id).wakeup())


因为TCP是字节流协议,所以要处理提取完整请求,结果就是RequestChannel.Request对象

供业务线程使用

这里聊一聊另外1个话题,我们看到这里使用了异步跨线程处理的方式,如何保证请求的顺序性呢?

之前的文章提过了,拿到一个请求体后,就取消对OP_READ的注册,导致就算有第2个请求过来我也装聋作哑,等把响应体传给IO线程后,再处理新的请求,此时多个响应会在IO线程那边排队,再通过socket发送出去,很简单!

Kafka网络层的设计原理和实现就介绍到这里了。很多框架都采用这种模式。

希望读者能够通过这几篇文章理解Kafka的网络端实现!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Kafka