Spark源码系列之Spark内核——Storage模块
2016-01-04 16:22
204 查看
1.Storage模块架构
Storage模块分为两部分:通信层:Storage模块的通信层通过Master/Slave模型实现的。Master和Slave之间传输控制信息、状态信息。
存储层:Storage模块可以数据存储在Memory或Disk上,可能还会replicate到远程服务器上。
2.通信层
BlockManager包装了BlockManagerMaster,发送信息包装成BlockManagerInfo。Spark在Driver和Worker端都创建各自的BlockManager,并通过BlockManagerMaster进行通信,通过BlockManager对Storage模块进行操作。
BlockManager对象在SparkEnv.create函数中进行创建:
def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } ………… val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
并且在创建之前对当前节点是否是Driver进行了判断。如果是,则创建这个Endpoint;否则,创建Driver的连接。
在创建BlockManager之后,BlockManager会调用initialize方法初始化自己。并且初始化的时候,会调用BlockManagerMaster向Driver注册自己,同时,在注册时也启动了Slave Endpoint。另外,向本地shuffle服务器注册Executor配置,如果存在的话。
def initialize(appId: String): Unit = { ………… master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } }
而BlockManagerMaster将注册请求包装成RegisterBlockManager注册到Driver。Driver的BlockManagerMasterEndpoint会调用register方法,通过对消息BlockManagerInfo检查,向Driver注册。
private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) { val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => // A block manager of the same executor already exists, so remove it (assumed dead) logError("Got two different block manager registrations on same executor - " + s" will replace old one $oldId with new one $id") removeExecutor(id.executorId) case None => } logInfo("Registering block manager %s with %s RAM, %s".format( id.hostPort, Utils.bytesToString(maxMemSize), id)) blockManagerIdByExecutor(id.executorId) = id blockManagerInfo(id) = new BlockManagerInfo( id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) }
不难发现BlockManagerInfo对象被保存到Map映射中。
在通信层中BlockManagerMaster控制着消息的流向,这里采用了模式匹配,所有的消息模式都在BlockManagerMessage中。
3.存储层
Spark Storage的最小存储单位是block,所有的操作都是以block为单位进行的。
在BlockManager被创建的时候MemoryStore和DiskStore对象就被创建出来了
val diskBlockManager = new DiskBlockManager(this, conf) private[spark] val memoryStore = new MemoryStore(this, maxMemory) private[spark] val diskStore = new DiskStore(this, diskBlockManager)
3.1Disk Store
由于当前的Spark版本对Disk Store进行了更细粒度的分工,把对文件的操作提取出来放到了DiskBlockManager中,DiskStore仅仅负责数据的存储和读取。Disk Store会配置多个文件目录,Spark会在不同的文件目录下创建文件夹,其中文件夹的命名方式是:spark-UUID(随机UUID码)。Disk Store在存储的时候创建文件夹。并且根据“高内聚,低耦合”原则,这种服务型的工具代码就放到了Utils中(调用路径:DiskStore.putBytes—>DiskBlockManager.createLocalDirs—>Utils.createDirectory):
def createDirectory(root: String, namePrefix: String = "spark"): File = { var attempts = 0 val maxAttempts = MAX_DIR_CREATION_ATTEMPTS var dir: File = null while (dir == null) { attempts += 1 if (attempts > maxAttempts) { throw new IOException("Failed to create a temp directory (under " + root + ") after " + maxAttempts + " attempts!") } try { dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) if (dir.exists() || !dir.mkdirs()) { dir = null } } catch { case e: SecurityException => dir = null; } } dir.getCanonicalFile }
在DiskBlockManager里,每个block都被存储为一个file,通过计算blockId的hash值,将block映射到文件中。
def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // Create the subdirectory if it doesn't already exist val subDir = subDirs(dirId).synchronized { val old = subDirs(dirId)(subDirId) if (old != null) { old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists() && !newDir.mkdir()) { throw new IOException(s"Failed to create local dir in $newDir.") } subDirs(dirId)(subDirId) = newDir newDir } } new File(subDir, filename) } def getFile(blockId: BlockId): File = getFile(blockId.name)
通过hash值的取模运算,求出dirId和subDirId。然后,在从subDirs中找到subDir,如果subDir不存在,则创建一个新subDir。最后,以subDir为路径,blockId的name属性为文件名,新建该文件。
文件创建完之后,那么Spark就会在DiskStore中向文件写与之映射的block:
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { val bytes = _bytes.duplicate() logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val channel = new FileOutputStream(file).getChannel Utils.tryWithSafeFinally { while (bytes.remaining > 0) { channel.write(bytes) } } { channel.close() } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) PutResult(bytes.limit(), Right(bytes.duplicate())) }
读取过程就简单了,DiskStore根据blockId读取与之映射的file内容,当然,这中间需要从DiskBlockManager中得到文件信息。
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { // For small files, directly read rather than memory map if (length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(length.toInt) channel.position(offset) while (buf.remaining() != 0) { if (channel.read(buf) == -1) { throw new IOException("Reached EOF before filling buffer\n" + s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") } } buf.flip() Some(buf) } else { Some(channel.map(MapMode.READ_ONLY, offset, length)) } } { channel.close() } } override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val file = diskManager.getFile(blockId.name) getBytes(file, 0, file.length) }
3.2Memory Store
相对Disk Store,Memory Store就显得容易很多。Memory Store用一个LinkedHashMap来管理,其中Key是blockId,Value是MemoryEntry样例类,MemoryEntry存储着数据信息。private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
在MemoryStore中存储block的前提是当前内存有足够的空间存放。通过对tryToPut函数的调用对内存空间进行判断。
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false) val data = if (putAttempt.success) { assert(bytes.limit == size) Right(bytes.duplicate()) } else { null } PutResult(size, data, putAttempt.droppedBlocks) }
在tryToPut函数中,通过调用enoughFreeSpace函数判断内存空间。如果内存空间足够,那么就把block放到LinkedHashMap中;如果内存不足,那么就告诉BlockManager内存不足,如果允许Disk Store,那么就把该block放到disk上。
private def tryToPut( blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { var putSuccess = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] accountingLock.synchronized { val freeSpaceResult = ensureFreeSpace(blockId, size) val enoughFreeSpace = freeSpaceResult.success droppedBlocks ++= freeSpaceResult.droppedBlocks if (enoughFreeSpace) { val entry = new MemoryEntry(value(), size, deserialized) entries.synchronized { entries.put(blockId, entry) currentMemory += size } val valuesOrBytes = if (deserialized) "values" else "bytes" logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) putSuccess = true } else { lazy val data = if (deserialized) { Left(value().asInstanceOf[Array[Any]]) } else { Right(value().asInstanceOf[ByteBuffer].duplicate()) } val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } releasePendingUnrollMemoryForThisTask() } ResultWithDroppedBlocks(putSuccess, droppedBlocks) }
Memory Store读取block也很简单,只需要从LinkedHashMap中取出blockId的Value即可。
override def getValues(blockId: BlockId): Option[Iterator[Any]] = { val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { None } else if (entry.deserialized) { Some(entry.value.asInstanceOf[Array[Any]].iterator) } else { val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data Some(blockManager.dataDeserialize(blockId, buffer)) } }
3.put和get block
其实,我们在put和get block的时候并没有那么复杂,前面的细节BlockManager都包装好了,我们只需要调用BlockManager中的put和get函数即可。def putBytes( blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { require(bytes != null, "Bytes is null") doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel) } private def doPut( blockId: BlockId, data: BlockValues, level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None) : Seq[(BlockId, BlockStatus)] = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") effectiveStorageLevel.foreach { level => require(level != null && level.isValid, "Effective StorageLevel is null or invalid") } val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning(s"Block $blockId already exists on this machine; not re-adding it") return updatedBlocks } oldBlockOpt.get } else { tinfo } } val startTimeMs = System.currentTimeMillis var valuesAfterPut: Iterator[Any] = null var bytesAfterPut: ByteBuffer = null var size = 0L val putLevel = effectiveStorageLevel.getOrElse(level) val replicationFuture = data match { case b: ByteBufferValues if putLevel.replication > 1 => // Duplicate doesn't copy the bytes, but just creates a wrapper val bufferView = b.buffer.duplicate() Future { replicate(blockId, bufferView, putLevel) }(futureExecutionContext) case _ => null } putBlockInfo.synchronized { logTrace("Put for block %s took %s to get into synchronized block" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) var marked = false try { val (returnValues, blockStore: BlockStore) = { if (putLevel.useMemory) { (true, memoryStore) } else if (putLevel.useOffHeap) { (false, externalBlockStore) } else if (putLevel.useDisk) { (putLevel.replication > 1, diskStore) } else { assert(putLevel == StorageLevel.NONE) throw new BlockException( blockId, s"Attempted to put block $blockId without specifying storage level!") } } val result = data match { case IteratorValues(iterator) => blockStore.putIterator(blockId, iterator, putLevel, returnValues) case ArrayValues(array) => blockStore.putArray(blockId, array, putLevel, returnValues) case ByteBufferValues(bytes) => bytes.rewind() blockStore.putBytes(blockId, bytes, putLevel) } size = result.size result.data match { case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator case Right (newBytes) => bytesAfterPut = newBytes case _ => } if (putLevel.useMemory) { result.droppedBlocks.foreach { updatedBlocks += _ } } val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { marked = true putBlockInfo.markReady(size) if (tellMaster) { reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } updatedBlocks += ((blockId, putBlockStatus)) } } finally { if (!marked) { blockInfo.remove(blockId) putBlockInfo.markFailure() logWarning(s"Putting block $blockId failed") } } } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (putLevel.replication > 1) { data match { case ByteBufferValues(bytes) => if (replicationFuture != null) { Await.ready(replicationFuture, Duration.Inf) } case _ => val remoteStartTime = System.currentTimeMillis if (bytesAfterPut == null) { if (valuesAfterPut == null) { throw new SparkException( "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") } bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, putLevel) logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } } BlockManager.dispose(bytesAfterPut) if (putLevel.replication > 1) { logDebug("Putting block %s with replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } else { logDebug("Putting block %s without replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } updatedBlocks }
对于doPut函数,主要做了以下几个操作:
创建BlockInfo对象存储block信息;
将BlockInfo加锁,然后根据Storage Level判断存储到Memory还是Disk。同时,对于已经准备好读的BlockInfo要进行解锁。
根据block的副本数量决定是否向远程发送副本。
get函数包含两种情况:从本地get和从远程get。
def get(blockId: BlockId): Option[BlockResult] = { val local = getLocal(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") return local } val remote = getRemote(blockId) if (remote.isDefined) { logInfo(s"Found block $blockId remotely") return remote } None }
get函数会先从本地get,如果从本地找到,就返回相应的block;如果没有就请求从远程get。在通常情况下Spark任务的分配是根据block的分布决定的,任务往往会被分配到拥有block的节点上,因此getLocal就能找到所需的block;但是在资源有限的情况下,Spark会将任务调度到与block不同的节点上,这样就必须通过getRemote来获得block。
先看getLocal,getLocal后来又调用了doGetLocal:
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { if (blockInfo.get(blockId).isEmpty) { logWarning(s"Block $blockId had been removed") return None } if (!info.waitForReady()) { logWarning(s"Block $blockId was marked as failure.") return None } val level = info.level logDebug(s"Level for block $blockId is $level") if (level.useMemory) { logDebug(s"Getting block $blockId from memory") val result = if (asBlockResult) { memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) } else { memoryStore.getBytes(blockId) } result match { case Some(values) => return result case None => logDebug(s"Block $blockId not found in memory") } } if (level.useOffHeap) { logDebug(s"Getting block $blockId from ExternalBlockStore") if (externalBlockStore.contains(blockId)) { val result = if (asBlockResult) { externalBlockStore.getValues(blockId) .map(new BlockResult(_, DataReadMethod.Memory, info.size)) } else { externalBlockStore.getBytes(blockId) } result match { case Some(values) => return result case None => logDebug(s"Block $blockId not found in ExternalBlockStore") } } } if (level.useDisk) { logDebug(s"Getting block $blockId from disk") val bytes: ByteBuffer = diskStore.getBytes(blockId) match { case Some(b) => b case None => throw new BlockException( blockId, s"Block $blockId not found on disk, though it should be") } assert(0 == bytes.position()) if (!level.useMemory) { if (asBlockResult) { return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, info.size)) } else { return Some(bytes) } } else { if (!level.deserialized || !asBlockResult) { memoryStore.putBytes(blockId, bytes.limit, () => { val copyForMemory = ByteBuffer.allocate(bytes.limit) copyForMemory.put(bytes) }) bytes.rewind() } if (!asBlockResult) { return Some(bytes) } else { val values = dataDeserialize(blockId, bytes) if (level.deserialized) { val putResult = memoryStore.putIterator( blockId, values, level, returnValues = true, allowPersistToDisk = false) putResult.data match { case Left(it) => return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) case _ => throw new SparkException("Memory store did not return an iterator!") } } else { return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) } } } } } } else { logDebug(s"Block $blockId not registered locally") } None }
doGetLocal先检查是否存储该blockId,然后根据block的Storage Level进行下面的操作:
level.useMemory和level.useOffHeap(用到了tachyon分布式内存文件系统,暂不考虑,由于实际中没用过),从Memory或ExternalBlockStore中取出block;
level.useDisk
level.useMemory == true: 将block从disk中读出并写入内存以便下次使用时直接从内存中获得,同时返回该block。
level.useMemory == false: 将block从disk中读出并返回
如果都没找到,那么最后返回None。
再看getRemote,getRemote调用了doGetRemote函数:
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { case NonFatal(e) => numFetchFailures += 1 if (numFetchFailures == locations.size) { throw new BlockFetchException(s"Failed to fetch block from" + s" ${locations.size} locations. Most recent failure cause:", e) } else { logWarning(s"Failed to fetch remote block $blockId " + s"from $loc (failed attempt $numFetchFailures)", e) null } } if (data != null) { if (asBlockResult) { return Some(new BlockResult( dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())) } else { return Some(data) } } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None }
getRemote首先取得该block的所有location信息,然后根据location向远端发送请求,通过BlockTransferService获取block,只要有一个远端返回block该函数就返回而不继续发送请求。
4.partition和block的对应关系
在RDD中,核心的函数是iterator:final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
如果当前RDD的storage level不是NONE的话,表示该RDD在BlockManager中有存储,那么调用CacheManager中的getOrCompute函数计算RDD,在这个函数中partition和block就对应起来了:
getOrCompute函数会先构造RDDBlockId,其中RDDBlockId就把block和partition联系起来了,RDDBlockId产生的name就是BlockId的name属性,形式是:rdd_rdd.id_partition.index。
def getOrCompute[T]( rdd: RDD[T], partition: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") blockManager.get(key) match { case Some(blockResult) => val existingMetrics = context.taskMetrics .getInputMetricsForReadMethod(blockResult.readMethod) existingMetrics.incBytesRead(blockResult.bytes) val iter = blockResult.data.asInstanceOf[Iterator[T]] new InterruptibleIterator[T](context, iter) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() } } case None => val storedValues = acquireLockForPartition[T](key) if (storedValues.isDefined) { return new InterruptibleIterator[T](context, storedValues.get) } try { logInfo(s"Partition $key not found, computing it") val computedValues = rdd.computeOrReadCheckpoint(partition, context) if (context.isRunningLocally) { return computedValues } val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) val metrics = context.taskMetrics val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) new InterruptibleIterator(context, cachedValues) } finally { loading.synchronized { loading.remove(key) loading.notifyAll() } } } }
同时getOrCompute函数会对block进行判断:
如果该block存在,表示此RDD在之前已经被计算过和存储在BlockManager中,因此取出即可,无需再重新计算。
如果该block不存在则需要调用RDD的computeOrReadCheckpoint()函数计算出新的block,并将其存储到BlockManager中。
需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束。
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Linux 自检和 SystemTap
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- Tomcat端口被占用解决方法(不用重启)
- “传奇”图象数据存储方式
- 浅析Ruby的源代码布局及其编程风格
- 超大数据量存储常用数据库分表分库算法总结
- SQL Server误区30日谈 第18天 有关FileStream的存储,垃圾回收以及其它
- C++实现图的邻接表存储和广度优先遍历实例分析
- asp.net 抓取网页源码三种实现方法
- C#调用sql2000存储过程方法小结
- JS小游戏之仙剑翻牌源码详解
- JS小游戏之宇宙战机源码详解
- jQuery源码分析之jQuery中的循环技巧详解
- 本人自用的global.js库源码分享
- Linux内核链表实现过程
- java中原码、反码与补码的问题分析