spark scala mysql 语法
2016-01-28 17:05
561 查看
https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html
needregisteratemporarytable:
importjava.sql.{PreparedStatement,Connection,DriverManager}
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.types._
importorg.apache.spark.sql.{Row,SQLContext}
importorg.apache.spark.{SparkContext,SparkConf}
importjava.util.Properties
objecthoursAvg{
defmain(args:Array[String]):Unit={
valsparkConf=newSparkConf().setAppName("RDDRelation").setMaster("spark://Master.Hadoop:7077")
valsc=newSparkContext(sparkConf)
valsqlContext=newSQLContext(sc)
Class.forName("com.mysql.jdbc.Driver")
valurl="jdbc:mysql://IP:port/db"
valprop=newProperties()
prop.setProperty("user","greenwave")
prop.setProperty("password","green4irvine")
valaccountstatsDF=sqlContext.read.jdbc(url,"AccountStats",prop)
valaccountDF=sqlContext.read.jdbc(url,"Account",prop)
accountstatsDF.registerTempTable("accountstatsDF")
accountDF.registerTempTable("accountDF")
valselData=sqlContext.sql("selects.*,a.UtilityAccountIDfromaccountstatsDFs,accountDFawherea.AccountID=s.AccountID")
selData.registerTempTable("selData")
valselData_date_time=sqlContext.sql("selectAccountID,SubString(UpdateDate,1,10)asDate,SubString(UpdateDate,12,19)asTime,Period,StatsType,StatsVal,UtilityAccountIDfromselData")
selData_date_time.registerTempTable("selData_date_time")
valfilter_heatinghours=selData_date_time.filter(selData("StatsType")==="EON_SH.heatinghours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
filter_heatinghours.registerTempTable("filter_heatinghours")
valresult_heatinghours=filter_heatinghours.select(filter_heatinghours("UtilityAccountID"),filter_heatinghours("Period"),filter_heatinghours("Date"),filter_heatinghours("StatsType"),filter_heatinghours("AVG(StatsVal)").as("StatsVal"))
result_heatinghours.registerTempTable("temp_heatinghours")
valfilter_hotwaterhours=selData_date_time.filter(selData("StatsType")==="EON_SH.hotwaterhours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
filter_hotwaterhours.registerTempTable("filter_hotwaterhours")
valresult_hotwaterhours=filter_hotwaterhours.select(filter_hotwaterhours("UtilityAccountID"),filter_hotwaterhours("Period"),filter_hotwaterhours("Date"),filter_hotwaterhours("StatsType"),filter_hotwaterhours("AVG(StatsVal)").as("StatsVal"))
result_hotwaterhours.registerTempTable("temp_hotwaterhours")
valresult=result_heatinghours.unionAll(result_hotwaterhours)
valdfWriter=result.write.mode("overwrite")
dfWriter.jdbc(url,"AccountStatsAggregate",prop)
}
}
sbt:
name:="hoursAvg"
version:="1.0"
valapacheSpark="org.apache.spark"%%"spark-core"%"1.2.0"
valapacheSQL="mysql"%"mysql-connector-java"%"5.1.37"
valapacheSSQL="org.apache.spark"%"spark-sql_2.10"%"1.4.0"
lazyvalcommonSettings=Seq(
organization:="com.gws",
version:="0.1.0",
scalaVersion:="2.10.4"
)
lazyvalroot=(projectinfile(".")).
settings(commonSettings:_*).
settings(
name:="hoursAvg",
libraryDependencies++=Seq(
apacheSQL,
apacheSSQL,
apacheSpark.
exclude("com.esotericsoftware.kryo","kryo").
exclude("javax.activation","activation").
exclude("commons-logging","commons-logging").
exclude("commons-collections","commons-collections").
exclude("org.eclipse.jetty.orbit","javax.transaction").
exclude("org.eclipse.jetty.orbit","javax.servlet").
exclude("org.eclipse.jetty.orbit","javax.mail.glassfish").
exclude("org.eclipse.jetty.orbit","javax.activation")
)
)
https://github.com/yu-iskw/spark-dataframe-introduction/blob/master/doc/dataframe-introduction.md
needregisteratemporarytable:
//Registeratemporarytablefortheschema event.registerTempTable("event") //ExecuteaSparkSQL context.sql("SELECTcreated_at,repo.nameAS`repo.name`,actor.id,typeFROMeventWHEREtype='PullRequestEvent'").limit(5).show() scala:
importjava.sql.{PreparedStatement,Connection,DriverManager}
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.types._
importorg.apache.spark.sql.{Row,SQLContext}
importorg.apache.spark.{SparkContext,SparkConf}
importjava.util.Properties
objecthoursAvg{
defmain(args:Array[String]):Unit={
valsparkConf=newSparkConf().setAppName("RDDRelation").setMaster("spark://Master.Hadoop:7077")
valsc=newSparkContext(sparkConf)
valsqlContext=newSQLContext(sc)
Class.forName("com.mysql.jdbc.Driver")
valurl="jdbc:mysql://IP:port/db"
valprop=newProperties()
prop.setProperty("user","greenwave")
prop.setProperty("password","green4irvine")
valaccountstatsDF=sqlContext.read.jdbc(url,"AccountStats",prop)
valaccountDF=sqlContext.read.jdbc(url,"Account",prop)
accountstatsDF.registerTempTable("accountstatsDF")
accountDF.registerTempTable("accountDF")
valselData=sqlContext.sql("selects.*,a.UtilityAccountIDfromaccountstatsDFs,accountDFawherea.AccountID=s.AccountID")
selData.registerTempTable("selData")
valselData_date_time=sqlContext.sql("selectAccountID,SubString(UpdateDate,1,10)asDate,SubString(UpdateDate,12,19)asTime,Period,StatsType,StatsVal,UtilityAccountIDfromselData")
selData_date_time.registerTempTable("selData_date_time")
valfilter_heatinghours=selData_date_time.filter(selData("StatsType")==="EON_SH.heatinghours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
filter_heatinghours.registerTempTable("filter_heatinghours")
valresult_heatinghours=filter_heatinghours.select(filter_heatinghours("UtilityAccountID"),filter_heatinghours("Period"),filter_heatinghours("Date"),filter_heatinghours("StatsType"),filter_heatinghours("AVG(StatsVal)").as("StatsVal"))
result_heatinghours.registerTempTable("temp_heatinghours")
valfilter_hotwaterhours=selData_date_time.filter(selData("StatsType")==="EON_SH.hotwaterhours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
filter_hotwaterhours.registerTempTable("filter_hotwaterhours")
valresult_hotwaterhours=filter_hotwaterhours.select(filter_hotwaterhours("UtilityAccountID"),filter_hotwaterhours("Period"),filter_hotwaterhours("Date"),filter_hotwaterhours("StatsType"),filter_hotwaterhours("AVG(StatsVal)").as("StatsVal"))
result_hotwaterhours.registerTempTable("temp_hotwaterhours")
valresult=result_heatinghours.unionAll(result_hotwaterhours)
valdfWriter=result.write.mode("overwrite")
dfWriter.jdbc(url,"AccountStatsAggregate",prop)
}
}
sbt:
name:="hoursAvg"
version:="1.0"
valapacheSpark="org.apache.spark"%%"spark-core"%"1.2.0"
valapacheSQL="mysql"%"mysql-connector-java"%"5.1.37"
valapacheSSQL="org.apache.spark"%"spark-sql_2.10"%"1.4.0"
lazyvalcommonSettings=Seq(
organization:="com.gws",
version:="0.1.0",
scalaVersion:="2.10.4"
)
lazyvalroot=(projectinfile(".")).
settings(commonSettings:_*).
settings(
name:="hoursAvg",
libraryDependencies++=Seq(
apacheSQL,
apacheSSQL,
apacheSpark.
exclude("com.esotericsoftware.kryo","kryo").
exclude("javax.activation","activation").
exclude("commons-logging","commons-logging").
exclude("commons-collections","commons-collections").
exclude("org.eclipse.jetty.orbit","javax.transaction").
exclude("org.eclipse.jetty.orbit","javax.servlet").
exclude("org.eclipse.jetty.orbit","javax.mail.glassfish").
exclude("org.eclipse.jetty.orbit","javax.activation")
)
)
相关文章推荐
- mysql使用索引扫描来做排序
- MySQL冒泡排名
- mysql用户授权及撤销
- MYSQL存储过程怎么写
- MySQL函数
- mysql随机获取十条数据
- MYSQL使用问题汇集
- 安装Mysql遇到的问题: libaio.so.1()(64bit) is needed by MySQL ***
- MySQL数据库事务隔离级别(Transaction Isolation Level)
- MSSQL和MYSQL数据交互
- mysql特殊操作
- mysql select 时间戳转标准时间写法
- MySql5.6 Window超详细安装教程
- spark 连接 mysql 数据库
- 21-修改表给mysql添加外键
- mysql json字段的使用与意义
- MySQL两个表联合查询并按时间排序
- Mysql 数据库缓存cache功能
- MySQL数据库性能优化之缓存参数优化
- mysql 区分大小写查询