第36课:Kafka源码解读SocketServer下的Acceptor、Processor、Handler
2016-07-03 09:26
1091 查看
第36课:Kafka源码解读SocketServer下的Acceptor、Processor、Handler
1 socketServer下的NIO
2 Acceptor、Processor、Handler
/**
* An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
1 socketServer下的NIO
2 Acceptor、Processor、Handler
/**
* An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
/** * Accept loop that checks for new connection attempts */ def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } }
/* * Create a server socket to listen for connections on. */ private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if(host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort)) } catch { case e: SocketException => throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) } serverChannel }
/* * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) socketChannel.socket().setSendBufferSize(sendBufferSize) debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize)) processor.accept(socketChannel) } catch { case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) } }
/** * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selector */ private[kafka] class Processor(val id: Int, time: Time, maxRequestSize: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, protocol: SecurityProtocol, channelConfigs: java.util.Map[String, _], metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private object ConnectionId { def fromString(s: String): Option[ConnectionId] = s.split("-") match { case Array(local, remote) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) => BrokerEndPoint.parseHostPort(remote).map { case (remoteHost, remotePort) => ConnectionId(localHost, localPort, remoteHost, remotePort) } } case _ => None } }
相关文章推荐
- APICloud css样式大全
- Maven发布web项目到tomcat
- CSCI 512 Lecture-11 Corners
- document.documentElement
- SQL SERVER Z自定义函数
- 专题四总结
- 一、JSP、servlet、SQL三者之间的数据传递(前台与后台数据交互)
- Chapter 5. MDI窗体、菜单栏
- Hello,Structs!
- linux 总结
- Java面向对象概念补充
- 191. Number of 1 Bits
- 2.1 MySQL启动及登录
- hdu 1062 Text Reverse
- The Skyline Problem
- 核心动画实现书本翻页效果加载动画
- Android创建组合控件(摘自《Android5.0开发范例代码大全》)
- JS获取当前日期时间及JS日期格式化
- Android网络编程
- nutch使用