HBASE数据导入HIVE
2017-02-21 17:53
218 查看
package com.hx.data.hbase import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, HBaseAdmin, Put} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.client.Result import org.apache.spark.sql.hive.HiveContext /** * Created by Administrator on 2017/2/20. */ object HbaseUtils { def main(args: Array[String]): Unit = { val config =new SparkConf().setAppName("hbase2hive") val sc =new SparkContext(config) val hict = new HiveContext(sc) import hict.implicits._ //创建连接 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master") //设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE,"wx_data") val hbaserdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaserdd.cache() val rdd = hbaserdd.map(r=>( Bytes.toString(r._2.getRow), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"agentID".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"agentType".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"content".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"createTime".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"event".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"eventKey".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"fromUserName".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"id".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"itemCount".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"msgId".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"msgType".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"packageId".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"toUserName".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"type".getBytes())) )).toDF("key","agentid","agenttype","content","createtime","event","eventkey", "fromusername","id","itemcount","msgid","msgtype","packageid","tousername","types") rdd.registerTempTable("wx_data_temp") hict.sql("insert into huaxun.wx_data select * from wx_data_temp") sc.stop() // hbaserdd.foreach{ // case(_,result)=> // val key = Bytes.toString(result.getRow) // val agentID = Bytes.toString(result.getValue("wx_cf".getBytes(),"agentID".getBytes())) // val agentType = Bytes.toString(result.getValue("wx_cf".getBytes(),"agentType".getBytes())) // val content = Bytes.toString(result.getValue("wx_cf".getBytes(),"content".getBytes())) // val createTime = Bytes.toString(result.getValue("wx_cf".getBytes(),"createTime".getBytes())) // val event = Bytes.toString(result.getValue("wx_cf".getBytes(),"event".getBytes())) // val eventKey = Bytes.toString(result.getValue("wx_cf".getBytes(),"eventKey".getBytes())) // val fromUserName = Bytes.toString(result.getValue("wx_cf".getBytes(),"fromUserName".getBytes())) // val id = Bytes.toString(result.getValue("wx_cf".getBytes(),"id".getBytes())) // val itemCount = Bytes.toString(result.getValue("wx_cf".getBytes(),"itemCount".getBytes())) // val msgId = Bytes.toString(result.getValue("wx_cf".getBytes(),"msgId".getBytes())) // val msgType = Bytes.toString(result.getValue("wx_cf".getBytes(),"msgType".getBytes())) // val packageId = Bytes.toString(result.getValue("wx_cf".getBytes(),"packageId".getBytes())) // val toUserName = Bytes.toString(result.getValue("wx_cf".getBytes(),"toUserName".getBytes())) // val types = Bytes.toString(result.getValue("wx_cf".getBytes(),"type".getBytes())) // println("遍历hbaserdd") // } } }
相关文章推荐
- Hbase 学习(十一)使用hive往hbase当中导入数据
- sqoop:mysql和Hbase/Hive/Hdfs之间相互导入数据
- 使用Sqoop从MySQL导入数据到Hive和HBase 及近期感悟
- 教程 | 使用Sqoop从MySQL导入数据到Hive和HBase
- 从关系库导入数据到hive-hbase表中
- Hive表数据导入到Hbase
- Hive 数据导入HBase的2种方法详解
- spark 从HIVE读数据导入hbase中发生空指针(java.lang.NullPointerException)问题的解决
- sqoop:mysql和Hbase/Hive/Hdfs之间相互导入数据
- hive over hbase方式将微博用户数据导入hbase
- hive的数据导入与数据导出:(本地,云hdfs,hbase),列分隔符的设置,以及hdfs上传给pig如何处理
- Azure 云平台用 SQOOP 将 SQL server 2012 数据表导入 HIVE / HBASE
- Hive如何加载和导入HBase的数据
- 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- Hadoop数据工具sqoop,导入HDFS,HIVE,HBASE,导出到oracle
- sqoop向hdfs,hive,hbase导入数据
- 读取hive文件并将数据导入hbase
- 大数据基础(二)hadoop, mave, hbase, hive, sqoop在ubuntu 14.04.04下的安装和sqoop与hdfs,hive,mysql导入导出