您的位置:首页 > 编程语言 > Java开发

java 写的spark 控制hbase

2017-04-20 18:40 162 查看
package DAO;
import java.io.IOException;
import java.util.List;
import java.util.UUID;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkHBase {

private static final Logger LOG = LoggerFactory.getLogger(SparkHBase.class);

public static void createTable(Configuration conf,String tablename,String[] columnFamily) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
//HBaseAdmin admin=new HBaseAdmin(conf);

HBaseAdmin admin = (HBaseAdmin) ConnectionFactory.createConnection(conf).getAdmin();
if(admin.tableExists(tablename)){
LOG.info(tablename + " Table exists!");
}else{
HTableDescriptor tableDesc=new HTableDescriptor(TableName.valueOf(tablename));
for (int i = 0; i < columnFamily.length; i++) {
HColumnDescriptor columnDesc = new HColumnDescriptor(columnFamily[i]);
tableDesc.addFamily(columnDesc);
}
admin.createTable(tableDesc);
LOG.info(tablename + " create table success!");
}
admin.close();
}

public void dropTable(Configuration conf,String tableName) {
try {
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void addRow(Table table,String rowKey,String columnFamily,String key,String value) throws IOException{
Put rowPut=new Put(Bytes.toBytes(rowKey));
rowPut.addColumn(columnFamily.getBytes(),key.getBytes(),value.getBytes());
table.put(rowPut);
//        System.out.println("put '"+rowKey+"', '"+columnFamily+":"+key+"', '"+value+"'");
}

public static void putRow(HTable table,String rowKey,String columnFamily,String key,String value) throws IOException{
Put rowPut=new Put(Bytes.toBytes(rowKey));
rowPut.add(columnFamily.getBytes(),key.getBytes(),value.getBytes());
table.put(rowPut);
//    System.out.println("put '"+rowKey+"', '"+columnFamily+":"+key+"', '"+value+"'");
}

/**
* 鎵归噺娣诲姞鏁版�?
*
* @param list
* @throws IOException
*/

public void addDataBatch(HTable table,List<Put> list) {
try {
table.put(list);
} catch (RetriesExhaustedWithDetailsException e) {
LOG.error(e.getMessage());
} catch (IOException e) {
LOG.error(e.getMessage());
}
}

/**
* 鏌ヨ鍏ㄩ儴
*/
public void queryAll(HTable table) {
Scan scan = new Scan();
try {
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
int i = 0;
for (KeyValue rowKV : result.list()) {
if (i++ == 0) {
System.out.print("rowkey:" + new String(rowKV.getRow()) + " ");
}
}

System.out.println();
}
} catch (IOException e) {
LOG.error(e.getMessage());
}
}

/**
* 鎸夋煇�?�楁鏌ヨ column = value 鐨勬暟鎹�?
<
f1ec
span style="color:#808080;">     *
* @param queryColumn
*            瑕佹煡璇㈢殑鍒楀�?
* @param value
*            杩囨护鏉′欢鍊�
* @param columns
*            杩斿洖鐨勫垪鍚嶉泦鍚�?
*/
public ResultScanner queryBySingleColumn(HTable table,String queryColumn, String value, String[] columns) {
if (columns == null || queryColumn == null || value == null) {
return null;
}

try {
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(queryColumn), Bytes.toBytes(queryColumn), CompareOp.EQUAL, new SubstringComparator(value));
Scan scan = new Scan();

for (String columnName : columns) {
scan.addColumn(Bytes.toBytes(columnName), Bytes.toBytes(columnName));
}

scan.setFilter(filter);
return table.getScanner(scan);
} catch (Exception e) {
LOG.error(e.getMessage());
}

return null;
}

public static Result getRow(Table table,String rowKey) throws IOException{
Get get=new Get(Bytes.toBytes(rowKey));
Result result=table.get(get);
//System.out.println("Get: "+result);
return result;
}

/**
* 鍦ㄦ寚�?�氱殑鏉′欢涓嬶紝鎸夋煇涓�瓧娈佃仛鍚�
* @param paramMap 鍙傛暟鏉′欢
* @param dimensionColumns 缁村�?
* @param aggregateColumn 鑱氬悎�?�楁�?
* @return 杩斿洖map锛宬ey 涓篸imensionColumns 缁村害鐩稿搴旂殑鏁版嵁锛寁alue 涓篴ggregateColumn 瀛楁�?�瑰簲鐨勫�
*/

public Map<String, Long> aggregateBySingleColumn(Map<String, String> paramMap, String[] dimensionColumns, String aggregateColumn) {
if (dimensionColumns == null || dimensionColumns.length == 0 || paramMap == null || aggregateColumn == null || aggregateColumn.equals("")) {
return null;
}

HTable table = null;
Map<String, Long> map = null;
try {
FilterList filterList = new FilterList();
Scan scan = new Scan();
//濞h濮炴潻鍥ㄦ姢閺夆�娆�
for (String paramKey : paramMap.keySet()) {
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(paramKey), Bytes.toBytes(paramKey), CompareOp.EQUAL, new SubstringComparator(paramMap.get(paramKey)));
filterList.addFilter(filter);
}
scan.setFilter(filterList);

//鐟曚礁鐫嶉悳鎵畱閸掞拷
for (String column : dimensionColumns) {
scan.addColumn(Bytes.toBytes(column), Bytes.toBytes(column));
}
scan.addColumn(Bytes.toBytes(aggregateColumn), Bytes.toBytes(aggregateColumn));

ResultScanner results = table.getScanner(scan);

//鐏忓棙鐓$拠銏㈢波閺嬫粍鏂侀崗顧砤p 娑擄�?
map = new ConcurrentHashMap<String, Long>();
for (Result result : results) {
//              String dimensionKey = "";
StringBuilder dimensionKey = new StringBuilder();
//閸欐牕锟�?
String value = new String(result.getValue(Bytes.toBytes(aggregateColumn), Bytes.toBytes(aggregateColumn)));
Long aggregateValue = value == null? 0 : Long.parseLong(value);

//閹峰吋甯碖ey
for (String column : dimensionColumns) {
dimensionKey.append("\t" + new String(result.getValue(Bytes.toBytes(column), Bytes.toBytes(column))));
}
dimensionKey = dimensionKey.deleteCharAt(0);

if(map.containsKey(dimensionKey)) {
map.put(dimensionKey.toString(), map.get(dimensionKey.toString()) + aggregateValue);
} else {
map.put(dimensionKey.toString(), aggregateValue);
}
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
return map;
}

//    public static void main(String[] args) throws Exception {
//     Configuration conf= Conn.getHbaseConf();
//    HTable table =new HTable(conf, "test");
//
//    try {
//
//       String[] familyColumn= new String[]{"test1","test2"};
//
//       createTable(conf,"test",familyColumn);
//       UUID uuid = UUID.randomUUID();
//       String s_uuid = uuid.toString();
//       SparkHBase.putRow(table, s_uuid, "uuid", "col1", s_uuid);
//       SparkHBase.getRow(table, s_uuid);
//
//
////            util.queryAll();
//    //     Map<String, String> paramMap = new HashMap<String, String>();
//    //     paramMap.put("stat_date", "2016-02-03");
//    //     Map<String, Long> map = SparkHBase.aggregateBySingleColumn(paramMap, new String[]{"date", "name"}, "pv");
//
//
//  //       for (String key : map.keySet()) {
//   //          System.out.println(key + "\t" + map.get(key));
//  //       }
//
//
//
//    } catch (Exception e) {
//       if(e.getClass().equals(MasterNotRunningException.class)){
//          System.out.println("MasterNotRunningException");
//
//       }
//       if(e.getClass().equals(ZooKeeperConnectionException.class)){
//          System.out.println("ZooKeeperConnectionException");
//
//       }
//       if(e.getClass().equals(IOException.class)){
//          System.out.println("IOException");
//       }
//       e.printStackTrace();
//    }finally{
//       if(null!=table){
//          table.close();
//       }
//    }

/**
System.out.println("00--------------");
//   if (admin.tableExists("table1")) {

//     System.out.println("001--------------");

//       admin.disableTable("table1");
//        admin.deleteTable("table1");
//      }
System.out.println("1--------------");

HTableDescriptor tableDescripter = new HTableDescriptor("table");

System.out.println("2--------------");
tableDescripter.addFamily(new HColumnDescriptor("one"));
tableDescripter.addFamily(new HColumnDescriptor("two"));
tableDescripter.addFamily(new HColumnDescriptor("three"));
System.out.println("3--------------");
//     admin.createTable(tableDescripter);

/**

//Put閹垮秳缍�?
HTable table = new HTable(conf, "user");
Put put = new Put(Bytes.toBytes("row6"));
put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("value6"));
table.put(put);
table.flushCommits();

//Delete閹垮秳缍�?
Delete delete = new Delete(Bytes.toBytes("row1"));
table.delete(delete);

table.close();

//Scan閹垮秳缍�?
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("0120140722"));
scan.setStopRow(Bytes.toBytes("1620140728"));
scan.addFamily(Bytes.toBytes("basic"));
scan.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"));

String tableName = "user";
conf.set(TableInputFormat.INPUT_TABLE, tableName);

ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(out);
// scan.write(dos);
String scanStr = Base64.encodeBytes(out.toByteArray());
IOUtils.closeQuietly(dos);
IOUtils.closeQuietly(out);
//妤傛澧楅張顒�讲娴犮儳鏁ゆ俊鍌欑瑓閺傜懓绱¢敍锟�
//ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
//String scanStr = Base64.encodeBytes(proto.toByteArray());
conf.set(TableInputFormat.SCAN, scanStr);

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = context
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

Long count = hBaseRDD.count();
System.out.println("count: " + count);

List<Tuple2<ImmutableBytesWritable, Result>> tuples = hBaseRDD
.take(count.intValue());
for (int i = 0, len = count.intValue(); i < len; i++) {
Result result = tuples.get(i)._2();
KeyValue[] kvs = result.raw();
for (KeyValue kv : kvs) {
System.out.println("rowkey:" + new String(kv.getRow()) + " cf:"
+ new String(kv.getFamily()) + " column:"
+ new String(kv.getQualifier()) + " value:"
+ new String(kv.getValue()));
}
}
*/
//    }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐