您的位置:首页 > 其它

RocketMQ源码分析之rocketmq-broker启动 (二)

2018-08-06 23:11 726 查看
版权声明:本博客为技术原创, 未经允许,谢绝转载 !!! https://blog.csdn.net/yewandemty/article/details/81463165

本文章会从以下几个方面介绍rocketmq-broker启动流程
1. Broker简单介绍
1.1 initialize
1.2 start
2. broker启动代码分析
2.1 roketmq-broker启动入口
2.2 通过createBrokerController创建BrokerController对象
2.3 执行initialize方法进行初始化操作
2.4. 通过start启动broker信息

一,broker简单介绍

broker是消息接收处理,落地的核心模块。这个模块用于接收producer发送的消息以及consumer消费的消息。
broker启动主要分为initialize和start两个步骤。


图一、broker启动流程图

1.1 initialize

1) broker启动会获取外部传入的初始化参数, 也会从配置文件中加载相关的配置参数 ,将参数保存在对应的Configuration类当中, Configuration类包含的消息如下

  • broker自身的配置: 包括根目录 , namesrv地址,broker的IP和名称,消息队列数,收发消息线程池数等参数
  • netty启动配置: 包括监听端口, 工作线程数, 异步发送消息信号量数量等参数。
  • 存储层配置:包括存储根目录,CommitLog 配置,持久化策略配置等参数

2) 当配置信息设置完毕后, broker会将这些信息传入brokerController控制器当中,这个控制器会初始化加载很多的管理器

  • topicManager : 用于管理broker中存储的所有topic的配置
  • consumerOffsetManager: 管理Consumer的消费进度
  • subscriptionGroupManager: 用来管理订阅组,包括订阅权限等
  • messageStore: 用于broker层的消息落地存储
  • 3) 当管理器全部加载完毕后,控制器将开始进入下一步初始化

    • 启动netty服务器,用于接收消息
    • 初始化多个线程池,包括sendMessageExecutor、pullMessageExecutor、adminBrokerExecutor、clientManagerExecutor, 分别用于发送消息执行器、拉取消息执行器、broker管理器执行器、客户端管理执行器, 这些执行器回放如线程池中处理, 来做并发执行。

    1.2 start
    当broker初始化了配置参数后,就可以开始启动了

    • 启动刚刚初始化的各个管理器以后:topicManager , consumerOffsetManager, subscriptionGroupManager, messgeStoreManager
    • 开启定时调度线程, 每隔30s向nameSrv上报自己的信息
    • 启动线程处理无用的topic

    二、broker启动代码分析

    2.1 roketmq-broker启动入口

    public class BrokerStartup {
    //...
    public static void main(String[] args) {
    start(createBrokerController(args));
    }
    //...
    }
    • 第3-5行, broker启动程序入口, 包含两部分, createBrokerController(args) 和 start(brokerController)

    2.2 通过createBrokerController创建BrokerController对象

    public static BrokerController createBrokerController(String[] args) {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
    NettySystemConfig.socketSndbufSize = 131072;
    }
    
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
    NettySystemConfig.socketRcvbufSize = 131072;
    }
    
    try {
    //PackageConflictDetect.detectFastjson();
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
    new PosixParser());
    if (null == commandLine) {
    System.exit(-1);
    }
    
    final BrokerConfig brokerConfig = new BrokerConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    
    nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
    nettyServerConfig.setListenPort(10911);
    final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    
    if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
    int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
    messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
    }
    
    if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
    configFile = file;
    InputStream in = new BufferedInputStream(new FileInputStream(file));
    properties = new Properties();
    properties.load(in);
    
    properties2SystemEnv(properties);
    MixAll.properties2Object(properties, brokerConfig);
    MixAll.properties2Object(properties, nettyServerConfig);
    MixAll.properties2Object(properties, nettyClientConfig);
    MixAll.properties2Object(properties, messageStoreConfig);
    
    BrokerPathConfigHelper.setBrokerConfigPath(file);
    in.close();
    }
    }
    
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
    
    if (null == brokerConfig.getRocketmqHome()) {
    System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
    + " variable in your environment to match the location of the RocketMQ installation");
    System.exit(-2);
    }
    
    String namesrvAddr = brokerConfig.getNamesrvAddr();
    if (null != namesrvAddr) {
    try {
    String[] addrArray = namesrvAddr.split(";");
    for (String addr : addrArray) {
    RemotingUtil.string2SocketAddress(addr);
    }
    } catch (Exception e) {
    System.out.printf(
    "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
    namesrvAddr);
    System.exit(-3);
    }
    }
    
    switch (messageStoreConfig.getBrokerRole()) {
    case ASYNC_MASTER:
    case SYNC_MASTER:
    brokerConfig.setBrokerId(MixAll.MASTER_ID);
    break;
    case SLAVE:
    if (brokerConfig.getBrokerId() <= 0) {
    System.out.printf("Slave's brokerId must be > 0");
    System.exit(-3);
    }
    
    break;
    default:
    break;
    }
    
    messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
    
    if (commandLine.hasOption('p')) {
    Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
    MixAll.printObjectProperties(console, brokerConfig);
    MixAll.printObjectProperties(console, nettyServerConfig);
    MixAll.printObjectProperties(console, nettyClientConfig);
    MixAll.printObjectProperties(console, messageStoreConfig);
    System.exit(0);
    } else if (commandLine.hasOption('m')) {
    Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
    MixAll.printObjectProperties(console, brokerConfig, true);
    MixAll.printObjectProperties(console, nettyServerConfig, true);
    MixAll.printObjectProperties(console, nettyClientConfig, true);
    MixAll.printObjectProperties(console, messageStoreConfig, true);
    System.exit(0);
    }
    
    log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    MixAll.printObjectProperties(log, brokerConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    MixAll.printObjectProperties(log, nettyClientConfig);
    MixAll.printObjectProperties(log, messageStoreConfig);
    
    final BrokerController controller = new BrokerController(
    brokerConfig,
    nettyServerConfig,
    nettyClientConfig,
    messageStoreConfig);
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
    
    boolean initResult = controller.initialize();
    if (!initResult) {
    controller.shutdown();
    System.exit(-3);
    }
    
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    
    @Override
    public void run() {
    synchronized (this) {
    log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
    if (!this.hasShutdown) {
    this.hasShutdown = true;
    long beginTime = System.currentTimeMillis();
    controller.shutdown();
    long consumingTimeTotal = System.currentTimeMillis() - beginTime;
    log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
    }
    }
    }
    }, "ShutdownHook"));
    
    return controller;
    } catch (Throwable e) {
    e.printStackTrace();
    System.exit(-1);
    }
    
    return null;
    }
    • 第2行,设置rocketMQ版本信息
    • 第4-10行,校验远程通信的发送缓存和接收缓存是否为空, 如果为空则设置默认值大小为131072
      NettySystemConfig.socketSndbufSize = 131072;
    • NettySystemConfig.socketRcvbufSize = 131072;
  • 第14-19行,buildCommandlineOptions()构造命令行解析Options, 这个过程已经设置了h(help)、n(namesrvAddr)两个参数, 后面还会配置c(configFile), p(printConfigItem), m(printImportantConfig);parseCmdLine()方法会对mqbroker启动命令进行解析(*nix系统), 比如: nohup ./mqbroker -n 192.168.2.1:9876 ; 如果解析的commandLine==null, 则退出
  • 第35-52行,如果启动命令行参数包含 -c 参数,会读取配置到Propertis中, 让后通过MixAll.properties2Object(), 将读取的配置文件信息存入brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig对应的实体类中,最后将配置文件路径信息保存到BrokerPathConfigHelper中brokerConfigPath变量中
  • 第54行,将其它命令行参数读取到Properties中,并将信息保存到brokerConfig对象中
  • 第62-75行,从brokerConfig获取namesrvAddr信息,并将地址信息转为SocketAddress对象
  • 第77-91行,设置当前broker的角色(master,slave), 如果是同步/异步MASTER信息,brokerId=0 ;如果是SLAVE信息,brokerId > 0 ; 如果brokerId < 0 , 会抛出异常
  • 第116-121行,将log的信息配置到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig对象中
  • 第122-128行,通过brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig对象创建BrokerController对象,并将propertis中保存的信息保存到brokerController的Configuration属性中
  • 第136-159行,通过Runtime.getRuntime().addShutdownHook()设置,在jvm关闭之前需要处理的一些事情,系统会处理内存清理、对象销毁等一系列操作, 这里是对brokerController进行关闭操作。
  • 2.3 执行initialize方法进行初始化操作

    public boolean initialize() throws CloneNotSupportedException {
    boolean result = this.topicConfigManager.load();
    
    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();
    
    if (result) {
    try {
    this.messageStore =
    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
    this.brokerConfig);
    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
    //load plugin
    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
    } catch (IOException e) {
    result = false;
    log.error("Failed to initialize", e);
    }
    }
    
    result = result && this.messageStore.load();
    
    if (result) {
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
    NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
    fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
    this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
    this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getSendMessageThreadPoolNums(),
    this.brokerConfig.getSendMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.sendThreadPoolQueue,
    new ThreadFactoryImpl("SendMessageThread_"));
    
    this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getPullMessageThreadPoolNums(),
    this.brokerConfig.getPullMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.pullThreadPoolQueue,
    new ThreadFactoryImpl("PullMessageThread_"));
    
    this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getQueryMessageThreadPoolNums(),
    this.brokerConfig.getQueryMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.queryThreadPoolQueue,
    new ThreadFactoryImpl("QueryMessageThread_"));
    
    this.adminBrokerExecutor =
    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
    "AdminBrokerThread_"));
    
    this.clientManageExecutor = new ThreadPoolExecutor(
    this.brokerConfig.getClientManageThreadPoolNums(),
    this.brokerConfig.getClientManageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.clientManagerThreadPoolQueue,
    new ThreadFactoryImpl("ClientManageThread_"));
    
    this.consumerManageExecutor =
    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
    "ConsumerManageThread_"));
    
    this.registerProcessor();
    
    final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
    final long period = 1000 * 60 * 60 * 24;
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    try {
    BrokerController.this.getBrokerStats().record();
    } catch (Throwable e) {
    log.error("schedule record error.", e);
    }
    }
    }, initialDelay, period, TimeUnit.MILLISECONDS);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    try {
    BrokerController.this.consumerOffsetManager.persist();
    } catch (Throwable e) {
    log.error("schedule persist consumerOffset error.", e);
    }
    }
    }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    try {
    BrokerController.this.consumerFilterManager.persist();
    } catch (Throwable e) {
    log.error("schedule persist consumer filter error.", e);
    }
    }
    }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    try {
    BrokerController.this.protectBroker();
    } catch (Throwable e) {
    log.error("protectBroker error.", e);
    }
    }
    }, 3, 3, TimeUnit.MINUTES);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    try {
    BrokerController.this.printWaterMark();
    } catch (Throwable e) {
    log.error("printWaterMark error.", e);
    }
    }
    }, 10, 1, TimeUnit.SECONDS);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
    @Override
    public void run() {
    try {
    log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
    } catch (Throwable e) {
    log.error("schedule dispatchBehindBytes error.", e);
    }
    }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    
    if (this.brokerConfig.getNamesrvAddr() != null) {
    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
    } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
    @Override
    public void run() {
    try {
    BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
    } catch (Throwable e) {
    log.error("ScheduledTask fetchNameServerAddr exception", e);
    }
    }
    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }
    
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
    this.updateMasterHAServerAddrPeriodically = false;
    } else {
    this.updateMasterHAServerAddrPeriodically = true;
    }
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
    @Override
    public void run() {
    try {
    BrokerController.this.slaveSynchronize.syncAll();
    } catch (Throwable e) {
    log.error("ScheduledTask syncAll slave exception", e);
    }
    }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    } else {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
    @Override
    public void run() {
    try {
    BrokerController.this.printMasterAndSlaveDiff();
    } catch (Throwable e) {
    log.error("schedule printMasterAndSlaveDiff error.", e);
    }
    }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    }
    }
    
    return result;
    }
    • 第2-6行, 加载topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager ,将加载结果成功与否存储在result中
    • 第8-22行,如果前面的资源加载成功,会构建messageStore对象信息,在加载插件plugins到content后,会通过MessageStoreFactory.build()再次处理messageStore信息,最后通过load()方法加载messageStore信息
    • 第26-191行,校验资源加载result是否成功,如果不成功,则直接返回result结果false, 如果result = true,
      1) 创建sendMessageExecutor、pullMessageExecutor、queryMessageExecutor、adminBrokerExecutor、clientManageExecutor、consumerManageExecutor执行器信息
      2) 通过registerProcessor()方法向remotingServer, fastRemotingServer对象中注册刚刚创建的执行器(Excuter)信息
      3) 通过scheduledExecutorService:ScheduledExecutorService 对象,定期查询如下信息
      BrokerController.this.getBrokerStats().record(); 记录broker的状态
    • BrokerController.this.consumerOffsetManager.persist(); 消费者当前信息的offset位置
    • BrokerController.this.consumerFilterManager.persist(); 消费者filterManaer信息
    • BrokerController.this.protectBroker(); Broker信息
    • BrokerController.this.printWaterMark();
    • BrokerController.this.getMessageStore().dispatchBehindBytes() 打印日志信息
    • BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); 获取namesrv地址信息
    • BrokerController.this.slaveSynchronize.syncAll(); 同步从broker信息,

    2.4. 通过start启动broker信息

    public static BrokerController start(BrokerController controller) {
    try {
    controller.start();
    // ...
    return controller;
    } catch (Throwable e) {
    e.printStackTrace();
    System.exit(-1);
    }
    return null;
    }
    • 第3行,启动brokerControler
    public void start() throws Exception {
    if (this.messageStore != null) {
    this.messageStore.start();
    }
    
    if (this.remotingServer != null) {
    this.remotingServer.start();
    }
    
    if (this.fastRemotingServer != null) {
    this.fastRemotingServer.start();
    }
    
    if (this.brokerOuterAPI != null) {
    this.brokerOuterAPI.start();
    }
    
    if (this.pullRequestHoldService != null) {
    this.pullRequestHoldService.start();
    }
    
    if (this.clientHousekeepingService != null) {
    this.clientHousekeepingService.start();
    }
    
    if (this.filterServerManager != null) {
    this.filterServerManager.start();
    }
    
    this.registerBrokerAll(true, false);
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    try {
    BrokerController.this.registerBrokerAll(true, false);
    } catch (Throwable e) {
    log.error("registerBrokerAll Exception", e);
    }
    }
    }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
    //...
    }
    • 第2-28行,如果参数校验不为空,则启动messageStore、remotingServer、fastRemotingServer、brokerOuterAPI、pullRequestHoldService、clientHousekeepingService、filterServerManager相关信息
    • 第30-40行,通过方法registerBrokerAll()开启调度线程,每隔30s,向namesrv上报自己的信息,处理无用的topic

    上面是关于broker启动的源码分析, 后面有新的理解会在此文章的基础上进行修改。

    阅读更多
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: