您的位置:首页 > 其它

Spark源码阅读笔记之MemoryStore

2015-07-27 10:26 393 查看

Spark源码阅读笔记之MemoryStore

BlockManager底层通过BlockStore来对数据进行实际的存储。BlockStore是一个抽象类,有三种实现:DiskStore(磁盘级别的持久化)、MemoryStore(内存级别的持久化)和TachyonStore(Tachyon内存分布式文件系统级别的持久化)。

MemoryStore以反序列化后的数组或者序列化后的字节缓存(ByteBuffer)形式将Block,存储到内存中。

Stores blocks in memory, either as Arrays of deserialized Java objects or as serialized ByteBuffers.

MemoryStore用LinkedHashMap数据结构来存储数据,从而可以以存储的先后顺序来访问数据。并且会记录当前存储的所有Block的大小,若没有足够的内存来存储新Block,会以存储的先后顺序来遍历所有的Block,将可以存储到磁盘的Block存储到磁盘中,以释放出足够的内存来存储新Block,若还是没有足够的内存来存储,则如果新Block可以被存储到磁盘,将新Block存储到磁盘,否则不做任何操作。在存储Iterator的数据时,由于Iterator可能是从磁盘或者其他非内存的来源读取,因此展开时(转化为Array[Any])需要考虑是否有足够的内存。

MemoryStore属性

blockManagerBlockManager

maxMemory:Long

最大可用的内存大小

entries:LinkedHashMap[BlockId, MemoryEntry]

存储Block数据的Map,以BlockId为key,MemoryEntry为值,并能根据存储的先后顺序访问

accountingLock: Object

同步锁,保证只有一个线程在写和删除Block

Ensure only one thread is putting, and if necessary, dropping blocks at any given time

currentMemory:Long

当前内存使用情况

unrollMemoryMap:Map[Long, Long]

记录各个线程展开Iterator时的内存使用情况,key为线程的Id,value为内存使用情况。

A mapping from thread ID to amount of memory used for unrolling a block (in bytes). All accesses of this map are assumed to have manually synchronized on
accountingLock


maxUnrollMemory:Long

展开Iterator时需要保证的内存大小,值为maxMemory*conf.getDouble(“spark.storage.unrollFraction”, 0.2),若展开时没有足够的内存,并且展开Iterator使用的内存没有达到maxUnrollMemory,需要将存储在内存中的可以存储到磁盘中的Block存储到磁盘,以释放内存。

The amount of space ensured for unrolling values in memory, shared across all cores. This space is not reserved in advance, but allocated dynamically by dropping existing blocks.

unrollMemoryThreshold: Long

展开Iterator时每个线程初始分配的内存大小,当内存不够时会以1.5倍的大小申请内存,若没有足够的内存并且没有达到maxUnrollMemory,将存储在内存中的可以存储到磁盘中的Block存储到磁盘,以释放内存。值为conf.getLong(“spark.storage.unrollMemoryThreshold”, 1024 * 1024)。

Initial memory to request before unrolling any block

MemoryEntry的代码:

case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)


MemoryStore方法

写数据方法:

putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult

将字节缓存形式的Block存储到内存中,若level.deserialized为真,则需要将字节缓存反序列化,以数组的形式(Array[Any])存储;若为假则以字节缓存形式(ByteBuffer)存储。

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
putIterator(blockId, values, level, returnValues = true)
} else {
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}


putArray(blockId: BlockId,values: Array[Any],level: StorageLevel,returnValues: Boolean): PutResult

将数组形式的Block存储到内存,若level.deserialized为假,则需要将数组序列化,以字节缓存的形式(ByteBuffer)存储;若为真则数组的形式(Array[Any])存储。

override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true)
PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks)
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}


putIterator(blockId: BlockId,values: Iterator[Any],level: StorageLevel,returnValues: Boolean):PutResult

将Iterator[Any]展开为Array[Any]存储到内存中,展开时需靠考虑是否有足够的内存。若展开时没有足够的内存,并且展开Iterator使用的内存没有达到maxUnrollMemory,需要将存储在内存中的可以存储到磁盘中的Block存储到磁盘,以释放内存。若还是没有足够的内存,如果Block允许存储在磁盘则将该Block存储到磁盘,否则返回失败的结果(PutResult(0, Left(iteratorValues), droppedBlocks))。

override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
}


将数组或字节缓存存储到内存中时,都会调用底层的tryToPut方法:

tryToPut(blockId: BlockId, value: Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks

将value(数组,如果deserialized为真;或字节缓存,如果deserialized为假)存储到内存。该方法会首先调用ensureFreeSpace来释放足够的内存,如果有足够的内存则将Block放到entries,否则如果Block可以被放到磁盘(level.useDisk)则将Block存储到磁盘,否则返回失败的结果(ResultWithDroppedBlocks(false,droppedBlocks))。

Try to put in a set of values, if we can free up enough space. The value should either be an Array

if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size must also be passed

by the caller.

Synchronize on
accountingLock
to ensure that all the put requests and its associated block dropping is done by only on thread at a time. Otherwise while one thread is dropping blocks to free memory for one block, another thread may use up the freed space for another block.

Return whether put was successful, along with the blocks dropped in the process.

private def tryToPut(
blockId: BlockId,
value: Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {

var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

accountingLock.synchronized {
val freeSpaceResult = ensureFreeSpace(blockId, size)
val enoughFreeSpace = freeSpaceResult.success
droppedBlocks ++= freeSpaceResult.droppedBlocks

if (enoughFreeSpace) {
val entry = new MemoryEntry(value, size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
putSuccess = true
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
Left(value.asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}


tryToPut返回的结果是ResultWithDroppedBlocks

case class ResultWithDroppedBlocks(
success: Boolean,
droppedBlocks: Seq[(BlockId, BlockStatus)])


tryToPut方法会调用ensureFreeSpace方法来释放出足够的内存:

ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks

遍历entries,将可以存储到磁盘的Block存储到磁盘直到有足够的内存(actualFreeMemory >= space)来存储指定的Block。若最终没有足够的内存则返回失败的结果(ResultWithDroppedBlocks(success = false, droppedBlocks))。

/**
* Try to free up a given amount of space to store a particular block, but can fail if
* either the block is bigger than our memory or it would require replacing another block
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
* Assume that `accountingLock` is held by the caller to ensure only one thread is dropping
* blocks. Otherwise, the freed space may fill up before the caller puts in their new value.
*
* Return whether there is enough free space, along with the blocks dropped in the process.
*/
private def ensureFreeSpace(
blockIdToAdd: BlockId,
space: Long): ResultWithDroppedBlocks = {
logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")

val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

if (space > maxMemory) {
logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}

// Take into account the amount of memory currently occupied by unrolling blocks
val actualFreeMemory = freeMemory - currentUnrollMemory

if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L

// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
selectedMemory += pair.getValue.size
}
}
}

if (actualFreeMemory + selectedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one thread should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
return ResultWithDroppedBlocks(success = true, droppedBlocks)
} else {
logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " +
"from the same RDD")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
}
ResultWithDroppedBlocks(success = true, droppedBlocks)
}


当要存储Iterator形式的Block数据时,会调用putIterator方法:

putIterator(

blockId: BlockId,

values: Iterator[Any],

level: StorageLevel,

returnValues: Boolean,

allowPersistToDisk: Boolean):

PutResult

将Iterator形式的Block数据存储到内存中,若没有足够的内存来展开,当allowPersistToDisk为真并且该Block允许存储到磁盘时将该Block存储到磁盘。

Attempt to put the given block in memory store.

There may not be enough space to fully unroll the iterator in memory, in which case we

optionally drop the values to disk if

  (1) the block’s storage level specifies useDisk, and

  (2)
allowPersistToDisk
is true.

One scenario in which
allowPersistToDisk
is false is when the BlockManager reads a block

back from disk and attempts to cache it in memory. In this case, we should not persist the

block back on disk again, as it is already in disk store.

private[storage] def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean,
allowPersistToDisk: Boolean): PutResult = {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
val res = putArray(blockId, arrayValues, level, returnValues)
droppedBlocks ++= res.droppedBlocks
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
PutResult(res.size, res.data, droppedBlocks)
} else {
PutResult(0, Left(iteratorValues), droppedBlocks)
}
}
}


展开数据时需要调用unrollSafely方法:

unrollSafely(

blockId: BlockId,

values: Iterator[Any],

droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])

: Either[Array[Any], Iterator[Any]]

该方法以线程为单位分配内存,在展开Iterator时会为该线程分配初始内存memoryThreshold(unrollMemoryThreshold,conf.getLong(“spark.storage.unrollMemoryThreshold”, 1024 * 1024)),并周期性(每展开16条)的检查是否有足够的内存,若没有足够的内存,则申请内存,申请的数量是memoryGrowthFactor*memoryThreshold-memoryThreshold(memoryGrowthFactor为1.5,即以1.5倍的速度增加内存),若申请失败则判断当前展开使用的内存是否达到了maxUnrollMemory,若没有则将entries中可以存储到磁盘的数据存储到磁盘以释放出足够的内存。

Unroll the given block in memory safely.The safety of this operation refers to avoiding potential OOM exceptions caused by unrolling the entirety of the block in memory at once. This is achieved by periodically checking whether the memory restrictions for unrolling blocks are still satisfied, stopping immediately if not. This check is a safeguard against the scenario in which there is not enough free memory to accommodate the entirety of a single block. This method returns either an array with the contents of the entire block or an iterator containing the values of the block (if the array would have exceeded available memory).

def unrollSafely(
blockId: BlockId,
values: Iterator[Any],
droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
: Either[Array[Any], Iterator[Any]] = {

// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
// Memory currently reserved by this thread for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
// Previous unroll memory held by this thread, for releasing later (only at the very end)
val previousMemoryReserved = currentUnrollMemoryForThisThread
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[Any]

// Request enough memory to begin unrolling
keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)

if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
}

// Unroll this block safely, checking whether we have exceeded our threshold periodically
try {
while (values.hasNext && keepUnrolling) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
// Hold the accounting lock, in case another thread concurrently puts a block that
// takes up the unrolling space we just ensured here
accountingLock.synchronized {
if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
if (spaceToEnsure > 0) {
val result = ensureFreeSpace(blockId, spaceToEnsure)
droppedBlocks ++= result.droppedBlocks
}
keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
}
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}

if (keepUnrolling) {
// We successfully unrolled the entirety of this block
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
}

} finally {
// If we return an array, the values returned do not depend on the underlying vector and
// we can immediately free up space for other threads. Otherwise, if we return an iterator,
// we release the memory claimed by this thread later on when the task finishes.
if (keepUnrolling) {
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
releaseUnrollMemoryForThisThread(amountToRelease)
}
}
}


读数据方法:

getBytes(blockId: BlockId): Option[ByteBuffer]

取字节BlockId对应Block的字节缓存,若deserialized为真则表示内存中以反序列化形式存储,需要将数据序列化,否则直接返回

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
} else if (entry.deserialized) {
Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
} else {
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
}


getValues(blockId: BlockId): Option[Iterator[Any]]

取字节BlockId对应Block的数据,若deserialized为假则表示内存中以序列化形式存储,需要将数据反序列化,否则直接返回

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
} else if (entry.deserialized) {
Some(entry.value.asInstanceOf[Array[Any]].iterator)
} else {
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
}
}


和内存有关的方法:

freeMemory: Long = maxMemory - currentMemory

空闲内存

currentUnrollMemory: Long

当前所有线程展开Iterator时占用的内存大小

/**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
def currentUnrollMemory: Long = accountingLock.synchronized {
unrollMemoryMap.values.sum
}


reserveUnrollMemoryForThisThread(memory: Long): Boolean

为当前线程申请memory的内存,成功返回true,失败返回false

/**
* Reserve additional memory for unrolling blocks used by this thread.
* Return whether the request is granted.
*/
def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
accountingLock.synchronized {
val granted = freeMemory > currentUnrollMemory + memory
if (granted) {
val threadId = Thread.currentThread().getId
unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
}
granted
}
}


releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit

为当前线程释放memory的内存,若memory为-1,则释放当前线程的所有内存。

/**
* Release memory used by this thread for unrolling blocks.
* If the amount is not specified, remove the current thread's allocation altogether.
*/
def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
val threadId = Thread.currentThread().getId
accountingLock.synchronized {
if (memory < 0) {
unrollMemoryMap.remove(threadId)
} else {
unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory
// If this thread claims no more unroll memory, release it completely
if (unrollMemoryMap(threadId) <= 0) {
unrollMemoryMap.remove(threadId)
}
}
}
}


其他方法:

getSize(blockId: BlockId): Long = entries.synchronized { entries.get(blockId).size }

remove(blockId: BlockId): Boolean

将BlockId对应的Block从entries中移除,更新currentMemory

override def remove(blockId: BlockId): Boolean = {
entries.synchronized {
val entry = entries.remove(blockId)
if (entry != null) {
currentMemory -= entry.size
logInfo(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)")
true
} else {
false
}
}
}


clear()

清空entries,并令currentMemory为0

override def clear() {
entries.synchronized {
entries.clear()
currentMemory = 0
}
logInfo("MemoryStore cleared")
}


contains(blockId: BlockId): Boolean = entries.synchronized { entries.containsKey(blockId) }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: