源码解析-Yarn-ResourceManager07-ShutdownHookManager
2019-05-17 10:56
316 查看
0x00 系列文章目录
- 源码走读-Yarn-ResourceManager01-基础概念
- 源码走读-Yarn-ResourceManager02-RM的启动-脚本
- 源码走读-Yarn-ResourceManager03-RM的启动之RM详解
- 源码走读-Yarn-ResourceManager04-RM调度之FairScheduler
- 源码走读-Yarn-ResourceManager05-MR任务提交-客户端侧分析
- 源码走读-Yarn-ResourceManager06-MR任务提交-服务端分析
- 源码走读-Yarn-ResourceManager07-ShutdownHookManager
- 源码走读-Yarn-ResourceManager08-总结
0x07 ShutdownHookManager
在yarn代码中可以看到很多地方都使用了以下代码来添加自己的ShutdownHook:
[code]ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY)
[/code]
1
2
3
- 1
- 2
- 3
以上是ResourceManager
main函数中注册的一个CompositeServiceShutdownHook。
这一章节来分析下ShutdownHookManager的设计。
7.1 注释
[code]/**
* The <code>ShutdownHookManager</code> enables running shutdownHook
* in a deterministic order, higher priority first.
*
* The JVM runs ShutdownHooks in a non-deterministic order or in parallel.
* This class registers a single JVM shutdownHook and run all the
* shutdownHooks registered to it (to this class) in order based on their
* priority.
*
*/
这段注释的意思大致是说ShutdownHookManager让注册进来的shutdownHook按顺序运行,并且高优先级先运行
这样设计的原因是如果各自注册shutdownHook的话,JVM是没有办法保证执行顺序。
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
7.2 ShutdownHookManager <clinit>
ShutdownHookManager在程序非kill -9 时的结束时会触发,执行下面注册侧的shutdownHook逻辑,具体如下:
[code] //首先,包含一个饿汉式的ShutdownHookManager自身的单例对象:
private static final ShutdownHookManager MGR = new ShutdownHookManager();
//这里是向JVM注册了一个ShutdownHook,由一个线程来处理已经注册的ShutdownHook方法。
//在执行shutdownHook处理前,会将原子类型的标志位shtdownInProgress设为true表名正在处理所有shutdownHook。
//而且,这个过程是按注册时的优先级来排序的。
static {
Runtime.getRuntime().addShutdownHook(
new Thread() {
@Override
public void run() {
MGR.shutdownInProgress.set(true);
//获取按优先级排序的ShutdownHook runnable list,然后挨个调用run方法
for (Runnable hook: MGR.getShutdownHooksInOrder()) {
try {
hook.run();
} catch (Throwable ex) {
LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
"' failed, " + ex.toString(), ex);
}
}
}
}
);
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
7.3 ShutdownHookManager <init>
[code] //定义了一个存放shutdownHook的是线程安全的HashSet,以HookEntry来进行了封装了ShutdownHook对象。
private Set<HookEntry> hooks =
Collections.synchronizedSet(new HashSet<HookEntry>());
//用原子类型的布尔值表示shutdownHook是否正常处理中
private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
//private to constructor to ensure singularity
private ShutdownHookManager() {
}
[/code]
1
2
3
4
5
6
7
8
9
10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
以上就是ShutdownHookManager的实例构造器执行的内容。与之关联的判断shutdown是否在进行中方法如下:
[code] public boolean isShutdownInProgress() {
return shutdownInProgress.get();
}
[/code]
1
2
3
- 1
- 2
- 3
7.4 HookEntry
HookEntry的代码很简单,就是封装了ShutdownHook线程对象和优先级的一个类。
重写了equals方法,判断的标准是内部的名为hook的Runnable对象是否相同。
[code] /**
* Private structure to store ShutdownHook and its priority.
*/
private static class HookEntry {
Runnable hook;
int priority;
public HookEntry(Runnable hook, int priority) {
this.hook = hook;
this.priority = priority;
}
@Override
public int hashCode() {
return hook.hashCode();
}
@Override
public boolean equals(Object obj) {
boolean eq = false;
if (obj != null) {
if (obj instanceof HookEntry) {
eq = (hook == ((HookEntry)obj).hook);
}
}
return eq;
}
}
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
7.5 getShutdownHooksInOrder
[code] /**
* Returns the list of shutdownHooks in order of execution,
* Highest priority first.
* 按执行顺序来返回shutdownHook列表,高优先级在前
*/
List<Runnable> getShutdownHooksInOrder() {
List<HookEntry> list;
//这里就是获取了装有所有hooks的set对象的对象锁,转为list
synchronized (MGR.hooks) {
list = new ArrayList<HookEntry>(MGR.hooks);
}
//对hook list进行排序,高优先级排在前
Collections.sort(list, new Comparator<HookEntry>() {
//reversing comparison so highest priority hooks are first
@Override
public int compare(HookEntry o1, HookEntry o2) {
return o2.priority - o1.priority;
}
});
List<Runnable> ordered = new ArrayList<Runnable>();
for (HookEntry entry: list) {
ordered.add(entry.hook);
}
//最终返回这个已排好序的shutdownHook runnable list
return ordered;
}
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
7.6 提供给外部对象注册shutdownHook的方法
我们回顾一下之前RM中注册shutdownHook的代码:
[code]ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
[/code]
1
2
- 1
- 2
然后来看看用到的方法:
这个方法很简单,就是返回一个ShutdownHookManager的单例对象。
[code]public static ShutdownHookManager get() {
return MGR;
}
[/code]
1
2
3
- 1
- 2
- 3
下面这个方法也很简单,就是把shutdownHook runnable对象用HookEntry封装后放入hook set
[code]public void addShutdownHook(Runnable shutdownHook, int priority) {
if (shutdownHook == null) {
throw new IllegalArgumentException("shutdownHook cannot be NULL");
}
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
}
hooks.add(new HookEntry(shutdownHook, priority));
}
[/code]
1
2
3
4
5
6
7
8
9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
下面这个方法用于移除shutdownHook
[code]public boolean removeShutdownHook(Runnable shutdownHook) {
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
}
return hooks.remove(new HookEntry(shutdownHook, 0));
}
[/code]
1
2
3
4
5
6
- 1
- 2
- 3
- 4
- 5
- 6
7.7 CompositeService的例子
前面我们有看到这样一段代码:
[code]ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),SHUTDOWN_HOOK_PRIORITY);
[/code]
1
2
3
4
- 1
- 2
- 3
- 4
紧接着看下
CompositeServiceShutdownHook:
[code] public static class CompositeServiceShutdownHook implements Runnable {
private CompositeService compositeService;
public CompositeServiceShutdownHook(CompositeService compositeService) {
this.compositeService = compositeService;
}
@Override
public void run() {
ServiceOperations.stopQuietly(compositeService);
}
}
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
由前文所讲ShutdownHookManager触发机制,可得知当发生shutdownHook时会执行上面的run方法内容,所以我们赶紧看看
ServiceOperations.stopQuietly在干嘛:
[code] /**
* 停止服务。如果Service为空就啥都不做。这个方法被用来清理多个操作
* 返回一切可能的已捕获异常,但不会直接是个Throwalbe
*/
public static Exception stopQuietly(Service service) {
return stopQuietly(LOG, service);
}
[/code]
1
2
3
4
5
6
7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
接着往下看
stopQuietly方法:
[code]/**
* 停止服务。如果Service为空就啥都不做。这个方法被用来清理多个操作
* 异常会被捕获然后以WARN级别输出,但不会直接是个Throwalbe
* 如果发生异常就返回异常,否则返回null
*/
public static Exception stopQuietly(Log log, Service service) {
try {
stop(service);
} catch (Exception e) {
log.warn("When stopping the service " + service.getName()
+ " : " + e,
e);
return e;
}
return null;
}
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
看了上面这个方法,应该都知道为什么叫
stopQuietly了吧?下面接着看
stop方法在干啥:
[code]public static void stop(Service service) {
if (service != null) {
service.stop();
}
}
[/code]
1
2
3
4
5
- 1
- 2
- 3
- 4
- 5
接着跟的话你会发现以上方法又回到了
AbstractService的
stop方法,然后其实又调用了
ResourceManager的
serviceStop方法:
[code]@Override
protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
if (fetcher != null) {
fetcher.stop();
}
if (configurationProvider != null) {
configurationProvider.close();
}
// 注意这里,其实就是调用RM的父类CompositeService的serviceStop方法
super.serviceStop();
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
}
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
通过上面代码分析以及前文对
CompositeService的描述,我们知道最终会对所包含的所有
Service调用ServiceOperations.stopQuietly方法来关闭,这个方法的关闭流程是
Service.stop->
AbstractService.stop->
具体服务类的serviceStop(这里是ResourceManager)->
super.serviceStop()方式继续调用父类的serviceStop方法。一环套一环,设计思想值得学习借鉴。
7.8 小结
本章主要分析了关闭时的钩子管家,看看他是怎么管理多个ShutdownHook,其实思想很简单,绝对可以现学现用。下一章是最后一张,主要是一些对源码学习有帮助的辅助内容。请点击:源码走读-Yarn-ResourceManager08-总结
参考:
相关文章推荐
- spark学习-61-源代码:ShutdownHookManager虚拟机关闭钩子管理器
- spring data RedisCacheManager 源码查看与配置解析
- [mtk6572源码解析]ISms.aidl 生成的源码 和 SmsManagerEx 中被调用到的函数
- Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析
- Android 之使用LocalBroadcastManager,源码解析
- 研磨Hadoop源码(二)-yarn-ClientToAMTokenSecretManagerInRM
- LocalBroadcastManager源码解析
- Spring Security3源码分析(3)-authentication-manager标签解析
- Android源码解析之Binder守护进程Service Manager
- PackageManagerService启动流程源码解析
- minetest源码解析五:IGameDef、ItemDefManager、NodeDefManager类介绍
- CCActionManager和PageTurn3D源码解析
- Kafka源码深度解析-序列12 -Server核心组件之2-ReplicaManager核心数据结构与Replica同步原理
- YARN resourceManager解析
- iOS网络——AFNetworking AFURLSessionManager源码解析
- HTTP Client MultiThreadedHttpConnectionManager线程安全连接管理类源码解析
- Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析
- Hadoop源码解析之YARN客户端作业提交流程
- iOS源码解析—AFNetworking(AFNetworkReachabilityManager)
- LocationManagerService API的Hook解析