Spark-SQL连接MySql关系型数据库
2015-09-23 11:58
706 查看
注意:程序中需要用到jdbc的jar包
本程序中使用的是mysql-connector-java-5.1.29.jar
1,从mysql数据库读取数据
完整程序如下:
其中properties为java.util.Properties类型,且最少需要设定一个user和一个password属性
程序运行结果:
2,DataFrame写入MySql
(1)创建新表,SaveMode.ErrorIfExists(default)
在程序中调用DataFrame对象的write方法会返回一个DataFrameWriter对象,调用该对象的jdbc方法,写入url地址,table表名和property参数,
示例2:查询number > 1000 且 content不等于x11的所有记录
如果是
相当于同时两个条件,查询到的是两个条件分别查询的结果
示例3:错误排序
可以看出,在predicates中的字符串,都会被程序拼接于select * from test where XXXXX后面,例如"number > 1000"
则完整的sql语句为select * from test where number > 1000.程序正常运行,
而"order by content desc"
拼接后的sql为select * from test where order by content decs.报错是理所当然的
4,根据某字段的上下限将查询结果分块
另一个重载的方法def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties)
本程序中使用的是mysql-connector-java-5.1.29.jar
1,从mysql数据库读取数据
完整程序如下:
import java.util.Properties import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} /** * Created by root on 15-9-6. */ object SparkSqlJDBC { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("RDDRelation") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val property = new Properties() val url = "jdbc:mysql://localhost:3306/my_test_database" property.put("user","root") property.put("password", "root") val jdbcDF = sqlContext.read.jdbc(url,"spark_sql", property) jdbcDF.registerTempTable("records") sqlContext.sql("SELECT * FROM records").collect().foreach(println) } } 在Spark-SQL官方文档中,使用的是
val jdbcDF = sqlContext.load("jdbc", Map( "url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")) 在实际使用时发现,该方法已经Deprecated了.从api中看出,sqlContext.read方法得到一个DataFrame类型的返回值,DataFrame类型中有重载的jdbc方法,
程序中使用的是:defjdbc(url: String, table: String, properties: Properties):
其中properties为java.util.Properties类型,且最少需要设定一个user和一个password属性程序运行结果:
2,DataFrame写入MySql
(1)创建新表,SaveMode.ErrorIfExists(default)
在程序中调用DataFrame对象的write方法会返回一个DataFrameWriter对象,调用该对象的jdbc方法,写入url地址,table表名和property参数,
在上面程序最后写入下一行: jdbcDF.write.jdbc(url, "test", property)//将spark_sql表中的数据存入到一个新表test中,需要保证test表事先不存在 因为SaveMode默认为SaveMode.ErrorIfExists,表存在则报already exists的错误. 查看结果,可在数据库中看到新增一个test表,结构和数据与spark_sql完全一致 (2)插入数据到spark_sql表,SaveMode.Append jdbcDF.write.mode(SaveMode.Append).jdbc(url, "test", property) 使用SaveMode.Ignore,在表存在的情况下,不会执行插入操作.SaveMode.Overwrite会首先将表清空,然后重新插入数据 (3)update数据 暂无 (4)delete数据
暂无 3,有条件读取数据 spark_sql表中数据为:
有一个重载的 def jdbc( url: String, table: String, predicates: Array[String], coproperties: Properties) 按照源码中的说明:predicates中是每个分区中sql语句where后面的条件, 示例1:查询number > 1000的所有记录
val jdbcDF = sqlContext.read.jdbc(url,table, Array("number > 1000"), property) 结果如下:
示例2:查询number > 1000 且 content不等于x11的所有记录
val jdbcDF = sqlContext.read.jdbc(url,table, Array("number > 1000 and content <> 'x11'"), property) 结果如下:
如果是
val jdbcDF = sqlContext.read.jdbc(url,table, Array("number > 1000 and content <> 'x11'"), property) 结果如下:
相当于同时两个条件,查询到的是两个条件分别查询的结果
示例3:错误排序
val jdbcDF = sqlContext.read.jdbc(url,table, Array("order by content desc"), property) 程序报错: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'order by content desc' at line 1 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
示例4:正确排序
val jdbcDF = sqlContext.read.jdbc(url,table, Array("1=1 order by content desc"), property)
结果如下:
可以看出,在predicates中的字符串,都会被程序拼接于select * from test where XXXXX后面,例如"number > 1000"
则完整的sql语句为select * from test where number > 1000.程序正常运行,
而"order by content desc"
拼接后的sql为select * from test where order by content decs.报错是理所当然的
4,根据某字段的上下限将查询结果分块
另一个重载的方法def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties)
相关文章推荐
- MYSQL基础_聚合函数_分组和多表连接查询
- mysql并行复制功能
- MYSQL 数据类型
- mysql获取某个表的所有字段名
- mysql中enum的用法
- mysql 学习碎片
- sql执行效率检测 mysql explain
- mysql 存在update不存在insert
- spark安装mysql与hive
- MySQL多实例安装初探
- Mysql grant权限详解
- mysql集群一:主从复制,通过mysql-proxy做负载均衡
- Mac 10.10 以上启动MySQL
- mysql的1067错误 Unknown/unsupported storage engine: InnoDB
- mysql 查看配置文件
- mysql中主外键关系
- mysql 交互式连接和非交互式连接
- mysql wait_timeout和interactive_timeout总结
- mysql 超时设置
- mysql:Lock wait timeout exceeded,try restarting transaction