您的位置:首页 > 其它

spark core 2.0 ExecutionMemoryPool

2017-01-16 17:57 148 查看
为在各任务之间共享的大小可调节的内存池实施一些策略,并且做做一些记帐操作。

尽量保证每个任务有一个合理的内存分配,不要出现一些任务先占用大量内存,然后导致其它任务不断溢出到磁盘上。

如果有N个任务,保证每个任务在溢出前至少获得1/2N的内存,至少有1/N。

/**
* Implements policies and bookkeeping for sharing an adjustable-sized pool of memory between tasks.
*
* Tries to ensure that each task gets a reasonable share of memory, instead of some task ramping up
* to a large amount first and then causing others to spill to disk repeatedly.
*
* If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory
* before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
* set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever this
* set changes. This is all done by synchronizing access to mutable state and using wait() and
* notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across
* tasks was performed by the ShuffleMemoryManager.
为指定的任务尽量获得numBytes数量的内存,并且返回获得的内存字节数,如果没有分配,则返回0.
/**
* Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
* obtained, or 0 if none can be allocated.
*
* This call may block until there is enough free memory in some situations, to make sure each
* task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
*
* @param numBytes number of bytes to acquire
* @param taskAttemptId the task attempt acquiring memory
* @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
*                      one parameter (Long) that represents the desired amount of memory by
*                      which this pool should be expanded.
* @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
*                           at this given moment. This is not a field because the max pool
*                           size is variable in certain cases. For instance, in unified
*                           memory management, the execution pool can be expanded by evicting
*                           cached blocks, thereby shrinking the storage pool.
*
* @return the number of bytes granted to the task.
*/
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

// TODO: clean up this clunky method signature

// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}

// Keep looping until we're either sure that we don't want to grant this request (because this
// task would have more than 1 / numActiveTasks of the memory) or we have enough free
// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
// TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

// In every iteration of this loop, we should first try to reclaim any borrowed execution
// space from storage. This is necessary because of the potential race condition where new
// storage blocks may steal the free execution memory that this task was waiting for.
maybeGrowPool(numBytes - memoryFree)

// Maximum size the pool would have after potentially growing the pool.
// This is used to compute the upper bound of how much memory each task can occupy. This
// must take into account potential free memory as well as the amount this pool currently
// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
// we did not take into account space that could have been freed by evicting cached blocks.
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)

// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)

// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L  // Never reached
}

释放指定任务的内存。

/**
* Release `numBytes` of memory acquired by the given task.
*/
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
var memoryToFree = if (curMem < numBytes) {
logWarning(
s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
s"of memory from the $poolName pool")
curMem
} else {
numBytes
}
if (memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) -= memoryToFree
if (memoryForTask(taskAttemptId) <= 0) {
memoryForTask.remove(taskAttemptId)
}
}
lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed
}


/**
* Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
*
* When acquiring memory for a task, the execution pool may need to make multiple
* attempts. Each attempt must be able to evict storage in case another task jumps in
* and caches a large block between the attempts. This is called once per attempt.
*/
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}

/**
* The size the execution pool would have after evicting storage memory.
*
* The execution memory pool divides this quantity among the active tasks evenly to cap
* the execution memory allocation for each task. It is important to keep this greater
* than the execution pool size, which doesn't take into account potential memory that
* could be freed by evicting storage. Otherwise we may hit SPARK-12155.
*
* Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
* in execution memory allocation across tasks, Otherwise, a task may occupy more than
* its fair share of execution memory, mistakenly thinking that other tasks can acquire
* the portion of storage memory that cannot be evicted.
*/
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}

executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: