spark2.0版本的 DataFrame、DataSet 与 Spark sql
2016-11-30 11:44
211 查看
参考:
http://www.cnblogs.com/seaspring/p/5804178.html
https://my.oschina.net/cjun/blog/655263?p={{currentPage%201}}
http://spark.apache.org/docs/latest/sql-programming-guide.html
1. 相关概念
1) RDD:spark中最基本的 弹性分布式数据集,提供了很多api 来操作数据集中的元素
2) DataFrame:spark的基于RDD的一种高级抽象,在RDD之上加入了scheme信息,给RDD的元素的每一列提供了 名称和数据类型 的标志;同 时它还提供了更多的api,可以实现类似于sql的操作;但是DataFrame也丢掉了RDD的优点:编译时类型检查和面向对象
3) DataSet:引入了Encoder,成功结合了RDD 和 DataFrame 的优点 (dataset从spark1.6开始引入的)
4) Hive:在hadoop发展过程中,提供给熟悉RDBMS但不了解MapReduce的技术人员的工具,是一种 SQL-ON-HADOOP的工具
5) Shark:也是一种 SQL-ON-HADOOP的工具,依赖于hive,但是效率比hive高
6) Hive on Spark:hive的发展计划,以spark为hive的底层引擎之一
7) Spark sql:脱胎于shark,兼容hive,但是效率更高
8) SqlContext:
能从不同的数据源加载数据,将数据转换成DataFrame;
也能将DataFrame转换成sqlContext自身中的表,使用sql来操作数据;
可以使用.sql()方法,直接查询表中的数据,将返回的数据封装成DataFrame;
9) HiveContext:继承自SQLContext,除了SQLContext中的功能之外,还具有直接操作hive库中表的数据,和HQL兼容
2. spark 2.0之前的老版本:
RDD的创建 依赖于 sparkcontext;
流处理 依赖于 streamingcontext
sql 依赖于 sqlcontext
hive 依赖于 hivecontext
spark 2.0 之后,这些都统一于 sparksession,sparksession 封装了 sparkcontext,sqlcontext,例如下面:
sparksession 的创建:val sparkSession : SparkSession = SparkSession.builder.master("sss").appName("sss").getOrCreate
支持hive的 sparksession 的创建:val sparkSession : SparkSession = SparkSession.builder.master("sss").appName("sss").enableHiveSupport.getOrCreate
获取sparkcontext:val sc : SparkContext = sparkSession.sparkContext
获取sqlcontext:val sqlContext : SQLContext = sparkSession.sqlContext
3. 相关api(参考 http://c.360webcache.com/cm=21cf76120887f4fcd52b0034ffa6a56c&q=spark2.0+parquet+DataFrames&u=http%3A%2F%2Fblog.csdn.net%2Fyhao2014%2Farticle%2Fdetails%2F52215966) 1) SparkSession 的创建:
val sparkSession : SparkSession = SparkSession.builder.
master("sss")
.appName("sss")
.config("spark.some.config.option", "some-value")
.getOrCreate
2) DataFrame 的创建:
① 从 csv 数据源 创建:参考 http://blog.csdn.net/lw_ghy/article/details/51480358
② 从 json 数据源 创建
val dataFrame : DataFrame = sparkSession.read.json("some json file")
③ 从 parquet 数据源 创建
val dataFrame : DataFrame = sparkSession.read.json("some parquet file")
④ 从 sql 创建
val dataFrame : DataFrame = sparkSession.sql("select ******")
⑤ 从 load 函数(数据源)建
默认数据源(默认是parquet): val dataFrame : DataFrame = sparkSession.read.load("some parquet file")
指定数据源: val dataFrame : DataFrame = sparkSession.read.format("json").json("some json file")
⑥ 从RDD 创建:参考 http://blog.csdn.net/ronaldo4511/article/details/53379526
case class People(name : String, age : Int)
def createDFWith_CaseClass = { //使用 case class 的方式创建
import sparkSession.implicits._
val rdd : RDD[People]= sparkSession.sparkContext.textFile(hdfsFile,2).map(line => line.split(",")).map(arr => People(arr(0),arr(1).trim.toInt))
val peopleRdd : DataFrame = rdd.toDF
}
def createDFWith_Scheme = { //使用 scheme 的方式创建
val schema = StructType(
Seq(
StructField("name",StringType,true)
,StructField("age",IntegerType,true)
)
)
import sparkSession.implicits._
val rowRDD = sparkSession.sparkContext.textFile(hdfsFile,2).map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))
val peopleRdd = sparkSession.createDataFrame(rowRDD,schema)
}
3) DataSet 的创建(同 上)
val dataSet : DataSet = rdd.toDS
4. 不同版本spark下的dataframe的差异
1) spark2.0 之前:
val df1 : DataFrame = sqlContex.createDataFrame(rddRow, schema)
2) spark2.0 之后:
val df : DataFrame = sparkSession.createDataFrame(rddRow, schema)
3) 虽然 df1 和 df2 都声明为 DataFrame 类型,但是在spark2.0之后,这里的DataFrame 实际上是DataSet,因为DataFrame被声明为Dataset[Row],如下:
package object sql {
// ...省略了不相关的代码
type DataFrame = Dataset[Row]
}
http://www.cnblogs.com/seaspring/p/5804178.html
https://my.oschina.net/cjun/blog/655263?p={{currentPage%201}}
http://spark.apache.org/docs/latest/sql-programming-guide.html
1. 相关概念
1) RDD:spark中最基本的 弹性分布式数据集,提供了很多api 来操作数据集中的元素
2) DataFrame:spark的基于RDD的一种高级抽象,在RDD之上加入了scheme信息,给RDD的元素的每一列提供了 名称和数据类型 的标志;同 时它还提供了更多的api,可以实现类似于sql的操作;但是DataFrame也丢掉了RDD的优点:编译时类型检查和面向对象
3) DataSet:引入了Encoder,成功结合了RDD 和 DataFrame 的优点 (dataset从spark1.6开始引入的)
4) Hive:在hadoop发展过程中,提供给熟悉RDBMS但不了解MapReduce的技术人员的工具,是一种 SQL-ON-HADOOP的工具
5) Shark:也是一种 SQL-ON-HADOOP的工具,依赖于hive,但是效率比hive高
6) Hive on Spark:hive的发展计划,以spark为hive的底层引擎之一
7) Spark sql:脱胎于shark,兼容hive,但是效率更高
8) SqlContext:
能从不同的数据源加载数据,将数据转换成DataFrame;
也能将DataFrame转换成sqlContext自身中的表,使用sql来操作数据;
可以使用.sql()方法,直接查询表中的数据,将返回的数据封装成DataFrame;
9) HiveContext:继承自SQLContext,除了SQLContext中的功能之外,还具有直接操作hive库中表的数据,和HQL兼容
2. spark 2.0之前的老版本:
RDD的创建 依赖于 sparkcontext;
流处理 依赖于 streamingcontext
sql 依赖于 sqlcontext
hive 依赖于 hivecontext
spark 2.0 之后,这些都统一于 sparksession,sparksession 封装了 sparkcontext,sqlcontext,例如下面:
sparksession 的创建:val sparkSession : SparkSession = SparkSession.builder.master("sss").appName("sss").getOrCreate
支持hive的 sparksession 的创建:val sparkSession : SparkSession = SparkSession.builder.master("sss").appName("sss").enableHiveSupport.getOrCreate
获取sparkcontext:val sc : SparkContext = sparkSession.sparkContext
获取sqlcontext:val sqlContext : SQLContext = sparkSession.sqlContext
3. 相关api(参考 http://c.360webcache.com/cm=21cf76120887f4fcd52b0034ffa6a56c&q=spark2.0+parquet+DataFrames&u=http%3A%2F%2Fblog.csdn.net%2Fyhao2014%2Farticle%2Fdetails%2F52215966) 1) SparkSession 的创建:
val sparkSession : SparkSession = SparkSession.builder.
master("sss")
.appName("sss")
.config("spark.some.config.option", "some-value")
.getOrCreate
2) DataFrame 的创建:
① 从 csv 数据源 创建:参考 http://blog.csdn.net/lw_ghy/article/details/51480358
② 从 json 数据源 创建
val dataFrame : DataFrame = sparkSession.read.json("some json file")
③ 从 parquet 数据源 创建
val dataFrame : DataFrame = sparkSession.read.json("some parquet file")
④ 从 sql 创建
val dataFrame : DataFrame = sparkSession.sql("select ******")
⑤ 从 load 函数(数据源)建
默认数据源(默认是parquet): val dataFrame : DataFrame = sparkSession.read.load("some parquet file")
指定数据源: val dataFrame : DataFrame = sparkSession.read.format("json").json("some json file")
⑥ 从RDD 创建:参考 http://blog.csdn.net/ronaldo4511/article/details/53379526
case class People(name : String, age : Int)
def createDFWith_CaseClass = { //使用 case class 的方式创建
import sparkSession.implicits._
val rdd : RDD[People]= sparkSession.sparkContext.textFile(hdfsFile,2).map(line => line.split(",")).map(arr => People(arr(0),arr(1).trim.toInt))
val peopleRdd : DataFrame = rdd.toDF
}
def createDFWith_Scheme = { //使用 scheme 的方式创建
val schema = StructType(
Seq(
StructField("name",StringType,true)
,StructField("age",IntegerType,true)
)
)
import sparkSession.implicits._
val rowRDD = sparkSession.sparkContext.textFile(hdfsFile,2).map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))
val peopleRdd = sparkSession.createDataFrame(rowRDD,schema)
}
3) DataSet 的创建(同 上)
val dataSet : DataSet = rdd.toDS
4. 不同版本spark下的dataframe的差异
1) spark2.0 之前:
val df1 : DataFrame = sqlContex.createDataFrame(rddRow, schema)
2) spark2.0 之后:
val df : DataFrame = sparkSession.createDataFrame(rddRow, schema)
3) 虽然 df1 和 df2 都声明为 DataFrame 类型,但是在spark2.0之后,这里的DataFrame 实际上是DataSet,因为DataFrame被声明为Dataset[Row],如下:
package object sql {
// ...省略了不相关的代码
type DataFrame = Dataset[Row]
}
相关文章推荐
- spark2.0版本RDD、DataFrame、DataSet介绍
- [Spark SQL] SparkSession、DataFrame 和 DataSet 练习
- spark sql定义RDD、DataFrame与DataSet
- 第43课:Spark 2.0编程实战之SparkSession、DataFrame、DataSet开发实战
- Spark 2.1 -- spark SQL , Dataframe 和DataSet 指南
- Spark 2.1 -- spark SQL , Dataframe 和DataSet 指南
- SparkSQL------SQL,DataFrame,DataSet
- 初识Spark2.0之Spark SQL
- Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming
- Spark-SQL与hive整合【版本spark1.6.0+hive0.14】--Standalone模式
- [Spark2.0]Spark SQL, DataFrames 和Datasets指南
- 第97课: 使用Spark Streaming+Spark SQL+mysql 实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名(详细内幕版本)
- 第114课:SparkStreaming+Kafka+Spark SQL+TopN+Mysql+KafkaOffsetMonitor电商广告点击综合案例实战(详细内幕版本)
- 【转载】Spark SQL 1.3.0 DataFrame介绍、使用
- Spark 2.0介绍:Dataset介绍和使用
- sparkSQL中 DataSet 和 DataFram区别
- 【转】Spark-Sql版本升级对应的新特性汇总
- Spark SQL 之 DataFrame
- spark sql 功能测试及总结 (1.4.1版本)
- spark中的dataframe与sparksql的实例