spark点点滴滴 —— 认识spark sql的DataFrame和DataSet
2017-03-31 15:59
543 查看
概述
spark的DataFrames和DataSets是spark SQL中的关键概念,相比于RDD,DataFrame更能描述数据类型,因此是spark sql的基础类型,同时在spark 2.0.x及其以后的版本中,spark的机器学习也会逐渐替换成基于DataFrame的api,所有我们有必要了解spark的DataFrame相关概念。spark sql在spark框架中的位置:
我们可以看到,spark sql是建立在spark框架之上的一个大数据关系型数据处理系统,和impala类似。
其核心部分有两个:DataFrame API和Catalyst引擎,SparkSQL的优化器系统Catalyst和大多数当前的大数据SQL处理引擎设计基本相同(Impala、Presto、Hive(Calcite)等),本篇我们重点看看DataFrame的基本原理和使用。
DataFrame基本原理
和RDD的比较
我们知道,RDD是spark早期很重要的一个概念,是数据的immutable distributed的集合,由不同节点上的partition组成。DataFrame和RDD类似,也是数据的不可变分布式集合。不同的是,数据被组织成带名字的列,就像关系型数据库中的表。是一种有结构的高级别抽象,与之相应的提供了一种领域特定语言(DSL)API来操作这些分布式数据。DataFrame直观上很像是RDDs的加强版,它和RDDs在数据存储上最大的区别就在于,DataFrame是有Schema的,通俗的讲,就是上图中蓝色框住的那个表头。不要小看这一点,对于复杂的数据类型,DataFrame的这种结构可以使编程大大简化。
在spark2.0后,DataFrame的API和DataSet的API合并统一了,现在只需要处理DataSet相关API即可。
DataSet
从spark 2.0开始,两种独立的API特点:strongly-typed API 和untyped API。从概念上来说,将DataFrame作为 一般对象Dataset[Row]的集合的别名,而DataSet是strongly-typed JVM对象的集合,即java和scala中的类。不同语言对应抽象类型如下:
Language | Main Abstraction |
---|---|
Scala | Dataset[T] & DataFrame (alias for Dataset[Row]) |
Java | Dataset[T] |
Python* | DataFrame |
R* | DataFrame |
为什么用DataFrame
在RDDs中,数据集中的每项数据都是一个整体,因为你无法得知其内部的结构,这也就使得你对数据项的操作能力很弱,当你想获得数据项内部的部分信息的时候,你需要手动将object按照你预先设定的数据格式进行分割,麻烦,且容易出错。而使用DataFrame,意味着你可以直接获得数据项的内部数据结构,并且由于DataFrame的Schema的存在,数据项的转换也都将是类型安全的,这对于较为复杂的数据计算程序的调试是十分有利的,很多数据类型不匹配的问题都可以在编译阶段就被检查出来,而对于不合法的数据文件,DataFrame也具备一定分辨能力。而另一方面,我们注意到,当RDD被切割出“列”并加上“表头”变成DataFrame之后,就意味着DataFrame要支持比RDD更加细粒度的查询,而这种Table式的结构,很容易就可以让我们联想到数据库中数据表,同时DataFrame API也支持使用者对DataFrame进行数据库那样的关联、聚合、筛选等查询操作。
对DataFrame的查询,都是在RDDs数据集上做得直接映射,。你可以把DataFrame中的数据,理解为“逻辑数据”,而“物理数据”实际上在RDDs中。
DataFrame性能
为了能够更快更好的定位到数据,甚至于更好的利用内存与磁盘中的存储空间,DataFrame中的数据在内存和磁盘中的排列也必须更为考究,才能够在不损失性能的前提下提供这些操作。Spark SQL团队给出的方案是:按列压缩存储。而列式存储则是将行拆开,将一列的数据放在一起,同时不同列可以存放在不同的位置(由于天然利于纵向分表,所以在超大数据集的存储上,列式存储也具有一定优势)。通常情况下,我们查询一个数据并不需要检查一行数据中的每个列条目,但是在行式存储中,必须要扫描全部数据集才能够筛选出我们想要的那条数据,既然我们检索的项目很可能只是“Id”一项而已,那为什么要去管其他列呢?特别是在磁盘上,磁头访问数据的方式是线性的,如果只想根据“Id”进行筛选,即便只是上面那个只有两列的数据表,磁头移动的距离也要超过列式存储的好几倍。不过相应的,列式存储中“更新”“插入“”查询“等操作会比较麻烦,但是由于DataFrame和RDDs一样都是Immutable的,所以恰好规避了这一问题。
最终,DataFrame API中这种支持对列进行访问的形式,要比RDDs API的数据访问粒度更为细腻,这也就意味着数据工程师可以根据“列”的性质,来为列建立索引,从而避免遍历所有的数据项。
DataSet的操作
我们先使用spark-shell来简单看下DataSet的结构:我们将json数据文件people.json上传到hdfs上/XXX/spark/people.json,内容如下:
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
在spark-shell中执行结果如下
:
更多DataSet上的api可以参考这里。
如何构建DataSet
和class类对应转换
在spark-shell上执行效果如下图:从RDD转换
构造文件people.txt,内容如下:Michael, 29 Andy, 30 Justin, 19
上次到hdfs上,执行效果:
编程指定
scala代码如下:import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema)
【参考资料】A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets
【参考资料】进化的Spark, 从DataFrame说起
相关文章推荐
- Spark SQL DataFrame/Dataset介绍
- spark结构化数据处理:Spark SQL、DataFrame和Dataset
- spark-SQL的DataFrame和DataSet
- spark第六篇:Spark SQL, DataFrame and Dataset Guide
- SparkSQL中SQL、DataFrame和DataSet方式的静态类型安全和运行时类型安全
- Spark SQL and DataFrame Guide
- Spark SQL and DataFrame Guide(1.4.1)——之Data Sources
- Spark2 Dataset DataFrame空值null,NaN判断和处理
- Spark-Sql之DataFrame实战详解
- Apache Spark 2.0三种API的传说:RDD、DataFrame和Dataset
- sparkSQL:dataframe
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- sparkSql DataFrame操作
- Spark SQL and DataFrame Guide
- 异常用户发现(Spark MLlib+Spark SQL+DataFrame)
- spark sql和DataFrame本质
- Spark-SQL之DataFrame操作大全
- 本地模式使用JAVA SACLA 开发 Spark SQL DataFrame
- DataFrame和SparkSql使用区别
- sparkSQL中 DataSet 和 DataFram区别