您的位置:首页 > 运维架构 > Apache

Apache Kafka源码分析 – Broker Server

2014-02-14 11:52 453 查看

1.Kafka.scala

在Kafka的main入口中startupKafkaServerStartable,而KafkaServerStartable这是对KafkaServer的封装

valkafkaServerStartble=newKafkaServerStartable(serverConfig)
kafkaServerStartble.startup


packagekafka.server
classKafkaServerStartable(valserverConfig:KafkaConfig)extendsLogging{
privatevarserver:KafkaServer=null
privatedefinit(){
server=newKafkaServer(serverConfig)
}
defstartup(){
try{
server.startup()
}
catch{...}
}
}

2.KafkaServer

KafkaServer代表一个kafkabroker,这是kafka的核心.
只需要看看里面startup了哪些modules,就知道broker做了哪些工作,后面一个个具体分析吧

packagekafka.server
/**
*RepresentsthelifecycleofasingleKafkabroker.Handlesallfunctionalityrequired
*tostartupandshutdownasingleKafkanode.
*/
classKafkaServer(valconfig:KafkaConfig,time:Time=SystemTime)extendsLogging{
varsocketServer:SocketServer=null
varrequestHandlerPool:KafkaRequestHandlerPool=null
varlogManager:LogManager=null
varkafkaHealthcheck:KafkaHealthcheck=null
vartopicConfigManager:TopicConfigManager=null
varreplicaManager:ReplicaManager=null
varapis:KafkaApis=null
varkafkaController:KafkaController=null
valkafkaScheduler=newKafkaScheduler(config.backgroundThreads)
varzkClient:ZkClient=null
/**
*StartupAPIforbringingupasingleinstanceoftheKafkaserver.
*InstantiatestheLogManager,theSocketServerandtherequesthandlers-KafkaRequestHandlers
*/
defstartup(){
/*startscheduler*/
kafkaScheduler.startup()
/*setupzookeeper*/
zkClient=initZk()
/*startlogmanager*/
logManager=createLogManager(zkClient)
logManager.startup()
socketServer=newSocketServer(config.brokerId,
config.hostName,
config.port,
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketSendBufferBytes,
config.socketReceiveBufferBytes,
config.socketRequestMaxBytes)
socketServer.startup()
replicaManager=newReplicaManager(config,time,zkClient,kafkaScheduler,logManager,isShuttingDown)
kafkaController=newKafkaController(config,zkClient)
/*startprocessingrequests*/
apis=newKafkaApis(socketServer.requestChannel,replicaManager,zkClient,config.brokerId,config,kafkaController)
requestHandlerPool=newKafkaRequestHandlerPool(config.brokerId,socketServer.requestChannel,apis,config.numIoThreads)
replicaManager.startup()
kafkaController.startup()
topicConfigManager=newTopicConfigManager(zkClient,logManager)
topicConfigManager.startup()
/*telleveryonewearealive*/
kafkaHealthcheck=newKafkaHealthcheck(config.brokerId,config.advertisedHostName,config.advertisedPort,config.zkSessionTimeoutMs,zkClient)
kafkaHealthcheck.startup()
}

2.1KafkaScheduler

KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

packagekafka.utils
/**
*Aschedulerbasedonjava.util.concurrent.ScheduledThreadPoolExecutor
*
*Ithasapoolofkafka-scheduler-threadsthatdotheactualwork.
*
*@paramthreadsThenumberofthreadsinthethreadpool
*@paramthreadNamePrefixThenametouseforschedulerthreads.Thisprefixwillhaveanumberappendedtoit.
*@paramdaemonIftruetheschedulerthreadswillbe"daemon"threadsandwillnotblockjvmshutdown.
*/
@threadsafe
classKafkaScheduler(valthreads:Int,
valthreadNamePrefix:String="kafka-scheduler-",
daemon:Boolean=true)extendsSchedulerwithLogging{
@volatileprivatevarexecutor:ScheduledThreadPoolExecutor=null
overridedefstartup(){
thissynchronized{
executor=newScheduledThreadPoolExecutor(threads)//创建ScheduledThreadPoolExecutor
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
executor.setThreadFactory(newThreadFactory(){
defnewThread(runnable:Runnable):Thread=
Utils.newThread(threadNamePrefix+schedulerThreadId.getAndIncrement(),runnable,daemon)
})
}
}
defschedule(name:String,fun:()=>Unit,delay:Long,period:Long,unit:TimeUnit)={
valrunnable=newRunnable{//将fun封装成Runnable
defrun()={
try{
fun()
}catch{...}
finally{...}
}
}
if(period>=0)//在pool中进行delayschedule
executor.scheduleAtFixedRate(runnable,delay,period,unit)
else
executor.schedule(runnable,delay,unit)
}

2.2ZookeeperClient

由于Kafka是基于zookeeper进行配置管理的,所以需要创建zkclient和zookeeper集群通信

2.3logManager

Theentrypointtothekafkalogmanagementsubsystem.Thelogmanagerisresponsibleforlogcreation,retrieval,andcleaning.
ApacheKafka源码分析–LogManagement

2.4ReplicaManager

在0.8中新加入的replica相关模块

ApacheKafkaReplicationDesign–Highlevel
kafkaDetailedReplicationDesignV3
ApacheKafka源码分析–ReplicaManager

2.5KafkaSocketServer

首先brokerserver是socketserver,所有和broker的交互都是通过往socket端口发送request来实现的

socketServer=newSocketServer(config.brokerId...)

KafkaApis
该类封装了所有request的处理逻辑


/**
*Athreadthatanswerskafkarequests.
*/
classKafkaRequestHandler(id:Int,brokerId:Int,valrequestChannel:RequestChannel,apis:KafkaApis)extendsRunnablewithLogging{
defrun(){
while(true){
try{
valreq=requestChannel.receiveRequest()//从socketChannel接受request
if(reqeqRequestChannel.AllDone){
debug("Kafkarequesthandler%donbroker%dreceivedshutdowncommand".format(
id,brokerId))
return
}
req.requestDequeueTimeMs=SystemTime.milliseconds
apis.handle(req)//使用kafkaApis来处理request
}catch{
casee:Throwable=>error("Exceptionwhenhandlingrequest",e)
}
}
}

defshutdown():Unit=requestChannel.sendRequest(RequestChannel.AllDone)
}

classKafkaRequestHandlerPool(valbrokerId:Int,
valrequestChannel:RequestChannel,
valapis:KafkaApis,
numThreads:Int)extendsLogging{
valthreads=newArray[Thread](numThreads)//线程池
valrunnables=newArray[KafkaRequestHandler](numThreads)
for(i<-0untilnumThreads){
runnables(i)=newKafkaRequestHandler(i,brokerId,requestChannel,apis)
threads(i)=Utils.daemonThread("kafka-request-handler-"+i,runnables(i))
threads(i).start()
}

defshutdown(){
info("shuttingdown")
for(handler<-runnables)
handler.shutdown
for(thread<-threads)
thread.join
info("shutdowncompletely")
}
}



2.6offsetManager

offsetManager=createOffsetManager()
定期清除过期的offset数据,即compact操作,

scheduler.schedule(name="offsets-cache-compactor",
fun=compact,
period=config.offsetsRetentionCheckIntervalMs,
unit=TimeUnit.MILLISECONDS)

以及consumer相关的一些offset操作,不细究了,因为我们不用highlevelconsumer

2.7KafkaController

kafkaController=newKafkaController(config,zkClient,brokerState)

ApacheKafka源码分析–Controller

0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

2.8TopicConfigManager

topicConfigManager=newTopicConfigManager(zkClient,logManager)

TopicConfigManager用于处理topicconfig的change,kafka除了全局的配置,还有一种叫Topic-levelconfiguration

>bin/kafka-topics.sh--zookeeperlocalhost:2181--alter--topicmy-topic
--configmax.message.bytes=128000

比如你可以这样设置,那么这些topicconfig如何生效的?

topic-levelconfig默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

并且这个通知有个suffix,用于区别是否已处理过

/**
*Processthegivenlistofconfigchanges
*/
privatedefprocessConfigChanges(notifications:Seq[String]){
if(notifications.size>0){
info("Processingconfigchangenotification(s)...")
valnow=time.milliseconds
vallogs=logManager.logsByTopicPartition.toBuffer
vallogsByTopic=logs.groupBy(_._1.topic).mapValues(_.map(_._2))
for(notification<-notifications){
valchangeId=changeNumber(notification)
if(changeId>lastExecutedChange){//未处理过
valchangeZnode=ZkUtils.TopicConfigChangesPath+"/"+notification
val(jsonOpt,stat)=ZkUtils.readDataMaybeNull(zkClient,changeZnode)
if(jsonOpt.isDefined){
valjson=jsonOpt.get
valtopic=json.substring(1,json.length-1)//hackywaytodequote,从通知中获取topicname
if(logsByTopic.contains(topic)){
/*combinethedefaultpropertieswiththeoverridesinzktocreatethenewLogConfig*/
valprops=newProperties(logManager.defaultConfig.toProps)
props.putAll(AdminUtils.fetchTopicConfig(zkClient,topic))
vallogConfig=LogConfig.fromProps(props)
for(log<-logsByTopic(topic))
log.config=logConfig//真正的更新log配置
info("Processedtopicconfigchange%dfortopic%s,settingnewconfigto%s.".format(changeId,topic,props))
purgeObsoleteNotifications(now,notifications)//删除过期的notification,10分钟
}
}
lastExecutedChange=changeId
}
}
}
}
这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍

并且broker重启后是会从zk中,loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置

2.8KafkaHealthcheck

kafkaHealthcheck=newKafkaHealthcheck(config.brokerId,config.advertisedHostName,config.advertisedPort,config.zkSessionTimeoutMs,zkClient)

这个很简单,就像注释的,告诉所有人我还活着。。。

实现就是在,

/brokers/[0...N]-->advertisedHost:advertisedPort

register一个ephemeralznode,当SessionExpired时,再去register,典型zk应用
所以只需要watch这个路径就是知道broker是否还活着

2.9ContolledShutdown

对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了

但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partitionleader的迁移,或replicaoffline,然后才能shutdown

privatedefcontrolledShutdown()
挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是uncleanshutdown
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: