您的位置:首页 > 数据库 > Oracle

利用Spark将DataFrame、Hive数据写入Oracle

2016-08-20 17:59 441 查看
本篇博文的主要内容:

1、分析Spark读写Oracle方法

2、DataFrame数据写入Oracle

3、Hive数据写入Oracle

      DataFrame是在Spark1.3.0中推出的新的api,它借鉴了R语言DataFrame的一些优点,同时,扩充了Spark处理大规模结构化数据的能力。作为Spark初学者,在通过学习使用Spark读写RDBMS数据的过程中,遇到了一系统的问题,查阅了大量文章,得到了一一解决,现将Spark读写oracle数据库的总结分享给大家,与大家共勉。如果文章中有错误的地方,希望大家及时指正,谢谢合作!

1、Spark读写Oracle方法分析

      目前,Spark在读写关系型数据表时提供了DataFrameReader和DataFrameWriter等类。在读取Oracle数据库时,通过测试(具体实现将在下面描述),可以根据官网提供的读取数据库方式获取数据。但是在写Oracle数据库时,会报“no
suitable driver found”异常。这是因为DataFrameWriter的jdbc方法目前还不能支撑写oracle数据库。(个人理解,目前作者尚未解决该问题)

考虑使用JdbcUtils.saveTable的方式写Oracle数据库,但是报“java.sql.SQLException:
ORA-00902: invalid datatype”异常。
查阅OracleDialect.scala源码

private case object OracleDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")

  override def getCatalystType(

      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {

    // Handle NUMBER fields that have no precision/scale in special way

    // because JDBC ResultSetMetaData converts this to 0 procision and -127 scale

    // For more details, please see

    // https://github.com/apache/spark/pull/8780#issuecomment-145598968
    // and

    // https://github.com/apache/spark/pull/8780#issuecomment-144541760
    if (sqlType == Types.NUMERIC && size == 0) {

      // This is sub-optimal as we have to pick a precision/scale in advance whereas the data

      //  in Oracle is allowed to have different precision/scale for each value.

      Option(DecimalType(DecimalType.MAX_PRECISION, 10))

    } else {

      None

    }

  }

}

       可以发现,OracleDialect 覆写了getCatalystType(该方法用于读取Oracle数据库时数据类型的转换),但是没有覆写getJDBCType(该方法用于写Oracle数据库时数据类型的转换),因此,在写Oracle数据库时,由于找不到可用的数据类型方言的转换,导致Oracle数据库无法识别某些数据类型。需要覆写getJDBCType方法,将数据类型转换为Oracle识别的类型即可。

1.1读取Oracle数据

利用sqlContext.read得到,通过load方法加载读取的表数据,load为lazy级别,当触发Spark Action时才会去Oracle中读取表数据。下面是读取Oracle数据的代码

//创建SparkContext
val conf = new SparkConf().setAppName("Reading from Oracle").setMaster("spark://Master:7077")
val sc = new SparkContext(conf)
val oracleDriverUrl = "jdbc:oracle:thin:@//ip:1521/dbinstance"

val jdbcMap = Map("url" -> oracleDriverUrl,
"user" -> "username",
"password" -> "userpassword",
"dbtable" -> ""table_name,
"driver" -> "oracle.jdbc.driver.OracleDriver")
//创建sqlContext用来连接oracle、Hive等,由于HiveContext继承自SQLContext,因此,实例化HiveContext既可以操作Oracle,也可操作Hive
val sqlContext = new HiveContext(sc)//加载oracle表数据val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load

2、写入Oracle数据

采用JdbcUtils.saveTable方式将Spark数据写入Oracle,但是必须保证写入的表存在。

首先实例化一个JdbcDialect,并覆写getJDBCType方法,定义方言

//overwrite JdbcDialect fitting for Oracle
val OracleDialect = new JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
//getJDBCType is used when writing to a JDBC table
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
//        case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
case _ => None
}
}


注册方言,写入指定的oracle数据库表

//Registering the OracleDialect
JdbcDialects.registerDialect(OracleDialect)

val connectProperties = new Properties()
connectProperties.put("user", "GSPWJC")
connectProperties.put("password", "GSPWJC")
Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()

//write back Oracle
//Note: When writing the results back orale, be sure that the target table existing
JdbcUtils.saveTable(aDataFrame, oracleDriverUrl, "tableName", connectProperties)


        其中,aDataFrame是通过查询关系数据库或Hive查询得到的DataFrame。注意,自定义DataFrame,需要定义好RDD数据Schema,并RDD转换成DataFrame,网上有许多资料,在此不作赘述。

        下面附上个人的一些代码,主要是实现了读取Oracle数据,在Spark上计算,将计算结果写回Oracle数据库。

import java.util.Properties

import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}

import org.apache.spark.sql.types._

import org.apache.spark.{SparkContext, SparkConf}

/**

 * Created by Alex on 2016/3/22.

 * This main Object is just for testing reading data from oracle database and computing, then writing the results to Hive 

 * and putting the results back oracle

 */

object TestOracle{

  def main(args: Array[String]) {

    //创建SparkContext
    val sc = createSparkContext

    //创建sqlContext用来连接oracle、Hive等

    val sqlContext = new HiveContext(sc)

    //加载oracle表数据,为lazy方式

    val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load

    //hive切换test库

    sqlContext.sql("use test")

    //向spark注册一个临时表,在临时表上进行数据操作,提高效率,但需要考虑表的规模,以防出现OOM

    //测试问题,直接从oracle上获取数据,当数据量较大时,集群计算时间就会显著增长。

    jdbcDF.registerTempTable("temp_table1")

    val noTotalId = sqlContext.sql("some sql")

//    println("NO_TOTAL_ID_TMP Total count" + noTotalId.count())

//    //注册NO_TOTAL_ID,以备下面计算使用

    noTotalId.registerTempTable("temp_table2")

    //未经过优化,后续会考虑加入分区、buket等优化策略,提高效率

    val results = sqlContext.sql(" some sql")

    // Drop temp tables registered to Spark

    sqlContext.dropTempTable("temp_table1")

    sqlContext.dropTempTable("temp_table2")

    results.registerTempTable("resultDF")

    //write to Hive

    sqlContext.sql("insert into ods_incom_biaoma select * from resultDF")

    //Read results from Hive can make the task efficient

    val df2Oracle = sqlContext.sql("select * from ods_incom_biaoma")

    //Registering the OracleDialect

    JdbcDialects.registerDialect(OracleDialect)

    val connectProperties = new Properties()

    connectProperties.put("user", "user")

    connectProperties.put("password", "password")

    Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()

    //write back Oracle

    //Note: When writing the results back orale, be sure that the target table existing

    JdbcUtils.saveTable(df2Oracle, oracleDriverUrl, "ods_incom_biaoma", connectProperties)

    sc.stop

  }

  val oracleDriverUrl = "oracleUrl"

  val tableName = "aTable"

  val jdbcMap = Map("url" -> oracleDriverUrl,

                    "user" -> "user",

                    "password" -> "password",

                    "dbtable" -> tableName,

                    "driver" -> "oracle.jdbc.driver.OracleDriver")

  def createSparkContext: SparkContext = {

    val conf = new SparkConf().setAppName("Data Integration checking and Computing").setMaster("spark://Master:7077")

    //SparkConf parameters setting

    //conf.set("spark.sql.autoBroadcastJoinThreshold", "50M")

    /*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/

    //conf.set("spark.sql.codegen", "true")

    /*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/

    //conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

    /*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/

    //conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true")

    val sc = new SparkContext(conf)

    sc

  }

  //overwrite JdbcDialect fitting for Oracle

  val OracleDialect = new JdbcDialect {

    override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")

    //getJDBCType is used when writing to a JDBC table

    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {

      case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))

      case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))

      case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))

      case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))

      case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))

      case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))

      case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))

      case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))

      case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))

      case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))

      case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))

      //        case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))

      case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))

      case _ => None

    }

  }

  

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: