spark-hbase数据操作心得
2017-08-26 19:47
387 查看
通过spark程序读写hbase的方法百度上太多了,这里我就不一一列举,这里我要分享的是我在开发spark程序对hbase读写时喜欢使用的方法,水平有限,还望指点。
我的hbase表结构是简单的rowkey + cfamily:”INFO” + value
首先,定义几个方法
然后就可以在主程序中根据需要灵活进行变换,用法:
1.扫描全表取得Map或者List并按需要进行转换,就可以轻松地去各个rdd中使用了,前提是这些hbase表的数据量不多的情况下可以使用如果hbase表的数据量大的时候,还是劝你别这么用:
2.当需要通过rowkey去获取一系列数据来遍历时,可以这样:
我的hbase表结构是简单的rowkey + cfamily:”INFO” + value
首先,定义几个方法
/** * 扫描全表,返回数据集 * @param connection: Connection * @param tableName: String * @return Map[String,List[(String,String)]] 返回值为rowKey->List[(column ,value)] */ def getResultMap(connection: Connection, tableName: String): Map[String, List[(String, String)]] = { connection.getTable(TableName.valueOf(tableName)).getScanner(new Scan).toList .map(result => (new String(result.getRow),result.raw.toList.map(cv => (new String(cv.getQualifier), new String(cv.getValue)))) ).toMap } /** * 通过rowkey,get对应的结果 * @param connection: Connection * @param tableName: String * @param key: String * @return Map[String, String] 返回值为column -> value */ def getResultMap(connection: Connection, tableName: String, key: String): Map[String, String] = { val userTable = TableName.valueOf(tableName) val table = connection.getTable(userTable) val g = new Get(key.getBytes()) table.get(g).raw.toList.map(res => (new String(res.getQualifier), new String(res.getValue))).toMap } /** * 根据cate获取对应所有数据List * @param connection * @param tableName * @param cate ["R","C","V"]=>代表要取所有的rowkey还是column还是value * @return */ def getOneOfResList(connection: Connection, tableName: String, cate: String): List[String] = { val resList = connection.getTable(TableName.valueOf(tableName)).getScanner(new Scan).toList cate match { case "R" => resList.map(res => new String(res.getRow)) case "C" => resList.map(res => res.raw.toList.flatMap(cv => new String(cv.getQualifier))) case "V" => resList.map(res => res.raw.toList.flatMap(cv => new String(cv.getValue))) case _ => List() } } /** * 根据cate与key(rowkey)获取对应rowkey的所有列的list或者值的list * @param connection * @param tableName * @param cate ["R","C","V"]=>代表要取所有的rowkey还是column还是value * @param key rowkey * @return */ def getOneOfResList(connection: Connection, tableName: String, cate: String, key: String): List[String] = { val table = connection.getTable(TableName.valueOf(tableName)) val g = new Get(key.getBytes()) val resList = table.get(g).raw.toList cate match { case "C" => resList.map(res => new String(res.getQualifier)) case "V" => resList.map(res => new String(res.getValue)) case _ => List() } }
然后就可以在主程序中根据需要灵活进行变换,用法:
1.扫描全表取得Map或者List并按需要进行转换,就可以轻松地去各个rdd中使用了,前提是这些hbase表的数据量不多的情况下可以使用如果hbase表的数据量大的时候,还是劝你别这么用:
val DEPLOY$resultMap = getResultMap(connection, CTB("DEPLOY")) val DEPLOY$ag_node$map = DEPLOY$resultMap.flatMap(res => res._2.map(_._1 -> res._1)) val LINEATTR$part_V$map = getResultMap(connection, CTB("LINEATTR")).map(res => res._1 -> res._2.head._1.toDouble) val BALANCE$nodeft_ratio$map = getResultMap(connection, CTB("BALANCE")).map(res => res._1 -> res._2.map(_._2).head.toDouble) val a_g1_g2List = getOneOfResList(connection, PTB("AG"), "R")
2.当需要通过rowkey去获取一系列数据来遍历时,可以这样:
someRdd.mapPartition{ case paritition=> //... partition.flatMap{ //... val res = getResultMap(connection, PTB("LOAD"), ag).map { t => val ft = t._1.split("@")(0) val time = t._1.split("@")(1) val fv = t._2.toDouble } } } someRdd.mapPartition{ case paritition=> //... partition.flatMap{ //... val res = getOneOfResList(connection, PTB("LOADZP"), "C", agg) .map(x => (x.split("@")(0), x.split("@")(1), x.split("@")(2))) .map(x => ((line, x._1, x._2), x._3.toDouble)) } }
相关文章推荐
- spark-hbase数据操作心得
- sparkOnHbase 解决spark读取hbase数据后不能分布式操作
- 从HBase数据库表中读取数据动态转为DataFrame格式,方便后续用Spark SQL操作(scala实现)
- Sql数据操作的若干心得
- Kafka+SparkStreaming解析Json数据并插入Hbase,包含部分业务逻辑
- HBASE--数据操作,MapReduce
- HBase学习心得之HBase原理&Java接口操作增删改查
- HBase基本数据操作详解【完整版,绝对精品】
- Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询
- mongodb数据导入hbase,spark读取hbase数据分析
- Spark操作HBase问题:java.io.IOException: Non-increasing Bloom keys
- spark 读取hbase数据并转化为dataFrame
- 用Spark查询HBase中的表数据
- HBase(六)HBase整合Hive,数据的备份与MR操作HBase
- spark 从HIVE读数据导入hbase中发生空指针(java.lang.NullPointerException)问题的解决
- 2018-08-21期 Hbase客户端API操作(数据查询,get方式)
- 大数据学习系列之九---- Hive整合Spark和HBase以及相关测试
- 【甘道夫】HBase基本数据操作详解【完整版,绝对精品】
- Spark 下操作 HBase(1.0.0 新 API)
- spark 操作 hbase