您的位置:首页 > 其它

HBASE数据导入HIVE

2017-02-21 17:53 218 查看
package com.hx.data.hbase

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, HBaseAdmin, Put}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.client.Result
import org.apache.spark.sql.hive.HiveContext
/**
* Created by Administrator on 2017/2/20.
*/
object HbaseUtils {
def main(args: Array[String]): Unit = {
val config =new SparkConf().setAppName("hbase2hive")
val sc =new SparkContext(config)
val hict = new HiveContext(sc)
import hict.implicits._
//创建连接
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "master")
//设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE,"wx_data")
val hbaserdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hbaserdd.cache()
val rdd = hbaserdd.map(r=>(
Bytes.toString(r._2.getRow),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"agentID".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"agentType".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"content".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"createTime".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"event".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"eventKey".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"fromUserName".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"id".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"itemCount".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"msgId".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"msgType".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"packageId".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"toUserName".getBytes())),
Bytes.toString(r._2.getValue("wx_cf".getBytes(),"type".getBytes()))
)).toDF("key","agentid","agenttype","content","createtime","event","eventkey",
"fromusername","id","itemcount","msgid","msgtype","packageid","tousername","types")
rdd.registerTempTable("wx_data_temp")
hict.sql("insert into huaxun.wx_data select * from wx_data_temp")
sc.stop()
//      hbaserdd.foreach{
//        case(_,result)=>
//          val key = Bytes.toString(result.getRow)
//          val agentID = Bytes.toString(result.getValue("wx_cf".getBytes(),"agentID".getBytes()))
//          val agentType = Bytes.toString(result.getValue("wx_cf".getBytes(),"agentType".getBytes()))
//          val content = Bytes.toString(result.getValue("wx_cf".getBytes(),"content".getBytes()))
//          val createTime = Bytes.toString(result.getValue("wx_cf".getBytes(),"createTime".getBytes()))
//          val event = Bytes.toString(result.getValue("wx_cf".getBytes(),"event".getBytes()))
//          val eventKey = Bytes.toString(result.getValue("wx_cf".getBytes(),"eventKey".getBytes()))
//          val fromUserName = Bytes.toString(result.getValue("wx_cf".getBytes(),"fromUserName".getBytes()))
//          val id = Bytes.toString(result.getValue("wx_cf".getBytes(),"id".getBytes()))
//          val itemCount = Bytes.toString(result.getValue("wx_cf".getBytes(),"itemCount".getBytes()))
//          val msgId = Bytes.toString(result.getValue("wx_cf".getBytes(),"msgId".getBytes()))
//          val msgType = Bytes.toString(result.getValue("wx_cf".getBytes(),"msgType".getBytes()))
//          val packageId = Bytes.toString(result.getValue("wx_cf".getBytes(),"packageId".getBytes()))
//          val toUserName = Bytes.toString(result.getValue("wx_cf".getBytes(),"toUserName".getBytes()))
//          val types = Bytes.toString(result.getValue("wx_cf".getBytes(),"type".getBytes()))
//          println("遍历hbaserdd")
//   }
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: