您的位置:首页 > 运维架构 > 网站架构

第38课: BlockManager架构原理、运行流程图和源码解密

2017-06-06 07:32 477 查看
第38课:  BlockManager架构原理、运行流程图和源码解密

BlockManager是管理整个Spark运行时数据的读写,包含数据存储本身,在数据存储的基础之上进行数据读写。由于Spark是分布式的,所有BlockManager也是分布式的,BlockManager本身相对而言是一个比较大的模块,Spark中有非常多的模块:调度模块、资源管理模块等等。BlockManager是另外一个非常重要的模块,BlockManager本身源码量非常大。本节从BlockManager原理流程对BlockManager做深刻的理解。在Shuffle读写数据的时候, 我们需要读写BlockManager。因此BlockManager是至关重要的内容。
编写一个业务代码WordCount.scala,通过观察WordCount运行时候BlockManager的日志来理解BlockManager的运行。
WordCount.scala代码如下:
1.         package com.dt.spark.sparksql
2.          
3.         import org.apache.log4j.{Level,Logger}
4.         importorg.apache.spark.SparkConf
5.         importorg.apache.spark.SparkContext
6.         importorg.apache.spark.internal.config
7.         import org.apache.spark.rdd.RDD
8.          
9.         /**
10.        * 使用Scala开发本地测试的SparkWordCount程序
11.        *
12.        * @author DT大数据梦工厂
13.        *        新浪微博:http://weibo.com/ilovepains/
14.        */
15.      object WordCount {
16.        def main(args: Array[String]) {
17.          Logger.getLogger("org").setLevel(Level.ALL)
18.       
19.          /**
20.          * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
21.          * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
22.          * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
23.            * 只有1G的内存)的初学者       *
24.            */
25.          val conf = new SparkConf() //创建SparkConf对象
26.          conf.setAppName("Wow,My First SparkApp!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
27.          conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群
28.          /**
29.            * 第2步:创建SparkContext对象
30.            * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala、Java、Python、R等都必须有一个SparkContext
31.            * SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
32.            * 同时还会负责Spark程序往Master注册程序等
33.            * SparkContext是整个Spark应用程序中最为至关重要的一个对象
34.            */
35.          val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
36.          /**
37.            * 第3步:根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD
38.            * RDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其它的RDD操作
39.            * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
40.            */
41.          //val lines: RDD[String] =sc.textFile("D://Big_Data_Software//spark-1.6.0-bin-hadoop2.6//README.md",1) //读取本地文件并设置为一个Partion
42.          // val lines =sc.textFile("D://Big_Data_Software//spark-1.6.0-bin-hadoop2.6//README.md",1) //读取本地文件并设置为一个Partion
43.       
44.          val lines =sc.textFile("data/wordcount/helloSpark.txt", 1) //读取本地文件并设置为一个Partion
45.          /**
46.            * 第4步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
47.            * 第4.1步:讲每一行的字符串拆分成单个的单词
48.            */
49.       
50.          val words = lines.flatMap { line =>line.split(" ") } //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合
51.       
52.          /**
53.            * 第4步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
54.            * 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word =>(word, 1)
55.            */
56.          val pairs = words.map { word => (word,1) }
57.       
58.          /**
59.            * 第4步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
60.            * 第4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
61.            */
62.          val wordCountsOdered = pairs.reduceByKey(_+ _).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair =>(pair._2, pair._1)) //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
63.          wordCountsOdered.collect.foreach(wordNumberPair=> println(wordNumberPair._1 + " : " + wordNumberPair._2))
64.          while (true) {
65.       
66.          }
67.          sc.stop()
68.       
69.        }
70.      }
 
在IDEA中运行一个业务程序WordCount.scala,日志中显示:
l  SparkEnv: Registering MapOutputTracker,其中MapOutputTracker中数据的读写都和BlockManager关联。l  SparkEnv: Registering BlockManagerMaste,其中RegisteringBlockManagerMaster是BlockManagerMaster进行注册。l  DiskBlockManager: Created local directory C:\Users\dell\AppData\Local\Temp\blockmgr-...其中DiskBlockManager是管理磁盘存储的,里面有我们的数据。可以访问Temp目录下以blockmgr-开头的文件的内容。
 WordCount运行结果如下:
1.          Using Spark's default log4j profile:org/apache/spark/log4j-defaults.properties
2.         17/06/06 05:37:57 INFOSparkContext: Running Spark version 2.1.0
3.         ……
4.         17/06/06 05:38:01 INFOSparkEnv: Registering MapOutputTracker
5.         17/06/06 05:38:01 DEBUGMapOutputTrackerMasterEndpoint: init
6.         17/06/06 05:38:01 INFOSparkEnv: Registering BlockManagerMaster
7.         17/06/06 05:38:01 INFOBlockManagerMasterEndpoint: Usingorg.apache.spark.storage.DefaultTopologyMapper for getting topology information
8.         17/06/06 05:38:01 INFOBlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
9.         17/06/06 05:38:01 INFODiskBlockManager: Created local directory atC:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828872c98804
10.      17/06/06 05:38:01 DEBUG DiskBlockManager:Adding shutdown hook
11.      17/06/06 05:38:01 DEBUGShutdownHookManager: Adding shutdown hook
12.      17/06/06 05:38:01 INFOMemoryStore: MemoryStore started with capacity 637.2 MB
13.      17/06/06 05:38:02 INFOSparkEnv: Registering OutputCommitCoordinator
14.      17/06/06 05:38:02 DEBUGOutputCommitCoordinator$OutputCommitCoordinatorEndpoint: init
15.      ……..
 
从Application启动的角度来观察BlockManager:
(1)在Application启动的时候会在SparkEnv中注册BlockManagerMaster以及MapOutputTracker,其中
a)      BlockManagerMaster:对整个集群的 Block数据进行管理;
b)      MapOutputTrackerMaster:跟踪所有的Mapper的输出;
 
BlockManagerMaster中有1个引用driverEndpoint,isDriver判断是否运行在Driver上。BlockManagerMaster源码如下:
1.          private[spark]
2.         class BlockManagerMaster(
3.             var driverEndpoint: RpcEndpointRef,
4.             conf: SparkConf,
5.             isDriver: Boolean)
6.           extends Logging {
 
BlockManagerMaster注册给SparkEnv,SparkEnv在SparkContext中。
SparkContext.scala源码:
1.            ......
2.           private var _env: SparkEnv = _
3.         ……
4.           _env = createSparkEnv(_conf, isLocal,listenerBus)
5.             SparkEnv.set(_env)
 
进入createSparkEnv方法:
1.           private[spark] def createSparkEnv(
2.               conf: SparkConf,
3.               isLocal: Boolean,
4.               listenerBus: LiveListenerBus): SparkEnv ={
5.             SparkEnv.createDriverEnv(conf, isLocal,listenerBus, SparkContext.numDriverCores(master))
6.           }
 
进入SparkEnv.scala的createDriverEnv方法:
1.              private[spark]def createDriverEnv(
2.             ……
3.             create(
4.               conf,
5.               SparkContext.DRIVER_IDENTIFIER,
6.               bindAddress,
7.               advertiseAddress,
8.               port,
9.               isLocal,
10.            numCores,
11.            ioEncryptionKey,
12.            listenerBus = listenerBus,
13.            mockOutputCommitCoordinator =mockOutputCommitCoordinator
14.          )
15.        }
16.      ……
 
SparkEnv.scala的createDriverEnv中调用了create方法,判断是否是Driver,进入create方法源码:
1.           private def create(
2.               conf: SparkConf,
3.               executorId: String,
4.               bindAddress: String,
5.               advertiseAddress: String,
6.               port: Int,
7.               isLocal: Boolean,
8.               numUsableCores: Int,
9.               ioEncryptionKey: Option[Array[Byte]],
10.            listenerBus: LiveListenerBus = null,
11.            mockOutputCommitCoordinator:Option[OutputCommitCoordinator] = None): SparkEnv = {
12.       
13.          val isDriver = executorId ==SparkContext.DRIVER_IDENTIFIER
14.               ......
15.          if (isDriver) {
16.            conf.set("spark.driver.port",rpcEnv.address.port.toString)
17.          } else if (rpcEnv.address != null) {
18.            conf.set("spark.executor.port",rpcEnv.address.port.toString)
19.            logInfo(s"Settingspark.executor.port to: ${rpcEnv.address.port.toString}")
20.          }
21.       ......
22.         val mapOutputTracker = if (isDriver) {
23.            new MapOutputTrackerMaster(conf,broadcastManager, isLocal)
24.          } else {
25.            new MapOutputTrackerWorker(conf)
26.          }
27.      ……
28.      SparkContext.scala
29.      private[spark] valDRIVER_IDENTIFIER = "driver"
30.      ……
 
SparkEnv.scala的createDriverEnv中new出来一个MapOutputTrackerMaster,MapOutputTrackerMaster的源码如下:
1.          private[spark] classMapOutputTrackerMaster(conf: SparkConf,
2.             broadcastManager: BroadcastManager,isLocal: Boolean)
3.           extends MapOutputTracker(conf) {
4.         ……
 
然后我们看一下blockManagerMaster。在SparkEnv.scala中,new出来一个blockManagerMaster:
1.              val blockManagerMaster = newBlockManagerMaster(registerOrLookupEndpoint(
2.               BlockManagerMaster.DRIVER_ENDPOINT_NAME,
3.               new BlockManagerMasterEndpoint(rpcEnv,isLocal, conf, listenerBus)),
4.               conf, isDriver)
BlockManagerMaster对整个集群的 Block数据进行管理,Block是Spark数据管理的单位,跟数据存储没有关系,数据可能存在磁盘上,也可能存储在内存中,也可能存储在offline如Alluxio上。源码如下:
1.         private[spark]
2.         class BlockManagerMaster(
3.             var driverEndpoint: RpcEndpointRef,
4.             conf: SparkConf,
5.             isDriver: Boolean)
6.           extends Logging {
7.         …..
在构建BlockManagerMaster的时候,new出来一个BlockManagerMasterEndpoint,这是循环消息体:
1.             private[spark]
2.         classBlockManagerMasterEndpoint(
3.             override val rpcEnv: RpcEnv,
4.             val isLocal: Boolean,
5.             conf: SparkConf,
6.             listenerBus: LiveListenerBus)
7.           extends ThreadSafeRpcEndpoint with Logging {
 
(2)BlockManagerMasterEndpoint本身是一个消息体,会负责通过远程消息通信的方式去管理所有节点的BlockManager;
查看一下WordCount在IDEA中的运行日志,日志中显示BlockManagerMasterEndpoint:Registering block manager,向block manager进行注册:
1.         ……
2.          17/06/06 05:38:02 INFO BlockManager: Usingorg.apache.spark.storage.RandomBlockReplicationPolicy for block replicationpolicy
3.         17/06/06 05:38:02 INFOBlockManagerMaster: Registering BlockManager BlockManagerId(driver,192.168.93.1, 63572, None)
4.         17/06/06 05:38:02 DEBUGDefaultTopologyMapper: Got a request for 192.168.93.1
5.         17/06/06 05:38:02 INFOBlockManagerMasterEndpoint: Registering block manager 192.168.93.1:63572 with637.2 MB RAM, BlockManagerId(driver, 192.168.93.1, 63572, None)
6.         17/06/06 05:38:02 INFOBlockManagerMaster: Registered BlockManager BlockManagerId(driver,192.168.93.1, 63572, None)
7.         17/06/06 05:38:02 INFOBlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.93.1,63572, None)
8.         …….
 
(3)每启动一个ExecutorBackend都会实例化BlockManager并通过远程通信的方式注册给BlockManagerMaster;实质上是Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint;
(4)MemoryStore是BlockManager中专门负责内存数据存储和读写的类;
查看一下WordCount在IDEA中的运行日志,日志中显示MemoryStore: Block broadcast_0 stored as values in memory,数据存储在内存中。
1.          …….
2.         17/06/06 05:38:04 INFOMemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.5KB, free 637.0 MB)
3.         17/06/06 05:38:04 INFOMemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size20.0 KB, free 637.0 MB)
4.         17/06/06 05:38:04 INFOBlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.93.1:63572(size: 20.0 KB, free: 637.2 MB)
5.         …….
 
Spark处理数据读写数据以block为单位的,MemoryStore将block数据存储在内存中。MemoryStore.scala源码:
1.          private[spark] class MemoryStore(
2.             conf: SparkConf,
3.             blockInfoManager: BlockInfoManager,
4.             serializerManager: SerializerManager,
5.             memoryManager: MemoryManager,
6.             blockEvictionHandler: BlockEvictionHandler)
7.           extends Logging {
8.         ……
 
(5)DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类;
1.          private[spark] classDiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
2.         .......
 
(6)DiskBlockManager:管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘的文件的创建、读写等;
查看一下WordCount在IDEA中的运行日志,日志中显示INFO DiskBlockManager: Created localdirectory,DiskBlockManager负责磁盘文件的管理。
1.          …..
2.         17/06/06 05:38:01 INFOBlockManagerMasterEndpoint: Usingorg.apache.spark.storage.DefaultTopologyMapper for getting topology information
3.         17/06/06 05:38:01 INFOBlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
4.         17/06/06 05:38:01 INFODiskBlockManager: Created local directory atC:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828872c98804
5.         17/06/06 05:38:01 DEBUGDiskBlockManager: Adding shutdown hook
6.         …….
DiskBlockManager负责管理逻辑级别和物理级别的映射关系,根据BlockID映射一个文件。在目录spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件进行hash生成。通过createLocalDirs 生成本地目录。DiskBlockManager源码:
1.          private[spark] class DiskBlockManager(conf: SparkConf,deleteFilesOnStop: Boolean) extends Logging {
2.         ……
3.         private defcreateLocalDirs(conf: SparkConf): Array[File] = {
4.             Utils.getConfiguredLocalDirs(conf).flatMap{ rootDir =>
5.               try {
6.                 val localDir =Utils.createDirectory(rootDir, "blockmgr")
7.                 logInfo(s"Created local directoryat $localDir")
8.                 Some(localDir)
9.               } catch {
10.              case e: IOException =>
11.                logError(s"Failed to createlocal dir in $rootDir. Ignoring this directory.", e)
12.                None
13.            }
14.          }
15.        }
 
 
从Job运行的角度来观察BlockManager:
   查看一下WordCount.scala的运行日志:日志中显示INFOBlockManagerInfo: Added broadcast_0_piece0 in memory,将BlockManagerInfo的广播变量加入到内存中。
1.         ……
2.         17/06/06 05:38:04 INFOMemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size20.0 KB, free 637.0 MB)
3.         17/06/06 05:38:04 INFOBlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.93.1:63572(size: 20.0 KB, free: 637.2 MB)
4.         ……
BlockManagerInfo是Driver中为了管理所有的ExecutorBackend中的BlockManager中的元数据而设立的。
BlockManagerMasterEndpoint.scala源码:
1.          private[spark] class BlockManagerInfo(
2.             val blockManagerId: BlockManagerId,
3.             timeMs: Long,
4.             val maxMem: Long,
5.             val slaveEndpoint: RpcEndpointRef)
6.           extends Logging {
BlockManagerInfo中:集群中每启动一个节点,就有一个BlockManager,BlockManagerId标明是哪个BlockManager。slaveEndpoint进行消息通信的。
(1)首先通过MemoryStore存储广播变量;
(2)在Driver中是通过BlockManagerInfo来管理集群中每个ExecutorBackend中的BlockManager中的元数据信息的;
(3)当改变了具体的ExecutorBackend上的Block信息后就必须发消息给Driver中的BlockManagerMaster来更新相应的BlockManagerInfo;
(4)当执行第二个Stage的时候,第二个Stage会向Driver中的MapOutputTrackerMasterEndpoint发消息请求上一个Stage中相应的输出,此时MapOutputTrackerMaster会把上一个Stage的输出数据的元数据信息发送当前请求的 Stage;
以下是BlockManager工作原理和运行机制简图:
 


图 8- 5 BlockManager工作原理和运行机制简图
 
BlockManagerMasterEndpoint.scala中BlockManagerInfo的getStatus方法:
1.           defgetStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))
其中的BlockStatus是一个case class:
1.            case class BlockStatus(storageLevel:StorageLevel, memSize: Long, diskSize: Long) {
2.           def isCached: Boolean = memSize + diskSize> 0
3.         }
BlockTransferService.scala是进行网络连接操作获取远程数据。
1.          private[spark]
2.         abstract classBlockTransferService extends ShuffleClient with Closeable with Logging {
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: