您的位置:首页 > 运维架构

使用Thrift API监控Storm集群和Topology

2015-04-13 11:03 148 查看
如要监控Storm集群和运行在其上的Topology,该如何做呢?

Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix。

整体的流程已经清楚了,下面就来实践吧。

1 安装Thrift

由于我们要使用Thrift来编译Storm的源代码来获得Thrift Client相关的Java源代码,所以需要先安装Thrift,这里选取的版本为0.9.2。

到官网下载好安装包:http://thrift.apache.org/

编译安装:configure && make && make install

验证:thrift --version

如果打印出Thrift version 0.9.2,代表安装成功。

2 编译Thrift Client代码

首先下载Storm源代码,这里使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz

解压后进行编译:thrift -gen java apache-storm-0.9.3/storm-core/src/storm.thrift

在当前目录下出现gen-java文件夹,此文件夹下就是Thrift Client的Java源代码了。

3 使用Thrift Client API

然后创建一个Maven项目来进行执行监控数据的获取。

项目生成一个Jar文件,输入一些命令和自定义参数,然后输出结果。

以命令行的形式进行调用,这样可以方便的接入监控系统,当然使用形式可以根据自身情况施行。

创建好后,把gen-java生成的代码拷贝进来。

在pom.xml里引入Thrift对应版本的库:

[html] view
plaincopy





<dependency>

<groupId>org.apache.thrift</groupId>

<artifactId>libthrift</artifactId>

<version>0.9.2</version>

</dependency>

首先写一些Thrift相关的辅助类。

ClientInfo.java

[java] view
plaincopy





package com.damacheng009.storm.monitor.thrift;

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.thrift.transport.TFramedTransport;

import org.apache.thrift.transport.TSocket;

import backtype.storm.generated.Nimbus;

/**

* 代表一个Thrift Client的信息

* @author jb-xingchencheng

*

*/

public class ClientInfo {

private TSocket tsocket;

private TFramedTransport tTransport;

private TBinaryProtocol tBinaryProtocol;

private Nimbus.Client client;

public TSocket getTsocket() {

return tsocket;

}

public void setTsocket(TSocket tsocket) {

this.tsocket = tsocket;

}

public TFramedTransport gettTransport() {

return tTransport;

}

public void settTransport(TFramedTransport tTransport) {

this.tTransport = tTransport;

}

public TBinaryProtocol gettBinaryProtocol() {

return tBinaryProtocol;

}

public void settBinaryProtocol(TBinaryProtocol tBinaryProtocol) {

this.tBinaryProtocol = tBinaryProtocol;

}

public Nimbus.Client getClient() {

return client;

}

public void setClient(Nimbus.Client client) {

this.client = client;

}

}

ClientManager.java

[java] view
plaincopy





package com.damacheng009.storm.monitor.thrift;

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.thrift.transport.TFramedTransport;

import org.apache.thrift.transport.TSocket;

import org.apache.thrift.transport.TTransportException;

import backtype.storm.generated.Nimbus;

/**

* Thrift Client管理类

* @author jb-xingchencheng

*

*/

public class ClientManager {

public static ClientInfo getClient(String nimbusHost, int nimbusPort) throws TTransportException {

ClientInfo client = new ClientInfo();

TSocket tsocket = new TSocket(nimbusHost, nimbusPort);

TFramedTransport tTransport = new TFramedTransport(tsocket);

TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);

Nimbus.Client c = new Nimbus.Client(tBinaryProtocol);

tTransport.open();

client.setTsocket(tsocket);

client.settTransport(tTransport);

client.settBinaryProtocol(tBinaryProtocol);

client.setClient(c);

return client;

}

public static void closeClient(ClientInfo client) {

if (null == client) {

return;

}

if (null != client.gettTransport()) {

client.gettTransport().close();

}

if (null != client.getTsocket()) {

client.getTsocket().close();

}

}

}

然后就可以写自己的逻辑去获取集群和拓扑的数据了,Storm提供的UI界面上展示的数据基本都可以获取到,这里只举出一个简单的例子,我们想获得某个拓扑发生异常的次数,和发生的异常的堆栈。剩下的项目你可以随意的定制。

下面是入口类:

Main.java

[java] view
plaincopy





package com.damacheng009.storm.monitor;

import com.damacheng009.storm.monitor.logic.Logic;

/**

* 入口类

* @author jb-xingchencheng

*

*/

public class Main {

// NIMBUS的信息

public static String NIMBUS_HOST = "192.168.180.36";

public static int NIMBUS_PORT = 6627;

/**

* 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多参数)

* @param args

*/

public static void main(String[] args) {

if (args.length < 3) {

return;

}

NIMBUS_HOST = args[0];

NIMBUS_PORT = Integer.parseInt(args[1]);

String cmd = args[2];

String result = "-1";

if (cmd.equals("get_topo_exp_size")) {

String topoName = args[3];

result = Logic.getTopoExpSize(topoName);

} else if (cmd.equals("get_topo_exp_stack_trace")) {

String topoName = args[3];

result = Logic.getTopoExpStackTrace(topoName);

}

System.out.println(result);

}

}

测试的时候把具体的HOST和PORT改一下即可。
然后是具体的逻辑类。

Logic.java

[java] view
plaincopy





package com.damacheng009.storm.monitor.logic;

import java.util.Date;

import java.util.List;

import java.util.Set;

import com.damacheng009.storm.monitor.Main;

import com.damacheng009.storm.monitor.thrift.ClientInfo;

import com.damacheng009.storm.monitor.thrift.ClientManager;

import backtype.storm.generated.ClusterSummary;

import backtype.storm.generated.ErrorInfo;

import backtype.storm.generated.TopologyInfo;

import backtype.storm.generated.TopologySummary;

public class Logic {

/**

* 取得某个拓扑的异常个数

* @param topoName

* @return

*/

public static String getTopoExpSize(String topoName) {

ClientInfo client = null;

int errorTotal = 0;

try {

client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);

ClusterSummary clusterSummary = client.getClient().getClusterInfo();

List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();

for (TopologySummary ts : topoSummaryList) {

if (ts.getName().equals(topoName)) {

TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());

Set<String> errorKeySet = topologyInfo.getErrors().keySet();

for (String errorKey : errorKeySet) {

List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);

errorTotal += listErrorInfo.size();

}

break;

}

}

return String.valueOf(errorTotal);

} catch (Exception e) {

return "-1";

} finally {

ClientManager.closeClient(client);

}

}

/**

* 返回某个拓扑的异常堆栈

* @param topoName

* @return

*/

public static String getTopoExpStackTrace(String topoName) {

ClientInfo client = null;

StringBuilder error = new StringBuilder();

try {

client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);

ClusterSummary clusterSummary = client.getClient().getClusterInfo();

List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();

for (TopologySummary ts : topoSummaryList) {

if (ts.getName().equals(topoName)) {

TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());

// 得到错误信息

Set<String> errorKeySet = topologyInfo.getErrors().keySet();

for (String errorKey : errorKeySet) {

List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);

for (ErrorInfo ei : listErrorInfo) {

// 发生异常的时间

long expTime = (long) ei.getError_time_secs() * 1000;

// 现在的时间

long now = System.currentTimeMillis();

// 由于获取的是全量的错误堆栈,我们可以设置一个范围来获取指定范围的错误,看情况而定

// 如果超过5min,那么就不用记录了,因为5min检查一次

if (now - expTime > 1000 * 60 * 5) {

continue;

}

error.append(new Date(expTime) + "\n");

error.append(ei.getError() + "\n");

}

}

break;

}

}

return error.toString().isEmpty() ? "none" : error.toString();

} catch (Exception e) {

return "-1";

} finally {

ClientManager.closeClient(client);

}

}

}

最后打成一个Jar包,就可以跑起来接入监控系统了,如在Zabbix中,可以把各个监控项设置为自定义的item,在Zabbix Client中配置命令行来运行Jar取得数据。

接下来的测试过程先略过。

对于Storm监控的实践,目前就是这样了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: