从HBase数据库表中读取数据动态转为DataFrame格式,方便后续用Spark SQL操作(scala实现)
2017-06-01 20:39
2076 查看
个人研究后,才发现HBase存储的都是字符串类型,大部分函数都是byte[]字节类型来操作,实现需要用到HBaseTableCatalog类,需要导入hbase-spark-***.jar相关jar包,下载链接:http://maven.wso2.org/nexus/content/repositories/Apache/org/apache/hbase/hbase-spark/2.0.0-SNAPSHOT/
个人实现如下:
个人实现如下:
package com.spark import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan, Result} import org.apache.spark._ import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog import org.apache.spark.sql.{DataFrame, SQLContext} import collection.JavaConverters._ /** * Created by 475222395@qq.com on 2017/6/1. */ object testHBase { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local") val sc = new SparkContext(sparkConf) val tablename = "zy-test" val conf = HBaseConfiguration.create() //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置 conf.set(HConstants.ZOOKEEPER_QUORUM, "IP:PORT") conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-unsecure"); val hbaseContext = new HBaseContext(sc, conf) //创建一个扫描对象 val scan = new Scan //val hbaseRdd = hbaseContext.hbaseRDD(TableName.valueOf(tablename), scan) // 建立一个数据库的连接 val conn = ConnectionFactory.createConnection(conf); val hAdmin = conn.getAdmin if (!hAdmin.tableExists(TableName.valueOf(tablename))) { println(tablename + " is not exist"); return } var resultSet = Set[String]() //获取表 val table = conn.getTable(TableName.valueOf(tablename)) // 扫描全表输出结果 val results = table.getScanner(scan) val it: java.util.Iterator[Result] = results.iterator() while(it.hasNext) { val result = it.next() val cells = result.rawCells() for(cell <- cells) { println("行建:" + new String(CellUtil.cloneRow(cell))) println("列族:" + new String(CellUtil.cloneFamily(cell))) println("列名:" + new String(CellUtil.cloneQualifier(cell))) println("值:" + new String(CellUtil.cloneValue(cell))) println("时间戳:" + cell.getTimestamp()) var str = Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) resultSet += str println("---------------------") } } println(resultSet) def handleSet(rs : Set[String]): String = { var str = "" rs.foreach(r => { val arr = r.split(":", 2) str += "\"" + r +"\":{\"cf\":\"" + arr(0) + "\", \"col\":\"" + arr(1) + "\", \"type\":\"string\"}," }) str.substring(0, str.length-1) } var columnsDes = handleSet(resultSet) println(columnsDes) def catalog = s"""{ |"table":{"namespace":"default", "name":"${tablename}"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, | ${columnsDes} |} |}""".stripMargin println("********" + catalog) val sqlContext = new SQLContext(sc); import sqlContext.implicits._ def withCatalog(cat: String): DataFrame = { sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog -> cat)) .format("org.apache.hadoop.hbase.spark") .load() } val df = withCatalog(catalog) df.show() // 关闭资源 results.close() table.close() conn.close() sc.stop() } }
相关文章推荐
- sparkSQL里 sql语句,dataframe,Thrift Server JDBC都可以实现对数据的查询,过滤等操作, 哪这3种情况分别是什么情况下使用
- Spark 读取Hbase表数据并实现类似groupByKey操作
- SPark SQL 从 DB 读取数据方法和方式 scala
- SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)
- 【利用Python进行数据分析——经验篇3】如何操作DataFrame中的列的数据格式(转为百分数、保留4位小数)
- 数据库操作_连接SQL Server数据库示例;连接ACCESS数据库;连接到 Oracle 数据库示例;SqlCommand 执行SQL命令示例;SqlDataReader 读取数据示例;使用DataAdapter填充数据到DataSet;使用DataTable存储数据库表;将数据库数据填充到 XML 文件;10 使用带输入参数的存储过程;11 使用带输入、输出参数的存储过程示;12 获得数据库中表的数目和名称;13 保存图片到SQL Server数据库示例;14 获得插入记录标识号;Exce
- 使用scala,java实现使用phenix读取hbase中数据
- spark学习-SparkSQL--11-scala版写的SparkSQL程序读取Hbase表注册成表SQL查询
- 通过自定义SparkSQL外部数据源实现SparkSQL读取HBase
- spark连接数据库操作(scala实现)
- SQL语句实现附加数据库,可以改数据库物理文件名,数据库名,非常方便
- 使用PL/SQL Developer工具来实现创建表空间、创建数据库、备份数据库、数据导出等操作
- mybatis实战之路,疯狂的数据库操作框架、动态sql实现CRUD及带条件的增CRUD
- spark 读取hbase数据并转化为dataFrame
- 使用PL/SQL Developer工具来实现创建表空间、创建数据库、备份数据库、数据导出等操作
- Spark 读取Hbase表数据并实现类似groupByKey操作
- spring多数据源的处理 mybatis实现跨库查询 实现Myibatis动态sql跨数据库的处理 Spring动态配置多数据源,即在大型应用中对数据进行切分,并且采用多个数据库实例进行管理,这样
- spark-sql读取映射hbase数据的hive外部表
- spark-sql读取映射hbase数据的hive外部表
- 两个Repeater嵌套实现动态菜单(ado.net+sql和xml+Linq两种读取数据方式)