20161108Hbase-Coprocessor
2016-11-08 17:12
127 查看
Hbase Coprocess 协处理器建立二级索引
1.需求场景
在项目中使用Hbase存储数据的话,有时候会需要用到二级索引,比如我们的数据表存储产品相关信息,存储格式如下:row 1 | itemidId1 | shopidId | sellerId |
row 2 | itemidId2 | shopidId | sellerId |
2.Coprocessor
幸运的是,Hbase为了满足上述需求,在较新版本中提供了协处理器功能,协处理器可以理解为我们常用的关系型数据库中的触发器和存储过程,分别对应协处理器的Observer和Endpoint,其中Endpoint可以在服务端做一些统计聚合计算,将结果直接返回给用户,Observer提供了三个层面的接口,RegionObserver、MasterObserver和WALObserver,这里用到的二级索引只用到了RegionObserver.3.demo
这里我们以itemId和shopId建立索引表为例,提供一个demo,为了满足我们使用二级索引表的需求,我们需要在往主数据表put数据之前,先往索引表put shopId和itemId的对应数据,在get数据表的时候,要先根据shopId从索引表拿到所有的itemIds,然后根据itemId去获取主数据表的数据。以下是代码示例:public class Coprocessor extends BaseRegionObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(Coprocessor.class); public static final String ROW_KEY_FIELD = "row_key_field"; public static final String INDEX_FIELD = "index_field"; public static final String DEFAULT_DELIMITER = "_"; public static final String INDEX_FIELD_VALUE = "index_field_value"; public static final String INDEX_TABLE_DELIMITER = "_INDEX_"; @Override public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final Durability durability) { try { if (null == put.getAttribute(ROW_KEY_FIELD)) { LOGGER.warn("rowKeyField is null."); return; } String keyField = new String(put.getAttribute(ROW_KEY_FIELD)); String keyFieldValue = null; String srcTableName = e.getEnvironment().getRegionInfo().getTable().getNameAsString(); String[] indexFields = new String(put.getAttribute(INDEX_FIELD)).split(","); List<Cell> cellKeyFieldList = put.get(Bytes.toBytes("cf"), Bytes.toBytes(keyField)); for (Cell cell : cellKeyFieldList) { keyFieldValue = new String(CellUtil.cloneValue(cell)); } for (String indexField : indexFields) { String indexFieldValue = null; Table indexTable = e.getEnvironment().getTable(TableName.valueOf("indexTableName")); if (null == indexTable) { LOGGER.warn("there is no index table named " + indexTableName); return; } List<Cell> cellIndexFieldList = put.get(Bytes.toBytes("cf"), Bytes.toBytes(indexField)); for (Cell cell : cellIndexFieldList) { indexFieldValue = new String(CellUtil.cloneValue(cell)); } Put indexPut = new Put((indexFieldValue + DEFAULT_DELIMITER + keyFieldValue).getBytes()); indexPut.addColumn("cf".getBytes(), indexField.getBytes(), "".getBytes()); indexTable.put(indexPut); indexTable.close(); } } catch (Exception exception) { LOGGER.error("prePut has error with problem ", exception); } } @Override public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results) { try { if (null != get.getAttribute(INDEX_FIELD)) { String srcTableName = e.getEnvironment().getRegionInfo().getTable().getNameAsString(); String indexField = new String(get.getAttribute(INDEX_FIELD)); Table srcTable = e.getEnvironment().getTable(TableName.valueOf(srcTableName)); Table indexTable = e.getEnvironment().getTable(TableName.valueOf("indexTableName")); if (null != indexTable) { Scan scan = new Scan(); String keyFieldValue = new String(get.getAttribute(INDEX_FIELD_VALUE)); scan.setFilter(new PrefixFilter((keyFieldValue + DEFAULT_DELIMITER).getBytes())); ResultScanner resultScanner = indexTable.getScanner(scan); for (Result result : resultScanner) { List<Get> getList = new ArrayList<>(); for (Cell cell : result.rawCells()) { String rowKey = new String(CellUtil.cloneRow(cell)); rowKey = rowKey.replace(keyFieldValue + DEFAULT_DELIMITER, ""); Get srcGet = new Get(encodeRowKey(rowKey)); getList.add(srcGet); } for (Result getResult : srcTable.get(getList)) { results.addAll(Arrays.asList(getResult.rawCells())); } } indexTable.close(); srcTable.close(); e.bypass();//则会将以前的数据忽略掉,而是使用用户自定义的数据 e.complete();//告诉框架后续的操作可以被跳过,剩下没有被调用的协处理器也将被跳过,即表示该协处理器为最后一个处理器。 } } } catch (Exception exception) { LOGGER.error("preScanOpen has error with problem ", exception); } }
4.部署加载
将该工程打包后放在Hbase集群可以访问到的HDFS文件系统中,然后用如下代码加载协处理器static void addProcess(String copPath) throws IOException { try { admin.disableTable(TableName.valueOf("itemTable")); HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("itemTable")); hTableDescriptor.addFamily(new HColumnDescriptor("cf")); hTableDescriptor.addCoprocessor(Coprocessor.class.getName(), new Path( "hdfs://hdfsname/coprocess/coprocess.jar"), 1001, null); admin.modifyTable(TableName.valueOf"itemTable"), hTableDescriptor); } catch (IOException e) { e.printStackTrace(); } finally { admin.enableTable(TableName.valueOf("itemTable")); } }
5.踩的坑
每次加载都需要将原来的协处理器完全卸载(可以用shell命令),否则加载不会生效,个人推测是因为协处理器作为一个Class被Hbase的CoprocessorClassLoader加载后会在内存维持它的存在,如果你加载一个同名的Coprocessor,它会视为已经存在这样一个Coprocessor,就不会更新你重新加载的,要么shell命令完全卸载,要么将后面的Coprocessor重命名。参考资料
http://www.voidcn.com/blog/sima64/article/p-5790478.htmlhttp://www.programgo.com/article/938497705/
http://www.ibm.com/developerworks/cn/opensource/os-cn-hbase-coprocessor1/index.html
http://www.ibm.com/developerworks/cn/opensource/os-cn-hbase-coprocessor2/index.html
http://blog.csdn.net/lifuxiangcaohui/article/details/39991183/
相关文章推荐
- Hbase总结(八)Hbase中的Coprocessor
- hbase 之 Coprocessor
- 使用HBase EndPoint(coprocessor)进行计算
- 利用Hbase的coprocessor实现增量式Apriori算法
- hbase的coprocessor使用
- 使用Coprocessor实现hbase+solr数据交互
- 基于hbase mapreduce和coprocessor实现hbase二级索引创建与自动维护
- 【HBase】1009-HBase的协处理器(coprocessor)统计行数
- Hbase 协处理器 Coprocessor
- Hbase总结:Hbase中的Coprocessor
- Hbase总结:Hbase中的Coprocessor
- HBase高级特性:通过Coprocessor实现Solr Cloud二级索引
- HBase数据查询之Coprocessor
- hbase的coprocessor使用(转)
- HBase数据查询之Coprocessor
- Hbase中的Coprocessor
- 读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor
- hbase的coprocessor使用
- 读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor
- Hbase中的Coprocessor (总结7)