您的位置:首页 > 运维架构 > Apache

HBase源码分析之org.apache.hadoop.hbase.client.coprocessor包

2013-05-01 06:33 741 查看


Endpoint就类似RDBMS里面的Stored procedure了,可以直接在Region Server上执行我们的代码,它主要提供了能够操作单个Region的proxy和操作多个Regions的proxy,这样我们可以做一些sum,count的操作,不用再client端去做(数据量大,client端撑不了)。这里重点说一下Coprocessor。

客户端定义Coprocessor接口实现,并继承BaseEndpointCoprocessor类;调用HTable.coprocessorExec( Class<T> protocol, byte[] startKey, byte[] endKey,Batch.Call<T,R> callable, Batch.Callback<R> callback)

根据startKey,endKey结合region Meta表,找出相应region的startKeys,在客户端多线程(ExecutorService,有多少对应region,就分多少个线程)连接相应的regionserver中的Region,

在该Region区域执行客户端定义的Coprocessor实现,执行的结果用Futrue包装。

然后客户端统一遍历Futrue获取,返回每个region对应的结构Map<regionName,T>

Endpoint的基本使用方法

coprocessorExec所传入的4个参数,第一个参数为之前定义的RowCountProtocol 协议类的class,第二为startRow,第三个为endRow。最后一个参数是实现Batch.Call接口的匿名类,Batch.Call<RowCountProtocol, Long>() 中的RowCountProtocol和Long分别代表CoprocessorProtocol和返回值的类,它所实现的call函数会被自动调用。最后coprocessorExec所返回的map的key,value分别为region名和计算结果。

样例代码:

public interface RowCountProtocol extends CoprocessorProtocol{

public long getRowCount ();

public long getKeyValueCount ();

}

public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol{

public long getRowCount (){

}

public long getKeyValueCount (){

}

}

Map<byte[], Pair<Long, Long>> results = table.coprocessorExec(

RowCountProtocol.class,null, null,new Batch.Call<RowCountProtocol, Pair<Long, Long>>() {

public Pair<Long, Long> call(RowCountProtocol counter)

throws IOException {

return new Pair(counter.getRowCount(),

counter.getKeyValueCount());
}
}
);

有三个方法对Endpoint进行设置:

a、启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site.xml这个文件来实现,只需要添加如下代码:

<property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.RowCountEndpoint 
</value>
 </property>


b、启用表aggregation,只对特定的表生效。通过HBase Shell 来实现。

(1)disable指定表。hbase> disable 'mytable'

(2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.RowCountEndpoint ||'

(3)重启指定表 hbase> enable 'mytable'

c、HTableDescriptor htd=new HTableDescriptor("testTable");

htd.setValue("CORPROCESSOR$1",path.toString+"|"+ RowCountEndpoint .class.getCanonicalName()+"|"+Coprocessor.Priority.USER);

其中path为jar在HDFS中的路径
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐