您的位置:首页 > 数据库

SparkSQL学习记录(SparkSQL 两种Schema创建方式)

2018-05-22 17:24 393 查看
方式://l通过定义Case Class,使用反射推断Schema(case class方式)

     //2 通过可编程接口,定义Schema,并应用到RDD上(createDataFrame 方式)


依赖:

                <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>


方式一:
import org.apache.spark.SparkConf

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

//l通过定义Case Class,使用反射推断Schema(case class方式)
case class Person(name: String, age: Int)
object SparkSqlDemo1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("sparksqldemo1").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rddpeople = sc.textFile("test.txt").map(_.split(" ")).map(p => Person(p(0), p(1).trim().toInt))
//隐式转换
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = rddpeople.toDF()
df.registerTempTable("people")

//缓存和清除缓存表
//sqlContext.cacheTable("people")
//sqlContext.uncacheTable("people")
//sqlContext.sql("CACHE TABLE people")
//sqlContext.sql("UNCACHE TABLE people")
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 10 and age <= 19")

//DSL(Domain Specific Language)
//在DSL中,使用Scala符号'+标示符表示基础表中的列,Spark的execution engine会将这些标示符隐式转换成表达式
//另外可以在API中找到很多DSL相关的方法,如where()、select()、limit()等等,详细资料可以查看Catalyst模块中的DSL子模块
// val teenagers =  df.where('age >= 10).select('name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

sc.stop()
}
}

方式二:

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
//通过可编程接口,定义Schema,并应用到RDD上(createDataFrame 方式)
object SparkSqlDemo2 {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sparksqldemo2").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val schemaString = "name age"
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = sc.textFile("test.txt").map(_.split(" ")).map(p => Row(p(0), p(1).trim))
val peopleDF = sqlContext.createDataFrame(rowRDD, schema)
peopleDF.registerTempTable("people")
sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
.map(t => "Name: " + t(0)).collect().foreach(println)
}
}


阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: