【spark-1.5.1】Spark SQL and DataFrame
2016-01-26 00:00
260 查看
dataframes
一、create dataframes
二、interoperating with rdds
spark support two ways for converting existing rdds into dataframes.
1.interring the schema using reflection
2. programmatically specifying the schema
a.create an rdd of rows from the original rdd
b.create the schema represented by a structtype matching the structure of rows in rdd created in step a.
c.apply the schema to the rdd of rows via createdataframe method provided by sqlcontext
Data Sources
一、create dataframes
val sc: SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.json"
二、interoperating with rdds
spark support two ways for converting existing rdds into dataframes.
1.interring the schema using reflection
val sc: SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ case class Person(name: String,age: Int) /* *[hadoop@hftclclw0001 resources]$ cat people.txt *Michael, 29 *Andy, 30 *Justin, 19 */ //people_rdd_1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:24 val people_rdd_1 = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")) //people_rdd_2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[23] at map at <console>:26 val people_rdd_2 = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)) //people_df: org.apache.spark.sql.DataFrame = [name: string, age: int] val people_df = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF() people_df.registerTempTable("people) val teenagers = sqlContext.sql("select name,age from people where age >= 13 and age <= 19") teenagers.map(t=>"Name:" + t(0)).collect().foreach(println) teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println) teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)
2. programmatically specifying the schema
a.create an rdd of rows from the original rdd
b.create the schema represented by a structtype matching the structure of rows in rdd created in step a.
c.apply the schema to the rdd of rows via createdataframe method provided by sqlcontext
val sc: SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) //a.create an rdd val people = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt") val schemastring = "name age" import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType,StructField,StringType} //b.create the schema based on the string of schema val schema = StructType(schemastring.split(" ").map(fieldName => StructField(fieldName,StringType,true))) // val rowRdd = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim)) //c.create data frame val peopleDataFrame = sqlContext.createDataFrame(rowRdd,schema) peopleDataFrame.registerTempTable("people") val results = sqlContext.sql("select name from people") results.map(t=>"Name:" + t(0)).collect().foreach(println)
Data Sources
相关文章推荐
- Mysql并发控制
- 高性能MySql学习笔记——锁、事务、隔离级别
- mysql之触发器trigger
- 各版本 MySQL 并行复制的实现及优缺点
- sqlite 自增,记录
- 阶段性理解ORM
- MySQL 手动主从同步不锁表
- mysql 5.7新数据库sys解析(一)
- Entity Framework 6 Recipes 2nd Edition(11-11)译 -> 在LINQ中调用数据库函数
- oracle学习之数据库数据保存成文件
- 创建和修改数据表(读书笔记)
- SQLite数据插入异常
- SQLite数据插入异常
- PLSQL 连接 oracle 12c
- SparkSql 不支持Date Format (支持Timestamp)
- redis操作命令
- PostgreSQL Replication之第十四章 扩展与BDR
- MySQL中哈希表
- mysql中查询时间为空的数据
- 学习sqlite3(四)---SQLite中的高级SQL