您的位置:首页 > 数据库

从HBase数据库表中读取数据动态转为DataFrame格式,方便后续用Spark SQL操作(scala实现)

2017-06-01 20:39 2076 查看
个人研究后,才发现HBase存储的都是字符串类型,大部分函数都是byte[]字节类型来操作,实现需要用到HBaseTableCatalog类,需要导入hbase-spark-***.jar相关jar包,下载链接:http://maven.wso2.org/nexus/content/repositories/Apache/org/apache/hbase/hbase-spark/2.0.0-SNAPSHOT/

个人实现如下:

package com.spark

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan, Result}
import org.apache.spark._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.{DataFrame, SQLContext}
import collection.JavaConverters._

/**
* Created by 475222395@qq.com on 2017/6/1.
*/
object testHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)

val tablename = "zy-test"
val conf = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
conf.set(HConstants.ZOOKEEPER_QUORUM, "IP:PORT")
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-unsecure");
val hbaseContext = new HBaseContext(sc, conf)
//创建一个扫描对象
val scan = new Scan
//val hbaseRdd = hbaseContext.hbaseRDD(TableName.valueOf(tablename), scan)

// 建立一个数据库的连接
val conn = ConnectionFactory.createConnection(conf);
val hAdmin = conn.getAdmin
if (!hAdmin.tableExists(TableName.valueOf(tablename))) {
println(tablename + " is not exist");
return
}

var resultSet = Set[String]()
//获取表
val table = conn.getTable(TableName.valueOf(tablename))
// 扫描全表输出结果
val results = table.getScanner(scan)
val it: java.util.Iterator[Result] = results.iterator()
while(it.hasNext) {
val result = it.next()
val cells = result.rawCells()
for(cell <- cells) {
println("行建:" + new String(CellUtil.cloneRow(cell)))
println("列族:" + new String(CellUtil.cloneFamily(cell)))
println("列名:" + new String(CellUtil.cloneQualifier(cell)))
println("值:" + new String(CellUtil.cloneValue(cell)))
println("时间戳:" + cell.getTimestamp())
var str = Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell))
resultSet += str
println("---------------------")
}
}
println(resultSet)

def handleSet(rs : Set[String]): String = {
var str = ""
rs.foreach(r => {
val arr = r.split(":", 2)
str += "\"" + r +"\":{\"cf\":\"" + arr(0) + "\", \"col\":\"" + arr(1) + "\", \"type\":\"string\"},"
})

str.substring(0, str.length-1)
}

var columnsDes = handleSet(resultSet)
println(columnsDes)

def catalog = s"""{
|"table":{"namespace":"default", "name":"${tablename}"},
|"rowkey":"key",
|"columns":{
|"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
| ${columnsDes}
|}
|}""".stripMargin

println("********" + catalog)
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._

def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val df = withCatalog(catalog)
df.show()

// 关闭资源
results.close()
table.close()
conn.close()

sc.stop()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐