利用Spark将DataFrame、Hive数据写入Oracle
2016-08-20 17:59
441 查看
本篇博文的主要内容:
1、分析Spark读写Oracle方法
2、DataFrame数据写入Oracle
3、Hive数据写入Oracle
DataFrame是在Spark1.3.0中推出的新的api,它借鉴了R语言DataFrame的一些优点,同时,扩充了Spark处理大规模结构化数据的能力。作为Spark初学者,在通过学习使用Spark读写RDBMS数据的过程中,遇到了一系统的问题,查阅了大量文章,得到了一一解决,现将Spark读写oracle数据库的总结分享给大家,与大家共勉。如果文章中有错误的地方,希望大家及时指正,谢谢合作!
1、Spark读写Oracle方法分析
目前,Spark在读写关系型数据表时提供了DataFrameReader和DataFrameWriter等类。在读取Oracle数据库时,通过测试(具体实现将在下面描述),可以根据官网提供的读取数据库方式获取数据。但是在写Oracle数据库时,会报“no
suitable driver found”异常。这是因为DataFrameWriter的jdbc方法目前还不能支撑写oracle数据库。(个人理解,目前作者尚未解决该问题)
考虑使用JdbcUtils.saveTable的方式写Oracle数据库,但是报“java.sql.SQLException:
ORA-00902: invalid datatype”异常。
查阅OracleDialect.scala源码
private case object OracleDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
// Handle NUMBER fields that have no precision/scale in special way
// because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
// For more details, please see
// https://github.com/apache/spark/pull/8780#issuecomment-145598968
// and
// https://github.com/apache/spark/pull/8780#issuecomment-144541760
if (sqlType == Types.NUMERIC && size == 0) {
// This is sub-optimal as we have to pick a precision/scale in advance whereas the data
// in Oracle is allowed to have different precision/scale for each value.
Option(DecimalType(DecimalType.MAX_PRECISION, 10))
} else {
None
}
}
}
可以发现,OracleDialect 覆写了getCatalystType(该方法用于读取Oracle数据库时数据类型的转换),但是没有覆写getJDBCType(该方法用于写Oracle数据库时数据类型的转换),因此,在写Oracle数据库时,由于找不到可用的数据类型方言的转换,导致Oracle数据库无法识别某些数据类型。需要覆写getJDBCType方法,将数据类型转换为Oracle识别的类型即可。
1.1读取Oracle数据
利用sqlContext.read得到,通过load方法加载读取的表数据,load为lazy级别,当触发Spark Action时才会去Oracle中读取表数据。下面是读取Oracle数据的代码
val sqlContext = new HiveContext(sc)//加载oracle表数据val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
2、写入Oracle数据
采用JdbcUtils.saveTable方式将Spark数据写入Oracle,但是必须保证写入的表存在。
首先实例化一个JdbcDialect,并覆写getJDBCType方法,定义方言
注册方言,写入指定的oracle数据库表
其中,aDataFrame是通过查询关系数据库或Hive查询得到的DataFrame。注意,自定义DataFrame,需要定义好RDD数据Schema,并RDD转换成DataFrame,网上有许多资料,在此不作赘述。
下面附上个人的一些代码,主要是实现了读取Oracle数据,在Spark上计算,将计算结果写回Oracle数据库。
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by Alex on 2016/3/22.
* This main Object is just for testing reading data from oracle database and computing, then writing the results to Hive
* and putting the results back oracle
*/
object TestOracle{
def main(args: Array[String]) {
//创建SparkContext
val sc = createSparkContext
//创建sqlContext用来连接oracle、Hive等
val sqlContext = new HiveContext(sc)
//加载oracle表数据,为lazy方式
val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
//hive切换test库
sqlContext.sql("use test")
//向spark注册一个临时表,在临时表上进行数据操作,提高效率,但需要考虑表的规模,以防出现OOM
//测试问题,直接从oracle上获取数据,当数据量较大时,集群计算时间就会显著增长。
jdbcDF.registerTempTable("temp_table1")
val noTotalId = sqlContext.sql("some sql")
// println("NO_TOTAL_ID_TMP Total count" + noTotalId.count())
// //注册NO_TOTAL_ID,以备下面计算使用
noTotalId.registerTempTable("temp_table2")
//未经过优化,后续会考虑加入分区、buket等优化策略,提高效率
val results = sqlContext.sql(" some sql")
// Drop temp tables registered to Spark
sqlContext.dropTempTable("temp_table1")
sqlContext.dropTempTable("temp_table2")
results.registerTempTable("resultDF")
//write to Hive
sqlContext.sql("insert into ods_incom_biaoma select * from resultDF")
//Read results from Hive can make the task efficient
val df2Oracle = sqlContext.sql("select * from ods_incom_biaoma")
//Registering the OracleDialect
JdbcDialects.registerDialect(OracleDialect)
val connectProperties = new Properties()
connectProperties.put("user", "user")
connectProperties.put("password", "password")
Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()
//write back Oracle
//Note: When writing the results back orale, be sure that the target table existing
JdbcUtils.saveTable(df2Oracle, oracleDriverUrl, "ods_incom_biaoma", connectProperties)
sc.stop
}
val oracleDriverUrl = "oracleUrl"
val tableName = "aTable"
val jdbcMap = Map("url" -> oracleDriverUrl,
"user" -> "user",
"password" -> "password",
"dbtable" -> tableName,
"driver" -> "oracle.jdbc.driver.OracleDriver")
def createSparkContext: SparkContext = {
val conf = new SparkConf().setAppName("Data Integration checking and Computing").setMaster("spark://Master:7077")
//SparkConf parameters setting
//conf.set("spark.sql.autoBroadcastJoinThreshold", "50M")
/*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/
//conf.set("spark.sql.codegen", "true")
/*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/
//conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
/*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/
//conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
val sc = new SparkContext(conf)
sc
}
//overwrite JdbcDialect fitting for Oracle
val OracleDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
//getJDBCType is used when writing to a JDBC table
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
case _ => None
}
}
}
1、分析Spark读写Oracle方法
2、DataFrame数据写入Oracle
3、Hive数据写入Oracle
DataFrame是在Spark1.3.0中推出的新的api,它借鉴了R语言DataFrame的一些优点,同时,扩充了Spark处理大规模结构化数据的能力。作为Spark初学者,在通过学习使用Spark读写RDBMS数据的过程中,遇到了一系统的问题,查阅了大量文章,得到了一一解决,现将Spark读写oracle数据库的总结分享给大家,与大家共勉。如果文章中有错误的地方,希望大家及时指正,谢谢合作!
1、Spark读写Oracle方法分析
目前,Spark在读写关系型数据表时提供了DataFrameReader和DataFrameWriter等类。在读取Oracle数据库时,通过测试(具体实现将在下面描述),可以根据官网提供的读取数据库方式获取数据。但是在写Oracle数据库时,会报“no
suitable driver found”异常。这是因为DataFrameWriter的jdbc方法目前还不能支撑写oracle数据库。(个人理解,目前作者尚未解决该问题)
考虑使用JdbcUtils.saveTable的方式写Oracle数据库,但是报“java.sql.SQLException:
ORA-00902: invalid datatype”异常。
查阅OracleDialect.scala源码
private case object OracleDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
// Handle NUMBER fields that have no precision/scale in special way
// because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
// For more details, please see
// https://github.com/apache/spark/pull/8780#issuecomment-145598968
// and
// https://github.com/apache/spark/pull/8780#issuecomment-144541760
if (sqlType == Types.NUMERIC && size == 0) {
// This is sub-optimal as we have to pick a precision/scale in advance whereas the data
// in Oracle is allowed to have different precision/scale for each value.
Option(DecimalType(DecimalType.MAX_PRECISION, 10))
} else {
None
}
}
}
可以发现,OracleDialect 覆写了getCatalystType(该方法用于读取Oracle数据库时数据类型的转换),但是没有覆写getJDBCType(该方法用于写Oracle数据库时数据类型的转换),因此,在写Oracle数据库时,由于找不到可用的数据类型方言的转换,导致Oracle数据库无法识别某些数据类型。需要覆写getJDBCType方法,将数据类型转换为Oracle识别的类型即可。
1.1读取Oracle数据
利用sqlContext.read得到,通过load方法加载读取的表数据,load为lazy级别,当触发Spark Action时才会去Oracle中读取表数据。下面是读取Oracle数据的代码
//创建SparkContext
val conf = new SparkConf().setAppName("Reading from Oracle").setMaster("spark://Master:7077")
val sc = new SparkContext(conf)
val oracleDriverUrl = "jdbc:oracle:thin:@//ip:1521/dbinstance" val jdbcMap = Map("url" -> oracleDriverUrl, "user" -> "username", "password" -> "userpassword", "dbtable" -> ""table_name, "driver" -> "oracle.jdbc.driver.OracleDriver")//创建sqlContext用来连接oracle、Hive等,由于HiveContext继承自SQLContext,因此,实例化HiveContext既可以操作Oracle,也可操作Hive
val sqlContext = new HiveContext(sc)//加载oracle表数据val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
2、写入Oracle数据
采用JdbcUtils.saveTable方式将Spark数据写入Oracle,但是必须保证写入的表存在。
首先实例化一个JdbcDialect,并覆写getJDBCType方法,定义方言
//overwrite JdbcDialect fitting for Oracle val OracleDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle") //getJDBCType is used when writing to a JDBC table override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC)) case _ => None } }
注册方言,写入指定的oracle数据库表
//Registering the OracleDialect JdbcDialects.registerDialect(OracleDialect) val connectProperties = new Properties() connectProperties.put("user", "GSPWJC") connectProperties.put("password", "GSPWJC") Class.forName("oracle.jdbc.driver.OracleDriver").newInstance() //write back Oracle //Note: When writing the results back orale, be sure that the target table existing JdbcUtils.saveTable(aDataFrame, oracleDriverUrl, "tableName", connectProperties)
其中,aDataFrame是通过查询关系数据库或Hive查询得到的DataFrame。注意,自定义DataFrame,需要定义好RDD数据Schema,并RDD转换成DataFrame,网上有许多资料,在此不作赘述。
下面附上个人的一些代码,主要是实现了读取Oracle数据,在Spark上计算,将计算结果写回Oracle数据库。
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by Alex on 2016/3/22.
* This main Object is just for testing reading data from oracle database and computing, then writing the results to Hive
* and putting the results back oracle
*/
object TestOracle{
def main(args: Array[String]) {
//创建SparkContext
val sc = createSparkContext
//创建sqlContext用来连接oracle、Hive等
val sqlContext = new HiveContext(sc)
//加载oracle表数据,为lazy方式
val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
//hive切换test库
sqlContext.sql("use test")
//向spark注册一个临时表,在临时表上进行数据操作,提高效率,但需要考虑表的规模,以防出现OOM
//测试问题,直接从oracle上获取数据,当数据量较大时,集群计算时间就会显著增长。
jdbcDF.registerTempTable("temp_table1")
val noTotalId = sqlContext.sql("some sql")
// println("NO_TOTAL_ID_TMP Total count" + noTotalId.count())
// //注册NO_TOTAL_ID,以备下面计算使用
noTotalId.registerTempTable("temp_table2")
//未经过优化,后续会考虑加入分区、buket等优化策略,提高效率
val results = sqlContext.sql(" some sql")
// Drop temp tables registered to Spark
sqlContext.dropTempTable("temp_table1")
sqlContext.dropTempTable("temp_table2")
results.registerTempTable("resultDF")
//write to Hive
sqlContext.sql("insert into ods_incom_biaoma select * from resultDF")
//Read results from Hive can make the task efficient
val df2Oracle = sqlContext.sql("select * from ods_incom_biaoma")
//Registering the OracleDialect
JdbcDialects.registerDialect(OracleDialect)
val connectProperties = new Properties()
connectProperties.put("user", "user")
connectProperties.put("password", "password")
Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()
//write back Oracle
//Note: When writing the results back orale, be sure that the target table existing
JdbcUtils.saveTable(df2Oracle, oracleDriverUrl, "ods_incom_biaoma", connectProperties)
sc.stop
}
val oracleDriverUrl = "oracleUrl"
val tableName = "aTable"
val jdbcMap = Map("url" -> oracleDriverUrl,
"user" -> "user",
"password" -> "password",
"dbtable" -> tableName,
"driver" -> "oracle.jdbc.driver.OracleDriver")
def createSparkContext: SparkContext = {
val conf = new SparkConf().setAppName("Data Integration checking and Computing").setMaster("spark://Master:7077")
//SparkConf parameters setting
//conf.set("spark.sql.autoBroadcastJoinThreshold", "50M")
/*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/
//conf.set("spark.sql.codegen", "true")
/*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/
//conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
/*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/
//conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
val sc = new SparkContext(conf)
sc
}
//overwrite JdbcDialect fitting for Oracle
val OracleDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
//getJDBCType is used when writing to a JDBC table
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
case _ => None
}
}
}
相关文章推荐
- 利用Spark将DataFrame、Hive数据写入Oracle
- spark 将dataframe数据写入Hive分区表
- spark 1.3.0 将dataframe数据写入Hive分区表
- spark 将dataframe数据写入Hive分区表
- 将DataFrame数据如何写入到Hive表中
- Spark RDD(DataFrame) 写入到HIVE的代码实现
- scala实战之spark源码修改(能够将DataFrame按字段增量写入mysql数据表)
- Spark DataFrame----一个用于大规模数据科学的API
- Hive数据分析——Spark是一种基于rdd(弹性数据集)的内存分布式并行处理框架,比于Hadoop将大量的中间结果写入HDFS,Spark避免了中间结果的持久化
- 利用sqoop导出hive数据到 oracle
- 基于Spark DataFrame的数据仓库框架
- Spark RDD/DataFrame map保存数据的两种方式
- spark dataframe和dataSet用电影点评数据实战
- Spark新年福音:一个用于大规模数据科学的API——DataFrame
- spark之DataFrame的json数据实战
- spark 读取hbase数据并转化为dataFrame
- Spark商业案例与性能调优实战100课》第11课:商业案例之通过纯粹通过DataFrame分析大数据电影点评系仿QQ和微信、淘宝等用户群分析与实战
- 利用sqoop 将 hive/hdfs数据 导入 Oracle中
- .net中利用oracle产品自带的数据访问组件(Oracle.DataAccess.dll)提升批量更新操作的执行效率
- 利用随机函数产生100个三位整数,将这些整数写入到数据文件data1.dat中