您的位置:首页 > 其它

1,rocketmq namesrv 第一章启动过程

2015-04-10 15:11 302 查看
    大家好,很高兴在这里跟大家分享下rocketmq源码实现,如有不对的地方欢迎指正。

Namesrv顾名思义就是名称服务,是没有状态可横向扩展的服务。废话不多说了,直接贴代码。。



1,入口函数NamesrvStartup.main0

        1.1 System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion)); 

              设置版本号属性

        1.2 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketSndbufSize)) {

            NettySystemConfig.SocketSndbufSize = 2048;

        }

        如果缓冲区为空   默认为2M

        1.3 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketRcvbufSize)) {

            NettySystemConfig.SocketRcvbufSize = 1024;

        }

        如果接受缓冲区为空 默认为1M

       1.4 PackageConflictDetect.detectFastjson();

       检测包冲突,因为rocketmq 配置都是用的fastJson作为序列化工具,所以最低使用的是1.2.3版本,如果版本过低会抛出异常

      1.5 Options options = ServerUtil.buildCommandlineOptions(new Options());

            commandLine =

                    ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),

                        new PosixParser());

            if (null == commandLine) {

                System.exit(-1);

                return null;

            }

         解析命令行参数,为用户提供了一个解释命令行的API不懂的可以查查 http://blog.csdn.net/faye0412/article/details/2949753
      1.6 final NamesrvConfig namesrvConfig = new NamesrvConfig();

              初始化Namesrv 全局配置文件

                                private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,

                                                                        System.getenv(MixAll.ROCKETMQ_HOME_ENV));

                               rocketmq home路径

                               private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv"

                                                                        + File.separator + "kvConfig.json";

                               KV配置持久化地址,具体含义讲到了在给大家说

      1.7 final NettyServerConfig nettyServerConfig = new NettyServerConfig();

             nettyServerConfig.setListenPort(9876); 设置namesrv监听端口

        Namesrv Netty服务全局配置,大家都知道rocketmq 底层通信使用的Netty4,下面给出各个参数的意义

                              private int listenPort = 8888;默认 监听端口

                              private int serverWorkerThreads = 8;Netty服务工作线程数量

                              private int serverCallbackExecutorThreads = 0;Netty服务异步回调线程池线程数量

                              private int serverSelectorThreads = 3;Netty Selector线程数量

                              private int serverOnewaySemaphoreValue = 256;控制单向的信号量

                              private int serverAsyncSemaphoreValue = 64;控制异步信号量

                              private int serverChannelMaxIdleTimeSeconds = 120;服务空闲心跳检测时间间隔 单位秒

                              private int serverSocketSndBufSize = NettySystemConfig.SocketSndbufSize;Netty发送缓冲区大小

                              private int serverSocketRcvBufSize = NettySystemConfig.SocketRcvbufSize;Netty接受缓冲区大小

                              private boolean serverPooledByteBufAllocatorEnable = false;是否使用Netty内存池

            

      1.8 if (commandLine.hasOption('c')) {

                String file = commandLine.getOptionValue('c');

                if (file != null) {

                    InputStream in = new BufferedInputStream(new FileInputStream(file));

                    properties = new Properties();

                    properties.load(in);

                    MixAll.properties2Object(properties, namesrvConfig);

                    MixAll.properties2Object(properties, nettyServerConfig);

                    System.out.println("load config properties file OK, " + file);

                    in.close();

                }

            }

            根据命令行参数,加载外部namesrvConfig及nettyServerConfig全局配置文件

      1.9 if (commandLine.hasOption('p')) {

                MixAll.printObjectProperties(null, namesrvConfig);

                MixAll.printObjectProperties(null, nettyServerConfig);

                System.exit(0);

            }

            打印namesrvConfig及nettyServerConfig配置信息

       2.0 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

                  将命令行指定的propertis文件 解析加载到namesrvConfig配置中

                   if (null == namesrvConfig.getRocketmqHome()) {

                             System.out.println("Please set the " + MixAll.ROCKETMQ_HOME_ENV

                                   + " variable in your environment to match the location of the RocketMQ installation");

                                System.exit(-2);

                       }

                     注意:这里必须要设置rocketMq home路径,因为要加载conf配置文件

      2.1  LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();

             JoranConfigurator configurator = new JoranConfigurator();

            configurator.setContext(lc);

            lc.reset();

            configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

            final Logger log = LoggerFactory.getLogger(LoggerName.NamesrvLoggerName);

            初始化logback日志工厂,rocketMq默认使用logback作为日志输出,不知道的可以去查查

       2.2 MixAll.printObjectProperties(log, namesrvConfig);

            MixAll.printObjectProperties(log, nettyServerConfig);

            打印namesrv配置参数

      2.3 final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

                 创建NamesrvController对象,namesrv  主要控制类

                
4000
this.namesrvConfig = namesrvConfig;设置namesrc配置

                 this.nettyServerConfig = nettyServerConfig;设置Netty配置

                 this.kvConfigManager = new KVConfigManager(this);创建KV配置管理 具体用到在解释

                 this.routeInfoManager = new RouteInfoManager();创建路由信息管理

                 this.brokerHousekeepingService = new BrokerHousekeepingService(this);创建broker连接事件处理服务

                boolean initResult = controller.initialize();初始化分为几步

       2.3.1 this.kvConfigManager.load();加载Kv配置文件

                          String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());

                          if (content != null) {

                                 KVConfigSerializeWrapper kvConfigSerializeWrapper =

                                                 KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);

                                if (null != kvConfigSerializeWrapper) {

                                   this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());

                                   log.info("load KV config table OK");

                                }

                            }

               rocketmq 默认使用json数据格式存储配置

               默认路径为:System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

              private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();

               如果配置kvConfigSerializeWrapper不为空则放入configTable容器里,

      2.3.2 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);初始化通信层

                         super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());设置父类信号量

                         this.serverBootstrap = new ServerBootstrap();创建 netty 启动类

                         this.nettyServerConfig = nettyServerConfig;设置netty 配置信息

                         this.channelEventListener = channelEventListener;设置扩展的一个时间监听

                         int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();获取异步线程数量

                         if (publicThreadNums <= 0) {

                                 publicThreadNums = 4; 如果小于等于0 默认设置为4

                          }

                          this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {

                                   private AtomicInteger threadIndex = new AtomicInteger(0);

                                   @Override

                                   public Thread newThread(Runnable r) {

                                              return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());

                                   }

                          });

                         创建 处理Callback应答线程池

                           this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {

                                   private AtomicInteger threadIndex = new AtomicInteger(0);

                                  @Override

                                   public Thread newThread(Runnable r) {

                                              return new Thread(r,

                                     String.format("NettyBossSelector_%d", this.threadIndex.incrementAndGet()));

                                   }

                            });

                          创建netty eventLoopGroupBoss 默认线程为1

                          this.eventLoopGroupWorker =

                              new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {

                                 private AtomicInteger threadIndex = new AtomicInteger(0);

                                 private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                                 @Override

                                 public Thread newThread(Runnable r) {

                                           return new Thread(r, String.format("NettyServerSelector_%d_%d", threadTotal,

                                                     this.threadIndex.incrementAndGet()));

                                   }

                          });

                           创建 netty eventLoopGroupWorker 默认线程为3

      2.3.3 this.remotingExecutor =

                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),

                    new ThreadFactoryImpl("RemotingExecutorThread_"));

                     初始化服务端网络请求处理线程池

      2.3.4 this.registerProcessor();

                   注册Name Server网络请求处理

      2.3.5 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                       @Override

                        public void run() {

                                        NamesrvController.this.routeInfoManager.scanNotActiveBroker();

                         }

                  }, 5, 10, TimeUnit.SECONDS);

                 NamesrvController初始化时启动线程定时调用RouteInfoManger的scanNotActiveBroker方法来定时清理不活动的broker(默认两分钟没有向namesrv发送心跳更新BrokerLiveInfo时间戳的),比较BrokerLiveInfo的时间戳,如果过期关闭channel连接

      2.3.6 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                       @Override

                       public void run() {

                               NamesrvController.this.kvConfigManager.printAllPeriodically();

                       }

                 }, 1, 10, TimeUnit.MINUTES);

                 启动线程定时调用printAllPeriodically方法打印KV配置信息包含变化的配置

      2.3.7 controller.start(); 调用NamesrvController.start()启动Netty及相关服务

      2.3.8 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//

                  nettyServerConfig.getServerWorkerThreads(), //

                  new ThreadFactory() {

                            private AtomicInteger threadIndex = new AtomicInteger(0);

                                 @Override

                                  public Thread newThread(Runnable r) {

                                                return new Thread(r, "NettyServerWorkerThread_" + this.threadIndex.incrementAndGet());

                                  }

                   });

                  创建默认的事件线程组,用于接收netty事件

      2.3.9 ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)

                             .channel(NioServerSocketChannel.class)使用NIO selector

                             .option(ChannelOption.SO_BACKLOG, 1024)
输入连接指示(对连接的请求)的最大队列长度被设置为 backlog 参数。如果队列满时收到连接指示,则拒绝该连接

                             .option(ChannelOption.SO_REUSEADDR, true) 设置了SO_REUSADDR的应用可以避免TCP 的 TIME_WAIT 状态 时间过长无法复用端口

                             .option(ChannelOption.SO_KEEPALIVE, false) 设置心跳参数 FALSE为不启用参数

                             .childOption(ChannelOption.TCP_NODELAY, true)开启TCP_NODELAY表示package被忽略size尽快地发送。

                             .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) socket发送缓冲区大小

                             .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) socket接收缓冲区大小

                             .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) localAddress方法用于绑定服务器地址和端口

                             .childHandler(new ChannelInitializer<SocketChannel>() {

                                     @Override

                                     public void initChannel(SocketChannel ch) throws Exception {

                                             ch.pipeline().addLast(

                                                     defaultEventExecutorGroup, 默认事件接收线程组

                                                     new NettyEncoder(), netty编码器

                                                     new NettyDecoder(),  netty解码器

                                                     new IdleStateHandler(0, 0, nettyServerConfig .getServerChannelMaxIdleTimeSeconds()), 空闲链路状态处理

                                                     new NettyConnetManageHandler(), 自实现空闲链路处理

                                                     new NettyServerHandler()); netty服务处理

                                           }

                                   });

      2.3.10 if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {

                                       childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

                  }

                  是否启用推外内存,默认不使用

      2.3.11 ChannelFuture sync = this.serverBootstrap.bind().sync();

                  绑定netty监听端口

      2.3.12 if (this.channelEventListener != null) {

                              this.nettyEventExecuter.start();

                  }

                  启动事件线程

      2.3.13 this.timer.scheduleAtFixedRate(new TimerTask() {

                @Override

                public void run() {

                      try {

                               NettyRemotingServer.this.scanResponseTable();

                          }

                        catch (Exception e) {

                                   log.error("scanResponseTable exception", e);

                          }

                      }

                  }, 1000 * 3, 1000);

                 每隔1秒扫描下异步调用超时情况
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: