您的位置:首页 > 编程语言 > Java开发

JAVA操作Hbase管理类HbaseManage基本详细操作

2017-05-24 16:51 295 查看
基本内容包括一些常见table操作,row操作,column,family操作以及一些过滤器的操作(filter)

话不多说直接上代码:

package hbase;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SkipFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @author Administrator
* HBase的查询实现只提供两种方式:
1、按指定RowKey获取唯一一条记录,get方法(org.apache.hadoop.hbase.client.Get)
2、按指定的条件获取一批记录,scan方法(org.apache.Hadoop.Hbase.client.Scan)
*/
public class HbaseManage {
private static Configuration conf = null;

private static Connection connection = null;
// 初始化hbase
static {
// windows环境下 解决hadoop调用本地库问题
System.setProperty("hadoop.home.dir",
"C:/Users/Administrator/Downloads/hadoop-common-2.2.0-bin-master");
conf = HBaseConfiguration.create();// 使用默认的classpath下的hbase-site.xml配置
if (connection == null) {
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
}

// 判断表是否存在
private static boolean isExistTable(String tableName) throws IOException {
boolean exist = false;
// HBaseAdmin hAdmin = new HBaseAdmin(conf);
Admin admin = connection.getAdmin();
// exist = hAdmin.tableExists(tableName);
exist = admin.tableExists(TableName.valueOf(tableName));
return exist;
}

/**
* 插入数据
* @param tableName
* @param rowkey
* @param columnFamily
* @param column
* @param value
* @throws IOException
*/
public static void putData(String tableName, String rowkey,
String columnFamily, String column, String value)
throws IOException {
// HTable table = new HTable(conf, tableName);
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowkey));
// put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
// Bytes.toBytes(column));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
Bytes.toBytes(value));

table.put(put);
// table.put(List<Put>);
}

/**
* 获取指定行的所有数据
* @param tableName 表名
* @param row 行键key
* @param columnFamily 列族
* @param column 列名
* @throws IOException
*/
public static void getData(String tableName,String rowKey,String columnFamily,String column) throws IOException{
// HTable table = new HTable(conf, tableName); 写法已过时
Table table = connection.getTable(TableName.valueOf(tableName));
// Scan scan = new Scan();
// ResultScanner result = table.getScanner(scan);
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
byte[] rb = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
String value = new String(rb,"UTF-8");
System.out.println(value);
}

/**
* 获取指定表的所有数据
* @param tableName 表名
* @throws IOException
*/
public static void scanTable(String tableName) throws IOException{
// HTable table = new HTable(hbaseConfiguration, tableName);
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
List<Cell> cells= result.listCells();
for (Cell cell : cells) {
byte[] rb = cell.getValueArray();
String row = Bytes.toString(result.getRow());
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println("[row:"+row+"],[family:"+family+"],[qualifier:"+qualifier+"],[value:"+value+"]");
}
}
}

/**
* 通过rowkey查找数据
* @param tableName
* @param rowKey
* @throws IOException
*/
public static void getDataByRow (String tableName, String rowKey) throws IOException{
// HTable table = new HTable(conf, tableName);
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes());
Result rs = table.get(get);
for(Cell cell : rs.listCells()){
System.out.print(Bytes.toString(CellUtil.cloneRow(cell))+" ");
System.out.print(Bytes.toString(CellUtil.cloneFamily(cell))+" ");
System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell))+" ");
System.out.print(Bytes.toString(CellUtil.cloneValue(cell))+" ");
System.out.println(cell.getTimestamp());
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
System.out.println(Bytes.toString(cell.getQualifierArray()));
}
}

/**
* 通过列簇获取 下面所有的列的数据
* @param tableName
* @param family
* @throws IOException
*/
public static void getDataByFamilyColumn(String tableName,String family,String column) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
if (!column.isEmpty()) {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}else {
scan.addFamily(Bytes.toBytes(family));
}
ResultScanner resultScanner = table.getScanner(scan);
int count = 0;
for (Result result : resultScanner) {
List<Cell> cells= result.listCells();
for (Cell cell : cells) {
byte[] rb = cell.getValueArray();
String row = Bytes.toString(result.getRow());
String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println("[row:"+row+"],[family:"+family1+"],[qualifier:"+qualifier+"]"
+ ",[value:"+value+"],[time:"+cell.getTimestamp()+"]");
}
count++;
}
System.out.println("count ===== "+count);
}

public static void deleteByRow(String tableName,String row) throws IOException{
Table table = connection.getTable(TableName.valueOf("user"));
Delete del = new Delete(Bytes.toBytes("zhangsan"));
table.delete(del);
}

public static void getDataByRowFilter(String tableName,String prefixRow) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setMaxVersions();
PrefixFilter pf = new PrefixFilter(Bytes.toBytes(prefixRow));
scan.setFilter(pf);
ResultScanner resultScanner = table.getScanner(scan);

for (Result result : resultScanner) {
List<Cell> cells= result.listCells();
for (Cell cell : cells) {
byte[] rb = cell.getValueArray();
String row = Bytes.toString(result.getRow());
String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println("[row:"+row+"],[fami
dd9c
ly:"+family1+"],[qualifier:"+qualifier+"]"
+ ",[value:"+value+"],[time:"+cell.getTimestamp()+"]");
}
}
}

public static void getDataBySingleCol(String tableName,String family,String column,String cvalue) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
// scan.setMaxVersions();
SingleColumnValueFilter scvf= new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column),
CompareOp.EQUAL,Bytes.toBytes(cvalue));
scvf.setFilterIfMissing(true); //默认为false, 没有此列的数据也会返回 ,为true则只返回name=lisi的数据
scan.setFilter(scvf);
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
List<Cell> cells= result.listCells();
for (Cell cell : cells) {
byte[] rb = cell.getValueArray();
String row = Bytes.toString(result.getRow());
String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println("[row:"+row+"],[family:"+family1+"],[qualifier:"+qualifier+"]"
+ ",[value:"+value+"],[time:"+cell.getTimestamp()+"]");
}
}
}

public static ResultScanner getDataSingleColumnValueFilter(String tableName,String family,String column,String value) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
SingleColumnValueFilter scvf= new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column),
CompareOp.EQUAL,Bytes.toBytes(value));

// SingleColumnValueFilter scvf1= new SingleColumnValueFilter(Bytes.toBytes("account"), Bytes.toBytes("name"),
// CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("zhangsan"))); //匹配完整字节数组
// new BinaryPrefixComparator(value) //匹配字节数组前缀
// new RegexStringComparator(expr) // 正则表达式匹配
// new SubstringComparator(substr)// 子字符串匹配

scvf.setFilterIfMissing(true); //默认为false, 没有此列的数据也会返回 ,为true则只返回name=lisi的数据
Scan scan = new Scan();
scan.setFilter(scvf);
ResultScanner resultScanner = table.getScanner(scan);
return resultScanner;
}

public static ResultScanner getDataFamilyFilter(String tableName,String family) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
FamilyFilter ff = new FamilyFilter(CompareOp.EQUAL ,
new BinaryComparator(Bytes.toBytes(family))); //表中不存在account列族,过滤结果为空
// new BinaryPrefixComparator(value) //匹配字节数组前缀
// new RegexStringComparator(expr) // 正则表达式匹配
// new SubstringComparator(substr)// 子字符串匹配
Scan scan = new Scan();
// 通过scan.addFamily(family) 也可以实现此操作
scan.setFilter(ff);
ResultScanner resultScanner = table.getScanner(scan);
return resultScanner;
}
public static ResultScanner getDataQualifierFilter(String tableName,String qualifier) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
QualifierFilter ff = new QualifierFilter(
CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes(qualifier)));
// new BinaryPrefixComparator(value) //匹配字节数组前缀
// new RegexStringComparator(expr) // 正则表达式匹配
// new SubstringComparator(substr)// 子字符串匹配
Scan scan = new Scan();
// 通过scan.addFamily(family) 也可以实现此操作
scan.setFilter(ff);
ResultScanner resultScanner = table.getScanner(scan);
return resultScanner;
}

public static ResultScanner getDataColumnPrefixFilter(String tableName,String qualifier) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
ColumnPrefixFilter ff = new ColumnPrefixFilter(Bytes.toBytes(qualifier));
Scan scan = new Scan();
// 通过QualifierFilter的 newBinaryPrefixComparator也可以实现
scan.setFilter(ff);
ResultScanner resultScanner = table.getScanner(scan);
return resultScanner;
}
/**
* 返回 多列内容, 以 cols里内的 byte[]为前缀
* @param tableName
* @param cols
* @return
* @throws IOException
*/
public static ResultScanner getDataMultipleColumnPrefixFilter(String tableName,String... cols) throws IOException{
byte[][] prefixes = new byte[cols.length][] ;
for (int i = 0; i < cols.length; i++) {
prefixes[i] = cols[i].getBytes();
}
Table table = connection.getTable(TableName.valueOf(tableName));
//返回所有行中以name或者age打头的列的数据
MultipleColumnPrefixFilter ff = new MultipleColumnPrefixFilter(prefixes);
Scan scan = new Scan();
scan.setFilter(ff);
ResultScanner rs = table.getScanner(scan);
return rs;
}

/**
* 返回所有列中从a 到 d 打头的范围的数据,
* @param tableName
* @param startCol
* @param endCol
* @return
* @throws IOException
*/
public static ResultScanner getDataColumnRangeFilter(String tableName,String startCol,String endCol) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
byte[] startColumn = Bytes.toBytes(startCol);
byte[] endColumn = Bytes.toBytes(endCol);
//返回所有列中从a到d打头的范围的数据,
ColumnRangeFilter ff = new ColumnRangeFilter(startColumn, true, endColumn, true);
Scan scan = new Scan();
scan.setFilter(ff);
ResultScanner rs = table.getScanner(scan);
return rs;
}

/**
* 返回匹配 row的内容
* @param tableName
* @param row
* @return
* @throws IOException
*/
public static ResultScanner getDataRowFilter(String tableName,String row) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
RowFilter rf = new RowFilter(CompareOp.EQUAL ,
new SubstringComparator(row));
// new BinaryPrefixComparator(value) //匹配字节数组前缀
// new RegexStringComparator(expr) // 正则表达式匹配
// new SubstringComparator(substr)// 子字符串匹配
Scan scan = new Scan();
scan.setFilter(rf);
ResultScanner rs = table.getScanner(scan);
return rs;
}

/**
* pageFilter
* @param tableName
* @return
* @throws IOException
*/
public static ResultScanner getDataPageFilter(String tableName) throws IOException{
Table table = connection.getTable(TableName.valueOf("user"));
PageFilter pf = new PageFilter(2L);
Scan scan = new Scan();
scan.setFilter(pf);
scan.setStartRow(Bytes.toBytes("zhangsan_"));
ResultScanner rs = table.getScanner(scan);
return rs;
}

public static ResultScanner getDataSkipFilter(String tableName) throws IOException{
Table table = connection.getTable(TableName.valueOf("user"));
SkipFilter sf = new SkipFilter(new ValueFilter(CompareOp.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("zhangsan"))));
Scan scan = new Scan();
scan.setFilter(sf);
ResultScanner rs = table.getScanner(scan);
return rs;
}

public static ResultScanner getDataFirstKeyOnlyFilter(String tableName) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
FirstKeyOnlyFilter fkof = new FirstKeyOnlyFilter();
Scan scan = new Scan();
scan.setFilter(fkof);
ResultScanner rs = table.getScanner(scan);
return rs;
}

public static void main(String args[]) throws Exception {

scanTable("user");
// getDataByRow("user", "zhangsan_1495527850759");
// putData("user", "wangwu_"+System.currentTimeMillis(), "userid", "id", "009");
// getDataByFamilyColumn("user", "userid","id");
// scanTable("user");
// getDataByRowFilter("user", "zhangsan");
// putData("user", "zhangsan_1495527850824", "account", "country", "china");
// for (Result result : getDataFirstKeyOnlyFilter("user")) {
// List<Cell> cells= result.listCells();
// for (Cell cell : cells) {
// String row = Bytes.toString(result.getRow());
// String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
// String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
// String value = Bytes.toString(CellUtil.cloneValue(cell));
// System.out.println("[row:"+row+"],[family:"+family1+"],[qualifier:"+qualifier+"]"
// + ",[value:"+value+"],[time:"+cell.getTimestamp()+"]");
// }
// }

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: