您的位置:首页 > 运维架构

Zeppelin中Interpreter插件开发

2016-07-22 18:02 525 查看
项目背景:

(1) 已有监控系统采用的OpenTSDB方案

(2) 目前一些大数据应用,尤其是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。

(3) 需要前端展示实时分析结果,采用zeppelin展示方式,但是目前zeppelin不支持OpenTSDB后端引擎支持

So, 自己开发!

一 Interpreter插件流程

插播: 刚去访问官方发现0.6.0版本发布了! http://zeppelin.apache.org/docs/0.6.0/

(1) 下载Zeppelin源码

(2) 创建Zeppelin Maven工程的 Module



(3) 添加对zeppelin-interpreter插件包的依赖



由于Zeppelin运行环境已经有了该依赖包,所以我们再创建自定义Interpreter插件的时候只需要在代码中对其依赖,打包过程中不需要打包该包。所以使用provided依赖方式。

(4) 添加对OpenTSDB客户端操作API包的依赖



注意:该包为内部开发依赖包

(5) 创建实现类继承Zeppelin提供的抽象类org.apache.zeppelin.interpreter.Interpreter;

public class TsdInterpreter extends Interpreter

(6) 代码中注册当前插件

在实现类中添加以下代码实现当前插件的注册

static {

Interpreter.register("tsd", "tsd", TsdInterpreter.class.getName());
}

以tsd名称注册,那么Zeppelin前端在调用OpenTSDB查询的时候,只需要指定后端引擎名称%tsd即可。

(7) 实现核心抽象方法,即Zeppelin前端提交过来的命令

public InterpreterResult interpret(String cmd, InterpreterContext context)

cmd: 即在Zeppelin交互式界面编写的命令,不包含%tsd

context: 当前插件的上下文,主要包含插件的配置信息,例如操作OpenTSDB的时候就需要从上下文中获取OpenTSDB的IP和端口参数。

该方法实现的核心思想就是: 解析命令=>实例化OpenTSDB操作客户端=>操作OpenTSDB客户端进行数据查询=> 获取返回结果 封装成InterpreterResult对象

贴核心代码吧:

Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String value = (String) intpProperty.get(key);

if (key.equals("tsd.host") ) {
host = value;
} else if (key.equals("tsd.port")) {
port = value;
}
}
propertiesUtil.setOpentsdbIp(host);
propertiesUtil.setPort(Integer.parseInt(port));

Scanner scanner = new Scanner(items[1]);
String start, end, metric, tagsStr;
if (scanner.hasNext())
start = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"1!Please enter the correct format!");
}

if (scanner.hasNext())
end = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"2!Please enter the correct format!");
}

if (scanner.hasNext())
metric = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"3!Please enter the correct format!");
}

if (scanner.hasNext())
tagsStr = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"4!Please enter the correct format!");
}

// cpid=tudou,busiid=*,code=1
String[] tagsStrs = tagsStr.split(",");
Map<String, String> tags = new HashMap<String, String>();
for (String s : tagsStrs) {
int index = s.indexOf('=');
if (index == -1)
continue;
String tagK = s.substring(0, index);
String tagV = s.substring(index + 1);
tags.put(tagK, tagV);
}

QueryService queryService = new QueryService();
try {
List<QueryResponseEntity> responses = queryService
.queryByMetric(start, end, metric, tags, null, "sum");

StringBuffer sb = new StringBuffer();
// Map<String, String> alldps = new HashMap<String, String>();

// build header
Set<String> keys = new HashSet<String>();
sb.append("time\t");
for (QueryResponseEntity st : responses) {
sb.append(st.getTags().toString() + "\t");
keys.addAll(st.getDps().keySet());
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1, "\n");

List<String> keys2 = new ArrayList<String>(keys);
Collections.sort(keys2);
// build lines
Iterator<String> it = keys2.iterator();

long t;
while (it.hasNext()) {
String key = it.next(); // 每一行的时间戳

t = Long.parseLong(key);
sb.append(sdf.format(new Date(t*1000)) + "\t");
for (QueryResponseEntity st : responses) {
Map<String, String> dps = st.getDps();
String value = dps.get(key);
if (value != null) {
sb.append(value + "\t");
} else {
sb.append(" \t");
}
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1,
"\n");
}
// sb.toString()

return new InterpreterResult(InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE, sb.toString());

二 插件部署

(1) 实现类的配置

在ZEPPELIN_HOME/conf/zeppelin-site.xml



(2) 拷贝OpenTSDB插件包

在ZEPPELIN_HOME/interpreter

创建文件夹tsd,将所有依赖包拷贝到该文件夹下



(3) 重启Zeppelin,在Zeppelin管理界面的 Interpreter中添加 TSD配置



三 实现效果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息