您的位置:首页 > 运维架构 > Apache

spark-core_11:org.apache.spark.deploy.master.Master源码解析3--MasterWebUI(MasterRpcEndPoint,8080)初始化web

2018-03-26 21:35 519 查看
承接上文
分析一下newMasterWebUI(MasterRpcEndPoint,8080):它的主要作用就是将每个页面的html以scala.xml.Node的形势封装放在serlvet中,然后再将servlet放到servletContextHandler中,供jetty.Server使用/**
 * Web UI server for the standalonemaster.
  * 实现抽象类WebUI的子类有:MasterWebUI,HistoryServer,MesosClusterUI, SparkUI, WorkerWebUI。这些都是真正提供web ui服务的。
 */
private[master]
class MasterWebUI(
    val master:Master,
    requestedPort: Int,
    customMasterPage: Option[MasterPage] = None)
  extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging
  with UIRoot{

  val masterEndpointRef= master.self
  val killEnabled= master.conf.getBoolean("spark.ui.killEnabled", true)

  val masterPage= customMasterPage.getOrElse(new MasterPage(this))
  initialize()
  /** Initialize all components of the server.
    * 初始化Master UI的web组件: ServletContextHandler
    * ===
    * MasterPage、ApplicationPage、HistoryNotFoundPage他们父类是
    * WebUIPage有两个抽象函数render(request: HttpServletRequest): Seq[Node]和renderJson(request: HttpServletRequest): JValue。
    * 它两个方法都被attachPage()里面的方法,进行调用
    * */
  def initialize() {
    //MasterPage路径: / ,可以跟进去看WebUIPage主构造参数就是它的路径
    val masterPage= new MasterPage(this)  //MasterPage:渲染workers, 活跃的appliction,活跃的drivers
    //ApplicationPage路径:/app
    attachPage(new ApplicationPage(this)) //ApplicationPage根据request中的appId参数, 展示这个活跃的application
    attachPage(new HistoryNotFoundPage(this)) //HistoryNotFoundPage路径 /history/not-found
    attachPage(masterPage)
    //加载静态资源, 见这里:resources/org/apache/spark/ui/static/
    attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(ApiRootResource.getServletHandler(this))
    //把这俩kill请求转发给master page处理, Master异步kill,也就是从页面上可以停止app和driver, 路径:/app/kill, /driver/kill
    attachHandler(createRedirectHandler(
      "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
    attachHandler(createRedirectHandler(
      "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods= Set("POST")))
  }
1,分析一下new MasterPage(this),它的render()方法用scala.xml的方式将html构造出来/**
  * WebUIPage有两个抽象函数render(request:HttpServletRequest): Seq[Node]和renderJson(request:HttpServletRequest): JValue。
  * 需要注意的是成员参数prefix,这个就是这个页面对应的http path了
  * 看参数可以猜出, 接收servlet request, 处理请求, 返回Node或者Json结果。
  * 它的子类有MasterPage、ApplicationPage等
  */
private[ui] class MasterPage(parent:MasterWebUI) extends WebUIPage("") {
  private val master =parent.masterEndpointRef

 ….
  /** Index view listing applications and executors
    * 一大堆的html代码, 补上所需的变量, 加上公共部分页头UIUtils.basicSparkPage,
4000
就是一个完整的渲染后的页面了
    * 它会被MasterWebUI初始化后,被initialize()方法的attachPage()调用==>再被createServletHandler方法调用(它是创建jetty的servletContextHandler用的)
    * ==》然后这个过程使用createServlet创建servlet
    * */
  def render(request: HttpServletRequest): Seq[Node] = {
    val state= getMasterState
    //worker列表上的列字段
    val workerHeaders= Seq("WorkerId", "Address", "State", "Cores", "Memory")
    val workers= state.workers.sortBy(_.id)
    val aliveWorkers= state.workers.filter(_.state== WorkerState.ALIVE)
    val workerTable= UIUtils.listingTable(workerHeaders, workerRow, workers)

    val appHeaders= Seq("ApplicationID", "Name", "Cores", "Memory per Node", "Submitted Time",
      "User", "State", "Duration")
    val activeApps= state.activeApps.sortBy(_.startTime).reverse
    val activeAppsTable= UIUtils.listingTable(appHeaders, appRow, activeApps)
    val completedApps= state.completedApps.sortBy(_.endTime).reverse
    val completedAppsTable= UIUtils.listingTable(appHeaders, appRow, completedApps)

    val driverHeaders= Seq("SubmissionID", "Submitted Time", "Worker", "State", "Cores",
      "Memory", "MainClass")
    val activeDrivers= state.activeDrivers.sortBy(_.startTime).reverse
    val activeDriversTable= UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
    val completedDrivers= state.completedDrivers.sortBy(_.startTime).reverse
    val completedDriversTable= UIUtils.listingTable(driverHeaders, driverRow, completedDrivers)

    // For now we only show driver information if the userhas submitted drivers to the cluster.
    // This is until we integrate thenotion of drivers and applications in the UI.
    def hasDrivers: Boolean =activeDrivers.length > 0 || completedDrivers.length > 0
    //scala的xml和函数一样是一等公民
    //使用花括号操作scala成员
    val content=
        <div class="row-fluid">
          <div class="span12">
            <ul class="unstyled">
              <li><strong>URL:</strong> {state.uri}</li>
              {
                state.restUri.map { uri=>
                  <li>
                    <strong>REST URL:</strong> {uri}
                    <span class="rest-uri"> (cluster mode)</span>
                  </li>
                }.getOrElse { Seq.empty}
              }
              。。。。            </ul>
          </div>
        </div>

        。。。。。       </div>;
    UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
  }
2,再回MasterWebUI的initialize()中attachPage(masterPage)/** Attach apage to this UI.
  * WebUIPage的子类有:MasterPage\ApplicationPage\HistoryNotFoundPage
  * */
def attachPage(page: WebUIPage) {
  //pagePath是servlet的访问路径
  val pagePath= "/" + page.prefix
  val renderHandler= createServletHandler(pagePath,
    (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath)
  val renderJsonHandler= createServletHandler(pagePath.stripSuffix("/") + "/json",
    (request: HttpServletRequest) =>page.renderJson(request), securityManager, conf, basePath)
  attachHandler(renderHandler)
  attachHandler(renderJsonHandler)
  pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
    .append(renderHandler)
}
===> createServletHandler()就是用创建ServletContextHandler/** Create acontext handler that responds to a request with the given path prefix
  * 传进来的第一个参数就是servlet的url
  * 第二个参数是(request:HttpServletRequest) => page.render(request)。它变成ServletParams是因为有隐式转换,private[spark] object JettyUtils extendsLogging {
  type Responder[T] =HttpServletRequest => T
  class ServletParams[T <% AnyRef](valresponder: Responder[T],
    val contentType: String,
    val extractFn: T => String= (in: Any) => in.toString) {}

  implicit def htmlResponderToServlet(responder:Responder[Seq[Node]]): ServletParams[Seq[Node]] =
    new ServletParams(responder,"text/html", (in: Seq[Node]) => "<!DOCTYPE html>"+ in.toString)将这个匿名函数隐式成了ServletParams目的是为了得到ServletContextHandler,给jetty的server使用
  * */
def createServletHandler[T <% AnyRef](
    path: String,
    servletParams: ServletParams[T],
    securityMgr: SecurityManager,
    conf: SparkConf,
    basePath: String= ""): ServletContextHandler = {
  createServletHandler(path, createServlet(servletParams, securityMgr, conf), basePath)
}/** Create acontext handler that responds to a request with the given path prefix
  * 真正创建ServletContextHandler的地方,这个ServletContextHandler,可以按setContextPath()设置上下文路径
  * */
def createServletHandler(
    path: String,
    servlet: HttpServlet,
    basePath: String): ServletContextHandler = {
  val prefixedPath= if (basePath == "" && path== "/") {
    path
  } else {
    //将传进来是项目路径最后一个“/”去掉
    (basePath + path).stripSuffix("/")
  }
  val contextHandler= new ServletContextHandler
  val holder= new ServletHolder(servlet) //ServletHolder就是用来承载servlet
  contextHandler.setContextPath(prefixedPath)
  contextHandler.addServlet(holder, "/")  //即通过prefixedPath+“/”就可以访问到这个servlet
  contextHandler

===》再看一下createServlet(servletParams, securityMgr, conf)是如何创建Servlet的def createServlet[T <% AnyRef](
    servletParams: ServletParams[T],
    securityMgr: SecurityManager,
    conf: SparkConf): HttpServlet = {
  // SPARK-10589 avoid frame-related click-jackingvulnerability, using X-Frame-Options
  // (seehttp://tools.ietf.org/html/rfc7034). By default allow framing only from the
  // same origin, but allow framing for aspecific named URI.
  // Example: spark.ui.allowFramingFrom =https://example.com/
  //X-Frame-Options用于设置页面是否可以被放在iframe中,其中SAMEORIGIN表示只能本网站进行iframe
  /**
    * Java代码:
     response.addHeader("x-frame-options","SAMEORIGIN");
      Nginx配置:
      add_header X-Frame-OptionsSAMEORIGIN
      Apache配置:
      Header always appendX-Frame-Options SAMEORIGIN
    */
  val allowFramingFrom= conf.getOption("spark.ui.allowFramingFrom")
  val xFrameOptionsValue=
    allowFramingFrom.map(uri => s"ALLOW-FROM $uri").getOrElse("SAMEORIGIN")

  new HttpServlet{
    override def doGet(request:HttpServletRequest, response: HttpServletResponse) {
      try {
        //默认是true,SecurityManager是关于用户权限和ssl相关的管理类
        if (securityMgr.checkUIViewPermissions(request.getRemoteUser)){
          //这个ServletParams是隐式转换过来的,如果"text/html"表示解析成html,text/plain解析成源码,text/json解析成json格式
          response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
          response.setStatus(HttpServletResponse.SC_OK) //响应状态
          //调用(request:HttpServletRequest) => page.render(request),将会得到Seq[scala.xml.Node]
          val result = servletParams.responder(request)
          response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
          response.setHeader("X-Frame-Options", xFrameOptionsValue)
          // scalastyle:off println
          //通过response管道输出Seq[scala.xml.Node],extractFn():可以将Seq[Node]变成字符串
          response.getWriter.println(servletParams.extractFn(result))
          // scalastyle:on println
        } else{
         response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
          response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
         response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
            "User is not authorized to access this page.")
        }
      } catch {
        case e: IllegalArgumentException =>
         response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
        case e: Exception =>
         logWarning(s"GET ${request.getRequestURI} failed: $e", e)
          throw e
      }
    }
    // SPARK-5983 ensure TRACE is not supported
    protected override def doTrace(req:HttpServletRequest, res: HttpServletResponse): Unit = {
      res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
    }
  }
}
3,MasterWebUI的initialize()中attachPage(masterPage){…attachHandler(renderHandler)。。}这个attachHandler()就是将返回的ServletContextHandler加到一个ArrayBuffer[ServletContextHandler]
====》到此,MasterWebUI初始化代码就结束了,还回到Master中webUi.bind() //启动一个jetty ,生成server实例, 监听web端口, jetty server绑定handler五、看如何启动的jetty.Server的/** Bind to theHTTP server behind this web interface. */
def bind() {
  assert(!serverInfo.isDefined, "Attempted to bind %s more thanonce!".format(className))
  try {
    serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
    logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
  } catch {
    case e: Exception =>
      logError("Failed to bind %s".format(className), e)
      System.exit(1)
  }
}
   /**
   * 尝试使用给定的上下文处理程序启动绑定到提供的hostName:port的Jetty服务器。   如果所需的端口号被争用,则继续增加端口直到找到空闲端口。 返回码头服务器对象,选定的端口以及可变的处理程序集合。
   */
  def startJettyServer(
      hostName: String,
      port: Int,
      handlers: Seq[ServletContextHandler],
      conf: SparkConf,
      serverName: String = ""): ServerInfo = {

    addFilters(handlers, conf)
    //这个ContextHandlerCollection会对每个Handler进行执行,即便某个Handler失败,后面的Handler也会执行
    val collection= new ContextHandlerCollection
    /**
      * 压缩内容可以极大地提高网络带宽使用率,但是会以内存和CPU周期为代价。如果此处理程序用于静态内容, 那么可以避免使用高效的直接NIO,因此建议使用<code>org.eclipse.jetty.servlet.DefaultServlet</ code>的gzip机制
      */
    val gzipHandlers = handlers.map { h =>
      val gzipHandler= new GzipHandler
      gzipHandler.setHandler(h)
      gzipHandler
    }
   collection.setHandlers(gzipHandlers.toArray)

    // Bind to the given port, or throw ajava.net.BindException if the port is occupied
    def connect(currentPort: Int):(Server, Int) = {
      //给server设置InetSocketAddress对应的host及port
      val server= new Server(new InetSocketAddress(hostName, currentPort))
      val pool= new QueuedThreadPool
      pool.setDaemon(true)
      server.setThreadPool(pool) //给server定义联接池
      val errorHandler= new ErrorHandler()
      errorHandler.setShowStacks(true)
      server.addBean(errorHandler)
      server.setHandler(collection)
      try {
        server.start()
        (server, server.getConnectors.head.getLocalPort)
      } catch {
        case e: Exception =>
          server.stop()
          pool.stop()
          throw e
      }
    }
    //就是将上面的(server, server.getConnectors.head.getLocalPort)返回
    val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
    ServerInfo(server, boundPort, collection)
  }
}
到此JettyServer启动结束,再回到Master中看如何清理超时的Woker及zk是如何选举
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: