您的位置:首页 > 其它

源码解析-Yarn-ResourceManager07-ShutdownHookManager

2019-05-17 10:56 316 查看

 

0x00 系列文章目录

  1. 源码走读-Yarn-ResourceManager01-基础概念
  2. 源码走读-Yarn-ResourceManager02-RM的启动-脚本
  3. 源码走读-Yarn-ResourceManager03-RM的启动之RM详解
  4. 源码走读-Yarn-ResourceManager04-RM调度之FairScheduler
  5. 源码走读-Yarn-ResourceManager05-MR任务提交-客户端侧分析
  6. 源码走读-Yarn-ResourceManager06-MR任务提交-服务端分析
  7. 源码走读-Yarn-ResourceManager07-ShutdownHookManager
  8. 源码走读-Yarn-ResourceManager08-总结

0x07 ShutdownHookManager

在yarn代码中可以看到很多地方都使用了以下代码来添加自己的ShutdownHook:

[code]ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY)
[/code]
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

以上是ResourceManager

main
函数中注册的一个CompositeServiceShutdownHook。

这一章节来分析下ShutdownHookManager的设计。

7.1 注释

[code]/**
* The <code>ShutdownHookManager</code> enables running shutdownHook
* in a deterministic order, higher priority first.
*
* The JVM runs ShutdownHooks in a non-deterministic order or in parallel.
* This class registers a single JVM shutdownHook and run all the
* shutdownHooks registered to it (to this class) in order based on their
* priority.
*
*/

这段注释的意思大致是说ShutdownHookManager让注册进来的shutdownHook按顺序运行,并且高优先级先运行
这样设计的原因是如果各自注册shutdownHook的话,JVM是没有办法保证执行顺序。
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

7.2 ShutdownHookManager <clinit>

ShutdownHookManager在程序非kill -9 时的结束时会触发,执行下面注册侧的shutdownHook逻辑,具体如下:

[code]  //首先,包含一个饿汉式的ShutdownHookManager自身的单例对象:
private static final ShutdownHookManager MGR = new ShutdownHookManager();

//这里是向JVM注册了一个ShutdownHook,由一个线程来处理已经注册的ShutdownHook方法。
//在执行shutdownHook处理前,会将原子类型的标志位shtdownInProgress设为true表名正在处理所有shutdownHook。
//而且,这个过程是按注册时的优先级来排序的。
static {
Runtime.getRuntime().addShutdownHook(
new Thread() {
@Override
public void run() {
MGR.shutdownInProgress.set(true);
//获取按优先级排序的ShutdownHook runnable list,然后挨个调用run方法
for (Runnable hook: MGR.getShutdownHooksInOrder()) {
try {
hook.run();
} catch (Throwable ex) {
LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
"' failed, " + ex.toString(), ex);
}
}
}
}
);
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

7.3 ShutdownHookManager <init>

[code]  //定义了一个存放shutdownHook的是线程安全的HashSet,以HookEntry来进行了封装了ShutdownHook对象。
private Set<HookEntry> hooks =
Collections.synchronizedSet(new HashSet<HookEntry>());

//用原子类型的布尔值表示shutdownHook是否正常处理中
private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);

//private to constructor to ensure singularity
private ShutdownHookManager() {
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

以上就是ShutdownHookManager的实例构造器执行的内容。与之关联的判断shutdown是否在进行中方法如下:

[code]  public boolean isShutdownInProgress() {
return shutdownInProgress.get();
}
[/code]
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

7.4 HookEntry

HookEntry的代码很简单,就是封装了ShutdownHook线程对象和优先级的一个类。

重写了equals方法,判断的标准是内部的名为hook的Runnable对象是否相同。

[code]  /**
* Private structure to store ShutdownHook and its priority.
*/
private static class HookEntry {
Runnable hook;
int priority;

public HookEntry(Runnable hook, int priority) {
this.hook = hook;
this.priority = priority;
}

@Override
public int hashCode() {
return hook.hashCode();
}

@Override
public boolean equals(Object obj) {
boolean eq = false;
if (obj != null) {
if (obj instanceof HookEntry) {
eq = (hook == ((HookEntry)obj).hook);
}
}
return eq;
}

}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

7.5 getShutdownHooksInOrder

[code]  /**
* Returns the list of shutdownHooks in order of execution,
* Highest priority first.
* 按执行顺序来返回shutdownHook列表,高优先级在前
*/
List<Runnable> getShutdownHooksInOrder() {
List<HookEntry> list;
//这里就是获取了装有所有hooks的set对象的对象锁,转为list
synchronized (MGR.hooks) {
list = new ArrayList<HookEntry>(MGR.hooks);
}
//对hook list进行排序,高优先级排在前
Collections.sort(list, new Comparator<HookEntry>() {

//reversing comparison so highest priority hooks are first
@Override
public int compare(HookEntry o1, HookEntry o2) {
return o2.priority - o1.priority;
}
});
List<Runnable> ordered = new ArrayList<Runnable>();
for (HookEntry entry: list) {
ordered.add(entry.hook);
}
//最终返回这个已排好序的shutdownHook runnable list
return ordered;
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

7.6 提供给外部对象注册shutdownHook的方法

我们回顾一下之前RM中注册shutdownHook的代码:

[code]ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
[/code]
  • 1
  • 2
  • 1
  • 2

然后来看看用到的方法:

这个方法很简单,就是返回一个ShutdownHookManager的单例对象。

[code]public static ShutdownHookManager get() {
return MGR;
}
[/code]
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

下面这个方法也很简单,就是把shutdownHook runnable对象用HookEntry封装后放入hook set

[code]public void addShutdownHook(Runnable shutdownHook, int priority) {
if (shutdownHook == null) {
throw new IllegalArgumentException("shutdownHook cannot be NULL");
}
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
}
hooks.add(new HookEntry(shutdownHook, priority));
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

下面这个方法用于移除shutdownHook

[code]public boolean removeShutdownHook(Runnable shutdownHook) {
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
}
return hooks.remove(new HookEntry(shutdownHook, 0));
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

7.7 CompositeService的例子

前面我们有看到这样一段代码:

[code]ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),SHUTDOWN_HOOK_PRIORITY);

[/code]
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

紧接着看下

CompositeServiceShutdownHook

[code] public static class CompositeServiceShutdownHook implements Runnable {

private CompositeService compositeService;

public CompositeServiceShutdownHook(CompositeService compositeService) {
this.compositeService = compositeService;
}

@Override
public void run() {
ServiceOperations.stopQuietly(compositeService);
}
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

由前文所讲ShutdownHookManager触发机制,可得知当发生shutdownHook时会执行上面的run方法内容,所以我们赶紧看看

ServiceOperations.stopQuietly
在干嘛:

[code] /**
* 停止服务。如果Service为空就啥都不做。这个方法被用来清理多个操作
* 返回一切可能的已捕获异常,但不会直接是个Throwalbe
*/
public static Exception stopQuietly(Service service) {
return stopQuietly(LOG, service);
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

接着往下看

stopQuietly
方法:

[code]/**
* 停止服务。如果Service为空就啥都不做。这个方法被用来清理多个操作
* 异常会被捕获然后以WARN级别输出,但不会直接是个Throwalbe
* 如果发生异常就返回异常,否则返回null
*/
public static Exception stopQuietly(Log log, Service service) {
try {
stop(service);
} catch (Exception e) {
log.warn("When stopping the service " + service.getName()
+ " : " + e,
e);
return e;
}
return null;
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

看了上面这个方法,应该都知道为什么叫

stopQuietly
了吧?下面接着看
stop
方法在干啥:

[code]public static void stop(Service service) {
if (service != null) {
service.stop();
}
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

接着跟的话你会发现以上方法又回到了

AbstractService
stop
方法,然后其实又调用了
ResourceManager
serviceStop
方法:

[code]@Override
protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
if (fetcher != null) {
fetcher.stop();
}
if (configurationProvider != null) {
configurationProvider.close();
}
// 注意这里,其实就是调用RM的父类CompositeService的serviceStop方法
super.serviceStop();
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
}
[/code]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

通过上面代码分析以及前文对

CompositeService
的描述,我们知道最终会对所包含的所有
Service
调用ServiceOperations.stopQuietly方法来关闭,这个方法的关闭流程是
Service.stop
->
AbstractService.stop
->
具体服务类的serviceStop(这里是ResourceManager)
->
super.serviceStop()
方式继续调用父类的serviceStop方法。一环套一环,设计思想值得学习借鉴。

7.8 小结

本章主要分析了关闭时的钩子管家,看看他是怎么管理多个ShutdownHook,其实思想很简单,绝对可以现学现用。下一章是最后一张,主要是一些对源码学习有帮助的辅助内容。请点击:源码走读-Yarn-ResourceManager08-总结

 

参考:

https://blog.csdn.net/baichoufei90/article/details/82670677

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: