您的位置:首页 > 其它

Flink per-Job模式InfluxdbReporter上报JobName

2021-02-05 20:54 323 查看

最近将Flink集群从1.6升级到1.8,主要是为了使用1.8的两个特性:一个是universal kafka ,另外一个是rocksdb ttl, 然后注意到1.8 提供了Influxdb 的reporter, 在最开始1.6使用的rest api方式主动请求对应的metric, 使用这种方式目前有两个弊端:

  • 前期使用metric比较少,自己通过开发图表展示,但是后期需要新的metric 都需要开发一次

  • 客户端使用轮询的方式去请求,如果任务比较多就会造成一定延时,并且实时平台在做高可用情况下,涉及定时的切换,给系统开发带来一定复杂性

面对这两个问题选择了influxdb+grafana的方式,也应该是很多公司选择的方案,当然也有选择Prometheus 的,接下来介绍一下将flink metric 上报influxdb部署的步骤:

  1. 将flink安装包下面opt目录的flink-metrics-influxdb-1.8.2.jar包拷贝至lib目录下面

  2. 在flink-conf.yaml 中增加配置,

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host: localhost  //influxdb服务所在的地址metrics.reporter.influxdb.port:8086//influxdb 端口metrics.reporter.influxdb.db: flink   //influxdb库名称metrics.reporter.influxdb.username: flink-metricsmetrics.reporter.influxdb.password: qwerty

至此部署已经完成,接下来提交一个任务到集群中去,在grafana做一些图表展示,在influxdb中会自动生成很多measurement也就是表,选择taskmanager_Status_JVM_CPU_Load_value 表查看,发现其写入的Tags只有host与tm_id的信息,查看jobmanager_Status_JVM_CPU_Load_value表,发现其写入的Tags只有host信息,

taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",tm_id="container_e03_1573197073760_0134_01_000006"}jobmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx"}

如果这样,那么对于flink on yarn perjob模式就没有办法区别当前指标到底属于哪一个任务,然后查看其它表例如numReocrdsIn包含了job_name/job_id 信息,导致同一个任务的不同metric写入的tags不同是为什么呢?果断看下InfluxdbReporter源码,可以发现在InfluxdbReporter的继承类AbstractReporter中的notifyOfAddedMetric方法,每一个metric在被添加时其metric也就被确定了,其tags由MeasurementInfoProvider的getTags方法来获取:

privatestaticMap<String,String> getTags(MetricGroup group){// Keys are surrounded by brackets: remove them, transforming "<name>" to "name".Map<String,String> tags =newHashMap<>();for(Map.Entry<String,String> variable: group.getAllVariables().entrySet()){String name = variable.getKey();            tags.put(name.substring(1, name.length()-1), variable.getValue());}return tags;}

每一个metric的MetricGroup都是不同的,所以导致了上面观察到的现象,现在想每一个metric都包含job_name/job_id信息,我们可以将包含job_name/job_id 信息的提取出来添加到其他metric的tags的,需要改写一下源码,实现方式如下:

  1. 在AbstractReporter中定义如下变量

privatefinalString JOB_NAME_LABEL ="job_name";privatefinalString JOB_ID_LABEL ="job_id";protectedString jobName;protectedString jobId;protectedMap<String,String> jobInfo =newHashMap<>();

在AbstractReporter增加一个能够在InfluxdbReporter获取jobInfo方法

protectedAbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider){this.metricInfoProvider = metricInfoProvider;}

在notifyOfAddedMetric方中新增获取job_name/job_id的逻辑

MeasurementInfo measurementInfo =((MeasurementInfo) metricInfo);Map<String,String> tags = measurementInfo.getTags();if(StringUtils.isBlank(jobName)){            jobId = tags.get(JOB_NAME_LABEL);}else{            jobInfo.put(JOB_NAME_LABEL, jobName);}if(StringUtils.isBlank(jobId)){            jobName = tags.get(JOB_ID_LABEL);}else{            jobInfo.put(JOB_ID_LABEL, jobId);}

2. 看一下InfluxdbReporter的上报方法report中,其通过buildReport构造了BatchPoints,每个Point的构造又是通过MetricMapper的map方法构造,最终调用了MetricMapper的builder方法,那么就需要将jobInfo添加到tags中,改造如下:
InfluxdbReporter的buildReport方法中

report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey(),getJobInfo()));

那么需要在MetricMapper对应map方法中新增Map jobInfo参数,需要将这个参数传入到builder方法中

privatestaticPoint.Builder builder(MeasurementInfo info,Instant timestamp,Map<String,String> jobInfo){Map<String,String> tags=info.getTags();        tags.putAll(jobInfo);returnPoint.measurement(info.getName()).tag(tags).time(timestamp.toEpochMilli(),TimeUnit.MILLISECONDS);}

至此改造全部完成,然后重新打包上传,看一下结果

taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",job_id="d9f2524eeb590a61628560d8677a1b23",job_name="test",tm_id="container_e03_1573197073760_0141_01_000003"}

已经满足我们预期的结果,接下来就可以通过配置job_name或者job_id条件筛选,查看想要的metric。


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: