hadoop2-HBase的Java API操作
2018-11-08 13:59
513 查看
Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。
项目结构如下:
我使用的Hbase的版本是
hbase-0.98.9-hadoop2-bin.tar.gz
大家下载后,可以拿到里面的lib目录下面的jar文件,即上所示的hbase-lib资源。
接口类:
/hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java
package com.b510.hbase.util.dao; import java.util.List; import org.apache.hadoop.hbase.client.HTableInterface; /** * @author Hongten * @created 7 Nov 2018 */ public interface HbaseDao { // initial table public HTableInterface getHTableFromPool(String tableName); // check if the table is exist public boolean isHTableExist(String tableName); // create table public void createHTable(String tableName, String[] columnFamilys); // insert new row public void addRow(String tableName, String rowKey, String columnFamily, String column, String value); // get row by row key public void getRow(String tableName, String rowKey); public void getAllRows(String tableName); // get rows by giving range public void getRowsByRange(String tableName, String startRowKey, String endRowKey); //delete row public void delRow(String tableName, String rowKey); //delete rows by row keys public void delRowsByRowKeys(String tableName, List<String> rowKeys); // auto flush data when close public void closeAutoFlush(HTableInterface table); // close table public void closeTable(HTableInterface table); // close pool connection public void closePoolConnection(); // delete table public void deleteHTable(String tableName); }
实现类:
/hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java
package com.b510.hbase.util.dao.impl; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import com.b510.hbase.util.dao.HbaseDao; /** * @author Hongten * @created 7 Nov 2018 */ @SuppressWarnings("deprecation") public class HbaseDaoImpl implements HbaseDao { private static Configuration conf = null; private static HBaseAdmin hAdmin; private static HTablePool pool; private static int defaultPoolSize = 5; public HbaseDaoImpl(int poolSize) { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888"); try { hAdmin = new HBaseAdmin(conf); // the default pool size is 5. pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } @Override public HTableInterface getHTableFromPool(String tableName) { HTableInterface table = pool.getTable(tableName); return table; } @Override public boolean isHTableExist(String tableName) { try { return hAdmin.tableExists(tableName); } catch (IOException e) { e.printStackTrace(); } return false; } @Override public void createHTable(String tableName, String[] columnFamilys) { if (!isHTableExist(tableName)) { HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); // The Hbase suggested the number of column family should be less than 3. // Normally, there only have 1 column family. for (String cfName : columnFamilys) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName); tableDescriptor.addFamily(hColumnDescriptor); } try { hAdmin.createTable(tableDescriptor); } catch (IOException e) { e.printStackTrace(); } System.out.println("The table [" + tableName + "] is created."); } else { System.out.println("The table [" + tableName + "] is existing already."); } } @Override public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) { if (isHTableExist(tableName)) { HTableInterface table = getHTableFromPool(tableName); Put put = new Put(rowKey.getBytes()); put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes()); try { table.put(put); } catch (IOException e) { e.printStackTrace(); } System.out.println("Insert into table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + column + "], Vlaue=[" + value + "]."); closeTable(table); } else { System.out.println("The table [" + tableName + "] does not exist."); } } @Override public void getRow(String tableName, String rowKey) { if (isHTableExist(tableName)) { HTableInterface table = getHTableFromPool(tableName); Get get = new Get(rowKey.getBytes()); Result result; try { result = table.get(get); String columnName = ""; String timeStamp = ""; String columnFamily = ""; String value = ""; for (Cell cell : result.rawCells()) { timeStamp = String.valueOf(cell.getTimestamp()); columnFamily = new String(CellUtil.cloneFamily(cell)); columnName = new String(CellUtil.cloneQualifier(cell)); value = new String(CellUtil.cloneValue(cell)); System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "]."); } } catch (IOException e) { e.printStackTrace(); } closeTable(table); } else { System.out.println("The table [" + tableName + "] does not exist."); } } @Override public void getAllRows(String tableName) { if (isHTableExist(tableName)) { Scan scan = new Scan(); scanHTable(tableName, scan); } else { System.out.println("The table [" + tableName + "] does not exist."); } } private void scanHTable(String tableName, Scan scan) { try { HTableInterface table = getHTableFromPool(tableName); ResultScanner results = table.getScanner(scan); for (Result result : results) { String rowKey = ""; String columnName = ""; String timeStamp = ""; String columnFamily = ""; String value = ""; for (Cell cell : result.rawCells()) { rowKey = new String(CellUtil.cloneRow(cell)); timeStamp = String.valueOf(cell.getTimestamp()); columnFamily = new String(CellUtil.cloneFamily(cell)); columnName = new String(CellUtil.cloneQualifier(cell)); value = new String(CellUtil.cloneValue(cell)); System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "]."); } } closeTable(table); } catch (IOException e) { e.printStackTrace(); } } @Override public void getRowsByRange(String tableName, String startRowKey, String endRowKey) { if (isHTableExist(tableName)) { Scan scan = new Scan(); scan.setStartRow(startRowKey.getBytes()); // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive). // the hbase version is 0.98.9 scan.setStopRow(endRowKey.getBytes()); scanHTable(tableName, scan); } else { System.out.println("The table [" + tableName + "] does not exist."); } } @Override public void delRow(String tableName, String rowKey) { if (isHTableExist(tableName)) { HTableInterface table = getHTableFromPool(tableName); deleteRow(table, rowKey); } else { System.out.println("The table [" + tableName + "] does not exist."); } } private void deleteRow(HTableInterface table, String rowKey) { Delete del = new Delete(rowKey.getBytes()); try { table.delete(del); System.out.println("Delete from table [" + new String(table.getTableName()) + "], Rowkey=[" + rowKey + "]."); closeTable(table); } catch (IOException e) { e.printStackTrace(); } } @Override public void delRowsByRowKeys(String tableName, List<String> rowKeys) { if (rowKeys != null && rowKeys.size() > 0) { for (String rowKey : rowKeys) { delRow(tableName, rowKey); } } } @Override public void deleteHTable(String tableName) { if (isHTableExist(tableName)) { try { hAdmin.disableTable(tableName.getBytes()); hAdmin.deleteTable(tableName.getBytes()); System.out.println("The table [" + tableName + "] is deleted."); } catch (IOException e) { e.printStackTrace(); } } else { System.out.println("The table [" + tableName + "] does not exist."); } } @Override public void closeAutoFlush(HTableInterface table) { table.setAutoFlush(false, false); } @Override public void closeTable(HTableInterface table) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void closePoolConnection() { try { pool.close(); } catch (IOException e) { e.printStackTrace(); } } }
测试类:
/hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java
package com.b510.hbase.util.dao.test; import java.util.ArrayList; import java.util.List; import org.junit.Test; import com.b510.hbase.util.dao.HbaseDao; import com.b510.hbase.util.dao.impl.HbaseDaoImpl; /** * @author Hongten * @created 7 Nov 2018 */ public class HbaseDaoTest { HbaseDao dao = new HbaseDaoImpl(4); public static final String tableName = "t_test"; public static final String columnFamilyName = "cf1"; public static final String[] CFs = { columnFamilyName }; public static final String COLUMN_NAME_NAME = "name"; public static final String COLUMN_NAME_AGE = "age"; @Test public void main() { createTable(); addRow(); getRow(); getAllRows(); getRowsByRange(); delRow(); delRowsByRowKeys(); deleteHTable(); } public void createTable() { System.out.println("=== create table ===="); dao.createHTable(tableName, CFs); } public void addRow() { System.out.println("=== insert record ===="); dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten"); dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22"); dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom"); dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25"); dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone"); dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30"); dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs"); dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24"); } public void getRow() { System.out.println("=== get record ===="); dao.getRow(tableName, "12345566"); } public void getAllRows() { System.out.println("=== scan table ===="); dao.getAllRows(tableName); } public void getRowsByRange() { System.out.println("=== scan record by giving range ===="); // it will return the '12345567' and '12345568' rows. dao.getRowsByRange(tableName, "12345567", "12345569"); } public void delRow() { System.out.println("=== delete record ===="); dao.delRow(tableName, "12345568"); // only '12345567' row. getRowsByRange(); } public void delRowsByRowKeys() { System.out.println("=== delete batch records ===="); List<String> rowKeys = new ArrayList<String>(); rowKeys.add("12345566"); rowKeys.add("12345569"); dao.delRowsByRowKeys(tableName, rowKeys); // can not find the '12345566' and '12345569' getAllRows(); } public void deleteHTable() { System.out.println("=== delete table ===="); dao.deleteHTable(tableName); } }
测试结果:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. === create table ==== The table [t_test] is created. === insert record ==== Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten]. Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24]. === get record ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. === scan table ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. === delete record ==== Delete from table [t_test], Rowkey=[12345568]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete batch records ==== Delete from table [t_test], Rowkey=[12345566]. Delete from table [t_test], Rowkey=[12345569]. === scan table ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete table ==== The table [t_test] is deleted.
源码下载:
========================================================
More reading,and english is important.
I'm Hongten
大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
Hongten博客排名在100名以内。粉丝过千。
Hongten出品,必是精品。
E | hongtenzone@foxmail.com B | http://www.cnblogs.com/hongten
========================================================
相关文章推荐
- hbase-0.98整合hadoop-2.6,附java操作代码
- HBase Java API使用操作例子
- HBase 6、用Phoenix Java api操作HBase
- 用JAVA的API操作HBASE
- JAVA操作HDFS API(hadoop)
- 在集群中Java 通过调用API操作HBase 0.98
- Hadoop之——Java操作HBase
- 利用javaApI【eclipse】操作HBase时,出现异常的解决方案
- Hadoop学习二(java api调用操作HDFS)
- Hbase的Java开发API操作实现
- Hadoop学习笔记(十六)---HBase JAVA API
- 从HDFS读取文件,把记录存到Hbase的java API操作
- java api操作hbase
- 4000 hadoop入门(三)之 javaAPI操作Hdfs,进行文件操作
- hbase快速入门-- java api 操作
- HBase 6、用Phoenix Java api操作HBase
- Hadoop系列-HDFS文件操作的JAVA API用法(七)
- 【十八掌●武功篇】第八掌:HBase之基本操作Java API
- HBase(0.96)新的Java API操作
- hadoop系列之五JavaAPI操作HDFS文本系统