您的位置:首页 > 编程语言 > Java开发

Hbase-1.1.1-java API

2015-07-21 15:32 369 查看
1.工具类

package com.lixin.stuty.hbase;

import java.io.IOException;

import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
/**
* hbase for version 1.1.1
* @author Administrator
*
*/
public class HBaseUtil {
public static final String ZK_QUORUM  = "hbase.zookeeper.quorum";
public static final String ZK_CLIENTPORT  = "hbase.zookeeper.property.clientPort";
private Configuration conf = HBaseConfiguration.create();
private Connection connection ;
private Admin admin;

public HBaseUtil(String zk_quorum) {
conf.set(ZK_QUORUM, zk_quorum);
init();
}

public HBaseUtil(String zk_quorum,String zk_clientPort) {
conf.set(ZK_QUORUM, zk_quorum);
conf.set(ZK_CLIENTPORT, zk_clientPort);
init();
}

private void init(){
try {
//Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
connection = ConnectionFactory.createConnection(conf);
admin  = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
public void close(){
try {
if(admin != null) admin.close();
if(connection!=null) connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 创建一个表
* @param table_name 表名称
* @param family_names 列族名称集合
* @throws IOException
*/
public void create(String table_name,String... family_names) throws IOException{
//获取TableName
TableName tableName = TableName.valueOf(table_name);
//table 描述
HTableDescriptor htabledes =  new HTableDescriptor(tableName);
for(String family_name : family_names){
//column 描述
HColumnDescriptor family = new HColumnDescriptor(family_name);
htabledes.addFamily(family);
}
admin.createTable(htabledes);
}
/**
* 增加一条记录
* @param table_name 表名称
* @param row    rowkey
* @param family 列族名称
* @param qualifier 列族限定符(可以为null)
* @param value 值
* @throws IOException
*/
public void addColumn(String table_name,String row, String family,String qualifier,String value) throws IOException{
//表名对象
TableName tableName = TableName.valueOf(table_name);
//表对象
Table table = connection.getTable(tableName);
// put对象 负责录入数据
Put put = new Put(row.getBytes());
put.addColumn(family.getBytes(), qualifier.getBytes(), value.getBytes());
table.put(put);
}
/**
* 判断表是否存在
*/
public boolean tableExist(String table_name) throws IOException{
return admin.tableExists(TableName.valueOf(table_name));
}
/**删除表*/
public void deleteTable(String table_name) throws IOException{
TableName tableName = TableName.valueOf(table_name);
if(admin.tableExists(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
}
/**
* 查询单个row的记录
* @param table_name 表明
* @param row  行键
* @param family  列族
* @param qualifier  列族成员
* @return
* @throws IOException
*/
public Cell[] getRow(String table_name,String row,String family,String qualifier) throws IOException{
Cell[] cells = null;
//check
if(StringUtils.isEmpty(table_name)||StringUtils.isEmpty(row)){
return null;
}
//Table
Table table = connection.getTable(TableName.valueOf(table_name));
Get get = new Get(row.getBytes());
//判断在查询记录时,是否限定列族和子列(qualifier).
if(StringUtils.isNotEmpty(family)&&StringUtils.isNotEmpty(qualifier)){
get.addColumn(family.getBytes(), qualifier.getBytes());
}
if(StringUtils.isNotEmpty(family)&&StringUtils.isEmpty(qualifier)){
get.addFamily(family.getBytes());
}
Result result = table.get(get);
cells = result.rawCells();
return cells;
}
/**
* 获取表中的所有记录,可以指定列族,列族成员,开始行键,结束行键.
* @param table_name
* @param family
* @param qualifier
* @param startRow
* @param stopRow
* @return
* @throws IOException
*/
public ResultScanner getScan(String table_name,String family,String qualifier,String startRow,String stopRow) throws IOException{
ResultScanner resultScanner = null;

//Table
Table table = connection.getTable(TableName.valueOf(table_name));
Scan scan = new Scan();
if(StringUtils.isNotBlank(family)&& StringUtils.isNotEmpty(qualifier)){
scan.addColumn(family.getBytes(), qualifier.getBytes());
}
if(StringUtils.isNotEmpty(family)&& StringUtils.isEmpty(qualifier)){
scan.addFamily(family.getBytes());
}
if(StringUtils.isNotEmpty(startRow)){
scan.setStartRow(startRow.getBytes());
}
if(StringUtils.isNotEmpty(stopRow)){
scan.setStopRow(stopRow.getBytes());
}
resultScanner = table.getScanner(scan);

return resultScanner;
}
}


2.测试:

package com.lixin.stuty.hbase;

import static org.junit.Assert.*;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

public class HBaseUtilTest {
private HBaseUtil hu = null;
@Before
public void init(){
String zk_quorum  = "172.21.135.148";
String zk_clientPort = "2181";
hu = new HBaseUtil(zk_quorum, zk_clientPort);
}
@Test
public void testCreate() throws IOException {
String table_name = "users";
String[] fanily_names = new String[]{"user_id","address","info"};
hu.create(table_name, fanily_names);
hu.close();
}
@Test
public void testIsExist() throws IOException{
String table_name = "sitech";
System.out.println(hu.tableExist(table_name));
hu.close();
}
@Test
public void testDelete() throws IOException{
String table_name = "person1";
hu.deleteTable(table_name);
hu.close();
}
@Test
public void testGetRow() throws IOException{
String table_name = "users";
String row = "xiaoming";
String family = "address";
String qualifier = "";
Cell[] cells = hu.getRow(table_name, row, family, qualifier);
for(Cell cell : cells){
String recode_row = Bytes.toString(CellUtil.cloneRow(cell));
String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier1 = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(recode_row+"\t"+family1+"\t"+qualifier1+"\t"+value);
}
hu.close();
}
@Test
public void testGetScanner() throws IOException{
String table_name = "users";
String family = "address";
String qualifier = "city";
String startRow = "xiaoming";
String stopRow = "xiaoming";

ResultScanner resultScanner  = hu.getScan(table_name, family, qualifier, startRow, stopRow);
Iterator<Result> iterator = resultScanner.iterator();
while(iterator.hasNext()){
Result result = iterator.next();
Cell[] rawCells = result.rawCells();
for(Cell cell : rawCells){
String recode_row = Bytes.toString(CellUtil.cloneRow(cell));
String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier1 = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(recode_row+"\t"+family1+"\t"+qualifier1+"\t"+value);
}
}
hu.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: