您的位置:首页 > Web前端 > CSS

Spark测量系统 MetricsSystem

2016-12-30 17:26 288 查看
Spark测量系统,由指定的instance创建,由source、sink组成,周期性地从source获取指标然后发送到sink,其中instance、source、sink的概念如下:
Instance:指定了谁在使用测量系统,在spark中有一些列如master、worker、executor、client driver这些角色,这些角色创建测量系统用于监控spark状态,目前在spark中已经实现的角色包括master、worker、executor、driver、applications

Source:指定了从哪里收集测量数据。在Spark测量系统中有两种来源:

(1) Spark内部指标,比如MasterSource、WorkerSource等,这些源将会收集Spark组件的内部状态

(2) 普通指标,比例JvmSource,通过配置文件进行配置

Sink:指定了往哪里输出测量数据

private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {

private[this] val confFile = conf.get("spark.metrics.conf", null)
private[this] val metricsConfig = new MetricsConfig(Option(confFile))

private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()

MetricsSystem的启动过程包括以下步骤:

1) 注册Sources;

2) 注册Sinks;

3) 给Sinks增加Jetty的ServletContextHandler。

 

注册Sources

registerSources方法用于注册Sources,告诉测量系统从哪里收集测量数据,注册Sources的过程分为以下步骤:

1) 从metricsConfig获取Driver的Properties。

2) 用正则匹配Driver的Properties中以source.开头的属性。然后将属性中的Source反射得到的实例加入ArrayBuffer[Source]

3) 将每个souce的metricsRegistry注册到ConcurrentMap metrics。

private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}

注册Sinks

registerSinks方法用于注册Sinks,即告诉测量系统MetricsSystem往哪里输出测量数据,注册Sinks的步骤如下:

1) 从Driver的Properties中用正则匹配以sink.开头的属性。

2) 将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception => {
logError("Sink class " + classPath + " cannot be instantialized")
throw e
}
}
}
}
}

给Sinks增加Jetty的ServletContextHandler

为了能够在SparkUI访问到测量数据,所以需要给Sinks增加Jetty的ServletContextHandler,主要用MetricsSystem得getServletHandlers方法实现。
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers).getOrElse(Array())
}


参考文章链接:http://blog.sina.com.cn/s/blog_9ca9623b0102weqi.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: