您的位置:首页 > 运维架构 > Apache

apache kafka系列之源码分析走读-kafka内部模块分析

2014-08-03 20:30 369 查看


apache kafka中国社区QQ群:162272557

kafka整体结构分析:

kafka源代码工程目录结构如下图:



下面只对core目录结构作说明,其他都是测试类或java客户端代码

admin      --管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展	             		patitions
Api       --该模块主要负责交互数据的组装,客户端与服务端交互数据编解码

client    --该模块比较简单,就一个类,Producer读取kafka broker元数据信息,		topic和partitions,以及leader

cluster   --该模块包含几个实体类,Broker,Cluster,Partition,Replica,解释他们之间关系:		  Cluster由多个broker组成,一个Broker包含多个partition,一个topic的所有	  partitions分布在不同broker的中,一个Replica包含多个Partition。
common     --通用模块,只包含异常类和错误验证
consumer    --consumer处理模块,负责所有客户端消费者数据和逻辑处理
contoroller  --负责中央控制器选举,partition的leader选举,副本分配,副本重新分配,		partition和replica扩容。
javaapi      --提供java的producer和consumer接口api
log          --Kafka文件存储模块,负责读写所有kafka的topic消息数据。
message    --封装多个消息组成一个“消息集”或压缩消息集。
metrics    --内部状态的监控模块

network           --网络事件处理模块,负责处理和接收客户端连接
producer          --producer实现模块,包括同步和异步发送消息。
serializer         --序列化或反序列化当前消息

kafka           --kafka门面入口类,副本管理,topic配置管理,leader选举实现(由contoroller模块调用)。
tools           --一看这就是工具模块,包含内容比较多:			a.导出对应consumer的offset值.			b.导出LogSegments信息,当前topic的log写的位置信息.			c.导出zk上所有consumer的offset值.			d.修改注册在zk的consumer的offset值.			f.producer和consumer的使用例子.
utils		  --Json工具类,Zkutils工具类,Utils创建线程工具类,KafkaScheduler公共调度器类,公共日志类等等。


1.kafka启动类:kafka.scala

kafka为kafka broker的main启动类,其主要作用为加载配置,启动report服务(内部状态的监控),注册释放资源的钩子,以及门面入口类。

kafka类代码如下:

......

try {

val props = Utils.loadProps(args(0)) //加载配置文件

val serverConfig = new KafkaConfig(props)

KafkaMetricsReporter.startReporters(serverConfig.props) //启动report服务(内部状态的监控)

val kafkaServerStartble = new KafkaServerStartable(serverConfig) //kafka server核心入口类

// attach shutdown handler to catch control-c

Runtime.getRuntime().addShutdownHook(new Thread() {
//钩子程序,当jvm退出前,销毁所有资源


override def run() = {

kafkaServerStartble.shutdown

}

})

kafkaServerStartble.startup

kafkaServerStartble.awaitShutdown

}

......

KafkaServerStartble类包装了KafkaSever类,其实啥都没有做。只是调用包装类而已

KafkaSever类是kafka broker运行控制的核心入口类,它是采用门面模式设计的。



kafka中KafkaServer类,采用门面模式,是网络处理,io处理等得入口.

ReplicaManager
副本管理

KafkaApis 处理所有request的Proxy类,根据requestKey决定调⽤用具体的handler

KafkaRequestHandlerPool 处理request的线程池,请求处理池 <-- num.io.threads io线程数量

LogManager kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据

TopicConfigManager
监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理,具体请查看topic级别配置

KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息

KafkaController
kafka集群中央控制器选举,leader选举,副本分配。

KafkaScheduler
负责副本管理和日志管理调度等等

ZkClient 负责注册zk相关信息.

BrokerTopicStats
topic信息统计和监控

ControllerStats 中央控制器统计和监控

KafkaServer部分主要代码如下:

......
def startup() {
info("starting")
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)

/* start scheduler */
kafkaScheduler.startup()

/* setup zookeeper */
zkClient = initZk()

/* start log manager */
logManager = createLogManager(zkClient)
logManager.startup()

socketServer = new SocketServer(config.brokerId,
config.hostName,
config.port,
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketSendBufferBytes,
config.socketReceiveBufferBytes,
config.socketRequestMaxBytes)
socketServer.startup()

replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
kafkaController = new KafkaController(config, zkClient)

/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

Mx4jLoader.maybeLoad()

replicaManager.startup()

kafkaController.startup()

topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()

/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()

registerStats()
startupComplete.set(true);
info("started")
}

private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}

/**
*  Forces some dynamic jmx beans to be registered on server startup.
*/
private def registerStats() {
BrokerTopicStats.getBrokerAllTopicsStats()
ControllerStats.uncleanLeaderElectionRate
ControllerStats.leaderElectionTimer
}
.......
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: