您的位置:首页 > 其它

Spark中使用scala方式- 操作Hbase 表:增删改查

2015-12-26 15:50 323 查看
Auth: FuRenjie

在build.sbt中配置依赖(行之间需要空格)

ame := "test2"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(

 "org.apache.spark" % "spark-core" % "1.0.0",

  "org.apache.hbase" % "hbase" % "1.2.1-hadoop1",

  "org.apache.hbase" % "hbase-client" % "1.2.1-hadoop1",

  "org.apache.hbase" % "hbase-common" % "1.2.1-hadoop1",

  "org.apache.hbase" % "hbase-server" % "1.2.1-hadoop1"

)

version := "1.0"

在环境变量中配置该工程需要的JAR包,命名为SPARK_TEST_JAR。

————————————————————————————————————————

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.hbase._

import org.apache.spark._

/**

 * Created by gongxuan on 2/3/15.

 * hadoop1.2.1  scala2.10.4  hbase0.98.9  spark1.0.0

 *

 */

object HBaseTest {

  def main(args: Array[String]) {

    //create table test_table1

    var table_name = "test1"

    val conf = HBaseConfiguration.create

    val admin = new HBaseAdmin(conf)

    if (admin.tableExists(table_name))

    {

      admin.disableTable(table_name)

      admin.deleteTable(table_name)

    }

    val htd = new HTableDescriptor(table_name)

    val hcd = new HColumnDescriptor("id")

    //add  column to table

    htd.addFamily(hcd)

    admin.createTable(htd)

    //put data to HBase table

    val tablename = htd.getName

    val table = new HTable(conf, tablename)

    val databytes = Bytes.toBytes("id")

    for (c <- 1 to 10) {

      val row = Bytes.toBytes("row" + c.toString)

      val p1 = new Put(row)

      p1.add(databytes, Bytes.toBytes(c.toString), Bytes.toBytes("value" + c.toString))

      table.put(p1)

    }

    for (c <- 1 to 10) {

      val g = new Get(Bytes.toBytes("row" + c.toString))

      println("Get:" + table.get(g))

    }

    //search table

    val config = HBaseConfiguration.create

    val sc = new SparkContext("local", "HBaseTest",

      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

    config.set(TableInputFormat.INPUT_TABLE,table_name)

// 用hadoopAPI创建一个RDD

    val hbaseRDD = sc.newAPIHadoopRDD(config, classOf[TableInputFormat],

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

      classOf[org.apache.hadoop.hbase.client.Result])

    val count = hbaseRDD.count()

    println("HbaseRDD Count:" + count)

    hbaseRDD.cache()

//找到result对象   ,返回类型Array[(org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result)]

//key是一个不变的ImmutableBytesWritable,value是Hbase的Result

//res(0)._2返回第二个参数result

    val res = hbaseRDD.take(count.toInt)

    for (j <- 1 to count.toInt) {

      var rs = res(j - 1)._2

//遍历res.raw取出每一个单元的值  ,返回类型:Array[org.apache.hadoop.hbase.KeyValue]

      var kvs = rs.raw

      for (kv <- kvs) //再遍历每一个单元里面的记录

        println("row:" + new String(kv.getRow()) +

          " cf:" + new String(kv.getFamily()) +

          " column:" + new String(kv.getQualifier()) +

          " value:" + new String(kv.getValue()))

    }

    //drop table

    admin.disableTable(table_name)

    admin.deleteTable(table_name)

  }

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: