您的位置:首页 > 编程语言 > Java开发

java API 操作 hbase 数据库(这不是简单的例子,是可以用于运营系统的高性能源码)

2014-02-05 18:53 741 查看
这不是简单的例子,是可以用于运营系统的高性能源码(例:http://htok.net/web

package htok.tmp;

import htok.tools.*;
import htok.Path;
import htok.we.Bag;

import java.util.*;
import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.*;

public class HtokHBase
{
private static Configuration conf = null;
private static TreeMap<String,ArrayList> HTS = null;

public HtokHBase(String host,String cport,String mport)
{
Configuration HBASE_CONFIG = new Configuration();
//与hbase/conf/hbase-site.xml中hbase.zookeeper.quorum配置的值相同
HBASE_CONFIG.set("hbase.zookeeper.quorum", host);
//与hbase/conf/hbase-site.xml中hbase.zookeeper.property.clientPort配置的值相同
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", cport);
//与hbase/conf/hbase-site.xml中hbase.master.port配置的值相同
HBASE_CONFIG.set("hbase.master.port", mport);
conf = HBaseConfiguration.create(HBASE_CONFIG);
}
public HtokHBase(String host,String cport)
{
this(host,cport,"60000");
}
public HtokHBase()
{
this("localhost","2181","60000");
}
//建表
public void newTable(String tableName, String[] familys) throws Exception
{
HBaseAdmin admin = new HBaseAdmin(conf);
if (!admin.tableExists(tableName))
{
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
for(int i=0; i<familys.length; i++)
{
tableDesc.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(tableDesc);
}
admin.close();
}
//强制建表
public void creatTable(String tableName, String[] familys) throws Exception
{
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tableName))
{
admin.disableTable(tableName);//删除表前应当丢弃该表
admin.deleteTable(tableName);
}
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
for(int i=0; i<familys.length; i++)
{
tableDesc.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(tableDesc);
admin.close();
}
//删除表
public void dropTable(String tableName) throws Exception
{
HBaseAdmin admin = new HBaseAdmin(conf);
try {
admin.disableTable(tableName);//删除表前应当丢弃该表
admin.deleteTable(tableName);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
}
admin.close();
}
//创建HTable
public HTable newHTable(String TableName)
{
try{
HTable table = new HTable(conf,TableName);
if(HTS==null)
HTS = new TreeMap<String,ArrayList>();
ArrayList<TreeMap> al = HTS.get(TableName);
if(al==null)
{
al = new ArrayList<TreeMap>();
HTS.put(TableName,al);
}
TreeMap<String,Object> tm = new TreeMap<String,Object>();
tm.put("htable",table);
tm.put("status","1");
tm.put("used",1);
al.add(tm);
return table;
}catch(Exception e){}
return null;
}
//取HTable
public HTable getHTable(String TableName)
{
if(HTS==null) return newHTable(TableName);
ArrayList<TreeMap> al = HTS.get(TableName);
if(al==null) return newHTable(TableName);
Iterator<TreeMap> itr = al.iterator();
while(itr.hasNext())
{
TreeMap<String,Object> tm = itr.next();
String status = (String)tm.get("status");
if(status.equals("0"))
{
int used = Integer.parseInt((String)tm.get("status"))+1;
tm.put("used",used);
tm.put("status","1");
return (HTable)tm.get("htable");
}
}
return newHTable(TableName);
}
//归还HTable
public void reHTable(String TableName,HTable table)
{
ArrayList al = (ArrayList)HTS.get(TableName);
Iterator itr = al.iterator();
while(itr.hasNext())
{
TreeMap<String,Object> tm = (TreeMap<String,Object>)itr.next();
HTable ht = (HTable)tm.get("htable");
if(ht.equals(table))
{
tm.put("status","0");
return;
}
}
try{table.close();}catch(Exception e){}
}
//清理过期的HTable
public void clearHTS()
{
Iterator it = HTS.entrySet().iterator();
while(it.hasNext())
{
Map.Entry me = (Map.Entry)it.next();
ArrayList al = (ArrayList)me.getValue();
Iterator itr = al.iterator();
while(itr.hasNext())
{
TreeMap tm = (TreeMap)itr.next();
String status = (String)tm.get("status");
if(status.equals("0"))
{
int used = (Integer)tm.get("used");
if(used>100)
{
HTable ht = (HTable)tm.get("htable");
try{ht.close();}catch(Exception e){}
itr.remove();
}
}
}
}
}
//取一条记录
public Bag get(String tableName, String rowKey) throws IOException
{
Bag bag = new Bag(Bag.BAG);
bag.setSuffix(-1);
bag.set("table",tableName);
bag.setSuffix(bag.getLength());
bag.set("rowkey",rowKey);
HTable table = getHTable(tableName);
Get get = new Get(rowKey.getBytes());
Result rs = table.get(get);
if(rs.list()!=null)
{
for(KeyValue kv : rs.list())
{
String family = Bytes.toString(kv.getFamily());
String qualifier = (Bytes.toString(kv.getQualifier())).trim();
if(qualifier.length()>0)
{
StringBuffer sb = new StringBuffer(family);
sb.append(":");
sb.append(qualifier);
family = sb.toString();
sb = null;
}
bag.set(family,Bytes.toString(kv.getValue()));
}
}
reHTable(tableName,table);
return bag;
}
//查询记录(默认2000条)
public Bag scan(String tableName) throws IOException
{
return scan(tableName,"","",0,null);
}
public Bag scan(String tableName,String StartRow,String StopRow) throws IOException
{
return scan(tableName,StartRow,StopRow,0,null);
}
public Bag select(String tableName,String StartRow,String StopRow,long num,String[] f) throws IOException
{
String[][] filters = new String[0][0];
filters[0] = f;
return scan(tableName,StartRow,StopRow,num,filters);
}
public Bag scan(String tableName,String StartRow,String StopRow,long num,String[][] filters) throws IOException
{
Bag bag = new Bag(Bag.BAG);
bag.setSuffix(-1);
bag.set("table",tableName);
HTable table = getHTable(tableName);
Scan s = new Scan();
if(num==0) num = 2000;//记录条数
s.setMaxResultSize(num);
if(StartRow.length()>0)s.setStartRow(Bytes.toBytes(StartRow));//从这条起
if(StopRow.length()>0)s.setStopRow(Bytes.toBytes(StopRow));//到这条止
if(filters!=null)//过滤条件
{
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int i=0;i<filters.length;i++)
{
SingleColumnValueFilter filter1;
if(filters[i][3].equalsIgnoreCase("is"))
filter1 = new SingleColumnValueFilter
(
Bytes.toBytes(filters[i][0]),
Bytes.toBytes(filters[i][1]),
CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(filters[i][2])
);
else
filter1 = new SingleColumnValueFilter
(
Bytes.toBytes(filters[i][0]),
Bytes.toBytes(filters[i][1]),
CompareFilter.CompareOp.NOT_EQUAL,
Bytes.toBytes(filters[i][2])
);
list.addFilter(filter1);
}
s.setFilter(list);
}
ResultScanner rs = table.getScanner(s);
if(rs!=null)
{
for(Result r:rs)
{
bag.setSuffix(bag.getLength());
int i=0;
for(KeyValue kv : r.raw())
{
if(i==0)
bag.set("rowkey",new String(kv.getRow()));
String family = Bytes.toString(kv.getFamily());
String qualifier = (Bytes.toString(kv.getQualifier())).trim();
if(qualifier.length()>0)
{
StringBuffer sb = new StringBuffer(family);
sb.append(":");
sb.append(qualifier);
family = sb.toString();
sb = null;
}
bag.set(family,Bytes.toString(kv.getValue()));
}
}
}
reHTable(tableName,table);
return bag;
}
//添加记录
public void put(String tableName, String rowKey, String family, String qualifier, String value) throws Exception
{
HTable table = getHTable(tableName);
try{
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
table.put(put);
reHTable(tableName,table);
}catch(IOException e){reHTable(tableName,table);}
}
//添加多条记录时调用(开始)
private String tabname;
private HTable ht;
private Put put1;
private String key = "";
//到HTable对象
public void getHTable(String TableName,int WriteBuffer)
{
this.tabname = TableName;
this.ht = getHTable(TableName);
if(WriteBuffer>0)
{
try{
ht.setWriteBufferSize(WriteBuffer * 1024 * 1024);
ht.setAutoFlush(false);
}catch(Exception e){}
}
}
//添加记录
public void put(String rowKey, String family, String qualifier, String value) throws Exception
{
if(!this.key.equals(rowKey))
{
this.key = rowKey;
if(this.put1!=null)this.ht.put(put1);
this.put1 = new Put(Bytes.toBytes(this.key));
}
this.put1.add(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
}
//提交
public void commits()
{
try{
if(this.put1!=null)this.ht.put(put1);
this.ht.flushCommits();
this.key = "";
this.put1 = null;
reHTable(this.tabname,this.ht);
}catch(Exception e){reHTable(this.tabname,this.ht);}
ht = null;
}//添加多条记录时调用(结束)
//删除记录
public void del(String tableName, String rowKey) throws IOException
{
String[] rks = {rowKey};
del(tableName,rks);
}
public void del(String tableName, String[] rowKeys) throws IOException
{
HTable table = getHTable(tableName);
try{
List<Delete> list = new ArrayList<Delete>();
for(int i=0;i<rowKeys.length;i++)
{
Delete del = new Delete(rowKeys[i].getBytes());
list.add(del);
}
table.delete(list);
reHTable(tableName,table);
}catch(IOException e){reHTable(tableName,table);}
}
}
用法:

HtokHBase hb = new HtokHBase();
hb.getHTable(tablename,0);
Bag list = loa2.getBag("list");
for(int i=0;i<list.getLength();i++)
{
list.setSuffix(i);
hb.put(new StringBuffer(pPage.get("nick")).append("_").append(list.get("num_iid")).toString(),"base","title",list.get("title"));
}
hb.commits();
... ...

可根据实际需求稍加升级后直接使用
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息