您的位置:首页 > 其它

用hbase(0.92版本以上)的协处理器实现快速返回查询结果总数

2013-01-14 16:19 369 查看
在0.92版本的hbase上添加了协处理器的功能,协处理器分为两大部分 endpoint和observer.

observer相当于一个钩子的作用,根据钩子运行的模块来划分,又分成三个

RegionObserver:用这个做数据操纵事件,其紧密的绑定到表的region


MasterObserver:处理集群级别的事件:管理操作和数据定义语言操作


WALObserver:预写日志处理


而endpoint可看作关系数据库中的存储过程,用户可自定义。

言归正传,如何配置并使用协处理器呢

本次只介绍用endpoint实现快速返回符合条件结果总数的方法。

1.配置

在$HBASE_HOME/conf/hbase-site.xml添加一个配置项。我用的0.94版本自带的实现为AggregateImplementation,具体如下

<property>

<name>hbase.coprocessor.region.classes</name>

<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>

</property>

若之前未配置此项,则配置完后,需要重启hbase方能生效。

2.客户端使用,直接上代码。

scan直接用查询结果所用的scan即可。

/**
* 获得符合条件结果总数
* @author wanglongyf2 2013-1-11 上午10:29:15
* @param scan
* @return
*/
private long getTotalNumber(Scan scan) {
AggregationClient aggregationClient = new AggregationClient(conf);
long rowCount = 0;
try {
scan.addColumn(columnFamily, etimeQualifier);//必须有此句,或者用addFamily(),否则出错,异常包含 ci ****
rowCount = aggregationClient.rowCount(tableName, null, scan);
} catch (Throwable e) {
LOG.fatal("getTotalNumber wrong. ");
e.printStackTrace();
}
return rowCount;
}


若要验证此结果总数和实际的结果数是否相同,则看下面,关键代码

scan.setStartRow(startRow);
scan.setStopRow(stopRow);
Filter filter = new SingleColumnValueFilter(columnFamily, qualifier,
CompareOp.GREATER, Bytes.toBytes(startTime));
scan.setFilter(filter);
long number = getTotalNumber(scan);
ResultScanner scanner = table.getScanner(scan);
Result res = scanner.next();
while(res != null) {
numberOfResults ++;
res = scanner.next();
}
if (numberOfResults != number) {
LOG.fatal(String.format("use aggregation %d and scanner %d gets inconsistant result. ",
number, numberOfResults));
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: