您的位置:首页 > 数据库 > MySQL

3、SqlContext读取mysql数据,产生一个Dataframe

2016-11-29 00:00 127 查看
#Spark 不仅可以从HDFS中读取数据,也可以从mysql等关系数据库中读取数据。
//通过sqlContext读取mysql中指定表的数据。
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "emp", "user" -> "root", "password" -> "815325")
).load()

#完整代码:
package com.liufu.org.sql

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by liufu on 2016/11/18.
*/
object SparkSqlContext {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("dataframTest").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

//通过sqlContext读取mysql中指定表的数据。
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "emp", "user" -> "root", "password" -> "815325")
).load()

//将JdbcDF注册成内存中的表,然后通过sqlContext运行SQL语句来进行操作。
//这样数据就像是从HDFS读取出来之后,结合case class样例类形成的内存表一样了。
jdbcDF.registerTempTable("emp")
val resultDF: DataFrame = sqlContext.sql("select eno,ename from emp where eno >= 7900")
resultDF.show()

/**
* 这里可以对DF进行各种操作之后,最后写入到HDFS或者写回Mysql
*/
//创建Properties存储数据库相关属性
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "815325")

//如果数据库中没有这个表,那么他也会创建一张表(很强大)
resultDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata","result",prop)

sc.stop()
}
}

#总结:把数据加载到DataFrame之后,可以将它注册成一张表,这样就可以利用sqlContext运行SQL语句,对表信息进行各种操作,操作完成后还可以写回到Mysql中
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐