您的位置:首页 > 大数据

spark-hbase数据操作心得

2017-08-26 19:47 387 查看
通过spark程序读写hbase的方法百度上太多了,这里我就不一一列举,这里我要分享的是我在开发spark程序对hbase读写时喜欢使用的方法,水平有限,还望指点。

我的hbase表结构是简单的rowkey + cfamily:”INFO” + value

首先,定义几个方法

/**
* 扫描全表,返回数据集
* @param connection: Connection
* @param tableName: String
* @return Map[String,List[(String,String)]]  返回值为rowKey->List[(column ,value)]
*/
def getResultMap(connection: Connection, tableName: String): Map[String, List[(String, String)]] = {
connection.getTable(TableName.valueOf(tableName)).getScanner(new Scan).toList
.map(result =>
(new String(result.getRow),result.raw.toList.map(cv => (new String(cv.getQualifier), new String(cv.getValue))))
).toMap
}

/**
* 通过rowkey,get对应的结果
* @param connection: Connection
* @param tableName: String
* @param key: String
* @return Map[String, String]  返回值为column -> value
*/
def getResultMap(connection: Connection, tableName: String, key: String): Map[String, String] = {
val userTable = TableName.valueOf(tableName)
val table = connection.getTable(userTable)
val g = new Get(key.getBytes())
table.get(g).raw.toList.map(res => (new String(res.getQualifier), new String(res.getValue))).toMap
}

/**
* 根据cate获取对应所有数据List
* @param connection
* @param tableName
* @param cate       ["R","C","V"]=>代表要取所有的rowkey还是column还是value
* @return
*/
def getOneOfResList(connection: Connection, tableName: String, cate: String): List[String] = {
val resList = connection.getTable(TableName.valueOf(tableName)).getScanner(new Scan).toList
cate match {
case "R" => resList.map(res => new String(res.getRow))
case "C" => resList.map(res => res.raw.toList.flatMap(cv => new String(cv.getQualifier)))
case "V" => resList.map(res => res.raw.toList.flatMap(cv => new String(cv.getValue)))
case _ => List()
}
}

/**
* 根据cate与key(rowkey)获取对应rowkey的所有列的list或者值的list
* @param connection
* @param tableName
* @param cate       ["R","C","V"]=>代表要取所有的rowkey还是column还是value
* @param key        rowkey
* @return
*/
def getOneOfResList(connection: Connection, tableName: String, cate: String, key: String): List[String] = {
val table = connection.getTable(TableName.valueOf(tableName))
val g = new Get(key.getBytes())
val resList = table.get(g).raw.toList
cate match {
case "C" => resList.map(res => new String(res.getQualifier))
case "V" => resList.map(res => new String(res.getValue))
case _ => List()
}
}


然后就可以在主程序中根据需要灵活进行变换,用法:

1.扫描全表取得Map或者List并按需要进行转换,就可以轻松地去各个rdd中使用了,前提是这些hbase表的数据量不多的情况下可以使用如果hbase表的数据量大的时候,还是劝你别这么用:

val DEPLOY$resultMap = getResultMap(connection, CTB("DEPLOY"))

val DEPLOY$ag_node$map = DEPLOY$resultMap.flatMap(res => res._2.map(_._1 -> res._1))

val LINEATTR$part_V$map = getResultMap(connection, CTB("LINEATTR")).map(res => res._1 -> res._2.head._1.toDouble)

val BALANCE$nodeft_ratio$map = getResultMap(connection, CTB("BALANCE")).map(res => res._1 -> res._2.map(_._2).head.toDouble)

val a_g1_g2List = getOneOfResList(connection, PTB("AG"), "R")


2.当需要通过rowkey去获取一系列数据来遍历时,可以这样:

someRdd.mapPartition{
case paritition=>
//...
partition.flatMap{
//...
val res = getResultMap(connection, PTB("LOAD"), ag).map {
t =>
val ft = t._1.split("@")(0)
val time = t._1.split("@")(1)
val fv = t._2.toDouble
}
}
}

someRdd.mapPartition{
case paritition=>
//...
partition.flatMap{
//...
val res = getOneOfResList(connection, PTB("LOADZP"), "C", agg)
.map(x => (x.split("@")(0), x.split("@")(1), x.split("@")(2)))
.map(x => ((line, x._1, x._2), x._3.toDouble))
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark hbase 大数据