您的位置:首页 > 数据库 > MySQL

spark scala mysql 语法

2016-01-28 17:05 561 查看
https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html

https://github.com/yu-iskw/spark-dataframe-introduction/blob/master/doc/dataframe-introduction.md


needregisteratemporarytable:

//Registeratemporarytablefortheschema
event.registerTempTable("event")

//ExecuteaSparkSQL
context.sql("SELECTcreated_at,repo.nameAS`repo.name`,actor.id,typeFROMeventWHEREtype='PullRequestEvent'").limit(5).show()

scala:

importjava.sql.{PreparedStatement,Connection,DriverManager}

importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.types._
importorg.apache.spark.sql.{Row,SQLContext}
importorg.apache.spark.{SparkContext,SparkConf}
importjava.util.Properties

objecthoursAvg{

defmain(args:Array[String]):Unit={

valsparkConf=newSparkConf().setAppName("RDDRelation").setMaster("spark://Master.Hadoop:7077")
valsc=newSparkContext(sparkConf)
valsqlContext=newSQLContext(sc)

Class.forName("com.mysql.jdbc.Driver")

valurl="jdbc:mysql://IP:port/db"

valprop=newProperties()
prop.setProperty("user","greenwave")
prop.setProperty("password","green4irvine")

valaccountstatsDF=sqlContext.read.jdbc(url,"AccountStats",prop)
valaccountDF=sqlContext.read.jdbc(url,"Account",prop)

accountstatsDF.registerTempTable("accountstatsDF")
accountDF.registerTempTable("accountDF")

valselData=sqlContext.sql("selects.*,a.UtilityAccountIDfromaccountstatsDFs,accountDFawherea.AccountID=s.AccountID")

selData.registerTempTable("selData")

valselData_date_time=sqlContext.sql("selectAccountID,SubString(UpdateDate,1,10)asDate,SubString(UpdateDate,12,19)asTime,Period,StatsType,StatsVal,UtilityAccountIDfromselData")
selData_date_time.registerTempTable("selData_date_time")

valfilter_heatinghours=selData_date_time.filter(selData("StatsType")==="EON_SH.heatinghours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
filter_heatinghours.registerTempTable("filter_heatinghours")

valresult_heatinghours=filter_heatinghours.select(filter_heatinghours("UtilityAccountID"),filter_heatinghours("Period"),filter_heatinghours("Date"),filter_heatinghours("StatsType"),filter_heatinghours("AVG(StatsVal)").as("StatsVal"))
result_heatinghours.registerTempTable("temp_heatinghours")

valfilter_hotwaterhours=selData_date_time.filter(selData("StatsType")==="EON_SH.hotwaterhours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
filter_hotwaterhours.registerTempTable("filter_hotwaterhours")

valresult_hotwaterhours=filter_hotwaterhours.select(filter_hotwaterhours("UtilityAccountID"),filter_hotwaterhours("Period"),filter_hotwaterhours("Date"),filter_hotwaterhours("StatsType"),filter_hotwaterhours("AVG(StatsVal)").as("StatsVal"))
result_hotwaterhours.registerTempTable("temp_hotwaterhours")

valresult=result_heatinghours.unionAll(result_hotwaterhours)

valdfWriter=result.write.mode("overwrite")
dfWriter.jdbc(url,"AccountStatsAggregate",prop)

}
}

sbt:

name:="hoursAvg"

version:="1.0"

valapacheSpark="org.apache.spark"%%"spark-core"%"1.2.0"

valapacheSQL="mysql"%"mysql-connector-java"%"5.1.37"

valapacheSSQL="org.apache.spark"%"spark-sql_2.10"%"1.4.0"

lazyvalcommonSettings=Seq(
organization:="com.gws",
version:="0.1.0",
scalaVersion:="2.10.4"
)

lazyvalroot=(projectinfile(".")).
settings(commonSettings:_*).
settings(
name:="hoursAvg",
libraryDependencies++=Seq(
apacheSQL,
apacheSSQL,
apacheSpark.
exclude("com.esotericsoftware.kryo","kryo").
exclude("javax.activation","activation").
exclude("commons-logging","commons-logging").
exclude("commons-collections","commons-collections").
exclude("org.eclipse.jetty.orbit","javax.transaction").
exclude("org.eclipse.jetty.orbit","javax.servlet").
exclude("org.eclipse.jetty.orbit","javax.mail.glassfish").
exclude("org.eclipse.jetty.orbit","javax.activation")
)
)


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: