您的位置:首页 > 编程语言 > Java开发

使用Java API对HBase进行CRUD操作

2013-09-10 08:50 330 查看
package test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;

public class CRUD {

private static Configuration config;
private static String tableName = "test";
private static HBaseAdmin hBaseAdmin;
private static HTablePool pool;
private static HTable table;

public static void setup() throws MasterNotRunningException, ZooKeeperConnectionException {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "10.10.4.55");
config.set("hbase.zookeeper.property.clientPort", "2181");

hBaseAdmin = new HBaseAdmin(config);
pool = new HTablePool(config, 1000);
table = (HTable) pool.getTable(tableName);
}

public static void createTable() throws IOException {
System.out.println("start create table......");
if (hBaseAdmin.tableExists(tableName)) {
hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);
System.out.println(tableName + " is exist delete!!!");
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
hTableDescriptor.addFamily(new HColumnDescriptor("c1"));
hTableDescriptor.addFamily(new HColumnDescriptor("c2"));

hBaseAdmin.createTable(hTableDescriptor);
System.out.println("create table over......");
}

public static void dropTable() throws IOException {
System.out.println("start drop table......");
if (hBaseAdmin.tableExists(tableName)) {
hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);
System.out.println(tableName + " is exist delete!!!");
}
System.out.println("end drop table......");
}

public static void insert() throws IOException {
System.out.println("start insert......");

Put put = new Put("r1".getBytes());
put.add("c1".getBytes(), "a".getBytes(), "r1c1a".getBytes());
put.add("c1".getBytes(), "b".getBytes(), "r1c1b".getBytes());
put.add("c2".getBytes(), "a".getBytes(), "r1c2a".getBytes());
put.add("c2".getBytes(), "b".getBytes(), "r1c2b".getBytes());
table.put(put);
Put put2 = new Put("r2".getBytes());
put2.add("c1".getBytes(), "a".getBytes(), "r2aaa".getBytes());
put2.add("c1".getBytes(), "a".getBytes(), "r2c1a".getBytes());
put2.add("c2".getBytes(), "b".getBytes(), "r2bbb".getBytes());
put2.add("c2".getBytes(), "b".getBytes(), "r2c2b".getBytes());
table.put(put2);

Put put3 = new Put("r3".getBytes());
put3.add("c1".getBytes(), "a".getBytes(), "c1a".getBytes());
put3.add("c1".getBytes(), "b".getBytes(), "r3c1b".getBytes());
put3.add("c2".getBytes(), "a".getBytes(), "r3c2b".getBytes());
put3.add("c2".getBytes(), "b".getBytes(), "r3c2b".getBytes());

table.put(put3);
Put put4 = new Put("r4".getBytes());
put4.add("c1".getBytes(), "a".getBytes(), "c1a".getBytes());
put4.add("c1".getBytes(), "b".getBytes(), "r4c1b".getBytes());
put4.add("c2".getBytes(), "a".getBytes(), "r4c2b".getBytes());
put4.add("c2".getBytes(), "b".getBytes(), "r4c2b".getBytes());
table.put(put4);
System.out.println("end insert......");
}

public static void deleteByRowkey(String rowkey) throws IOException {
System.out.println("start deleteByRowkey......");
List list = new ArrayList();
Delete delete = new Delete(rowkey.getBytes());
list.add(delete);
table.delete(list);
System.out.println("end deleteByRowkey......");
}

public static void findAll() throws IOException {
System.out.println("\n\n-----------------findAll() start-------------");
HTablePool pool = new HTablePool(config, 1000);
HTable table = (HTable) pool.getTable(tableName);

ResultScanner rs = table.getScanner(new Scan());
for (Result r : rs) {
System.out.println("rowkey:" + new String(r.getRow(), "utf-8"));
for (KeyValue keyValue : r.raw()) {
System.out.println(new String(keyValue.getFamily()) + ":"
+ new String(keyValue.getQualifier()) + "="
+ new String(keyValue.getValue()));
}
}
System.out.println("-----------------findAll() end-------------\n\n");
}

public static void getByRowkey(String rowkey) throws IOException {
Get scan = new Get(rowkey.getBytes());
Result r = table.get(scan);
System.out.println("rowkey:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println(new String(keyValue.getFamily()) + ":"
+ new String(keyValue.getQualifier()) + "="
+ new String(keyValue.getValue()));
}
}

public static void findByColumn(String columnName, String fualifier, String columnValue) throws IOException {
System.out.println("\n\n---------findByColumn-----  " + columnName
+ ":" + fualifier + "=" + columnValue);
Filter filter = new SingleColumnValueFilter(Bytes.toBytes(columnName),
Bytes.toBytes(fualifier), CompareOp.EQUAL, Bytes.toBytes(columnValue));
Scan s = new Scan();
s.setFilter(filter);
ResultScanner rs = table.getScanner(s);
for (Result r : rs) {
System.out.println("rowkey:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println(new String(keyValue.getFamily()) + ":"
+ new String(keyValue.getQualifier()) + "="
+ new String(keyValue.getValue()));
}
}
System.out.println("findByColumn over-----\n\n");
}

public static void findByColumns(Map colMap) throws IOException {
System.out.println("-----findByColumns(map)---");
List filters = new ArrayList();
for (Entry entry : colMap.entrySet()) {
String colName = entry.getKey().trim();
String colVal = entry.getValue().trim();
String[] familyQualifier = colName.split(":");
String strFamily = familyQualifier[0];
String strQualifier = familyQualifier[1];
System.out.println(strFamily + ":" + strQualifier + "=" + colVal);
Filter filter = new SingleColumnValueFilter(Bytes.toBytes(strFamily),
Bytes.toBytes(strQualifier), CompareOp.EQUAL, Bytes.toBytes(colVal));
filters.add(filter);
}

FilterList filterList = new FilterList(filters);
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
System.out.println("-----------------");
System.out.println("rowkey:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println(new String(keyValue.getFamily()) + ":"
+ new String(keyValue.getQualifier()) + "="
+ new String(keyValue.getValue()));
}
}
rs.close();
}

public static void cleanup() throws IOException {
table.close();
pool.close();
hBaseAdmin.close();
}

public static void main(String[] args) throws IOException {
setup();

createTable();

insert();

findAll();

deleteByRowkey("r1");
findAll();

getByRowkey("r2");

findByColumn("c1", "a", "c1a");

findAll();
Map colMap = new HashMap();
colMap.put("c1:a", "c1a");
colMap.put("c1:b", "r4c1b");
findByColumns(colMap);

cleanup();

}

}


输出结果为:

start create table......
test is exist delete!!!
create table over......
start insert......
end insert......

-----------------findAll() start-------------
rowkey:r1
c1:a=r1c1a
c1:b=r1c1b
c2:a=r1c2a
c2:b=r1c2b
rowkey:r2
c1:a=r2c1a
c2:b=r2c2b
rowkey:r3
c1:a=c1a
c1:b=r3c1b
c2:a=r3c2b
c2:b=r3c2b
rowkey:r4
c1:a=c1a
c1:b=r4c1b
c2:a=r4c2b
c2:b=r4c2b
-----------------findAll() end-------------

start deleteByRowkey......
end deleteByRowkey......

-----------------findAll() start-------------
rowkey:r2
c1:a=r2c1a
c2:b=r2c2b
rowkey:r3
c1:a=c1a
c1:b=r3c1b
c2:a=r3c2b
c2:b=r3c2b
rowkey:r4
c1:a=c1a
c1:b=r4c1b
c2:a=r4c2b
c2:b=r4c2b
-----------------findAll() end-------------

rowkey:r2
c1:a=r2c1a
c2:b=r2c2b

---------findByColumn-----  c1:a=c1a
rowkey:r3
c1:a=c1a
c1:b=r3c1b
c2:a=r3c2b
c2:b=r3c2b
rowkey:r4
c1:a=c1a
c1:b=r4c1b
c2:a=r4c2b
c2:b=r4c2b
findByColumn over-----

-----------------findAll() start-------------
rowkey:r2
c1:a=r2c1a
c2:b=r2c2b
rowkey:r3
c1:a=c1a
c1:b=r3c1b
c2:a=r3c2b
c2:b=r3c2b
rowkey:r4
c1:a=c1a
c1:b=r4c1b
c2:a=r4c2b
c2:b=r4c2b
-----------------findAll() end-------------

-----findByColumns(map)---
c1:b=r4c1b
c1:a=c1a
-----------------
rowkey:r4
c1:a=c1a
c1:b=r4c1b
c2:a=r4c2b
c2:b=r4c2b


注:

1. HBase更新操作同新增,如对同一RowKey、同一列名的值进行新增操作会覆盖原来的信息(这里有时间版本的概念)。

2. 使用SingleColumnValueFilter来做查找时要注意,如果某列上值为空也会查找出来,可以通过setFilterIfMissing(true);来让列值上为空的不返回。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: