apache kafka系列之源码分析走读-kafka内部模块分析
2014-08-03 20:30
369 查看
apache kafka中国社区QQ群:162272557
kafka整体结构分析:kafka源代码工程目录结构如下图:
下面只对core目录结构作说明,其他都是测试类或java客户端代码
admin --管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展 patitions Api --该模块主要负责交互数据的组装,客户端与服务端交互数据编解码 client --该模块比较简单,就一个类,Producer读取kafka broker元数据信息, topic和partitions,以及leader cluster --该模块包含几个实体类,Broker,Cluster,Partition,Replica,解释他们之间关系: Cluster由多个broker组成,一个Broker包含多个partition,一个topic的所有 partitions分布在不同broker的中,一个Replica包含多个Partition。 common --通用模块,只包含异常类和错误验证 consumer --consumer处理模块,负责所有客户端消费者数据和逻辑处理 contoroller --负责中央控制器选举,partition的leader选举,副本分配,副本重新分配, partition和replica扩容。 javaapi --提供java的producer和consumer接口api log --Kafka文件存储模块,负责读写所有kafka的topic消息数据。 message --封装多个消息组成一个“消息集”或压缩消息集。 metrics --内部状态的监控模块 network --网络事件处理模块,负责处理和接收客户端连接 producer --producer实现模块,包括同步和异步发送消息。 serializer --序列化或反序列化当前消息 kafka --kafka门面入口类,副本管理,topic配置管理,leader选举实现(由contoroller模块调用)。 tools --一看这就是工具模块,包含内容比较多: a.导出对应consumer的offset值. b.导出LogSegments信息,当前topic的log写的位置信息. c.导出zk上所有consumer的offset值. d.修改注册在zk的consumer的offset值. f.producer和consumer的使用例子. utils --Json工具类,Zkutils工具类,Utils创建线程工具类,KafkaScheduler公共调度器类,公共日志类等等。
1.kafka启动类:kafka.scala
kafka为kafka broker的main启动类,其主要作用为加载配置,启动report服务(内部状态的监控),注册释放资源的钩子,以及门面入口类。
kafka类代码如下:
......
try {
val props = Utils.loadProps(args(0)) //加载配置文件
val serverConfig = new KafkaConfig(props)
KafkaMetricsReporter.startReporters(serverConfig.props) //启动report服务(内部状态的监控)
val kafkaServerStartble = new KafkaServerStartable(serverConfig) //kafka server核心入口类
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
//钩子程序,当jvm退出前,销毁所有资源
override def run() = {
kafkaServerStartble.shutdown
}
})
kafkaServerStartble.startup
kafkaServerStartble.awaitShutdown
}
......
KafkaServerStartble类包装了KafkaSever类,其实啥都没有做。只是调用包装类而已
KafkaSever类是kafka broker运行控制的核心入口类,它是采用门面模式设计的。
kafka中KafkaServer类,采用门面模式,是网络处理,io处理等得入口.
ReplicaManager
副本管理
KafkaApis 处理所有request的Proxy类,根据requestKey决定调⽤用具体的handler
KafkaRequestHandlerPool 处理request的线程池,请求处理池 <-- num.io.threads io线程数量
LogManager kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据
TopicConfigManager
监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理,具体请查看topic级别配置
KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息
KafkaController
kafka集群中央控制器选举,leader选举,副本分配。
KafkaScheduler
负责副本管理和日志管理调度等等
ZkClient 负责注册zk相关信息.
BrokerTopicStats
topic信息统计和监控
ControllerStats 中央控制器统计和监控
KafkaServer部分主要代码如下:
...... def startup() { info("starting") isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) /* start scheduler */ kafkaScheduler.startup() /* setup zookeeper */ zkClient = initZk() /* start log manager */ logManager = createLogManager(zkClient) logManager.startup() socketServer = new SocketServer(config.brokerId, config.hostName, config.port, config.numNetworkThreads, config.queuedMaxRequests, config.socketSendBufferBytes, config.socketReceiveBufferBytes, config.socketRequestMaxBytes) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) kafkaController = new KafkaController(config, zkClient) /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() registerStats() startupComplete.set(true); info("started") } private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.setupCommonPaths(zkClient) zkClient } /** * Forces some dynamic jmx beans to be registered on server startup. */ private def registerStats() { BrokerTopicStats.getBrokerAllTopicsStats() ControllerStats.uncleanLeaderElectionRate ControllerStats.leaderElectionTimer } .......
相关文章推荐
- apache kafka系列之源码分析走读-kafka内部模块分析
- apache kafka系列之源码分析走读-kafkaApi详解
- apache kafka系列之源码分析走读-server端网络架构分析
- apache kafka系列之源码分析走读-kafkaApi详解
- apache kafka系列之源码分析走读-server端网络架构分析
- apache kafka系列之源码分析走读-SocketServer分析
- twitter storm 源码走读之5 -- worker进程内部消息传递处理和数据结构分析
- apache kafka源码分析走读-Producer分析
- layui源码详细分析系列之流加载模块
- apache kafka源码分析走读-Producer分析
- nova创建虚拟机源码分析系列之七 传入参数转换成内部id
- layui源码详细分析系列之模块加载机制
- Backbone.js源码分析系列之Collection模块
- dubbo源码分析系列——dubbo-register-api模块源码分析
- twitter storm 源码走读之5 -- worker进程内部消息传递处理和数据结构分析
- apache kafka源码分析走读-ZookeeperConsumerConnector分析
- dubbo源码分析系列——dubbo-rpc-api模块源码分析
- dubbo源码分析系列——dubbo-cluster模块源码分析
- layui源码详细分析系列之富文本编辑器模块
- Apache Kafka源码分析 - kafka controller