Spark2 Dataset行列操作和执行计划
2016-11-25 14:21
316 查看
Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]
Dataset是“懒惰”的,只在执行行动操作时触发计算。本质上,数据集表示一个逻辑计划,该计划描述了产生数据所需的计算。当执行行动操作时,Spark的查询优化程序优化逻辑计划,并生成一个高效的并行和分布式物理计划。
示例数据字段解释
1.导入常用的包
2.创建SparkSession,并导入示例数据
3.操作指定的列和行
4.查看SparkSQL逻辑和物理执行计划
Dataset是“懒惰”的,只在执行行动操作时触发计算。本质上,数据集表示一个逻辑计划,该计划描述了产生数据所需的计算。当执行行动操作时,Spark的查询优化程序优化逻辑计划,并生成一个高效的并行和分布式物理计划。
示例数据字段解释
1.导入常用的包
import scala.math._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrameReader import org.apache.spark.sql.functions._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import org.apache.spark.sql.DataFrameStatFunctions
2.创建SparkSession,并导入示例数据
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List( (0, "male", 37, 10, "no", 3, 18, 7, 4), (0, "female", 27, 4, "no", 4, 14, 6, 4), (0, "female", 32, 15, "yes", 1, 12, 1, 4), (0, "male", 57, 15, "yes", 5, 18, 6, 5), (0, "male", 22, 0.75, "no", 2, 17, 6, 3), (0, "female", 32, 1.5, "no", 2, 17, 5, 5), (0, "female", 22, 0.75, "no", 2, 12, 1, 3), (0, "male", 57, 15, "yes", 2, 14, 4, 4), (0, "female", 32, 15, "yes", 4, 16, 1, 2), (0, "male", 22, 1.5, "no", 4, 14, 4, 5)) val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating") data.printSchema() root |-- affairs: double (nullable = false) |-- gender: string (nullable = true) |-- age: double (nullable = false) |-- yearsmarried: double (nullable = false) |-- children: string (nullable = true) |-- religiousness: double (nullable = false) |-- education: double (nullable = false) |-- occupation: double (nullable = false) |-- rating: double (nullable = false)
3.操作指定的列和行
// 在Spark-shell中展示,前n条记录 data.show(7) +-------+------+----+------------+--------+-------------+---------+----------+------+ |affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating| +-------+------+----+------------+--------+-------------+---------+----------+------+ | 0.0| male|37.0| 10.0| no| 3.0| 18.0| 7.0| 4.0| | 0.0|female|27.0| 4.0| no| 4.0| 14.0| 6.0| 4.0| | 0.0|female|32.0| 15.0| yes| 1.0| 12.0| 1.0| 4.0| | 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0| | 0.0| male|22.0| 0.75| no| 2.0| 17.0| 6.0| 3.0| | 0.0|female|32.0| 1.5| no| 2.0| 17.0| 5.0| 5.0| | 0.0|female|22.0| 0.75| no| 2.0| 12.0| 1.0| 3.0| +-------+------+----+------------+--------+-------------+---------+----------+------+ only showing top 7 rows // 取前n条记录 val data3=data.limit(5) // 过滤 data.filter("age>50 and gender=='male' ").show +-------+------+----+------------+--------+-------------+---------+----------+------+ |affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating| +-------+------+----+------------+--------+-------------+---------+----------+------+ | 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0| | 0.0| male|57.0| 15.0| yes| 2.0| 14.0| 4.0| 4.0| +-------+------+----+------------+--------+-------------+---------+----------+------+ // 数据框的所有列 val columnArray=data.columns columnArray: Array[String] = Array(affairs, gender, age, yearsmarried, children, religiousness, education, occupation, rating) // 查询某些列的数据 data.select("gender", "age", "yearsmarried", "children").show(3) +------+----+------------+--------+ |gender| age|yearsmarried|children| +------+----+------------+--------+ | male|37.0| 10.0| no| |female|27.0| 4.0| no| |female|32.0| 15.0| yes| +------+----+------------+--------+ only showing top 3 rows val colArray=Array("gender", "age", "yearsmarried", "children") colArray: Array[String] = Array(gender, age, yearsmarried, children) data.selectExpr(colArray:_*).show(3) +------+----+------------+--------+ |gender| age|yearsmarried|children| +------+----+------------+--------+ | male|37.0| 10.0| no| |female|27.0| 4.0| no| |female|32.0| 15.0| yes| +------+----+------------+--------+ only showing top 3 rows // 操作指定的列,并排序 // data.selectExpr("gender", "age+1","cast(age as bigint)").orderBy($"gender".desc, $"age".asc).show data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc).show +------+----+----+ |gender|age1|age2| +------+----+----+ | male|23.0| 22| | male|23.0| 22| | male|38.0| 37| | male|58.0| 57| | male|58.0| 57| |female|23.0| 22| |female|28.0| 27| |female|33.0| 32| |female|33.0| 32| |female|33.0| 32| +------+----+----+
4.查看SparkSQL逻辑和物理执行计划
val data4=data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc) data4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gender: string, age1: double ... 1 more field] // 查看物理执行计划 data4.explain() == Physical Plan == *Project [gender#20, age1#135, age2#136L] +- *Sort [gender#20 DESC, age#21 ASC], true, 0 +- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200) +- LocalTableScan [gender#20, age1#135, age2#136L, age#21] // 查看逻辑和物理执行计划 data4.explain(extended=true) == Parsed Logical Plan == 'Sort ['gender DESC, 'age ASC], true +- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L] +- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupation#2 6, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17] == Analyzed Logical Plan == gender: string, age1: double, age2: bigint Project [gender#20, age1#135, age2#136L] +- Sort [gender#20 DESC, age#21 ASC], true +- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L, age#21] +- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupatio n#26, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17] == Optimized Logical Plan == Project [gender#20, age1#135, age2#136L] +- Sort [gender#20 DESC, age#21 ASC], true +- LocalRelation [gender#20, age1#135, age2#136L, age#21] == Physical Plan == *Project [gender#20, age1#135, age2#136L] +- *Sort [gender#20 DESC, age#21 ASC], true, 0 +- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200) +- LocalTableScan [gender#20, age1#135, age2#136L, age#21]
相关文章推荐
- Spark SQL 物理执行计划各操作实现
- Spark SQL 物理执行计划各操作实现
- Spark SQL 物理执行计划各操作实现
- Spark2 Dataset聚合操作
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- 数据库操作_连接SQL Server数据库示例;连接ACCESS数据库;连接到 Oracle 数据库示例;SqlCommand 执行SQL命令示例;SqlDataReader 读取数据示例;使用DataAdapter填充数据到DataSet;使用DataTable存储数据库表;将数据库数据填充到 XML 文件;10 使用带输入参数的存储过程;11 使用带输入、输出参数的存储过程示;12 获得数据库中表的数目和名称;13 保存图片到SQL Server数据库示例;14 获得插入记录标识号;Exce
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- 看MSSQL的执行计划,学习集合操作
- sql server 根据执行计划查询耗时操作
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- sparksql源码解析(执行计划)
- 集合操作执行计划[摘]
- 理解执行计划中的一些sort操作
- 阿里巴巴数据库操作手册-20-固定执行计划-outline
- Spark算子:RDD行动Action操作(6)–saveAsHadoopFile、saveAsHadoopDataset
- 排序操作执行计划
- Linux下使用crontab来执行定时任务计划----执行每晚12点多执行移动log日志文件操作
- asp.net中DataSet对象获取相应列值、行列数、列名、取出特定值这些操作的总结
- 一次ORA-4030问题诊断及解决【解决思路不错,说明了对象的统计信息与优化器的优化操作(即选择执行一个SQL语句在该优化参数环境下最佳的执行计划)间的关系】