您的位置:首页 > 其它

用hbase(0.92版本以上)的协处理器实现快速返回查询结果总数

2014-10-28 10:06 405 查看
在0.92版本的hbase上添加了协处理器的功能,协处理器分为两大部分 endpoint和observer.

observer相当于一个钩子的作用,根据钩子运行的模块来划分,又分成三个

RegionObserver:用这个做数据操纵事件,其紧密的绑定到表的region


MasterObserver:处理集群级别的事件:管理操作和数据定义语言操作


WALObserver:预写日志处理


而endpoint可看作关系数据库中的存储过程,用户可自定义。

言归正传,如何配置并使用协处理器呢

本次只介绍用endpoint实现快速返回符合条件结果总数的方法。

1.配置

在$HBASE_HOME/conf/hbase-site.xml添加一个配置项。我用的0.94版本自带的实现为AggregateImplementation,具体如下

<property>

<name>hbase.coprocessor.region.classes</name>

<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>

</property>

若之前未配置此项,则配置完后,需要重启hbase方能生效。

2.客户端使用,直接上代码。

scan直接用查询结果所用的scan即可。

/**

* 获得符合条件结果总数

* @author wanglongyf2 2013-1-11 上午10:29:15

* @param scan

* @return

*/

private long getTotalNumber(Scan scan) {

AggregationClient aggregationClient = new AggregationClient(conf);

long rowCount = 0;

try {

<span style="color:#ff0000;">scan.addColumn(columnFamily, etimeQualifier);//必须有此句,或者用addFamily(),否则出错,异常包含 ci ****

</span> rowCount = aggregationClient.rowCount(tableName, null, scan);

} catch (Throwable e) {

LOG.fatal("getTotalNumber wrong. ");

e.printStackTrace();

}

return rowCount;

}

若要验证此结果总数和实际的结果数是否相同,则看下面,关键代码

scan.setStartRow(startRow);

scan.setStopRow(stopRow);

Filter filter = new SingleColumnValueFilter(columnFamily, qualifier,

CompareOp.GREATER, Bytes.toBytes(startTime));

scan.setFilter(filter);

long number = getTotalNumber(scan);

ResultScanner scanner = table.getScanner(scan);

Result res = scanner.next();

while(res != null) {

numberOfResults ++;

res = scanner.next();

}

if (numberOfResults != number) {

LOG.fatal(String.format("use aggregation %d and scanner %d gets inconsistant result. ",

number, numberOfResults));

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: