您的位置:首页 > 数据库

【spark-1.5.1】Spark SQL and DataFrame

2016-01-26 00:00 260 查看
dataframes

一、create dataframes

val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.json"


二、interoperating with rdds

spark support two ways for converting existing rdds into dataframes.

1.interring the schema using reflection

val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String,age: Int)

/*
*[hadoop@hftclclw0001 resources]$ cat people.txt
*Michael, 29
*Andy, 30
*Justin, 19
*/

//people_rdd_1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:24
val people_rdd_1 = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(","))

//people_rdd_2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[23] at map at <console>:26
val people_rdd_2  = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))

//people_df: org.apache.spark.sql.DataFrame = [name: string, age: int]
val people_df = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()

people_df.registerTempTable("people)

val teenagers = sqlContext.sql("select name,age from people where age >= 13 and age <= 19")
teenagers.map(t=>"Name:" + t(0)).collect().foreach(println)
teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println)
teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)


2. programmatically specifying the schema

a.create an rdd of rows from the original rdd

b.create the schema represented by a structtype matching the structure of rows in rdd created in step a.

c.apply the schema to the rdd of rows via createdataframe method provided by sqlcontext

val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

//a.create an rdd
val people = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt")

val schemastring = "name age"
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType}

//b.create the schema based on the string of schema
val schema = StructType(schemastring.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

//
val rowRdd = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))

//c.create data frame
val peopleDataFrame = sqlContext.createDataFrame(rowRdd,schema)
peopleDataFrame.registerTempTable("people")

val results = sqlContext.sql("select name from people")

results.map(t=>"Name:" + t(0)).collect().foreach(println)


Data Sources
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: