您的位置:首页 > 其它

Elasticsearch2.4学习(四)------源码分析之启动过程

2017-11-06 10:27 561 查看
http://blog.csdn.net/guo_jia_liang/article/details/53080005

本片文章主要分析Elasticsearch2.4的启动过程


一、Elasticsearch类

在源码的org.elasticsearch.bootstrap包中,有Elasticsearch类,该类用于启动Elasticsearch。如下给出这个类的main函数:

[java] view
plain copy

public static void main(String[] args) throws StartupError {

try {

Bootstrap.init(args);

} catch (Throwable t) {

// format exceptions to the console in a special way

// to avoid 2MB stacktraces from guice, etc.

throw new StartupError(t);

}

}


二、Bootstrap类

从Elasticsearch类的main函数可以看到,启动过程调用了Bootstrap类的init函数。

我们先到org.elasticsearch.bootstrap包中看一看Bootstrap类,有一个宏观了解。

[java] view
plain copy

final class Bootstrap {

private static volatile Bootstrap INSTANCE;

private volatile Node node;

private final CountDownLatch keepAliveLatch = new CountDownLatch(1);

private final Thread keepAliveThread;

/** creates a new instance */

Bootstrap() {

keepAliveThread = new Thread(new Runnable() {

@Override

public void run() {

try {

keepAliveLatch.await();

} catch (InterruptedException e) {

// bail out

}

}

}, "elasticsearch[keepAlive/" + Version.CURRENT + "]");

keepAliveThread.setDaemon(false);

// keep this thread alive (non daemon thread) until we shutdown

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

keepAliveLatch.countDown();

}

});

}

// 后面的代码省略......

}

BootStrap类主要包含node、keepAliveLatch、KeepAliveThread这三个实例。

再来看它的构造函数,主要就是实现KeepAliveThread线程的run方法,让这个线程调用keepAliveLatch的await方法,keepAliveLatch初始化为1。当keepAliveLatch没有减到0时,KeepAliveThread线程一直等待,这个线程其实相当于一个HeartBeat,用于表示Elasticsearch是否还活着。

定义完run方法后,将keepAliveThread设置为用户线程。

注意:此时KeepAliveThread只是定义好了,还没有start。

最后,加了一个关闭钩子,在JVM关闭之前,需要执行addShutdownHook中定义的线程的run方法。即执行keepAliveLatch的countDown方法,这时KeepAliveThread得以向后面执行,最终退出。具体内容可以参考Runtime.getRuntime().addShutdownHook关闭钩子介绍


三、Bootstrap类的init函数

[java] view
plain copy

/**

* This method is invoked by {@link Elasticsearch#main(String[])}

* to startup elasticsearch.

*/

static void init(String[] args) throws Throwable {

// Set the system property before anything has a chance to trigger its use

// 设置系统属性

System.setProperty("es.logger.prefix", "");

BootstrapCLIParser bootstrapCLIParser = new BootstrapCLIParser();

CliTool.ExitStatus status = bootstrapCLIParser.execute(args);

if (CliTool.ExitStatus.OK != status) {

exit(status.status());

}

// 创建Bootstrap实例

INSTANCE = new Bootstrap();

boolean foreground = !"false".equals(System.getProperty("es.foreground",

System.getProperty("es-foreground")));

// handle the wrapper system property, if its a service, don't run as a service

if (System.getProperty("wrapper.service", "XXX").equalsIgnoreCase("true")) {

foreground = false;

}

// 获取运行时环境

Environment environment = initialSettings(foreground);

// 从运行时环境获取配置

Settings settings = environment.settings();

LogConfigurator.configure(settings, true);

checkForCustomConfFile();

// 创建pid文件

if (environment.pidFile() != null) {

PidFile.create(environment.pidFile(), true);

}

// 如果设置了打开文件个数的上限,则打印相应的log信息

if (System.getProperty("es.max-open-files", "false").equals("true")) {

ESLogger logger = Loggers.getLogger(Bootstrap.class);

logger.info("max_open_files [{}]",

ProcessProbe.getInstance().getMaxFileDescriptorCount());

}

// warn if running using the client VM

// 如果启动时是HotSpot的Client VM,则打印log信息,提醒采用Sever VM,以获得更好的性能

if (JvmInfo.jvmInfo().getVmName().toLowerCase(Locale.ROOT).contains("client")) {

ESLogger logger = Loggers.getLogger(Bootstrap.class);

logger.warn("jvm uses the client vm, make sure to run `java` with the

server vm for best performance by adding `-server` to the

command line");

}

try {

if (!foreground) {

// 如果foreground是false则不打印日志,关闭系统输出

Loggers.disableConsoleLogging();

closeSystOut();

}

// fail if using broken version

// 检测JVM的版本

JVMCheck.check();

INSTANCE.setup(true, settings, environment);

INSTANCE.start();

if (!foreground) {

closeSysError();

}

} catch (Throwable e) {

// 省略异常处理部分代码......

}

}

以上代码中已经添加了详细的注释,按照先后顺序,主要是做了如下几件事情:

1、设置系统属性

2、获取BootStrap的实例INSTANCE

3、获取运行环境、创建pid文件

4、判断是否设置了打开文件的最大个数、判断是否以Client VM的方式启动了VM(具体内容可以参考JVM的Client VM
与 Server VM介绍),并打印相应log信息

5、判断是否打印系统日志

6、检测JVM版本

7、INSTANCE执行setup

8、INSTANCE执行start

如下我们针对setup和start两个过程进行详细说明。


四、Bootstrap类的setup函数

[java] view
plain copy

private void setup(boolean addShutdownHook, Settings settings,

Environment environment) throws Exception {

final ESLogger logger = Loggers.getLogger(Bootstrap.class);

final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);

// 获取bootstrap.mlockall的配置信息

final Boolean mlockall = settings.getAsBoolean("bootstrap.mlockall", null);

if (mlockall != null) {

// 如果配置了,则在过时日志中添加信息,提醒该配置项已经过时

deprecationLogger.deprecated("setting [bootstrap.mlockall] is deprecated;

use [bootstrap.memory_lock]");

}

// 获取bootstrap.memory_lock配置信息

final Boolean memoryLock = settings.getAsBoolean("bootstrap.memory_lock", null);

// both bootstrap.mlockall and bootstrap.memory_lock are set, refuse to start

if (mlockall != null && memoryLock != null) {

// 如果对bootstrap.mlockall和bootstrap.memory_lock都进行了配置则抛出异常

throw new IllegalArgumentException("both [bootstrap.mlockall] and

[bootstrap.memory_lock] configured,"

+ " just use [bootstrap.memory_lock]");

}

// 本地初始化

initializeNatives(environment.tmpFile(),

memoryLock != null ? memoryLock : mlockall != null ? mlockall : false,

settings.getAsBoolean("bootstrap.seccomp", true),

settings.getAsBoolean("bootstrap.ctrlhandler", true));

// initialize probes before the security manager is installed

initializeProbes();

if (addShutdownHook) {

// 添加关闭钩子,在VM关闭之前先关闭node

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

if (node != null) {

node.close();

}

}

});

}

// look for jar hell

// 查询JatHell(打印classpath等信息)

JarHell.checkJarHell();

// install SM after natives, shutdown hooks, etc.

setupSecurity(settings, environment);

Settings nodeSettings = Settings.settingsBuilder()

.put(settings)

.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true)

.build();

// build node

NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder().settings(nodeSettings);

node = nodeBuilder.build();

}

以上代码中已经添加了详细的注释,按照先后顺序,主要是做了如下几件事情:

1、检测bootstrap.mlockall和bootstrap.memory_lock信息,并打印相应log信息

2、本地初始化(检测是否为root用户启动、设置安全计算模式、对内存进行加锁、监听来自ctrl键的关闭事件)

3、初始化两种probes,分别是ProcessProbe和Osprobe,这两个probe将会提供给ES的stats api所需要的一些ES进程和OS层面的信息

3、添加关闭钩子,在VM关闭之前,关闭node

4、查询JarHell(打印classpath等信息)

5、安装security manager(SM)

6、build node

本地初始化的代码实现的功能在2中已经说明,代码就不展开了。下面说明build node的过程。


五、Node类的构造函数

我们先来看org.elasticsearch.node包中的NodeBuilder类的nodeBuilder.build()函数(只进行build不进行start)

[java] view
plain copy

/**

* Builds the node without starting it.

*/

public Node build() {

return new Node(settings.build());

}

调用了Node类的构造函数,到org.elasticsearch.node包中的Node类看看

[java] view
plain copy

public class Node implements Releasable {

private static final String CLIENT_TYPE = "node";

public static final String HTTP_ENABLED = "http.enabled";

private final Lifecycle lifecycle = new Lifecycle();

private final Injector injector;

private final Settings settings;

private final Environment environment;

private final PluginsService pluginsService;

private final Client client;

/**

* Constructs a node with the given settings.

*

* @param preparedSettings Base settings to configure the node with

*/

public Node(Settings preparedSettings) {

// 调用带参构造函数

this(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null),

Version.CURRENT, Collections.<Class<? extends Plugin>>emptyList());

}

protected Node(Environment tmpEnv, Version version,

Collection<Class<? extends Plugin>> classpathPlugins) {

// 省略部分代码......

logger.info("initializing ...");

// 省略部分代码......

boolean success = false;

try {

ModulesBuilder modules = new ModulesBuilder();

modules.add(new Version.Module(version));

modules.add(new CircuitBreakerModule(settings));

for (Module pluginModule : pluginsService.nodeModules()) {

modules.add(pluginModule);

}

modules.add(new PluginsModule(pluginsService));

modules.add(new SettingsModule(this.settings));

modules.add(new NodeModule(this));

modules.add(new NetworkModule(namedWriteableRegistry));

modules.add(new ScriptModule(this.settings));

modules.add(new EnvironmentModule(environment));

modules.add(new NodeEnvironmentModule(nodeEnvironment));

modules.add(new ClusterNameModule(this.settings));

modules.add(new ThreadPoolModule(threadPool));

modules.add(new DiscoveryModule(this.settings));

modules.add(new ClusterModule(this.settings));

modules.add(new RestModule(this.settings));

modules.add(new TransportModule(settings, namedWriteableRegistry));

if (settings.getAsBoolean(HTTP_ENABLED, true)) {

modules.add(new HttpServerModule(settings));

}

modules.add(new IndicesModule());

modules.add(new SearchModule());

modules.add(new ActionModule(false));

modules.add(new MonitorModule(settings));

modules.add(new GatewayModule(settings));

modules.add(new NodeClientModule());

modules.add(new ShapeModule());

modules.add(new PercolatorModule());

modules.add(new ResourceWatcherModule());

modules.add(new RepositoriesModule());

modules.add(new TribeModule());

pluginsService.processModules(modules);

injector = modules.createInjector();

// 省略部分代码......

success = true;

} finally {

// 省略部分代码......

}

logger.info("initialized");

}

}

这里使用了Guice的Injector进行注入与获取实例。elasticsearch里面的组件基本都是用上面的方式进行模块化管理,elasticsearch对guice进行了封装,通过ModulesBuilder类构建Elasticsearch的模块。

这里有必要简单介绍下Guice,不然看到这里好多人会看不懂。Guice是google开源的一个依赖注入的框架,比起spring更加的轻量级,关键点:

1、使用@injectguice会扫描inject注释,并对方法中出现的参数实例寻找对应注册的实例进行初始化。

2、bind接口将接口跟具体实现类绑定。

3、使用 Injector 引导应用程序。

因此可以看到每个module里面都有一个方法configure()用于将对象和实现类作绑定。

至此,INSTANCE.setup的过程已经解释清楚。


六、Bootstrap类的start函数

[java] view
plain copy

private void start() {

node.start();

keepAliveThread.start();

}

我们可以看到,这个时候才真正启动了node和keepAliveThread线程。

其中,keepAliveThread线程会在Elasticsearch退出之前一直等待着keepAliveLatch减为0。我们看看node.start函数做了哪些事情。


七、Node类的start函数

[java] view
plain copy

/**

* Start the node. If the node is already started, this method is no-op.

*/

public Node start() {

// 如果node已经启动,则直接返回

if (!lifecycle.moveToStarted()) {

return this;

}

ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));

logger.info("starting ...");

// hack around dependency injection problem (for now...)

injector.getInstance(Discovery.class).setRoutingService(

injector.getInstance(RoutingService.class));

for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {

injector.getInstance(plugin).start();

}

injector.getInstance(MappingUpdatedAction.class).setClient(client);

injector.getInstance(IndicesService.class).start();

injector.getInstance(IndexingMemoryController.class).start();

injector.getInstance(IndicesClusterStateService.class).start();

injector.getInstance(IndicesTTLService.class).start();

injector.getInstance(SnapshotsService.class).start();

injector.getInstance(SnapshotShardsService.class).start();

injector.getInstance(RoutingService.class).start();

injector.getInstance(SearchService.class).start();

injector.getInstance(MonitorService.class).start();

injector.getInstance(RestController.class).start();

// TODO hack around circular dependencies problems

injector.getInstance(GatewayAllocator.class).setReallocation(

injector.getInstance(ClusterService.class),

injector.getInstance(RoutingService.class));

injector.getInstance(ResourceWatcherService.class).start();

injector.getInstance(GatewayService.class).start();

// Start the transport service now so the publish address will be

// added to the local disco node in ClusterService

TransportService transportService = injector.getInstance(TransportService.class);

transportService.start();

injector.getInstance(ClusterService.class).start();

// 省略部分代码......

transportService.acceptIncomingRequests();

discoService.joinClusterAndWaitForInitialState();

if (settings.getAsBoolean("http.enabled", true)) {

injector.getInstance(HttpServer.class).start();

}

injector.getInstance(TribeService.class).start();

if (settings.getAsBoolean("node.portsfile", false)) {

if (settings.getAsBoolean("http.enabled", true)) {

HttpServerTransport http = injector.getInstance(HttpServerTransport.class);

writePortsFile("http", http.boundAddress());

}

TransportService transport = injector.getInstance(TransportService.class);

writePortsFile("transport", transport.boundAddress());

}

logger.info("started");

return this;

}

Node的启动其实就是node里每个组件的启动,同样的,分别调用不同的实例的start方法来启动这个组件。

到目前为止,整个Elasticsearch的启动就已经完成。

流程大致是Elasticsearch#main -》Bootstrap#init -》Bootstrap#setup&start
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: