您的位置:首页 > 运维架构

Hadoop源码解析之YARN服务端作业提交流程

2017-02-17 00:17 567 查看
    RM接收到客户端作业提交请求时会通过RPC server做回应,其实客户端就是通过ApplicationClientProtocol的RPC客户端提交作业的,客户端的提交流程参见Hadoop源码解析之YARN客户端作业提交流程,在提交阶段的代码中,客户端在获取新的JobId时,会调用到服务端getNewApplication来获得一个GetNewApplicationResponse,该返回类中包含了APP的ApplicationId,调度器资源信息。需要注意的是在RM的服务端有多个RPCserver,服务于作业提交的server为ClientRMService,默认监听18032端口,可以通过yarn.resourcemanager.address配置,下面是服务端的getNewApplication,包含在ClientRMService.java中。

//ClientRMService.java
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException {
LOG.info("begin to getNewApplication");
//构建GetNewApplicationResponse对象
GetNewApplicationResponse response = recordFactory
.newRecordInstance(GetNewApplicationResponse.class);
//设置作业ID
response.setApplicationId(getNewApplicationId());
// Pick up min/max resource from scheduler...
// 设置调度器资源信息,作业ID设置完后,接下来设置调度器资源,目前包括CPU 内存两部分信息,相关函数有:
//yarn.scheduler.minimum-allocation-mb yarn.scheduler.minimum-allocation-vcores
//yarn.scheduler.maximum-allocation-mb yarn.scheduler.maximum-allocation-vcores,这些信息在调度器启动时指定。
response.setMaximumResourceCapability(scheduler
.getMaximumResourceCapability());

return response;
}
作业ID的获得通过getNewApplicationId,是由集群启动时间戳和计数器计算得来。

ApplicationId getNewApplicationId() {
LOG.info("begin to getNewApplicationId");
ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils
.newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),
applicationCounter.incrementAndGet());
LOG.info("Allocated new applicationId: " + applicationId.getId());
return applicationId;
}
作业ID的构建函数:

//BuilderUtils.java
public static ApplicationId newApplicationId(RecordFactory recordFactory,
long clusterTimeStamp, int id) {
LOG.info("begin to newApplicationId");
return ApplicationId.newInstance(clusterTimeStamp, id);
}

//ApplicationId.java
public static ApplicationId newInstance(long clusterTimestamp, int id) {
//Log.info("newInstance");
ApplicationId appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(clusterTimestamp);
appId.setId(id);
appId.build();
return appId;
}
在客户端接收到返回信息后,便知道了自己的作业ID、资源分配的最大值。

下面进入提交阶段,客户端通过底层RPC协议调用到服务端的ClientRMService中:

//ClientRMService.java
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException {

LOG.info("begin to call submitApplication");
//获得提交上下文
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
//获得作业ID
ApplicationId applicationId = submissionContext.getApplicationId();

// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
//进入一系列的安全校验
String user = null;
try {
// Safety
// 提交账户是否安全
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
}

// Check whether app has already been put into rmContext,
// If it is, simply return the response
//作业ID是否已经存在
if (rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}

//设置作业队列
if (submissionContext.getQueue() == null) {
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
//设置作业缺省名称
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
//设置作业类型
if (submissionContext.getApplicationType() == null) {
submissionContext
.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
} else {
if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
submissionContext.setApplicationType(submissionContext
.getApplicationType().substring(0,
YarnConfiguration.APPLICATION_TYPE_LENGTH));
}
}

try {
// call RMAppManager to submit application directly
//开始提交作业
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);

LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (YarnException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw e;
}

SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
作业提交阶段

//RMAppManager.java
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {

LOG.info("begin to submitApplication");
//获得作业ID
ApplicationId applicationId = submissionContext.getApplicationId();

//构建一个app并放入applicationACLS
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user);
A
4000
pplicationId appId = submissionContext.getApplicationId();

if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("UserGroupInformation.isSecurityEnabled() is true.");
Credentials credentials = null;
try {
credentials = parseCredentials(submissionContext);
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
credentials, submissionContext.getCancelTokensWhenComplete());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
//触发app启动事件
LOG.info("send event RMAppEventType.START");
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
}
application在下面函数中创建并加入相应集合,如果加入集合成功则代表作业提交成功

//RMAppManager.java
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext,
long submitTime, String user)
throws YarnException {
LOG.info("begin to createAndPopulateNewRMApp");
ApplicationId applicationId = submissionContext.getApplicationId();
validateResourceRequest(submissionContext);
// Create RMApp
//构建APP,submissionContext中包含了一个APP的绝大部分信息
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags());

// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
// influence each other
//再次判断作业是否存在,若不存在则放入hashMap中,一旦放入成功则表明作业提交成功
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
return application;
}

一个app包含的信息比较多,如下所示:

//RMAppImpl .java
public class RMAppImpl implements RMApp, Recoverable {

private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
private static final String UNAVAILABLE = "N/A";

// Immutable fields
private final ApplicationId applicationId;
private final RMContext rmContext;
private final Configuration conf;
private final String user;
private final String name;
private final ApplicationSubmissionContext submissionContext;
private final Dispatcher dispatcher;
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxAppAttempts;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
private final String applicationType;
private final Set<String> applicationTags;

// Mutable fields
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
private RMAppAttempt currentAttempt;
private String queue;
private EventHandler handler;
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();

// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
private RMAppState stateBeforeFinalSaving;
private RMAppEvent eventCausingFinalSaving;
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
...
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息