您的位置:首页 > 运维架构

YARN(Hadoop)学习笔记(4)

2014-12-04 17:23 344 查看
<span style="font-size:18px;">org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity包下的
LeafQueue#CSAssignment阅读笔记

public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node) {

// LOG.debug产生日志文件,级别是debug!
// if(LOG.isDebugEnable()){LOG.debug(...);是一种代码风格,为了提高效率
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " #applications=" + activeApplications.size());
}

// Check for reserved resources
// 如果reserverdContainer不为空,那么给node结点上的application分配资源
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {

// ApplicationAttemptId denotes the particular attempt of an
// ApplicationMaster for a given ApplicationId
FiCaSchedulerApp application = getApplication(reservedContainer
.getApplicationAttemptId());
synchronized (application) {
return assignReservedContainer(application, node,
reservedContainer, clusterResource);
}
}

// Try to assign containers to applications in order
// application是activeApplications集合中的元素
// Set<FiCaSchedulerApp> activeApplications
for (FiCaSchedulerApp application : activeApplications) {

if (LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application "
+ application.getApplicationId());
application.showRequests();
}

// synchronized(object){//statement to be synchorized;}
// 所以synchronized(object){}中方法都被同步化
synchronized (application) {
// Check if this resource is on the blacklist
// 如果activeApplication中的某个applicaton在黑名单中,跳到下一个application
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
continue;
}

// Schedule in priority order
// Collection<Priority> getPriorities()
for (Priority priority : application.getPriorities()) {
ResourceRequest anyRequest = application
.getResourceRequest(priority, ResourceRequest.ANY);
if (null == anyRequest) {
continue;
}

// Required resource
Resource required = anyRequest.getCapability();

// Do we need containers at this 'priority'?
// needContainers return (((starvation + requiredContainers)
// - reservedContainers) > 0)
if (!needContainers(application, priority, required)) {
continue;
}

// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the
// highest
// priority request as the target.
// This works since we never assign lower priority requests
// before all higher priority ones are serviced.
Resource userLimit = computeUserLimitAndSetHeadroom(
application, clusterResource, required);

// Check queue max-capacity limit
// boolean assignToQueue(clusterResource,
// required)用来判断potentialNewCapacity 和 absoluteMaxCapacity大小
// if (potentialNewCapacity > absoluteMaxCapacity) return
// false,意味这不分配到队列
if (!assignToQueue(clusterResource, required)) {
return NULL_ASSIGNMENT;
// NULL_ASSIGNMENT分配的资源为Resources.createResource(0, 0)
// createResource(int memory, int cores)
}

// Check user limit
if (!assignToUser(clusterResource, application.getUser(),
userLimit)) {
break;
}

// Inform the application it is about to get a scheduling
// opportunity
application.addSchedulingOpportunity(priority);

// Try to schedule
CSAssignment assignment = assignContainersOnNode(
clusterResource, node, application, priority, null);

// Did the application skip this node?
if (assignment.getSkipped()) {
// Don't count 'skipped nodes' as a scheduling
// opportunity!
application.subtractSchedulingOpportunity(priority);
continue;
}

// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(resourceCalculator,
clusterResource, assigned, Resources.none())) {

// Book-keeping
// Note: Update headroom to account for current
// allocation too...
allocateResource(clusterResource, application, assigned);

// Don't reset scheduling opportunities for non-local
// assignments
// otherwise the app will be delayed for each non-local
// assignment.
// This helps apps with many off-cluster requests
// schedule faster.
if (assignment.getType() != NodeType.OFF_SWITCH) {
application.resetSchedulingOpportunities(priority);
}

// Done
return assignment;
} else {
// Do not assign out of order w.r.t priorities
break;
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application "
+ application.getApplicationId());
}
application.showRequests();
}

return NULL_ASSIGNMENT;

}</span>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop server yarn