spark core 2.0 ExecutionMemoryPool
2017-01-16 17:57
148 查看
为在各任务之间共享的大小可调节的内存池实施一些策略,并且做做一些记帐操作。
尽量保证每个任务有一个合理的内存分配,不要出现一些任务先占用大量内存,然后导致其它任务不断溢出到磁盘上。
如果有N个任务,保证每个任务在溢出前至少获得1/2N的内存,至少有1/N。
释放指定任务的内存。
尽量保证每个任务有一个合理的内存分配,不要出现一些任务先占用大量内存,然后导致其它任务不断溢出到磁盘上。
如果有N个任务,保证每个任务在溢出前至少获得1/2N的内存,至少有1/N。
/** * Implements policies and bookkeeping for sharing an adjustable-sized pool of memory between tasks. * * Tries to ensure that each task gets a reasonable share of memory, instead of some task ramping up * to a large amount first and then causing others to spill to disk repeatedly. * * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever this * set changes. This is all done by synchronizing access to mutable state and using wait() and * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across * tasks was performed by the ShuffleMemoryManager.为指定的任务尽量获得numBytes数量的内存,并且返回获得的内存字节数,如果没有分配,则返回0.
/** * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes * obtained, or 0 if none can be allocated. * * This call may block until there is enough free memory in some situations, to make sure each * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already. * * @param numBytes number of bytes to acquire * @param taskAttemptId the task attempt acquiring memory * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in * one parameter (Long) that represents the desired amount of memory by * which this pool should be expanded. * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool * at this given moment. This is not a field because the max pool * size is variable in certain cases. For instance, in unified * memory management, the execution pool can be expanded by evicting * cached blocks, thereby shrinking the storage pool. * * @return the number of bytes granted to the task. */ private[memory] def acquireMemory( numBytes: Long, taskAttemptId: Long, maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized { assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") // TODO: clean up this clunky method signature // Add this task to the taskMemory map just so we can keep an accurate count of the number // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory` if (!memoryForTask.contains(taskAttemptId)) { memoryForTask(taskAttemptId) = 0L // This will later cause waiting tasks to wake up and check numTasks again lock.notifyAll() } // Keep looping until we're either sure that we don't want to grant this request (because this // task would have more than 1 / numActiveTasks of the memory) or we have enough free // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)). // TODO: simplify this to limit each task to its own slot while (true) { val numActiveTasks = memoryForTask.keys.size val curMem = memoryForTask(taskAttemptId) // In every iteration of this loop, we should first try to reclaim any borrowed execution // space from storage. This is necessary because of the potential race condition where new // storage blocks may steal the free execution memory that this task was waiting for. maybeGrowPool(numBytes - memoryFree) // Maximum size the pool would have after potentially growing the pool. // This is used to compute the upper bound of how much memory each task can occupy. This // must take into account potential free memory as well as the amount this pool currently // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management, // we did not take into account space that could have been freed by evicting cached blocks. val maxPoolSize = computeMaxPoolSize() val maxMemoryPerTask = maxPoolSize / numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks) // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) // Only give it as much memory as is free, which might be none if it reached 1 / numTasks val toGrant = math.min(maxToGrant, memoryFree) // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking; // if we can't give it this much now, wait for other tasks to free up memory // (this happens if older tasks allocated lots of memory before N grew) if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) { logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") lock.wait() } else { memoryForTask(taskAttemptId) += toGrant return toGrant } } 0L // Never reached }
释放指定任务的内存。
/** * Release `numBytes` of memory acquired by the given task. */ def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { val curMem = memoryForTask.getOrElse(taskAttemptId, 0L) var memoryToFree = if (curMem < numBytes) { logWarning( s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " + s"of memory from the $poolName pool") curMem } else { numBytes } if (memoryForTask.contains(taskAttemptId)) { memoryForTask(taskAttemptId) -= memoryToFree if (memoryForTask(taskAttemptId) <= 0) { memoryForTask.remove(taskAttemptId) } } lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed }
/** * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt. */ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { if (extraMemoryNeeded > 0) { // There is not enough free memory in the execution pool, so try to reclaim memory from // storage. We can reclaim any free memory from the storage pool. If the storage pool // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim // the memory that storage has borrowed from execution. val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) storagePool.decrementPoolSize(spaceToReclaim) executionPool.incrementPoolSize(spaceToReclaim) } } } /** * The size the execution pool would have after evicting storage memory. * * The execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. It is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. Otherwise we may hit SPARK-12155. * * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness * in execution memory allocation across tasks, Otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted. */ def computeMaxExecutionPoolSize(): Long = { maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
相关文章推荐
- Java MongoDB : upset document(MongoCollection替换更新document) replaceOne
- codeforces 696D Legen...(AC自动机+矩阵快速幂)
- 公司裁员
- Windows 免安装(zip)mysql的各种坑
- LCT
- 毕设论文
- Android开发之利用MQTT协议实现消息的即时推送
- 在opencv3中的机器学习算法
- C#泛型编程
- Linux┊关于gcc、glibc和binutils模块之间的关系
- Nginx HTTP负载均衡和反向代理配置
- UEditor富文本使用
- Google浏览器无法打开axure原型
- cos距离与欧式距离
- 协程与多线程的区别
- 走进AngularJs(一)angular基本概念的认识与实战
- RepositoryItemComboBox获得焦点弹出下拉框
- lightoj 1102 - Problem Makes Problem(组合数+逆元)
- C#二维码生成代码
- [FFMPEG硬件加速]Intel® Media Server Studio