您的位置:首页 > 其它

第36课:Kafka源码解读SocketServer下的Acceptor、Processor、Handler

2016-07-03 09:26 1091 查看
第36课:Kafka源码解读SocketServer下的Acceptor、Processor、Handler

1 socketServer下的NIO

2 Acceptor、Processor、Handler

/**

 * An NIO socket server. The threading model is

 *   1 Acceptor thread that handles new connections

 *   Acceptor has N Processor threads that each have their own selector and read requests from sockets

 *   M Handler threads that handle requests and produce responses back to the processor threads for writing.

 */

class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {

 

 

/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}

 

 

 

 

 

/*
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if(host == null || host.trim.isEmpty)
new InetSocketAddress(port)
else
new InetSocketAddress(host, port)
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
try {
serverChannel.socket.bind(socketAddress)
info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
} catch {
case e: SocketException =>
throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
}
serverChannel
}

 

 

 

 

/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)

debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))

processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}

 

 

 

 

/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
*/
private[kafka] class Processor(val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
protocol: SecurityProtocol,
channelConfigs: java.util.Map[String, _],
metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
case Array(local, remote) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
BrokerEndPoint.parseHostPort(remote).map { case (remoteHost, remotePort) =>
ConnectionId(localHost, localPort, remoteHost, remotePort)
}
}
case _ => None
}
}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: