Spark测量系统 MetricsSystem
2016-12-30 17:26
288 查看
Spark测量系统,由指定的instance创建,由source、sink组成,周期性地从source获取指标然后发送到sink,其中instance、source、sink的概念如下:
Instance:指定了谁在使用测量系统,在spark中有一些列如master、worker、executor、client driver这些角色,这些角色创建测量系统用于监控spark状态,目前在spark中已经实现的角色包括master、worker、executor、driver、applications
Source:指定了从哪里收集测量数据。在Spark测量系统中有两种来源:
(1) Spark内部指标,比如MasterSource、WorkerSource等,这些源将会收集Spark组件的内部状态
(2) 普通指标,比例JvmSource,通过配置文件进行配置
Sink:指定了往哪里输出测量数据
private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
private[this] val confFile = conf.get("spark.metrics.conf", null)
private[this] val metricsConfig = new MetricsConfig(Option(confFile))
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()
MetricsSystem的启动过程包括以下步骤:
1) 注册Sources;
2) 注册Sinks;
3) 给Sinks增加Jetty的ServletContextHandler。
注册Sources
registerSources方法用于注册Sources,告诉测量系统从哪里收集测量数据,注册Sources的过程分为以下步骤:
1) 从metricsConfig获取Driver的Properties。
2) 用正则匹配Driver的Properties中以source.开头的属性。然后将属性中的Source反射得到的实例加入ArrayBuffer[Source]
3) 将每个souce的metricsRegistry注册到ConcurrentMap metrics。
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
注册Sinks
registerSinks方法用于注册Sinks,即告诉测量系统MetricsSystem往哪里输出测量数据,注册Sinks的步骤如下:
1) 从Driver的Properties中用正则匹配以sink.开头的属性。
2) 将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception => {
logError("Sink class " + classPath + " cannot be instantialized")
throw e
}
}
}
}
}
给Sinks增加Jetty的ServletContextHandler
为了能够在SparkUI访问到测量数据,所以需要给Sinks增加Jetty的ServletContextHandler,主要用MetricsSystem得getServletHandlers方法实现。
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers).getOrElse(Array())
}
参考文章链接:http://blog.sina.com.cn/s/blog_9ca9623b0102weqi.html
Instance:指定了谁在使用测量系统,在spark中有一些列如master、worker、executor、client driver这些角色,这些角色创建测量系统用于监控spark状态,目前在spark中已经实现的角色包括master、worker、executor、driver、applications
Source:指定了从哪里收集测量数据。在Spark测量系统中有两种来源:
(1) Spark内部指标,比如MasterSource、WorkerSource等,这些源将会收集Spark组件的内部状态
(2) 普通指标,比例JvmSource,通过配置文件进行配置
Sink:指定了往哪里输出测量数据
private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
private[this] val confFile = conf.get("spark.metrics.conf", null)
private[this] val metricsConfig = new MetricsConfig(Option(confFile))
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()
MetricsSystem的启动过程包括以下步骤:
1) 注册Sources;
2) 注册Sinks;
3) 给Sinks增加Jetty的ServletContextHandler。
注册Sources
registerSources方法用于注册Sources,告诉测量系统从哪里收集测量数据,注册Sources的过程分为以下步骤:
1) 从metricsConfig获取Driver的Properties。
2) 用正则匹配Driver的Properties中以source.开头的属性。然后将属性中的Source反射得到的实例加入ArrayBuffer[Source]
3) 将每个souce的metricsRegistry注册到ConcurrentMap metrics。
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
注册Sinks
registerSinks方法用于注册Sinks,即告诉测量系统MetricsSystem往哪里输出测量数据,注册Sinks的步骤如下:
1) 从Driver的Properties中用正则匹配以sink.开头的属性。
2) 将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception => {
logError("Sink class " + classPath + " cannot be instantialized")
throw e
}
}
}
}
}
给Sinks增加Jetty的ServletContextHandler
为了能够在SparkUI访问到测量数据,所以需要给Sinks增加Jetty的ServletContextHandler,主要用MetricsSystem得getServletHandlers方法实现。
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers).getOrElse(Array())
}
参考文章链接:http://blog.sina.com.cn/s/blog_9ca9623b0102weqi.html
相关文章推荐
- Spark源码剖析——SparkContext的初始化(九)_启动测量系统MetricsSystem
- spark学习-44-Spark的测量系统MetricsSystem
- SparkContext的初始化(季篇)——测量系统、ContextCleaner及环境更新
- bootchart工具在Android系统开机测量中的应用
- bootchart工具在Android系统开机测量中的应用
- MLBase:Spark生态圈里的分布式机器学习系统
- 集群计算系统Spark——安装
- bootchart工具在Android系统开机测量中的应用
- 基于 C8051F 的智能测量系统
- Spark:一个高效的分布式计算系统
- 存储系统----测量性能(1)
- 温(湿)度测量系统主机简介
- Spark:一个高效的分布式计算系统
- Spark:一个高效的分布式计算系统
- Spark:一个高效的分布式计算系统
- 【转】Spark:一个高效的分布式计算系统
- TE维氏硬度图像测量系统
- 18、深入理解计算机系统笔记:测量程序执行时间
- bootchart工具在Android系统开机测量中的应用(二)之问题解决
- Spark:一个高效的分布式计算系统