您的位置:首页 > 数据库 > Oracle

spark读取oracle的

2016-05-05 15:55 537 查看
刚开始使用spark-sql,首先看了一部分的源码。然后开始着手程序的编写。

在spark中使用jdbc:

在 Spark-env.sh 文件中加入:

export SPARK_CLASSPATH=

任务提交时加入:

spark-submit –master spark://master:7077 –jars ojdbc16.jar

一、

val rdd = sqlContext.read.format(“jdbc”).options(

Map(“url” -> “jdbc:oracle:thin:@192.168.1.116:1521:orcl”,”user”->sourceDBUser,”password”->sourceDBPwd,

“dbtable” -> s”( select fielsfromfiels from sourceDBName ) a”, “driver” -> driver,

“numPartitions” ->”14”)).load().cache()

jdbcDF.rdd.partitions.size = 1

此时无论你的numPartitions的值为多大,实际的partitions只有一个分区,

该操作的并发度为1,你所有的数据都会在一个partition中进行操作,意味着无论你给的资源有多少,只有一个task会执行任务,执行效率可想而之,并且在稍微大点的表中进行操作分分钟就会OOM。

当我用该方法时,并发度为1,记得当时读取的数据是8000W条数据,一直卡着,一会儿出现OOM。经过查看官网的sparksql部分,发现了该方法也可以指定字段分区。

val jdbcDF = sqlContext.read.format(“jdbc”).options(

Map(“url” -> “jdbc:oracle:thin:root/123456@//192.168.0.33:1521/orcl”,

“dbtable” -> “( select * from T_LG_GNLK ) a”, “driver” -> “oracle.jdbc.driver.OracleDriver”,

“numPartitions” ->”5”,”partitionColumn”->”OBJECTID”,”lowerBound”->”0”,”upperBound”->”80000000”)).load()

val size= jdbcDF.rdd.partitions.size

此时size = 5;虽然现在的分区为5,但是前几个task根本就没有读取数据,知道最后一个分区卡主,此时数据读出来全部集中在一个台机器,当时加班已经到凌晨2点了,当时意识也模糊了,看了会儿源码,碎叫去了。到第二天,才突然发现自己犯了一个比较严重的错误,握草,然后开始修改lowerBound和upperBound的值,这是发现前几个task开始读取数据了。握草,我犯了一个太低级的错误,在此偷偷的抽了自己一嘴巴。

在此过程中发现了spark1.4的一个小bug,就是下面的描述

jdbcDF: org.apache.spark.sql.DataFrame = [ID:decimal(0,-127), XX: string, XX: string, XX: string, XX: decimal(0,-127)]

注意:在1.6以前的版本中,当Oracle表字段为Number时,对应DataType为decimal,此时会由于scala的精度断言抛出异常——可以在stackoverflow网站查找该异常——应该是个bug,在1.6中应该已经解决。有兴趣可以试下下。——如果继续show数据的话,会抛出该异常。

二、

def getConnection() = {

Class.forName(“oracle.jdbc.driver.OracleDriver”).newInstance()

DriverManager.getConnection(“jdbc:oracle:thin:@192.168.0.33:1521:orcl”, “root”, “12

3456”)

}

val rdd = new JdbcRDD(sc,getConnection,

“SELECT * FROM LDZHS WHERE ? <= ID AND ID <= ?”,

Long.MinValue, Long.MaxValue, 2 ,(r: ResultSet) => { r.getInt(“id”)+”\t”+r.getString(“name”)}

)

该方法是很久之前的API了,现在的新版本中有了更好的方法。即上面提到的方法1。

以上的方法都是依据分区的列必须是number类型的,否则java.lang.NumberFormatException: For input string: “A”

根据Long类型字段分区,此时一定要选择好上下界,要不然很可能很多空的partition的。

三、根据任意类型字段分区

jdbc(

url: String,

table: String,

predicates: Array[String],

connectionProperties: Properties): DataFrame

val part =

Array(

“2016-01-01” -> “2016-02-01”,

“2016-02-01” -> “2016-03-01”,

“2016-03-01” -> “2016-04-01”,

“2016-04-01” -> “2016-05-01”,

“2016-05-01” -> “2016-06-01”,

“2016-06-01” -> “2016-12-30”

).map {

case (start, end) =>

s”cast(RZSJ as date) >= date ‘start′"+s"ANDcast(RZSJasdate)<=date′start' " + s"AND cast(RZSJ as date) <= date 'end’”

}

val driver1: String = “oracle.jdbc.driver.OracleDriver”

val props1 = new Properties()

props1.setProperty(“user”, “root”)

props1.setProperty(“password”, “123456”)

props1.setProperty(“driver”, driver)

val dataf = sqlContext.read.jdbc(“jdbc:oracle:thin:root/123456@192.168.0.33:1521/orcl”,”T_LG_GNLK”,part,props)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: