Apache Kafka源码剖析:第4篇 业务线程池的原理
2017-08-13 00:00
281 查看
下面,我们来讲解一个请求是如何被业务线程池处理的!
Processor线程与Handler业务线程之间传递数据是通过RequestChannel里的RequestQueue来实现的。
响应则是通过responseQueues队列来实现的。
每个Processor线程对应着一个responseQueue.读到的请求放到requestQueue中,Handler业务线程从这个队里拿出请求进行处理;
业务线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程负责发送给客户端。
看一下核心概念:
1)requestQueue
注意这是全局唯一的1个对象,跟Thrift也是走这条路!
2)responseQueues
3)numProcessors
IO线程的个数
4)queueSize
请求的最大个数
5)responseListeners
监听器列表
在socketserver的初始化过程中,有注册监听器
因为TCP是字节流协议,所以要处理提取完整请求,结果就是RequestChannel.Request对象
供业务线程使用
这里聊一聊另外1个话题,我们看到这里使用了异步跨线程处理的方式,如何保证请求的顺序性呢?
之前的文章提过了,拿到一个请求体后,就取消对OP_READ的注册,导致就算有第2个请求过来我也装聋作哑,等把响应体传给IO线程后,再处理新的请求,此时多个响应会在IO线程那边排队,再通过socket发送出去,很简单!
Kafka网络层的设计原理和实现就介绍到这里了。很多框架都采用这种模式。
希望读者能够通过这几篇文章理解Kafka的网络端实现!
Processor线程与Handler业务线程之间传递数据是通过RequestChannel里的RequestQueue来实现的。
响应则是通过responseQueues队列来实现的。
每个Processor线程对应着一个responseQueue.读到的请求放到requestQueue中,Handler业务线程从这个队里拿出请求进行处理;
业务线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程负责发送给客户端。
这个跟netty和thrift如出一辙!
看一下核心概念:
1)requestQueue
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道 /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) }
注意这是全局唯一的1个对象,跟Thrift也是走这条路!
2)responseQueues
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道 /** Send a response back to the socket server to be sent over the network */ def sendResponse(response: RequestChannel.Response) { responseQueues(response.processor).put(response) for(onResponse <- responseListeners) onResponse(response.processor) }
3)numProcessors
IO线程的个数
4)queueSize
请求的最大个数
5)responseListeners
监听器列表
在socketserver的初始化过程中,有注册监听器
// register the processor threads for notification of responses requestChannel.addResponseListener(id => processors(id).wakeup())
因为TCP是字节流协议,所以要处理提取完整请求,结果就是RequestChannel.Request对象
供业务线程使用
这里聊一聊另外1个话题,我们看到这里使用了异步跨线程处理的方式,如何保证请求的顺序性呢?
之前的文章提过了,拿到一个请求体后,就取消对OP_READ的注册,导致就算有第2个请求过来我也装聋作哑,等把响应体传给IO线程后,再处理新的请求,此时多个响应会在IO线程那边排队,再通过socket发送出去,很简单!
Kafka网络层的设计原理和实现就介绍到这里了。很多框架都采用这种模式。
希望读者能够通过这几篇文章理解Kafka的网络端实现!
相关文章推荐
- 深入剖析线程池(继承关系、源码原理解析、小例子等)
- 并发编程之 源码剖析 线程池 实现原理
- 并发编程之 源码剖析 线程池 实现原理
- Apache Kafka源码剖析:第5篇 业务API处理
- struts2拦截器的实现原理及源码剖析
- 【编译原理】一个词法分析器源码的剖析
- Spark2.2 Worker、Driver和Executor向Master注册原理剖析图解及源码
- Java HashMap实现原理 源码剖析
- Spark源码之路(二):Master原理剖析与源码分析
- Java程序员从笨鸟到菜鸟之(四十六)细谈struts2(八)拦截器的实现原理及源码剖析
- 哈希表(亦称散列表)HashMap源码读后感,实现原理剖析
- Master原理剖析和源码分析
- Apache Kafka源码剖析:第2篇 Kafka网络引擎: 核心字段&初始化
- Apache common-pool, common-dbcp源码解读与对象池原理剖析
- Java HashMap实现原理 源码剖析
- CacheManager原理剖析与源码分析
- Spring源码剖析——核心IOC容器原理
- struts2拦截器的实现原理及源码剖析
- 唯一插件化Replugin源码及原理深度剖析--唯一Hook点原理
- Java HashMap实现原理 源码剖析