您的位置:首页 > 其它

深入理解Spark 2.1 Core (十四):securityManager 类源码分析

2017-06-28 11:11 531 查看
securityManager主要用于权限设置,比如在使用yarn作为资源调度框架时,用于生成secret key进行登录。该类默认只用一个实例,所以的app使用同一个实例,下面是该类的所有源代码:

[java]
view plain
copy

private[spark] class SecurityManager(sparkConf: SparkConf)  
  extends Logging with SecretKeyHolder {  
  
  // key used to store the spark secret in the Hadoop UGI  
  private val sparkSecretLookupKey = "sparkCookie"  
  
  private val authOn = sparkConf.getBoolean("spark.authenticate", false)  
  // keep spark.ui.acls.enable for backwards compatibility with 1.0  
  private var aclsOn =  
    sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))  
  
  // admin acls should be set before view or modify acls  
  private var adminAcls: Set[String] =  
    stringToSet(sparkConf.get("spark.admin.acls", ""))  
  
  private var viewAcls: Set[String] = _  
  
  // list of users who have permission to modify the application. This should  
  // apply to both UI and CLI for things like killing the application.  
  private var modifyAcls: Set[String] = _  
  
  // always add the current user and SPARK_USER to the viewAcls  
  private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),  
    Utils.getCurrentUserName())  
  
  setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))  
  setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))  
  
  private val secretKey = generateSecretKey()  
  logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +  
    "; ui acls " + (if (aclsOn) "enabled" else "disabled") +  
    "; users with view permissions: " + viewAcls.toString() +  
    "; users with modify permissions: " + modifyAcls.toString())  
  
  // Set our own authenticator to properly negotiate user/password for HTTP connections.  
  // This is needed by the HTTP client fetching from the HttpServer. Put here so its  
  // only set once.  
  if (authOn) {  
    Authenticator.setDefault(  
      new Authenticator() {  
        override def getPasswordAuthentication(): PasswordAuthentication = {  
          var passAuth: PasswordAuthentication = null  
          val userInfo = getRequestingURL().getUserInfo()  
          if (userInfo != null) {  
            val  parts = userInfo.split(":", 2)  
            passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())  
          }  
          return passAuth  
        }  
      }  
    )  
  }  
  
  // the default SSL configuration - it will be used by all communication layers unless overwritten  
  private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)  
  
  // SSL configuration for different communication layers - they can override the default  
  // configuration at a specified namespace. The namespace *must* start with spark.ssl.  
  val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))  
  val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))  
  
  logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")  
  logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")  
  
  val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {  
    val trustStoreManagers =  
      for (trustStore <- fileServerSSLOptions.trustStore) yield {  
        val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()  
  
        try {  
          val ks = KeyStore.getInstance(KeyStore.getDefaultType)  
          ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)  
  
          val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)  
          tmf.init(ks)  
          tmf.getTrustManagers  
        } finally {  
          input.close()  
        }  
      }  
  
    lazy val credulousTrustStoreManagers = Array({  
      logWarning("Using 'accept-all' trust manager for SSL connections.")  
      new X509TrustManager {  
        override def getAcceptedIssuers: Array[X509Certificate] = null  
  
        override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}  
  
        override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}  
      }: TrustManager  
    })  
  
    val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))  
    sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)  
  
    val hostVerifier = new HostnameVerifier {  
      override def verify(s: String, sslSession: SSLSession): Boolean = true  
    }  
  
    (Some(sslContext.getSocketFactory), Some(hostVerifier))  
  } else {  
    (None, None)  
  }  
  
  /** 
   * Split a comma separated String, filter out any empty items, and return a Set of strings 
   */  
  private def stringToSet(list: String): Set[String] = {  
    list.split(',').map(_.trim).filter(!_.isEmpty).toSet  
  }  
  
  /** 
   * Admin acls should be set before the view or modify acls.  If you modify the admin 
   * acls you should also set the view and modify acls again to pick up the changes. 
   */  
  def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {  
    viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))  
    logInfo("Changing view acls to: " + viewAcls.mkString(","))  
  }  
  
  def setViewAcls(defaultUser: String, allowedUsers: String) {  
    setViewAcls(Set[String](defaultUser), allowedUsers)  
  }  
  
  def getViewAcls: String = viewAcls.mkString(",")  
  
  /** 
   * Admin acls should be set before the view or modify acls.  If you modify the admin 
   * acls you should also set the view and modify acls again to pick up the changes. 
   */  
  def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {  
    modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))  
    logInfo("Changing modify acls to: " + modifyAcls.mkString(","))  
  }  
  
  def getModifyAcls: String = modifyAcls.mkString(",")  
  
  /** 
   * Admin acls should be set before the view or modify acls.  If you modify the admin 
   * acls you should also set the view and modify acls again to pick up the changes. 
   */  
  def setAdminAcls(adminUsers: String) {  
    adminAcls = stringToSet(adminUsers)  
    logInfo("Changing admin acls to: " + adminAcls.mkString(","))  
  }  
  
  def setAcls(aclSetting: Boolean) {  
    aclsOn = aclSetting  
    logInfo("Changing acls enabled to: " + aclsOn)  
  }  
  
  /** 
   * Generates or looks up the secret key. 
   * 
   * The way the key is stored depends on the Spark deployment mode. Yarn 
   * uses the Hadoop UGI. 
   * 
   * For non-Yarn deployments, If the config variable is not set 
   * we throw an exception. 
   */  
  private def generateSecretKey(): String = {  
    if (!isAuthenticationEnabled) return null  
    // first check to see if the secret is already set, else generate a new one if on yarn  
    val sCookie = if (SparkHadoopUtil.get.isYarnMode) {  
      val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)  
      if (secretKey != null) {  
        logDebug("in yarn mode, getting secret from credentials")  
        return new Text(secretKey).toString  
      } else {  
        logDebug("getSecretKey: yarn mode, secret key from credentials is null")  
      }  
      val cookie = akka.util.Crypt.generateSecureCookie  
      // if we generated the secret then we must be the first so lets set it so t  
      // gets used by everyone else  
      SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, cookie)  
      logInfo("adding secret to credentials in yarn mode")  
      cookie  
    } else {  
      // user must have set spark.authenticate.secret config  
      sparkConf.getOption("spark.authenticate.secret") match {  
        case Some(value) => value  
        case None => throw new Exception("Error: a secret key must be specified via the " +  
          "spark.authenticate.secret config")  
      }  
    }  
    sCookie  
  }  
  
  /** 
   * Check to see if Acls for the UI are enabled 
   * @return true if UI authentication is enabled, otherwise false 
   */  
  def aclsEnabled(): Boolean = aclsOn  
  
  /** 
   * Checks the given user against the view acl list to see if they have 
   * authorization to view the UI. If the UI acls are disabled 
   * via spark.acls.enable, all users have view access. If the user is null 
   * it is assumed authentication is off and all users have access. 
   * 
   * @param user to see if is authorized 
   * @return true is the user has permission, otherwise false 
   */  
  def checkUIViewPermissions(user: String): Boolean = {  
    logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +  
      viewAcls.mkString(","))  
    !aclsEnabled || user == null || viewAcls.contains(user)  
  }  
  
  /** 
   * Checks the given user against the modify acl list to see if they have 
   * authorization to modify the application. If the UI acls are disabled 
   * via spark.acls.enable, all users have modify access. If the user is null 
   * it is assumed authentication isn't turned on and all users have access. 
   * 
   * @param user to see if is authorized 
   * @return true is the user has permission, otherwise false 
   */  
  def checkModifyPermissions(user: String): Boolean = {  
    logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +  
      modifyAcls.mkString(","))  
    !aclsEnabled || user == null || modifyAcls.contains(user)  
  }  
  
  
  /** 
   * Check to see if authentication for the Spark communication protocols is enabled 
   * @return true if authentication is enabled, otherwise false 
   */  
  def isAuthenticationEnabled(): Boolean = authOn  
  
  /** 
   * Gets the user used for authenticating HTTP connections. 
   * For now use a single hardcoded user. 
   * @return the HTTP user as a String 
   */  
  def getHttpUser(): String = "sparkHttpUser"  
  
  /** 
   * Gets the user used for authenticating SASL connections. 
   * For now use a single hardcoded user. 
   * @return the SASL user as a String 
   */  
  def getSaslUser(): String = "sparkSaslUser"  
  
  /** 
   * Gets the secret key. 
   * @return the secret key as a String if authentication is enabled, otherwise returns null 
   */  
  def getSecretKey(): String = secretKey  
  
  // Default SecurityManager only has a single secret key, so ignore appId.  
  override def getSaslUser(appId: String): String = getSaslUser()  
  override def getSecretKey(appId: String): String = getSecretKey()  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐