Zeppelin中Interpreter插件开发
2016-07-22 18:02
525 查看
项目背景:
(1) 已有监控系统采用的OpenTSDB方案
(2) 目前一些大数据应用,尤其是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。
(3) 需要前端展示实时分析结果,采用zeppelin展示方式,但是目前zeppelin不支持OpenTSDB后端引擎支持
So, 自己开发!
由于Zeppelin运行环境已经有了该依赖包,所以我们再创建自定义Interpreter插件的时候只需要在代码中对其依赖,打包过程中不需要打包该包。所以使用provided依赖方式。
注意:该包为内部开发依赖包
static {
Interpreter.register("tsd", "tsd", TsdInterpreter.class.getName());
}
以tsd名称注册,那么Zeppelin前端在调用OpenTSDB查询的时候,只需要指定后端引擎名称%tsd即可。
cmd: 即在Zeppelin交互式界面编写的命令,不包含%tsd
context: 当前插件的上下文,主要包含插件的配置信息,例如操作OpenTSDB的时候就需要从上下文中获取OpenTSDB的IP和端口参数。
该方法实现的核心思想就是: 解析命令=>实例化OpenTSDB操作客户端=>操作OpenTSDB客户端进行数据查询=> 获取返回结果 封装成InterpreterResult对象。
贴核心代码吧:
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String value = (String) intpProperty.get(key);
if (key.equals("tsd.host") ) {
host = value;
} else if (key.equals("tsd.port")) {
port = value;
}
}
propertiesUtil.setOpentsdbIp(host);
propertiesUtil.setPort(Integer.parseInt(port));
Scanner scanner = new Scanner(items[1]);
String start, end, metric, tagsStr;
if (scanner.hasNext())
start = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"1!Please enter the correct format!");
}
if (scanner.hasNext())
end = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"2!Please enter the correct format!");
}
if (scanner.hasNext())
metric = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"3!Please enter the correct format!");
}
if (scanner.hasNext())
tagsStr = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"4!Please enter the correct format!");
}
// cpid=tudou,busiid=*,code=1
String[] tagsStrs = tagsStr.split(",");
Map<String, String> tags = new HashMap<String, String>();
for (String s : tagsStrs) {
int index = s.indexOf('=');
if (index == -1)
continue;
String tagK = s.substring(0, index);
String tagV = s.substring(index + 1);
tags.put(tagK, tagV);
}
QueryService queryService = new QueryService();
try {
List<QueryResponseEntity> responses = queryService
.queryByMetric(start, end, metric, tags, null, "sum");
StringBuffer sb = new StringBuffer();
// Map<String, String> alldps = new HashMap<String, String>();
// build header
Set<String> keys = new HashSet<String>();
sb.append("time\t");
for (QueryResponseEntity st : responses) {
sb.append(st.getTags().toString() + "\t");
keys.addAll(st.getDps().keySet());
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1, "\n");
List<String> keys2 = new ArrayList<String>(keys);
Collections.sort(keys2);
// build lines
Iterator<String> it = keys2.iterator();
long t;
while (it.hasNext()) {
String key = it.next(); // 每一行的时间戳
t = Long.parseLong(key);
sb.append(sdf.format(new Date(t*1000)) + "\t");
for (QueryResponseEntity st : responses) {
Map<String, String> dps = st.getDps();
String value = dps.get(key);
if (value != null) {
sb.append(value + "\t");
} else {
sb.append(" \t");
}
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1,
"\n");
}
// sb.toString()
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE, sb.toString());
创建文件夹tsd,将所有依赖包拷贝到该文件夹下
(1) 已有监控系统采用的OpenTSDB方案
(2) 目前一些大数据应用,尤其是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。
(3) 需要前端展示实时分析结果,采用zeppelin展示方式,但是目前zeppelin不支持OpenTSDB后端引擎支持
So, 自己开发!
一 Interpreter插件流程
插播: 刚去访问官方发现0.6.0版本发布了! http://zeppelin.apache.org/docs/0.6.0/(1) 下载Zeppelin源码
(2) 创建Zeppelin Maven工程的 Module
(3) 添加对zeppelin-interpreter插件包的依赖
由于Zeppelin运行环境已经有了该依赖包,所以我们再创建自定义Interpreter插件的时候只需要在代码中对其依赖,打包过程中不需要打包该包。所以使用provided依赖方式。
(4) 添加对OpenTSDB客户端操作API包的依赖
注意:该包为内部开发依赖包
(5) 创建实现类继承Zeppelin提供的抽象类org.apache.zeppelin.interpreter.Interpreter;
public class TsdInterpreter extends Interpreter(6) 代码中注册当前插件
在实现类中添加以下代码实现当前插件的注册static {
Interpreter.register("tsd", "tsd", TsdInterpreter.class.getName());
}
以tsd名称注册,那么Zeppelin前端在调用OpenTSDB查询的时候,只需要指定后端引擎名称%tsd即可。
(7) 实现核心抽象方法,即Zeppelin前端提交过来的命令
public InterpreterResult interpret(String cmd, InterpreterContext context)cmd: 即在Zeppelin交互式界面编写的命令,不包含%tsd
context: 当前插件的上下文,主要包含插件的配置信息,例如操作OpenTSDB的时候就需要从上下文中获取OpenTSDB的IP和端口参数。
该方法实现的核心思想就是: 解析命令=>实例化OpenTSDB操作客户端=>操作OpenTSDB客户端进行数据查询=> 获取返回结果 封装成InterpreterResult对象。
贴核心代码吧:
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String value = (String) intpProperty.get(key);
if (key.equals("tsd.host") ) {
host = value;
} else if (key.equals("tsd.port")) {
port = value;
}
}
propertiesUtil.setOpentsdbIp(host);
propertiesUtil.setPort(Integer.parseInt(port));
Scanner scanner = new Scanner(items[1]);
String start, end, metric, tagsStr;
if (scanner.hasNext())
start = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"1!Please enter the correct format!");
}
if (scanner.hasNext())
end = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"2!Please enter the correct format!");
}
if (scanner.hasNext())
metric = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"3!Please enter the correct format!");
}
if (scanner.hasNext())
tagsStr = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"4!Please enter the correct format!");
}
// cpid=tudou,busiid=*,code=1
String[] tagsStrs = tagsStr.split(",");
Map<String, String> tags = new HashMap<String, String>();
for (String s : tagsStrs) {
int index = s.indexOf('=');
if (index == -1)
continue;
String tagK = s.substring(0, index);
String tagV = s.substring(index + 1);
tags.put(tagK, tagV);
}
QueryService queryService = new QueryService();
try {
List<QueryResponseEntity> responses = queryService
.queryByMetric(start, end, metric, tags, null, "sum");
StringBuffer sb = new StringBuffer();
// Map<String, String> alldps = new HashMap<String, String>();
// build header
Set<String> keys = new HashSet<String>();
sb.append("time\t");
for (QueryResponseEntity st : responses) {
sb.append(st.getTags().toString() + "\t");
keys.addAll(st.getDps().keySet());
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1, "\n");
List<String> keys2 = new ArrayList<String>(keys);
Collections.sort(keys2);
// build lines
Iterator<String> it = keys2.iterator();
long t;
while (it.hasNext()) {
String key = it.next(); // 每一行的时间戳
t = Long.parseLong(key);
sb.append(sdf.format(new Date(t*1000)) + "\t");
for (QueryResponseEntity st : responses) {
Map<String, String> dps = st.getDps();
String value = dps.get(key);
if (value != null) {
sb.append(value + "\t");
} else {
sb.append(" \t");
}
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1,
"\n");
}
// sb.toString()
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE, sb.toString());
二 插件部署
(1) 实现类的配置
在ZEPPELIN_HOME/conf/zeppelin-site.xml(2) 拷贝OpenTSDB插件包
在ZEPPELIN_HOME/interpreter创建文件夹tsd,将所有依赖包拷贝到该文件夹下
(3) 重启Zeppelin,在Zeppelin管理界面的 Interpreter中添加 TSD配置
三 实现效果
相关文章推荐
- bash: /usr/bin/autocrorder: /usr/bin/python^M: bad interpreter: No such file or directory
- 大数据实验室(大数据基础培训)——Zeppelin的安装、配置及基础使用
- opentsdb 分布式时间序列数据库安装实践
- spark standalone模式 zeppelin安装
- zeppelin入门使用
- spark自带示例一
- Marklogic search development
- Marklogic search development -2.1 Understanding the Search API
- MarkLogic中的Score和Relevance (一)
- 流式计算框架:Storm VS Spark Streaming
- 浅聊大数据
- 大数据,你准备好了未?
- Spark1.4.0让你透视整个Spark分布式执行
- BloomReach业务调查笔记
- 大数相乘
- Spark相关的projects
- Opentsdb设计之道
- opentsdb简介
- UVa 10033 Interpreter
- Hive In Oozie Workflow