MQTT---HiveMQ源码详解(二)结构与启动
2017-03-07 23:24
507 查看
源博客地址:http://blog.csdn.net/pipinet123
examples是一些示例组网场景的示例配置
persistence存放着所有持久化信息的文件、以及备份文件。包含client_session_subscriptions、client_sessions、outgoing_message_flow、incomming_message_flow、publish_payloads、queued_messages、retained_messages等。
MQTT交流群:221405150
目录结构
在官网中也有更详细的介绍,下面我只对目录结构做一个简单介绍即可,感兴趣的朋友可以参考官网文档.http://www.hivemq.com/docs/hivemq/latest/#installationbin
包含hivemq.jar以及一些启动脚本conf
包含config.xml、logback.xml以及plugin的配置文件examples是一些示例组网场景的示例配置
data
metadata存放版本信息(加密过)persistence存放着所有持久化信息的文件、以及备份文件。包含client_session_subscriptions、client_sessions、outgoing_message_flow、incomming_message_flow、publish_payloads、queued_messages、retained_messages等。
diagnostics
存放着诊断模式下诊断信息,包括系统信息、网络接口信息、jvm信息、插件信息等等。方便开发者排查问题。license
存放hivemq授权license文件。log
存放日志plugins
第三方插件目录启动
既然它是一个java程序,那么我们就从它的main方法开始我们的hivemq源码之路。main
public class HiveMQServer { private static final Logger LOGGER = LoggerFactory.getLogger(HiveMQServer.class); private final NettyServer nettyServer; private final ClusterConfigurationService clusterConfigurationService; private final PluginBrokerCallbackHandler pluginBrokerCallbackHandler; private final PluginInformationStore pluginInformationStore; private final Provider<ClusterJoiner> clusterJoinerProvider; @Inject HiveMQServer(NettyServer nettyServer, ClusterConfigurationService clusterConfigurationService, PluginBrokerCallbackHandler pluginBrokerCallbackHandler, PluginInformationStore pluginInformationStore, Provider<ClusterJoiner> clusterJoinerProvider) { this.nettyServer = nettyServer; this.clusterConfigurationService = clusterConfigurationService; this.pluginBrokerCallbackHandler = pluginBrokerCallbackHandler; this.pluginInformationStore = pluginInformationStore; this.clusterJoinerProvider = clusterJoinerProvider; } public void start() throws InterruptedException, ExecutionException { //启动netty server this.nettyServer.start().sync(); //通知OnBrokerStart事件 fireOnBrokerStart(); //加入cluster joinCluster(); //启动对应承载在netty上的Listener,并打印出这些Listener启动结果信息。请参考Linstener配置请参考http://www.hivemq.com/docs/hivemq/latest/#configuration-chapter ListenableFuture<List<ListenerStartResult>> startFuture = this.nettyServer.startListeners(); List<ListenerStartResult> startResults = startFuture.get(); new ListenerStartResultLogger(startResults).log(); } private void joinCluster() { //根据配置确定是否加入cluster if (!this.clusterConfigurationService.isEnabled()) { return; } try { //使用ClusterJoiner类进行连接jgroup,组成cluster。 ClusterJoiner clusterJoiner = this.clusterJoinerProvider.get(); ListenableFuture<Void> future = clusterJoiner.join(); future.get(); } catch (Exception e) { if (e.getCause() instanceof DuplicateOrInvalidLicenseException) { LOGGER.error("Found duplicate or invalid license file in the cluster. Shutting down HiveMQ"); } else if (e.getCause() instanceof DifferentConfigurationException) { LOGGER.error("The configuration of this HiveMQ instance is different form the other instances in the cluster. Shutting down HiveMQ"); } else { LOGGER.error("Could not join cluster. Shutting down HiveMQ.", e); } if (e.getCause() instanceof UnrecoverableException) { throw ((UnrecoverableException) e.getCause()); } throw new UnrecoverableException(false); } } //通知对应plugin broker已经启动 private void fireOnBrokerStart() { LOGGER.trace("Calling all OnBrokerStart Callbacks"); printPluginInformations(); this.pluginBrokerCallbackHandler.onStart(); } public static void main(String[] args) throws InterruptedException, ExecutionException { LOGGER.info("Starting HiveMQ Server"); long startTime = System.nanoTime(); //初始化SystemInformation,可以通过环境变量来分别设置conf、plugins、log、license等目录。 //请参考hivemq spi SystemInformation LOGGER.trace("Initializing HiveMQ home directory"); HiveMQSystemInformation systemInformation = new HiveMQSystemInformation(true); //创建MetricRegistry //请参考开源框架Metrics LOGGER.trace("Creating MetricRegistry"); MetricRegistry metricRegistry = new MetricRegistry(); //增加统计Listener metricRegistry.addListener(new StatisticsListener()); //初始化日志 LOGGER.trace("Initializing Logging"); LogConfigurator.init(systemInformation.getConfigFolder(), metricRegistry); //增加未处理异常拦截,并对其进行优雅处理 LOGGER.trace("Initializing Exception handlers"); RecoverableExceptionHandler.init(); //初始化ConfigurationService,并读取conf/config.xml文件,加载用户配置 //请参考hivemq spi ConfigurationService, LOGGER.trace("Initializing configuration"); HiveMQConfigurationService hiveMQConfigurationService = HiveMQConfigurationServiceFactory.create(systemInformation); //创建Clusterid提供者。 ClusterIdProducer clusterIdProducer = new ClusterIdProducer(); if (hiveMQConfigurationService.clusterConfiguration().isEnabled()) { LOGGER.info("This node's cluster-ID is {}", clusterIdProducer.get()); } //根据原有版本,判断是否需要做持久化数据的migration,如需要进行migration,因为可以配置每个数据的使用策略(file/memory),所以每个数据分别进行migration LOGGER.trace("Checking for migrations"); Map<MigrationType, Set<String>> neededMigrations = Migrations.getNeededMigrations(systemInformation); Injector injector = null; if (neededMigrations.size() > 0) { LOGGER.warn("HiveMQ has been updated, migrating persistent data to new version !"); neededMigrations.keySet().forEach(type -> LOGGER.debug("{} needs to be migrated", type)); //因为migration也是依赖guice来做容器,所以migration也会创建一个injector injector = Bootstrap.createInjector(systemInformation, hiveMQConfigurationService, clusterIdProducer); Migrations.start(injector, neededMigrations); } //升级完成,将升级的最新版本信息,持久化到文件中,以便下次启动进行判断 Migrations.finish(systemInformation, hiveMQConfigurationService); //初始化guice LOGGER.trace("Initializing Guice"); injector = Bootstrap.createInjector(systemInformation, metricRegistry, hiveMQConfigurationService, clusterIdProducer, injector); //从guice中获得HiveMQServer实例,并启动它 HiveMQServer server = injector.getInstance(HiveMQServer.class); server.start(); //对EXodus日志级别做修改 LogConfigurator.addXodusLogModificator(); LOGGER.info("Started HiveMQ in {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); //hivemq版本升级检查器,会连接hivemq官网判断是否有新版本升级。可以在配置文件中设置不检查 UpdateChecker updateChecker = injector.getInstance(UpdateChecker.class); updateChecker.start(); } //根据加载出来的所有plugin打印plugin信息 //请参考hivemq spi @Information private void printPluginInformations() { Set<PluginInformation> pluginInformations = this.pluginInformationStore.getPluginInformations(); pluginInformations.forEach(pluginInformation -> LOGGER.info("Loaded Plugin {} - v{}", pluginInformation.getName(), pluginInformation.getVersion()) ); } }
Bootstrap & Guice Modules
它是采用Guice作为di框架,那么我们就从Bootstrap开始看它包含了哪些Module以及简单介绍下这些Module主要是注入哪些对应处理代码。public class Bootstrap { private static final Logger LOGGER = LoggerFactory.getLogger(Bootstrap.class); public static Injector createInjector(SystemInformation systemInformation, MetricRegistry metricRegistry, HiveMQConfigurationService hiveMQConfigurationService, ClusterIdProducer clusterIdProducer, Injector injector) { //根据系统变量判断是否开启诊断模式 if (!Boolean.parseBoolean(System.getProperty("diagnosticMode"))) { LOGGER.trace("Turning Guice stack traces off"); System.setProperty("guice_include_stack_traces", "OFF"); } //加载所有PluginModule //请参考hivemq spi PluginModule //后续会专门讲解plugin是如何加载的 List<PluginModule> pluginModules = new PluginBootstrap().create(systemInformation.getPluginFolder()); ImmutableList.Builder<AbstractModule> builder = ImmutableList.builder(); builder.add( //系统信息 new SystemInformationModule(systemInformation), //注册cache的生命周期范围 new ScopeModule(), //增加@PostConstruct、@PreDestroy注解处理 new LifecycleModule(), //配置的Module new ConfigurationModule(hiveMQConfigurationService, clusterIdProducer), //netty所有handler、以及listenser等module new NettyModule(), //内部module new InternalModule(), //plugin callback module,主要处理plugin注册cabllback后回调 new PluginCallbackModule(), //为方法增加cache的module new MethodCacheModule(), //持久化module new PersistenceModule(injector), //统计的module new MetricModule(metricRegistry), //流量监控module new TrafficShapingModule(), //cluster module new ClusterModule(), //plugin提供service的module new ServiceModule(pluginModules), //license的解析、验证、限制module new LicensingModule(), //更新hivemq程序的module new UpdateModule(), //诊断模式module new DiagnosticModule()); builder.addAll(pluginModules); return Guice.createInjector(Stage.PRODUCTION, builder.build()); } //创建数据升级的Injector,这个较上面的module加载的少点而已。 public static Injector createInjector(SystemInformation systemInformation, HiveMQConfigurationService hiveMQConfigurationService, ClusterIdProducer clusterIdProducer) { ImmutableList.Builder<AbstractModule> builder = ImmutableList.builder(); builder.add( new SystemInformationModule(systemInformation), new ConfigurationModule(hiveMQConfigurationService, clusterIdProducer), new BridgeModule(), new ScopeModule(), new LifecycleModule()); return Guice.createInjector(Stage.PRODUCTION, builder.build()); } }
相关文章推荐
- MQTT---HiveMQ源码详解(五)Netty-启动与Listeners加载
- MQTT---HiveMQ源码详解(十四)Persistence-LocalPersistence
- MQTT---HiveMQ源码详解(十六)TopicTree
- MQTT---HiveMQ源码详解(十一)Netty-Throttling
- MQTT---HiveMQ源码详解(一)概览
- MQTT---HiveMQ源码详解(十七)Cluster-Consistent Hashing Ring & Node Lifecycle
- MQTT---HiveMQ源码详解(三)配置加载
- MQTT---HiveMQ源码详解(外传)为什么使用Xodus
- MQTT---HiveMQ源码详解(十八)Cluster-kryo与Serializer
- MQTT---HiveMQ源码详解(十二)Netty-MQTT消息、事件处理(流程)
- MQTT---HiveMQ源码详解(八)Netty-WebSocket
- MQTT---HiveMQ源码详解(十三)Netty-MQTT消息、事件处理(源码举例解读)
- MQTT---HiveMQ源码详解(十九)Cluster-Request/Response
- MQTT---HiveMQ源码详解(四)插件加载
- MQTT---HiveMQ源码详解(九)Netty-Codec
- MQTT---HiveMQ源码详解(二十)Cluster-Replicate/VectorClock
- MQTT---HiveMQ源码详解(二十一)完结篇
- MQTT---HiveMQ源码详解(七)Netty-SSL/NoSSL
- MQTT---HiveMQ源码详解(六)Netty-Handlers总览
- MQTT---HiveMQ源码详解(十)Netty-Statistics