您的位置:首页 > 数据库

spark2.0版本的 DataFrame、DataSet 与 Spark sql

2016-11-30 11:44 211 查看
参考:

http://www.cnblogs.com/seaspring/p/5804178.html
https://my.oschina.net/cjun/blog/655263?p={{currentPage%201}}
http://spark.apache.org/docs/latest/sql-programming-guide.html
1. 相关概念
1) RDD:spark中最基本的 弹性分布式数据集,提供了很多api 来操作数据集中的元素
2) DataFrame:spark的基于RDD的一种高级抽象,在RDD之上加入了scheme信息,给RDD的元素的每一列提供了 名称和数据类型 的标志;同 时它还提供了更多的api,可以实现类似于sql的操作;但是DataFrame也丢掉了RDD的优点:编译时类型检查和面向对象
3) DataSet:引入了Encoder,成功结合了RDD 和 DataFrame 的优点 (dataset从spark1.6开始引入的)
4) Hive:在hadoop发展过程中,提供给熟悉RDBMS但不了解MapReduce的技术人员的工具,是一种 SQL-ON-HADOOP的工具
5) Shark:也是一种 SQL-ON-HADOOP的工具,依赖于hive,但是效率比hive高
6) Hive on Spark:hive的发展计划,以spark为hive的底层引擎之一
7) Spark sql:脱胎于shark,兼容hive,但是效率更高
8) SqlContext:

能从不同的数据源加载数据,将数据转换成DataFrame;

也能将DataFrame转换成sqlContext自身中的表,使用sql来操作数据;

可以使用.sql()方法,直接查询表中的数据,将返回的数据封装成DataFrame;
9) HiveContext:继承自SQLContext,除了SQLContext中的功能之外,还具有直接操作hive库中表的数据,和HQL兼容
2. spark 2.0之前的老版本:

RDD的创建 依赖于 sparkcontext;

流处理 依赖于 streamingcontext

sql 依赖于 sqlcontext

hive 依赖于 hivecontext
spark 2.0 之后,这些都统一于 sparksession,sparksession 封装了 sparkcontext,sqlcontext,例如下面:

sparksession 的创建:val sparkSession : SparkSession = SparkSession.builder.master("sss").appName("sss").getOrCreate

支持hive的 sparksession 的创建:val sparkSession : SparkSession = SparkSession.builder.master("sss").appName("sss").enableHiveSupport.getOrCreate

获取sparkcontext:val sc : SparkContext = sparkSession.sparkContext

获取sqlcontext:val sqlContext : SQLContext = sparkSession.sqlContext
3. 相关api(参考 http://c.360webcache.com/cm=21cf76120887f4fcd52b0034ffa6a56c&q=spark2.0+parquet+DataFrames&u=http%3A%2F%2Fblog.csdn.net%2Fyhao2014%2Farticle%2Fdetails%2F52215966) 1) SparkSession 的创建:

val sparkSession : SparkSession = SparkSession.builder.

master("sss")

.appName("sss")

.config("spark.some.config.option", "some-value")

.getOrCreate
2) DataFrame 的创建:

① 从 csv 数据源 创建:参考 http://blog.csdn.net/lw_ghy/article/details/51480358
② 从 json 数据源 创建

val dataFrame : DataFrame = sparkSession.read.json("some json file")

③ 从 parquet 数据源 创建

val dataFrame : DataFrame = sparkSession.read.json("some parquet file")

④ 从 sql 创建

val dataFrame : DataFrame = sparkSession.sql("select ******")

⑤ 从 load 函数(数据源)建

默认数据源(默认是parquet): val dataFrame : DataFrame = sparkSession.read.load("some parquet file")

指定数据源: val dataFrame : DataFrame = sparkSession.read.format("json").json("some json file")

⑥ 从RDD 创建:参考 http://blog.csdn.net/ronaldo4511/article/details/53379526
case class People(name : String, age : Int)

def createDFWith_CaseClass = { //使用 case class 的方式创建

import sparkSession.implicits._

val rdd : RDD[People]= sparkSession.sparkContext.textFile(hdfsFile,2).map(line => line.split(",")).map(arr => People(arr(0),arr(1).trim.toInt))
val peopleRdd : DataFrame = rdd.toDF

}

def createDFWith_Scheme = { //使用 scheme 的方式创建

val schema = StructType(

Seq(

StructField("name",StringType,true)

,StructField("age",IntegerType,true)

)

)

import sparkSession.implicits._

val rowRDD = sparkSession.sparkContext.textFile(hdfsFile,2).map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))
val peopleRdd = sparkSession.createDataFrame(rowRDD,schema)

}
3) DataSet 的创建(同 上)

val dataSet : DataSet = rdd.toDS

4. 不同版本spark下的dataframe的差异

1) spark2.0 之前:

val df1 : DataFrame = sqlContex.createDataFrame(rddRow, schema)

2) spark2.0 之后:

val df : DataFrame = sparkSession.createDataFrame(rddRow, schema)

3) 虽然 df1 和 df2 都声明为 DataFrame 类型,但是在spark2.0之后,这里的DataFrame 实际上是DataSet,因为DataFrame被声明为Dataset[Row],如下:

package object sql {

// ...省略了不相关的代码

type DataFrame = Dataset[Row]

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark sql