Spark源码阅读笔记之DiskStore
2015-07-30 11:07
1051 查看
Spark源码阅读笔记之DiskStore
BlockManager底层通过BlockStore来对数据进行实际的存储。BlockStore是一个抽象类,有三种实现:DiskStore(磁盘级别的持久化)、MemoryStore(内存级别的持久化)和TachyonStore(Tachyon内存分布式文件系统级别的持久化)。DiskStore通过DiskBlockManager来实现Block和相应磁盘文件的映射关系,从而将Block存储到磁盘的文件中。DiskBlockManager根据YARN_LOCAL_DIRS或LOCAL_DIRS(yarn模式),SPARK_LOCAL_DIRS或spark.local.dir(其他模式,默认值System.getProperty(“java.io.tmpdir“))配置的本地根目录(可能有多个,以逗号分隔)来生成DiskStore存放Block的根目录(与配置的根目录对应,也有可能有多个):…/blockmgr-UUID.randomUUID.toString(yarn模式)或…/spark-UUID.randomUUID.toString/blockmgr-UUID.randomUUID.toString(其他模式)。同时DiskBlockManager会为每个根目录生成conf.getInt(“spark.diskStore.subDirectories“, 64)个子目录用来存放Block对应的文件,每个Block会根据它的name哈希到相应的子目录,然后以Block的name为文件名来生成文件存储。
Creates and maintains the logical mapping between logical blocks and physical on-disk locations. By default, one block is mapped to one file with a name given by its BlockId. However, it is also possible to have a block map to only a segment of a file, by calling mapBlockToFileSegment().
Block files are hashed among the directories listed in spark.local.dir (or in SPARK_LOCAL_DIRS, if it’s set).
DiskBlockManager属性
blockManager:BlockManager
subDirsPerLocalDir:Int = blockManager.conf.getInt(“spark.diskStore.subDirectories“, 64)
每个本地根目录生成子目录的个数,生成子目录是为了避免生成过多的索引节点
Create one local directory for each path mentioned in spark.local.dir; then, inside this directory, create multiple subdirectories that we will hash files into, in order to avoid having really large inodes at the top level.
localDirs:Array[File]
存放Block对应的File的本地根目录,根据根据YARN_LOCAL_DIRS或LOCAL_DIRS(yarn模式), SPARK_LOCAL_DIRS或spark.local.dir(其他模式,默认值System.getProperty(“java.io.tmpdir”))配置生成
subDirs:Array[File](localDirs.lenght)(subDirsPerLocalDir)
存放所有子目录的二维数组
DiskBlockManager方法
getFile(filename: String): File
根据文件名,取得文件。该方法先将filename哈希到相应的子目录(subDirs[hash % localDirs.length][(hash / localDirs.length) % subDirsPerLocalDir]),然后判断子目录是否存在,若不存在则生成
/** Looks up a file by hashing it into one of our local subdirectories. */ def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // Create the subdirectory if it doesn't already exist var subDir = subDirs(dirId)(subDirId) if (subDir == null) { subDir = subDirs(dirId).synchronized { val old = subDirs(dirId)(subDirId) if (old != null) { old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists() && !newDir.mkdir()) { throw new IOException(s"Failed to create local dir in $newDir.") } subDirs(dirId)(subDirId) = newDir newDir } } } new File(subDir, filename) }
getFile(blockId: BlockId): File = getFile(blockId.name)
根据BlockId取得相应的File
containsBlock(blockId: BlockId): Boolean = getFile(blockId.name).exists()
判断BlockId是否有存储在该本地磁盘
getAllFiles(): Seq[File]
取得存储的所有的文件
/** List all the files currently stored on disk by the disk manager. */ def getAllFiles(): Seq[File] = { // Get all the files inside the array of array of directories subDirs.flatten.filter(_ != null).flatMap { dir => val files = dir.listFiles() if (files != null) files else Seq.empty } }
getAllBlocks(): Seq[BlockId] = getAllFiles().map(f => BlockId(f.getName))
取得存储的所有Block的BlockId
createTempLocalBlock(): (TempLocalBlockId, File)
创建本地临时文件
/** Produces a unique block id and File suitable for storing local intermediate results. */ def createTempLocalBlock(): (TempLocalBlockId, File) = { var blockId = new TempLocalBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempLocalBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }
createTempShuffleBlock(): (TempShuffleBlockId, File)
创建sort shuffle使用的临时文件
Produces a unique block id and File suitable for storing shuffled intermediate results. “
def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }
**DiskStore**属性
blockManager: BlockManager
diskManager: DiskBlockManager
minMemoryMapBytes:Long= blockManager.conf.getLong(
“spark.storage.memoryMapThreshold“, 2 * 1024L * 1024L)
对文件进行内存映射的阈值,即当文件大于该值时getBytes方法对文件进行内存映射,而不是直接将该文件的内容读取到字节缓存区。
DiskStore方法
def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult
将BlockId对应的字节缓存存储到磁盘
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { // So that we do not modify the input offsets ! // duplicate does not copy buffer, so inexpensive val bytes = _bytes.duplicate() logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val channel = new FileOutputStream(file).getChannel while (bytes.remaining > 0) { channel.write(bytes) } channel.close() val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) PutResult(bytes.limit(), Right(bytes.duplicate())) }
putIterator(blockId: BlockId, values: Iterator[Any],level: StorageLevel,returnValues: Boolean): PutResult
将BlockId对应的Iterator数据存储到磁盘,该方法先将Iterator序列化,然后存储到相应的文件。
override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { logDebug(s"Attempting to write values for block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) try { try { blockManager.dataSerializeStream(blockId, outputStream, values) } finally { // Close outputStream here because it should be closed before file is deleted. outputStream.close() } } catch { case e: Throwable => if (file.exists()) { file.delete() } throw e } val length = file.length val timeTaken = System.currentTimeMillis - startTime logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(length), timeTaken)) if (returnValues) { // Return a byte buffer for the contents of the file val buffer = getBytes(blockId).get PutResult(length, Right(buffer)) } else { PutResult(length, null) } }
putArray(blockId: BlockId,values: Array[Any],level: StorageLevel,returnValues: Boolean): PutResult
将BlockId对应的Array数据存储到磁盘,该方法先将Array序列化,然后存储到相应的文件。
override def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { putIterator(blockId, values.toIterator, level, returnValues) }
getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer]
底层方法,读取文件中偏移为offset,长度为length的内容。该方法会判断length是否大于minMemoryMapBytes,若大于,则做内存映射,否则直接读取到字节缓存中。
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { val channel = new RandomAccessFile(file, "r").getChannel try { // For small files, directly read rather than memory map if (length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(length.toInt) channel.position(offset) while (buf.remaining() != 0) { if (channel.read(buf) == -1) { throw new IOException("Reached EOF before filling buffer\n" + s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") } } buf.flip() Some(buf) } else { Some(channel.map(MapMode.READ_ONLY, offset, length)) } } finally { channel.close() } }
getBytes(blockId: BlockId): Option[ByteBuffer]
读取存储在磁盘中与BlockId对应的内容。
override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val file = diskManager.getFile(blockId.name) getBytes(file, 0, file.length) }
getBytes(segment: FileSegment): Option[ByteBuffer] = getBytes(segment.file, segment.offset, segment.length)
根据FileSegment读取内容,其中 FileSegment存放文件和要读取数据的偏移和大小:FileSegment(val file: File, val offset: Long, val length: Long)
getValues(blockId: BlockId): Option[Iterator[Any]]
读取BlockId对应的内容,并反序列化为Iterator。
override def getValues(blockId: BlockId): Option[Iterator[Any]] = { getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) }
getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]]
读取BlockId对应的内容,并根据自定义的Serializer反序列化为Iterator。
/** - A version of getValues that allows a custom serializer. This is used as part of the - shuffle short-circuit code. */ def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { // TODO: Should bypass getBytes and use a stream based implementation, so that // we won't use a lot of memory during e.g. external sort merge. getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) }
getSize(blockId: BlockId): Long = diskManager.getFile(blockId.name).length
得到存储在该本地磁盘的BlockId对应Block的大小。
remove(blockId: BlockId): Boolean
删除存储的BlockId对应的Block。
override def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) // If consolidation mode is used With HashShuffleMananger, the physical filename for the block // is different from blockId.name. So the file returns here will not be exist, thus we avoid to // delete the whole consolidated file by mistake. if (file.exists()) { file.delete() } else { false } }
contains(blockId: BlockId): Boolean
判断是否存储BlockId对应的Block。
override def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() }
相关文章推荐
- Mysql配置
- Java中对List进行排序
- 报错,但不影响运行ERROR: JDWP Unable to get JNI 1.2 environment, jvm->GetEnv() return code = -2
- 链表的创建、查询、删除,插入;
- 遥感数据下载
- 七牛php上传下载类,集成官方文档的方法
- 打造你自己ajax上传图片
- 快速排序
- Java ConcurrentModificationException 异常分析与解决方案
- while和ssh使用遇到的坑
- 黑马程序员--反射
- ListView下拉加载更多练习
- android FileObserver的用法与避坑指南
- 设计模式——单例模式
- 伪类和伪元素的区别
- C#初级知识点整理及VS的简单使用
- Android 开发工具类 34_OpenFileUtil
- 从头开始写项目Makefile(六):参数传递、条件判断、include .
- 东软实训开始了
- cygwin的安装,vi的使用,gcc,g++的使用(转)