2.sparkSQL--DataFrames与RDDs的相互转换
2017-03-25 00:13
477 查看
Spark SQL支持两种RDDs转换为DataFrames的方式
使用反射获取RDD内的Schema
当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
通过编程接口指定Schema
通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema。
原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6613755.html
微信:intsmaze
使用反射获取Schema(Inferring the Schema Using Reflection)
spark shell中不需要导入sqlContext.implicits._是因为spark shell默认已经自动导入了。
打包提交到yarn集群:
通过编程接口指定Schema(Programmatically Specifying the Schema)
当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:
从原来的RDD创建一个Row格式的RDD.
创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema.
通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema.
将程序打成jar包,上传到spark集群,提交Spark任务
在maven项目的pom.xml中添加Spark SQL的依赖
使用反射获取RDD内的Schema
当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
通过编程接口指定Schema
通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema。
原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6613755.html
微信:intsmaze
使用反射获取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object InferringSchema { def main(args: Array[String]) { //创建SparkConf()并设置App名称 val conf = new SparkConf().setAppName("SQL-intsmaze") //SQLContext要依赖SparkContext val sc = new SparkContext(conf) //创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(",")) //创建case class //将RDD和case class关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //导入隐式转换,如果不导入无法将RDD转换成DataFrame //将RDD转换成DataFrame import sqlContext.implicits._ val personDF = personRDD.toDF //注册表 personDF.registerTempTable("intsmaze") //传入SQL val df = sqlContext.sql("select * from intsmaze order by age desc limit 2") //将结果以JSON的方式存储到指定位置 df.write.json("hdfs://192.168.19.131:9000/personresult") //停止Spark Context sc.stop() } } //case class一定要放到外面 case class Person(id: Int, name: String, age: Int)
spark shell中不需要导入sqlContext.implicits._是因为spark shell默认已经自动导入了。
打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 2 \ --queue default \ /home/hadoop/sparksql-1.0-SNAPSHOT.jar
通过编程接口指定Schema(Programmatically Specifying the Schema)
当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:
从原来的RDD创建一个Row格式的RDD.
创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema.
通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema.
import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types._ import org.apache.spark.{SparkContext, SparkConf} object SpecifyingSchema { def main(args: Array[String]) { //创建SparkConf()并设置App名称 val conf = new SparkConf().setAppName("SQL-intsmaze") //SQLContext要依赖SparkContext val sc = new SparkContext(conf) //创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val personRDD = sc.textFile(args(0)).map(_.split(",")) //通过StructType直接指定每个字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //将RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //将schema信息应用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //注册表 personDataFrame.registerTempTable("intsmaze") //执行SQL val df = sqlContext.sql("select * from intsmaze order by age desc ") //将结果以JSON的方式存储到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop() } }
将程序打成jar包,上传到spark集群,提交Spark任务
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 2 \ --queue default \ /home/hadoop/sparksql-1.0-SNAPSHOT.jar \ hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \ --master yarn \ --deploy-mode client \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 2 \ --queue default \ /home/hadoop/sparksql-1.0-SNAPSHOT.jar \ hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
在maven项目的pom.xml中添加Spark SQL的依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.2</version> </dependency>
相关文章推荐
- Spark SQL RDD与DataFrames相互转换
- Spark RDDs vs DataFrames vs SparkSQL
- Spark SQL and DataFrames
- Spark SQL,DataFrames and DataSets Guide官方文档翻译
- Spark Sql 编程式结构DataType转换 代码类小结
- 《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南
- spark2.2官方教程笔记-Spark SQL, DataFrames and Datasets向导
- Spark(六):SparkSQLAndDataFrames对结构化数据集与非结构化数据的处理
- SparkSQL DataFrames操作
- Spark SQL概述,DataFrames,创建DataFrames的案例,DataFrame常用操作(DSL风格语法),sql风格语法
- pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换
- Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN
- [Spark2.0]Spark SQL, DataFrames 和Datasets指南
- Spark SQL, DataFrames 和 Datasets 指南
- Spark -9:Spark SQL, DataFrames and Datasets 编程指南
- 在Apache Spark 2.0中使用 DataFrames 和 SQL
- SparkSQL(Spark-1.4.0)实战系列(一)——DataFrames基础
- Spark SQL and DataFrame Guide(1.4.1)——之DataFrames
- Spark SQL, DataFrames and Datasets(Spark-2.1.1)指南
- Apache Spark 2.0中使用DataFrames和SQL