您的位置:首页 > 编程语言 > Java开发

Java连接HBASE数据库,创建一个表,删除一张表,修改表,输出插入,修改,数据删除,数据获取,显示表信息,过滤查询,分页查询,地理hash

2017-06-23 17:46 1501 查看
准备工作

1、创建Java的Maven项目

创建好的目录结构如下:



另外注意junit的版本,最好不要太高,最开始笔者使用的junit4.12的,发现运行的时候会报错。最后把Junit的版本改成4.7的之后,问题解决了。

创建测试工具类HbaseDemo,为了保证能够让HBASE运行,需要最开始写出如下配置:

package toto.com.hbase;

import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

/**
* 代码说明
*
* @author tuzq
* @create 2017-06-22 20:48
*/
public class HbaseDemo {
private Configuration conf = null;
private Connection conn = null;

@Before
public void init() throws Exception {
conf = HBaseConfiguration.create();
conn = ConnectionFactory.createConnection();
}
}


2、创建一个表

@Test
public void testCreate() throws Exception {
//获取一个表管理器
Admin admin = conn.getAdmin();
//表的描述器
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("t_user_info"));

//列族的描述器
HColumnDescriptor hcd1 = new HColumnDescriptor("base_info");
//BloomType.ROW 快速搜索一个数据在哪个文件块中
hcd1.setBloomFilterType(BloomType.ROW).setVersions(1,3);

//列族的描述器
HColumnDescriptor hcd2 = new HColumnDescriptor("extra_info");
hcd2.setBloomFilterType(BloomType.ROW).setVersions(1,3);

//添加列族
htd.addFamily(hcd1).addFamily(hcd2);

//创建表
admin.createTable(htd);
admin.close();
conn.close();
}


运行后的成功的结果状态:



进入HBASE中,查看数据信息



3、删除一张表

@Test
public void testDrop() throws Exception {
Admin admin = conn.getAdmin();
admin.disableTable(TableName.valueOf("t_user_info2"));
admin.deleteTable(TableName.valueOf("t_user_info2"));
admin.close();
conn.close();
}


执行后的效果如下:



进入HBASE中查看效果:



4.其它操作api

@Test
public void testModify() throws Exception {
Admin admin = conn.getAdmin();
// 修改已有ColumnFamily
HTableDescriptor table = admin.getTableDescriptor(TableName.valueOf("t_user_info"));
HColumnDescriptor f2 = table.getFamily("extra_info".getBytes());
f2.setBloomFilterType(BloomType.ROWCOL);
// 添加新的ColumnFamily
table.addFamily(new HColumnDescriptor("other_info"));

admin.modifyTable(TableName.valueOf("t_user_info"), table);
admin.close();
conn.close();
}

/**
* 插入/修改 数据
* @throws Exception
*/
@Test
public void testPut() throws Exception {
Table table = conn.getTable(TableName.valueOf("t_user_info"));

ArrayList<Put> puts = new ArrayList<Put>();

Put put01 = new Put(Bytes.toBytes("user001"));
put01.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhangsan"));

Put put02 = new Put("user001".getBytes());
put02.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("password"), Bytes.toBytes("123456"));

Put put03 = new Put("user002".getBytes());
put03.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("lisi"));
put03.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));

Put put04 = new Put("zhang_sh_01".getBytes());
put04.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang01"));
put04.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));

Put put05 = new Put("zhang_sh_02".getBytes());
put05.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang02"));
put05.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));

Put put06 = new Put("liu_sh_01".getBytes());
put06.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("liu01"));
put06.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));

Put put07 = new Put("zhang_bj_01".getBytes());
put07.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang03"));
put07.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));

Put put08 = new Put("zhang_bj_01".getBytes());
put08.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang04"));
put08.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));

puts.add(put01);
puts.add(put02);
puts.add(put03);
puts.add(put04);
puts.add(put05);
puts.add(put06);
puts.add(put07);
puts.add(put08);

table.put(puts);
table.close();
conn.close();
}

@Test
public void testDel() throws Exception {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
Delete delete = new Delete("user001".getBytes());
delete.addColumn("base_info".getBytes(), "password".getBytes());
t_user_info.delete(delete);

t_user_info.close();
conn.close();
}


@Test
public void testGet() throws Exception {
Table table = conn.getTable(TableName.valueOf("t_user_info"));

Get get = new Get("user001".getBytes());
Result result = table.get(get);
CellScanner cellScanner = result.cellScanner();
while(cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] valueArray = current.getValueArray();

System.out.println(new String(familyArray,current.getFamilyLength(),current.getFamilyLength()));
System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
}
table.close();
table.close();
}


运行结果:



@Test
public void testScan() throws Exception {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
Scan scan = new Scan();

ResultScanner scanner = t_user_info.getScanner(scan);
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
Result result = iter.next();
CellScanner cellScanner = result.cellScanner();
while(cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] valueArray = current.getValueArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] rowArray = current.getRowArray();

System.out.println(new String(rowArray, current.getRowOffset(), current.getRowLength()));
System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
System.out.println(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
}
System.out.println("-----------------------");
}
}


运行结果:



其它代码(过滤,扫描,分页查询):

/**
* 下面的内容一个注释一个注释的放开,然后可以看到过滤后的效果
* @throws Exception
*/
@Test
public void testFilter() throws Exception {
// 针对行键的前缀过滤器
/*Filter pf = new PrefixFilter(Bytes.toBytes("liu"));
testScan(pf);*/

// 行过滤器
/*RowFilter rf1 = new RowFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("user001")));
RowFilter rf2 = new RowFilter(CompareOp.EQUAL, new SubstringComparator("00"));
testScan(rf1);
System.out.println("**********");
testScan(rf2);*/

// 针对指定一个列的value来过滤
/*SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "password".getBytes(), CompareOp.EQUAL, "123456".getBytes());
scvf.setFilterIfMissing(true);   // 如果指定的列缺失,则也过滤掉
testScan(scvf);*/

/*ByteArrayComparable comparator1 = new RegexStringComparator("^zhang");
ByteArrayComparable comparator2 = new SubstringComparator("ang");
SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareOp.EQUAL, comparator1);
testScan(scvf);*/

// 针对列族名的过滤器
/*FamilyFilter ff1 = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("inf")));
FamilyFilter ff2 = new FamilyFilter(CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
testScan(ff1);*/

// 针对列名的过滤器
/*QualifierFilter qf = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password")));
QualifierFilter qf2 = new QualifierFilter(CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("us")));
testScan(qf);*/

// 跟SingleColumnValueFilter结果不同,只返回符合条件的该column
/*ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes());
testScan(cf);*/

/*byte[][] prefixes = new byte[][] { Bytes.toBytes("username"),Bytes.toBytes("password") };
MultipleColumnPrefixFilter mcf = new MultipleColumnPrefixFilter(prefixes);
testScan(mcf);*/

/*FamilyFilter ff2 = new FamilyFilter(CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes());
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
filterList.addFilter(ff2);
filterList.addFilter(cf);
testScan(filterList);*/
}

public void testScan(Filter filter) throws Exception {

Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));

Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = t_user_info.getScanner(scan);

Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
Result result = iter.next();
CellScanner cellScanner = result.cellScanner();
while (cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] valueArray = current.getValueArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] rowArray = current.getRowArray();

System.out.println(new String(rowArray, current.getRowOffset(), current.getRowLength()));
System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
System.out.println(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
}
System.out.println("-----------------------");
}
}

/**
* 分页查询
*
* @throws Exception
*/
@Test
public void pageScan() throws Exception{
final byte[] POSTFIX = new byte[] { 0x00 };
Table table = conn.getTable(TableName.valueOf("t_user_info"));
Filter filter = new PageFilter(3);   // 一次需要获取一页的条数
byte[] lastRow = null;
int totalRows = 0;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
byte[] startRow = Bytes.add(lastRow,POSTFIX);   //设置本次查询的起始行键
scan.setStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while((result = scanner.next()) != null){
System.out.println(localRows++ + ":" + result);
totalRows ++;
lastRow = result.getRow();
}
scanner.close();
if(localRows == 0) break;
Thread.sleep(2000);
}
System.out.println("total rows:" + totalRows);
}

public static void main(String[] args) throws Exception {
HbaseDemo demo = new HbaseDemo();
demo.init();
demo.testScan();
}


地理hash的代码:

import java.util.BitSet;
import java.util.HashMap;

public class Geohash {

private static int numbits = 6 * 5;
final static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z' };

final static HashMap<Character, Integer> lookup = new HashMap<Character, Integer>();
static {
int i = 0;
for (char c : digits)
lookup.put(c, i++);
}

public double[] decode(String geohash) {
StringBuilder buffer = new StringBuilder();
for (char c : geohash.toCharArray()) {

int i = lookup.get(c) + 32;
buffer.append(Integer.toString(i, 2).substring(1));
}

BitSet lonset = new BitSet();
BitSet latset = new BitSet();

// even bits
int j = 0;
for (int i = 0; i < numbits * 2; i += 2) {
boolean isSet = false;
if (i < buffer.length())
isSet = buffer.charAt(i) == '1';
lonset.set(j++, isSet);
}

// odd bits
j = 0;
for (int i = 1; i < numbits * 2; i += 2) {
boolean isSet = false;
if (i < buffer.length())
isSet = buffer.charAt(i) == '1';
latset.set(j++, isSet);
}
// 中国地理坐标:东经73°至东经135°,北纬4°至北纬53°
double lon = decode(lonset, 70, 140);
double lat = decode(latset, 0, 60);

return new double[] { lat, lon };
}

private double decode(BitSet bs, double floor, double ceiling) {
double mid = 0;
for (int i = 0; i < bs.length(); i++) {
mid = (floor + ceiling) / 2;
if (bs.get(i))
floor = mid;
else
ceiling = mid;
}
return mid;
}

public String encode(double lat, double lon) {
BitSet latbits = getBits(lat, 0, 60);
BitSet lonbits = getBits(lon, 70, 140);
StringBuilder buffer = new StringBuilder();
for (int i = 0; i < numbits; i++) {
buffer.append((lonbits.get(i)) ? '1' : '0');
buffer.append((latbits.get(i)) ? '1' : '0');
}
return base32(Long.parseLong(buffer.toString(), 2));
}

private BitSet getBits(double lat, double floor, double ceiling) {
BitSet buffer = new BitSet(numbits);
for (int i = 0; i < numbits; i++) {
double mid = (floor + ceiling) / 2;
if (lat >= mid) {
buffer.set(i);
floor = mid;
} else {
ceiling = mid;
}
}
return buffer;
}

public static String base32(long i) {
char[] buf = new char[65];
int charPos = 64;
boolean negative = (i < 0);
if (!negative)
i = -i;
while (i <= -32) {
buf[charPos--] = digits[(int) (-(i % 32))];
i /= 32;
}
buf[charPos] = digits[(int) (-i)];

if (negative)
buf[--charPos] = '-';
return new String(buf, charPos, (65 - charPos));
}

public static void main(String[] args) {
Geohash geohash = new Geohash();
String encode1 = geohash.encode(123.3345, 78.2247);
String encode2 = geohash.encode(123.3446, 78.2249);
String encode3 = geohash.encode(120.3346, 108.2249);
System.out.println(encode1);
System.out.println(encode2);
System.out.println(encode3);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐