您的位置:首页 > 其它

深入理解Spark 2.1 Core (十三):sparkEnv类源码分析

2017-06-28 11:09 567 查看
sparkEnv为运行的Spark实例(master,worker,executor等)持有运行环境相关的对象,sparkenv管理serializer, Akka actor system,
block manager, map output tracker等对象。sparkEnv主要被内部使用,后面可能仅供内部使用。sparkEnv最重要的方法是createDriverEnv方法,该方法有三个参数: conf: SparkConf,;isLocal:Boolean; listenerBus: LiveListenerBus。LiveListenerBus以监听器方式监听各种事件并处理。

[java]
view plain
copy

private[spark] def createDriverEnv(  
    conf: SparkConf,  
    isLocal: Boolean,  
    listenerBus: LiveListenerBus,  
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {  
  assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")  
  assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")  
  val hostname = conf.get("spark.driver.host")  
  val port = conf.get("spark.driver.port").toInt  
  create(  
    conf,  
    SparkContext.DRIVER_IDENTIFIER,  
    hostname,  
    port,  
    isDriver = true,  
    isLocal = isLocal,  
    listenerBus = listenerBus,  
    mockOutputCommitCoordinator = mockOutputCommitCoordinator  
  )  
}  

上述方法最后调用create方法来创建:主要创建securityManager、ActorSystem、mapOutputTracker、ShuffleManager、ShuffleMemoryManger、BlockTranferService、BlockManagerMaster,BlockManager、BroadCastManager、CacheManager、HttpFileServer、metricssystem:

[java]
view plain
copy

private def create(  
    conf: SparkConf,  
    executorId: String,  
    hostname: String,  
    port: Int,  
    isDriver: Boolean,  
    isLocal: Boolean,  
    listenerBus: LiveListenerBus = null,  
    numUsableCores: Int = 0,  
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {  
  
  // Listener bus is only used on the driver  
  if (isDriver) {  
    assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")  
  }  
  //创建安全管理器  
  val securityManager = new SecurityManager(conf)  
  
  // Create the ActorSystem for Akka and get the port it binds to.  
  //创建基于akka的分布式消息系统ActorSystem  
  val (actorSystem, boundPort) = {  
    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName  
    AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)  
  }  
  
  // Figure out which port Akka actually bound to in case the original port is 0 or occupied.  
  if (isDriver) {  
    conf.set("spark.driver.port", boundPort.toString)  
  } else {  
    conf.set("spark.executor.port", boundPort.toString)  
  }  
  
  // Create an instance of the class with the given name, possibly initializing it with our conf  
  def instantiateClass[T](className: String): T = {  
    val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)  
    // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just  
    // SparkConf, then one taking no arguments  
    try {  
      cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)  
        .newInstance(conf, new java.lang.Boolean(isDriver))  
        .asInstanceOf[T]  
    } catch {  
      case _: NoSuchMethodException =>  
        try {  
          cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]  
        } catch {  
          case _: NoSuchMethodException =>  
            cls.getConstructor().newInstance().asInstanceOf[T]  
        }  
    }  
  }  
  
  // Create an instance of the class named by the given SparkConf property, or defaultClassName  
  // if the property is not set, possibly initializing it with our conf  
  def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {  
    instantiateClass[T](conf.get(propertyName, defaultClassName))  
  }  
  
  val serializer = instantiateClassFromConf[Serializer](  
    "spark.serializer", "org.apache.spark.serializer.JavaSerializer")  
  logDebug(s"Using serializer: ${serializer.getClass}")  
  
  val closureSerializer = instantiateClassFromConf[Serializer](  
    "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")  
  
  def registerOrLookup(name: String, newActor: => Actor): ActorRef = {  
    if (isDriver) {  
      logInfo("Registering " + name)  
      actorSystem.actorOf(Props(newActor), name = name)  
    } else {  
      AkkaUtils.makeDriverRef(name, conf, actorSystem)  
    }  
  }  
  //创建mapOutputTracker  
  val mapOutputTracker =  if (isDriver) {  
    new MapOutputTrackerMaster(conf)  
  } else {  
    new MapOutputTrackerWorker(conf)  
  }  
  
  // Have to assign trackerActor after initialization as MapOutputTrackerActor  
  // requires the MapOutputTracker itself  
  mapOutputTracker.trackerActor = registerOrLookup(  
    "MapOutputTracker",  
    new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))  
  
  // Let the user specify short names for shuffle managers  
  val shortShuffleMgrNames = Map(  
    "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",  
    "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")  
  val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")  
  val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)  
  val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)  
  
  val shuffleMemoryManager = new ShuffleMemoryManager(conf)  
  
  val blockTransferService =  
    conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {  
      case "netty" =>  
        new NettyBlockTransferService(conf, securityManager, numUsableCores)  
      case "nio" =>  
        new NioBlockTransferService(conf, securityManager)  
    }  
  
  val blockManagerMaster = new BlockManagerMaster(registerOrLookup(  
    "BlockManagerMaster",  
    new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)  
  
  // NB: blockManager is not valid until initialize() is called later.  
  val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,  
    serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,  
    numUsableCores)  
  
  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)  
  
  val cacheManager = new CacheManager(blockManager)  
  
  val httpFileServer =  
    if (isDriver) {  
      val fileServerPort = conf.getInt("spark.fileserver.port", 0)  
      val server = new HttpFileServer(conf, securityManager, fileServerPort)  
      server.initialize()  
      conf.set("spark.fileserver.uri",  server.serverUri)  
      server  
    } else {  
      null  
    }  
  
  val metricsSystem = if (isDriver) {  
    // Don't start metrics system right now for Driver.  
    // We need to wait for the task scheduler to give us an app ID.  
    // Then we can start the metrics system.  
    MetricsSystem.createMetricsSystem("driver", conf, securityManager)  
  } else {  
    // We need to set the executor ID before the MetricsSystem is created because sources and  
    // sinks specified in the metrics configuration file will want to incorporate this executor's  
    // ID into the metrics they report.  
    conf.set("spark.executor.id", executorId)  
    val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)  
    ms.start()  
    ms  
  }  
  
  // Set the sparkFiles directory, used when downloading dependencies.  In local mode,  
  // this is a temporary directory; in distributed mode, this is the executor's current working  
  // directory.  
  val sparkFilesDir: String = if (isDriver) {  
    Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath  
  } else {  
    "."  
  }  
  
  // Warn about deprecated spark.cache.class property  
  if (conf.contains("spark.cache.class")) {  
    logWarning("The spark.cache.class property is no longer being used! Specify storage " +  
      "levels using the RDD.persist() method instead.")  
  }  
  
  val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {  
    new OutputCommitCoordinator(conf)  
  }  
  val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",  
    new OutputCommitCoordinatorActor(outputCommitCoordinator))  
  outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)  
  
  new SparkEnv(  
    executorId,  
    actorSystem,  
    serializer,  
    closureSerializer,  
    cacheManager,  
    mapOutputTracker,  
    shuffleManager,  
    broadcastManager,  
    blockTransferService,  
    blockManager,  
    securityManager,  
    httpFileServer,  
    sparkFilesDir,  
    metricsSystem,  
    shuffleMemoryManager,  
    outputCommitCoordinator,  
    conf)  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐