Flink per-Job模式InfluxdbReporter上报JobName
最近将Flink集群从1.6升级到1.8,主要是为了使用1.8的两个特性:一个是universal kafka ,另外一个是rocksdb ttl, 然后注意到1.8 提供了Influxdb 的reporter, 在最开始1.6使用的rest api方式主动请求对应的metric, 使用这种方式目前有两个弊端:
前期使用metric比较少,自己通过开发图表展示,但是后期需要新的metric 都需要开发一次
客户端使用轮询的方式去请求,如果任务比较多就会造成一定延时,并且实时平台在做高可用情况下,涉及定时的切换,给系统开发带来一定复杂性
面对这两个问题选择了influxdb+grafana的方式,也应该是很多公司选择的方案,当然也有选择Prometheus 的,接下来介绍一下将flink metric 上报influxdb部署的步骤:
将flink安装包下面opt目录的flink-metrics-influxdb-1.8.2.jar包拷贝至lib目录下面
在flink-conf.yaml 中增加配置,
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host: localhost //influxdb服务所在的地址metrics.reporter.influxdb.port:8086//influxdb 端口metrics.reporter.influxdb.db: flink //influxdb库名称metrics.reporter.influxdb.username: flink-metricsmetrics.reporter.influxdb.password: qwerty
至此部署已经完成,接下来提交一个任务到集群中去,在grafana做一些图表展示,在influxdb中会自动生成很多measurement也就是表,选择taskmanager_Status_JVM_CPU_Load_value 表查看,发现其写入的Tags只有host与tm_id的信息,查看jobmanager_Status_JVM_CPU_Load_value表,发现其写入的Tags只有host信息,
taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",tm_id="container_e03_1573197073760_0134_01_000006"}jobmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx"}
如果这样,那么对于flink on yarn perjob模式就没有办法区别当前指标到底属于哪一个任务,然后查看其它表例如numReocrdsIn包含了job_name/job_id 信息,导致同一个任务的不同metric写入的tags不同是为什么呢?果断看下InfluxdbReporter源码,可以发现在InfluxdbReporter的继承类AbstractReporter中的notifyOfAddedMetric方法,每一个metric在被添加时其metric也就被确定了,其tags由MeasurementInfoProvider的getTags方法来获取:
privatestaticMap<String,String> getTags(MetricGroup group){// Keys are surrounded by brackets: remove them, transforming "<name>" to "name".Map<String,String> tags =newHashMap<>();for(Map.Entry<String,String> variable: group.getAllVariables().entrySet()){String name = variable.getKey(); tags.put(name.substring(1, name.length()-1), variable.getValue());}return tags;}
每一个metric的MetricGroup都是不同的,所以导致了上面观察到的现象,现在想每一个metric都包含job_name/job_id信息,我们可以将包含job_name/job_id 信息的提取出来添加到其他metric的tags的,需要改写一下源码,实现方式如下:
在AbstractReporter中定义如下变量
privatefinalString JOB_NAME_LABEL ="job_name";privatefinalString JOB_ID_LABEL ="job_id";protectedString jobName;protectedString jobId;protectedMap<String,String> jobInfo =newHashMap<>();
在AbstractReporter增加一个能够在InfluxdbReporter获取jobInfo方法
protectedAbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider){this.metricInfoProvider = metricInfoProvider;}
在notifyOfAddedMetric方中新增获取job_name/job_id的逻辑
MeasurementInfo measurementInfo =((MeasurementInfo) metricInfo);Map<String,String> tags = measurementInfo.getTags();if(StringUtils.isBlank(jobName)){ jobId = tags.get(JOB_NAME_LABEL);}else{ jobInfo.put(JOB_NAME_LABEL, jobName);}if(StringUtils.isBlank(jobId)){ jobName = tags.get(JOB_ID_LABEL);}else{ jobInfo.put(JOB_ID_LABEL, jobId);}
2. 看一下InfluxdbReporter的上报方法report中,其通过buildReport构造了BatchPoints,每个Point的构造又是通过MetricMapper的map方法构造,最终调用了MetricMapper的builder方法,那么就需要将jobInfo添加到tags中,改造如下:
InfluxdbReporter的buildReport方法中
report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey(),getJobInfo()));
那么需要在MetricMapper对应map方法中新增Map jobInfo参数,需要将这个参数传入到builder方法中
privatestaticPoint.Builder builder(MeasurementInfo info,Instant timestamp,Map<String,String> jobInfo){Map<String,String> tags=info.getTags(); tags.putAll(jobInfo);returnPoint.measurement(info.getName()).tag(tags).time(timestamp.toEpochMilli(),TimeUnit.MILLISECONDS);}
至此改造全部完成,然后重新打包上传,看一下结果
taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",job_id="d9f2524eeb590a61628560d8677a1b23",job_name="test",tm_id="container_e03_1573197073760_0141_01_000003"}
已经满足我们预期的结果,接下来就可以通过配置job_name或者job_id条件筛选,查看想要的metric。
- Flink JobManager HA模式部署(基于Standalone)
- Flink JobManager HA模式部署(基于Standalone)
- flink1.10版local模式提交job流程分析
- hadoop 配置文件 masters 以及 namenode, jobtracker, secondary namenode
- Hadoop系列之Reporter,Partitioner,JobConf, JobClient
- Unity3D在WebPlayer模式下的异常上报探索
- Flink源码第一篇:Flink之Job启动流程
- Flink的standalone 模式简单部署
- Flink job cluster on Kubernetes
- Spring自动装配模式二:byName的解析
- Flink 源码解析 —— Standalone session 模式启动流程
- Flink的Job启动TaskManager端(源码分析)
- unistd.h 中int access(const char * pathname, int mode); 判断进程能否以mode模式访问pathname文件(可以用来判断文件/目录是否存在)
- Unity3D在WebPlayer模式下的异常上报探索
- Nutch1.7的deploy模式在伪分布式环境上报错
- struts2配置常量<constant name="struts.devMode">,将值修改为true(开发模式有什么好处)的好处
- hadoop MapReduce运行异常:Unknown protocol to name node: org.apache.hadoop.mapred.JobSubmissionProtocol
- jenkins结点报错java.nio.file.AccessDeniedException: c:\jenkins.home\jobs\jobname\nextBuildNumber
- Hadoop学习笔记(老版本,YARN之前),MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系
- HC-05蓝牙模块,在AT指令模式时输出AT+NAME?无应答问题