您的位置:首页 > 其它

Mesos源码分析(5): Mesos Master的启动之四

2016-07-24 20:24 387 查看
 

5. Create an instance of allocator.

 

代码如下



 

Mesos源码中默认的Allocator,即HierarchicalDRFAllocator的位置在$MESOS_HOME/src/master/allocator/mesos/hierarchical.hpp,而DRF中对每个Framework排序的Sorter位于$MESOS_HOME/src/master/allocator/sorter/drf/sorter.cpp,可以查看其源码了解它的工作原理。

 

HierarchicalDRF的基本原理

 

如何作出offer分配的决定是由资源分配模块Allocator实现的,该模块存在于Master之中。资源分配模块确定Framework接受offer的顺序,与此同时,确保在资源利用最大化的条件下公平地共享资源。

由于Mesos为跨数据中心调度资源并且是异构的资源需求时,资源分配相比普通调度将会更加困难。因此Mesos采用了DRF(主导资源公平算法 Dominant Resource Fairness)

Framework拥有的全部资源类型份额中占最高百分比的就是Framework的主导份额。DRF算法会使用所有已注册的Framework来计算主导份额,以确保每个Framework能接收到其主导资源的公平份额。

 

举个例子

 

考虑一个9CPU,18GBRAM的系统,拥有两个用户,其中用户A运行的任务的需求向量为{1CPU, 4GB},用户B运行的任务的需求向量为{3CPU,1GB},用户可以执行尽量多的任务来使用系统的资源。

在上述方案中,A的每个任务消耗总cpu的1/9和总内存的2/9,所以A的dominant resource是内存;B的每个任务消耗总cpu的1/3和总内存的1/18,所以B的dominant resource为CPU。DRF会均衡用户的dominant shares,执行3个用户A的任务,执行2个用户B的任务。三个用户A的任务总共消耗了{3CPU,12GB},两个用户B的任务总共消耗了{6CPU,2GB};在这个分配中,每一个用户的dominant share是相等的,用户A获得了2/3的RAM,而用户B获得了2/3的CPU。

以上的这个分配可以用如下方式计算出来:x和y分别是用户A和用户B的分配任务的数目,那么用户A消耗了{xCPU,4xGB},用户B消耗了{3yCPU,yGB},在图三中用户A和用户B消耗了同等dominant resource;用户A的dominant share为4x/18,用户B的dominant share为3y/9。所以DRF分配可以通过求解以下的优化问题来得到:

 

max(x,y)     #(Maximize allocations)

    subject to

        x + 3y <= 9         #(CPU constraint)

        4x + y <= 18         #(Memory Constraint)

            2x/9 = y/3     #(Equalize dominant shares)

 

最后解出x=3以及y=2,因而用户A获得{3CPU,12GB},B得到{6CPU, 2GB}。

 

HierarchicalDRF核心算法实现

 

HierachicalDRF的实现在Src/main/allocator/mesos/hierarchical.cpp中

 



 

不是每次把所有的资源都给所有的framework,而是根据资源分配算法,每个framework拿到的不同

 

void HierarchicalAllocatorProcess::allocate(

    const hashset<SlaveID>& slaveIds_)

{

  ++metrics.allocation_runs;

 
  // Compute the offerable resources, per framework:

  // (1) For reserved resources on the slave, allocate these to a

  // framework having the corresponding role.

  // (2) For unreserved resources on the slave, allocate these

  // to a framework of any role.

  hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;

 
  // NOTE: This function can operate on a small subset of slaves, we have to

  // make sure that we don't assume cluster knowledge when summing resources

  // from that set.

 
  vector<SlaveID> slaveIds;

  slaveIds.reserve(slaveIds_.size());

 
  // Filter out non-whitelisted and deactivated slaves in order not to send

  // offers for them.

  foreach (const SlaveID& slaveId, slaveIds_) {

    if (isWhitelisted(slaveId) && slaves[slaveId].activated) {

      slaveIds.push_back(slaveId);

    }

  }

 
  // Randomize the order in which slaves' resources are allocated.

  //

  // TODO(vinod): Implement a smarter sorting algorithm.

  std::random_shuffle(slaveIds.begin(), slaveIds.end());

 
  // Returns the __quantity__ of resources allocated to a quota role. Since we

  // account for reservations and persistent volumes toward quota, we strip

  // reservation and persistent volume related information for comparability.

  // The result is used to determine whether a role's quota is satisfied, and

  // also to determine how many resources the role would need in order to meet

  // its quota.

  //

  // NOTE: Revocable resources are excluded in `quotaRoleSorter`.

  auto getQuotaRoleAllocatedResources = [this](const
string& role) {

    CHECK(quotas.contains(role));

 
    // NOTE: `allocationScalarQuantities` omits dynamic reservation and

    // persistent volume info, but we additionally strip `role` here.

    Resources resources;

 
    foreach (Resource resource,

             quotaRoleSorter->allocationScalarQuantities(role)) {

      CHECK(!resource.has_reservation());

      CHECK(!resource.has_disk());

 
      resource.set_role("*");

      resources += resource;

    }

 
    return resources;

  };

 
  // Quota comes first and fair share second. Here we process only those

  // roles, for which quota is set (quota'ed roles). Such roles form a

  // special allocation group with a dedicated sorter.

  foreach (const SlaveID& slaveId, slaveIds) {

    foreach (const
string& role, quotaRoleSorter->sort()) {

      CHECK(quotas.contains(role));

 
      // If there are no active frameworks in this role, we do not

      // need to do any allocations for this role.

      if (!activeRoles.contains(role)) {

        continue;

      }

 
      // Get the total quantity of resources allocated to a quota role. The

      // value omits role, reservation, and persistence info.

      Resources roleConsumedResources = getQuotaRoleAllocatedResources(role);

 
      // If quota for the role is satisfied, we do not need to do any further

      // allocations for this role, at least at this stage.

      //

      // TODO(alexr): Skipping satisfied roles is pessimistic. Better

      // alternatives are:

      // * A custom sorter that is aware of quotas and sorts accordingly.

      // * Removing satisfied roles from the sorter.

      if (roleConsumedResources.contains(quotas[role].info.guarantee())) {

        continue;

      }

 
      // Fetch frameworks according to their fair share.

      foreach (const
string& frameworkId_, frameworkSorters[role]->sort()) {

        FrameworkID frameworkId;

        frameworkId.set_value(frameworkId_);

 
        // If the framework has suppressed offers, ignore. The unallocated

        // part of the quota will not be allocated to other roles.

        if (frameworks[frameworkId].suppressed) {

          continue;

        }

 
        // Only offer resources from slaves that have GPUs to

        // frameworks that are capable of receiving GPUs.

        // See MESOS-5634.

        if (!frameworks[frameworkId].gpuAware &&

            slaves[slaveId].total.gpus().getOrElse(0) > 0) {

          continue;

        }

 
        // Calculate the currently available resources on the slave.

        Resources available = slaves[slaveId].total - slaves[slaveId].allocated;

 
        // The resources we offer are the unreserved resources as well as the

        // reserved resources for this particular role. This is necessary to

        // ensure that we don't offer resources that are reserved for another

        // role.

        //

        // NOTE: Currently, frameworks are allowed to have '*' role.

        // Calling reserved('*') returns an empty Resources object.

        //

        // Quota is satisfied from the available non-revocable resources on the

        // agent. It's important that we include reserved resources here since

        // reserved resources are accounted towards the quota guarantee. If we

        // were to rely on stage 2 to offer them out, they would not be checked

        // against the quota guarantee.

        Resources resources =

          (available.unreserved() + available.reserved(role)).nonRevocable();

 
        // It is safe to break here, because all frameworks under a role would

        // consider the same resources, so in case we don't have allocatable

        // resources, we don't have to check for other frameworks under the

        // same role. We only break out of the innermost loop, so the next step

        // will use the same `slaveId`, but a different role.

        //

        // NOTE: The resources may not be allocatable here, but they can be

        // accepted by one of the frameworks during the second allocation

        // stage.

        if (!allocatable(resources)) {

          break;

        }

 
        // If the framework filters these resources, ignore. The unallocated

        // part of the quota will not be allocated to other roles.

        if (isFiltered(frameworkId, slaveId, resources)) {

          continue;

        }

 
        VLOG(2) << "Allocating " << resources << " on agent " << slaveId

                << " to framework " << frameworkId

                << " as part of its role quota";

 
        // NOTE: We perform "coarse-grained" allocation for quota'ed

        // resources, which may lead to overcommitment of resources beyond

        // quota. This is fine since quota currently represents a guarantee.

        offerable[frameworkId][slaveId] += resources;

        slaves[slaveId].allocated += resources;

 
        // Resources allocated as part of the quota count towards the

        // role's and the framework's fair share.

        //

        // NOTE: Revocable resources have already been excluded.

        frameworkSorters[role]->add(slaveId, resources);

        frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);

        roleSorter->allocated(role, slaveId, resources);

        quotaRoleSorter->allocated(role, slaveId, resources);

      }

    }

  }

 
  // Calculate the total quantity of scalar resources (including revocable

  // and reserved) that are available for allocation in the next round. We

  // need this in order to ensure we do not over-allocate resources during

  // the second stage.

  //

  // For performance reasons (MESOS-4833), this omits information about

  // dynamic reservations or persistent volumes in the resources.

  //

  // NOTE: We use total cluster resources, and not just those based on the

  // agents participating in the current allocation (i.e. provided as an

  // argument to the `allocate()` call) so that frameworks in roles without

  // quota are not unnecessarily deprived of resources.

  Resources remainingClusterResources = roleSorter->totalScalarQuantities();

  foreachkey (const
string& role, activeRoles) {

    remainingClusterResources -= roleSorter->allocationScalarQuantities(role);

  }

 
  // Frameworks in a quota'ed role may temporarily reject resources by

  // filtering or suppressing offers. Hence quotas may not be fully allocated.

  Resources unallocatedQuotaResources;

  foreachpair (const
string& name, const Quota& quota, quotas) {

    // Compute the amount of quota that the role does not have allocated.

    //

    // NOTE: Revocable resources are excluded in `quotaRoleSorter`.

    // NOTE: Only scalars are considered for quota.

    Resources allocated = getQuotaRoleAllocatedResources(name);

    const Resources required = quota.info.guarantee();

    unallocatedQuotaResources += (required - allocated);

  }

 
  // Determine how many resources we may allocate during the next stage.

  //

  // NOTE: Resources for quota allocations are already accounted in

  // `remainingClusterResources`.

  remainingClusterResources -= unallocatedQuotaResources;

 
  // To ensure we do not over-allocate resources during the second stage

  // with all frameworks, we use 2 stopping criteria:

  // * No available resources for the second stage left, i.e.

  // `remainingClusterResources` - `allocatedStage2` is empty.

  // * A potential offer will force the second stage to use more resources

  // than available, i.e. `remainingClusterResources` does not contain

  // (`allocatedStage2` + potential offer). In this case we skip this

  // agent and continue to the next one.

  //

  // NOTE: Like `remainingClusterResources`, `allocatedStage2` omits

  // information about dynamic reservations and persistent volumes for

  // performance reasons. This invariant is preserved because we only add

  // resources to it that have also had this metadata stripped from them

  // (typically by using `Resources::createStrippedScalarQuantity`).

  Resources allocatedStage2;

 
  // At this point resources for quotas are allocated or accounted for.

  // Proceed with allocating the remaining free pool.

  foreach (const SlaveID& slaveId, slaveIds) {

    // If there are no resources available for the second stage, stop.

    if (!allocatable(remainingClusterResources - allocatedStage2)) {

      break;

    }

 
    foreach (const
string& role, roleSorter->sort()) {

      foreach (const
string& frameworkId_,

               frameworkSorters[role]->sort()) {

        FrameworkID frameworkId;

        frameworkId.set_value(frameworkId_);

 
        // If the framework has suppressed offers, ignore.

        if (frameworks[frameworkId].suppressed) {

          continue;

        }

 
        // Only offer resources from slaves that have GPUs to

        // frameworks that are capable of receiving GPUs.

        // See MESOS-5634.

        if (!frameworks[frameworkId].gpuAware &&

            slaves[slaveId].total.gpus().getOrElse(0) > 0) {

          continue;

        }

 
        // Calculate the currently available resources on the slave.

        Resources available = slaves[slaveId].total - slaves[slaveId].allocated;

 
        // The resources we offer are the unreserved resources as well as the

        // reserved resources for this particular role. This is necessary to

        // ensure that we don't offer resources that are reserved for another

        // role.

        //

        // NOTE: Currently, frameworks are allowed to have '*' role.

        // Calling reserved('*') returns an empty Resources object.

        //

        // NOTE: We do not offer roles with quota any more non-revocable

        // resources once their quota is satisfied. However, note that this is

        // not strictly true due to the coarse-grained nature (per agent) of the

        // allocation algorithm in stage 1.

        //

        // TODO(mpark): Offer unreserved resources as revocable beyond quota.

        Resources resources = available.reserved(role);

        if (!quotas.contains(role)) {

          resources += available.unreserved();

        }

 
        // It is safe to break here, because all frameworks under a role would

        // consider the same resources, so in case we don't have allocatable

        // resources, we don't have to check for other frameworks under the

        // same role. We only break out of the innermost loop, so the next step

        // will use the same slaveId, but a different role.

        //

        // The difference to the second `allocatable` check is that here we also

        // check for revocable resources, which can be disabled on a per frame-

        // work basis, which requires us to go through all frameworks in case we

        // have allocatable revocable resources.

        if (!allocatable(resources)) {

          break;

        }

 
        // Remove revocable resources if the framework has not opted for them.

        if (!frameworks[frameworkId].revocable) {

          resources = resources.nonRevocable();

        }

 
        // If the resources are not allocatable, ignore. We can not break

        // here, because another framework under the same role could accept

        // revocable resources and breaking would skip all other frameworks.

        if (!allocatable(resources)) {

          continue;

        }

 
        // If the framework filters these resources, ignore.

        if (isFiltered(frameworkId, slaveId, resources)) {

          continue;

        }

 
        // If the offer generated by `resources` would force the second

        // stage to use more than `remainingClusterResources`, move along.

        // We do not terminate early, as offers generated further in the

        // loop may be small enough to fit within `remainingClusterResources`.

        const Resources scalarQuantity =

          resources.createStrippedScalarQuantity();

 
        if (!remainingClusterResources.contains(

                allocatedStage2 + scalarQuantity)) {

          continue;

        }

 
        VLOG(2) << "Allocating " << resources << " on agent " << slaveId

                << " to framework " << frameworkId;

 
        // NOTE: We perform "coarse-grained" allocation, meaning that we always

        // allocate the entire remaining slave resources to a single framework.

        //

        // NOTE: We may have already allocated some resources on the current

        // agent as part of quota.

        offerable[frameworkId][slaveId] += resources;

        allocatedStage2 += scalarQuantity;

        slaves[slaveId].allocated += resources;

 
        frameworkSorters[role]->add(slaveId, resources);

        frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);

        roleSorter->allocated(role, slaveId, resources);

 
        if (quotas.contains(role)) {

          // See comment at `quotaRoleSorter` declaration regarding

          // non-revocable.

          quotaRoleSorter->allocated(role, slaveId, resources.nonRevocable());

        }

      }

    }

  }

 
  if (offerable.empty()) {

    VLOG(1) << "No allocations performed";

  } else {

    // Now offer the resources to each framework.

    foreachkey (const FrameworkID& frameworkId, offerable) {

      offerCallback(frameworkId, offerable[frameworkId]);

    }

  }

 
  // NOTE: For now, we implement maintenance inverse offers within the

  // allocator. We leverage the existing timer/cycle of offers to also do any

  // "deallocation" (inverse offers) necessary to satisfy maintenance needs.

  deallocate(slaveIds_);

}
 

上面这段算法非常复杂,概况来说调用了三个Sorter,对所有的Framework进行排序,哪个先得到资源,哪个后得到资源。

 

在src/master/allocator/mesos/hierarchical.hpp中,有对三个重要的sorter的定义和注释,可以帮助了解sorter的逻辑。

 



 



 



 

 

总的来说分两大步:

先保证有quota的role

然后其他的资源没有quota的再分

在每一步Hierachical的意思是两层排序

一层是按照role排序

第二层是相同的role的不同Framework排序

每一层的排序都是按照计算的share进行排序来先给谁,再给谁

 

在src/master/allocator/sorter/drf/sorter.cpp中



 

 

double DRFSorter::calculateShare(const
string& name)

{

  double share = 0.0;

 
  // TODO(benh): This implementation of "dominant resource fairness"

  // currently does not take into account resources that are not

  // scalars.

 
  foreach (const
string& scalar, total_.scalarQuantities.names()) {

    // Filter out the resources excluded from fair sharing.

    if (fairnessExcludeResourceNames.isSome() &&

        fairnessExcludeResourceNames->count(scalar) > 0) {

      continue;

    }

 
    // We collect the scalar accumulated total value from the

    // `Resources` object.

    //

    // NOTE: Although in principle scalar resources may be spread

    // across multiple `Resource` objects (e.g., persistent volumes),

    // we currently strip persistence and reservation metadata from

    // the resources in `scalarQuantities`.

    Option<Value::Scalar> __total =

      total_.scalarQuantities.get<Value::Scalar>(scalar);

 
    CHECK_SOME(__total);

    const
double _total = __total.get().value();

 
    if (_total > 0.0) {

      double allocation = 0.0;

 
      // We collect the scalar accumulated allocation value from the

      // `Resources` object.

      //

      // NOTE: Although in principle scalar resources may be spread

      // across multiple `Resource` objects (e.g., persistent volumes),

      // we currently strip persistence and reservation metadata from

      // the resources in `scalarQuantities`.

      Option<Value::Scalar> _allocation =

        allocations[name].scalarQuantities.get<Value::Scalar>(scalar);

 
      if (_allocation.isSome()) {

        allocation = _allocation.get().value();

      }

 
      share = std::max(share, allocation / _total);

    }

  }

 
  return share / weights[name];

}
 

 

Quota, Reservation, Role, Weight

 

每个Framework可以有Role,既用于权限,也用于资源分配

可以给某个role在offerResources的时候回复Offer::Operation::RESERVE,来预订某台slave上面的资源。Reservation是很具体的,具体到哪台机器的多少资源属于哪个Role

Quota是每个Role的最小保证量,但是不具体到某个节点,而是在整个集群中保证有这么多就行了。

Reserved资源也算在Quota里面。

不同的Role之间可以有Weight

 

最后将resource交给每一个Framework

在allocate函数的最后,依次调用offerCallback来讲resource分配给每一个Framework



 



 

那offerCallback函数是什么时候注册进来的呢?

 



 

 

在Allocator的initialize函数中,OfferCallback被注册尽量,并且没过一段时间执行一次。

 

在Allocator初始化的时候,最后定义每allocationInterval运行一次

offerCallback是注册进来的函数,请记住。

 

6. flags.registry == "in_memory" or flags.registry == "replicated_log" 信息存储在内存,zk,本地文件夹

 

7. 选举和检测谁是Leader的对象初始化

 

Try<MasterContender*> contender_ = MasterContender::create(zk, flags.master_contender);

Try<MasterDetector*> detector_ = MasterDetector::create(zk, flags.master_detector);

 

8. 生成Master对象,启动Master线程

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: