您的位置:首页 > 运维架构

20161108Hbase-Coprocessor

2016-11-08 17:12 127 查看

Hbase Coprocess 协处理器建立二级索引

1.需求场景

在项目中使用Hbase存储数据的话,有时候会需要用到二级索引,比如我们的数据表存储产品相关信息,存储格式如下:

row 1itemidId1shopidIdsellerId
row 2itemidId2shopidIdsellerId
我们知道Hbase是单索引的数据存储,当我们根据itemId来查询数据是木有问题的,因为我们这里的rowKey就是根据itemId生成的,但当我们有的场景下需要根据shopId来查询的话,Hbase就无能为力了,因为我们rowKey的生成跟shopId没有一毛钱关系,这个时候我们会想到能不能建立一个存储itemId和shopId的映射表,当需要根据shopId查询数据的时候,先查映射表,找到shopId对应的itemId,然后再根据itemId去查询数据表获取我们想要的数据。

2.Coprocessor

幸运的是,Hbase为了满足上述需求,在较新版本中提供了协处理器功能,协处理器可以理解为我们常用的关系型数据库中的触发器和存储过程,分别对应协处理器的Observer和Endpoint,其中Endpoint可以在服务端做一些统计聚合计算,将结果直接返回给用户,Observer提供了三个层面的接口,RegionObserver、MasterObserver和WALObserver,这里用到的二级索引只用到了RegionObserver.

3.demo

这里我们以itemId和shopId建立索引表为例,提供一个demo,为了满足我们使用二级索引表的需求,我们需要在往主数据表put数据之前,先往索引表put shopId和itemId的对应数据,在get数据表的时候,要先根据shopId从索引表拿到所有的itemIds,然后根据itemId去获取主数据表的数据。以下是代码示例:

public class Coprocessor extends BaseRegionObserver {

private static final Logger LOGGER = LoggerFactory.getLogger(Coprocessor.class);

public static final String ROW_KEY_FIELD = "row_key_field";
public static final String INDEX_FIELD = "index_field";
public static final String DEFAULT_DELIMITER = "_";
public static final String INDEX_FIELD_VALUE = "index_field_value";
public static final String INDEX_TABLE_DELIMITER = "_INDEX_";

@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final Durability durability) {

try {
if (null == put.getAttribute(ROW_KEY_FIELD)) {
LOGGER.warn("rowKeyField is null.");
return;
}

String keyField = new String(put.getAttribute(ROW_KEY_FIELD));
String keyFieldValue = null;
String srcTableName = e.getEnvironment().getRegionInfo().getTable().getNameAsString();
String[] indexFields = new String(put.getAttribute(INDEX_FIELD)).split(",");

List<Cell> cellKeyFieldList = put.get(Bytes.toBytes("cf"), Bytes.toBytes(keyField));
for (Cell cell : cellKeyFieldList) {
keyFieldValue = new String(CellUtil.cloneValue(cell));
}

for (String indexField : indexFields) {

String indexFieldValue = null;
Table indexTable = e.getEnvironment().getTable(TableName.valueOf("indexTableName"));

if (null == indexTable) {
LOGGER.warn("there is no index table named " + indexTableName);
return;
}

List<Cell> cellIndexFieldList = put.get(Bytes.toBytes("cf"), Bytes.toBytes(indexField));
for (Cell cell : cellIndexFieldList) {
indexFieldValue = new String(CellUtil.cloneValue(cell));
}

Put indexPut = new Put((indexFieldValue + DEFAULT_DELIMITER + keyFieldValue).getBytes());
indexPut.addColumn("cf".getBytes(), indexField.getBytes(), "".getBytes());

indexTable.put(indexPut);
indexTable.close();
}

} catch (Exception exception) {
LOGGER.error("prePut has error with problem ", exception);
}
}

@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results) {

try {
if (null != get.getAttribute(INDEX_FIELD)) {
String srcTableName = e.getEnvironment().getRegionInfo().getTable().getNameAsString();
String indexField = new String(get.getAttribute(INDEX_FIELD));

Table srcTable = e.getEnvironment().getTable(TableName.valueOf(srcTableName));
Table indexTable = e.getEnvironment().getTable(TableName.valueOf("indexTableName"));
if (null != indexTable) {
Scan scan = new Scan();
String keyFieldValue = new String(get.getAttribute(INDEX_FIELD_VALUE));
scan.setFilter(new PrefixFilter((keyFieldValue + DEFAULT_DELIMITER).getBytes()));
ResultScanner resultScanner = indexTable.getScanner(scan);

for (Result result : resultScanner) {
List<Get> getList = new ArrayList<>();
for (Cell cell : result.rawCells()) {
String rowKey = new String(CellUtil.cloneRow(cell));
rowKey = rowKey.replace(keyFieldValue + DEFAULT_DELIMITER, "");
Get srcGet = new Get(encodeRowKey(rowKey));
getList.add(srcGet);
}
for (Result getResult : srcTable.get(getList)) {
results.addAll(Arrays.asList(getResult.rawCells()));
}
}
indexTable.close();
srcTable.close();
e.bypass();//则会将以前的数据忽略掉,而是使用用户自定义的数据
e.complete();//告诉框架后续的操作可以被跳过,剩下没有被调用的协处理器也将被跳过,即表示该协处理器为最后一个处理器。
}

}

} catch (Exception exception) {
LOGGER.error("preScanOpen has error with problem ", exception);
}
}


4.部署加载

将该工程打包后放在Hbase集群可以访问到的HDFS文件系统中,然后用如下代码加载协处理器

static void addProcess(String copPath) throws IOException {
try {
admin.disableTable(TableName.valueOf("itemTable"));
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("itemTable"));
hTableDescriptor.addFamily(new HColumnDescriptor("cf"));
hTableDescriptor.addCoprocessor(Coprocessor.class.getName(), new Path(
"hdfs://hdfsname/coprocess/coprocess.jar"), 1001, null);
admin.modifyTable(TableName.valueOf"itemTable"), hTableDescriptor);
} catch (IOException e) {
e.printStackTrace();
} finally {
admin.enableTable(TableName.valueOf("itemTable"));
}

}


5.踩的坑

每次加载都需要将原来的协处理器完全卸载(可以用shell命令),否则加载不会生效,个人推测是因为协处理器作为一个Class被Hbase的CoprocessorClassLoader加载后会在内存维持它的存在,如果你加载一个同名的Coprocessor,它会视为已经存在这样一个Coprocessor,就不会更新你重新加载的,要么shell命令完全卸载,要么将后面的Coprocessor重命名。

参考资料

http://www.voidcn.com/blog/sima64/article/p-5790478.html

http://www.programgo.com/article/938497705/

http://www.ibm.com/developerworks/cn/opensource/os-cn-hbase-coprocessor1/index.html

http://www.ibm.com/developerworks/cn/opensource/os-cn-hbase-coprocessor2/index.html

http://blog.csdn.net/lifuxiangcaohui/article/details/39991183/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase 二级索引