您的位置:首页 > 数据库

Spark 入门篇之spark&spark sql

2016-02-17 20:40 393 查看

Spark 入门篇

1 概述

Spark是一个通用的快速的大数据处理引擎,是类似于hadoop的map reduce大数据并行处理引擎。它的数据源可以是hdfs、cassandra、hbase等,除常规编程模式外,它还是支持sql使用方式。Spark支持streaming流式计算(秒级延迟)、机器学习库MLib、图计算GraphX、Bagel(Google的pregel图计算框架的实现)、SparkR等多种库,以用于各种复杂的数据处理的场景。

基于spark的编程框架,编写简洁的数据处理脚本,通过spark shell等方式将任务提交到spark平台,spark即可完成大数据任务拆分以及处理,用户可以通过管理的页面来查看任务的处理状态。

Spark基于scala编写,目前spark框架API接口支持scala、java、python、R等语言。

2 Spark优点

Spark 于2012年推出,相对hadoop的map reduce框架,具备较多优点。

优点具体如下:

1) 计算速度快,官方宣称:相对于hadoop,存储基于内存时,快100倍以上,数据存储基于磁盘时快10倍以上。

2) 编程简单

做迭代计算时,不需要像hadoop反复的写多个map reduce,更多和单机的过程式编程类似,代码简单很多。

提供了map(映射处理)、filter、count、reduce、join、group by等80种以上的计算算子,直接使用即可。

简单的已有算子支持的多轮迭代计算任务,一个脚本几行代码就搞定,相对于hadoop的多个map reduce类要简洁很多。

3) 可以在HDfs、hbase、cassandra、kafaka等多种数据源上运行。

3 Spark的主要概念

1) SparkContext

Spark程序首先就要构建一个SparkContext,它告知程序如何访问spark集群。 SparkContext可以基于sparkConf这个包含了集群等配置信息的对象构建。默认从/conf配置文件夹中的配置文件中读取相关配置,如spark平台的地址、hdfs地址等。

Python api初始化样例如下

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)


2) RDD(Resilient DistributedDatasets)

弹性分布式数据集,这个是spark这核心的抽象,它是spark的迭代计算过程中操作的分布式数据对象,是其主要的数据形态。RDD作为数据结构,本质上是一个只读的分区记录集合,一个RDD可以包含多个分区,每个分区就是一个dataset片段,RDD可以相互依赖。RDD是自容错的,如丢失,会自动重新计算生成。rdd在spark中可以有多重存储级别,可以默认纯cach,也可以在磁盘上。可以选择多副本模式,用于为在线服务提供即时查询的场景。

RDD可以由已有的数据集合生成,也可以由hdfs等外部数据文件或库生成。以下为从hdfs文件生成的Python代码样例:

distFile = sc.textFile("/directory/data.txt") //输入可以为目录或者模糊匹配的目录或文件 /*/*.txt


对于rdd支持的各类操作(过滤、映射、reduce等),多看api doc,一个页面都包括了,很简洁易懂。

4 Spark代码实例demo

提供python语言的代码demo。

1、 开始写代码前,做好环境准备

可以自己搭建部署spark,也可以利用已有环境进行配置。

包括:

1) conf/spark-defaults.conf

配置spark.master、spark.ui.port等信息

2) 结合hdfs使用,配置hdfs的信息

可以将正确配置的hadoop-site.xml放到spark的conf目录下,主要包括hdfs地址以及hadoop的用户名密码等信息。

2、 示例代码

基于python构建,统计文件中词的数量map、reduce等计算

from pyspark import SparkContext, SparkConf
#give your own hdfs path
srcPath="/xx/xx"
resPath="/xx/xx"
appName="word count test"

#init sparkContext
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)

#create rdd from hadoop txt file
textRdd = sc.textFile(srcPath)
#map text word line to (word,1)
wordSplit = textRdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1))
#reducebyKey to get word count
wordCounts = wordSplit.reduceByKey(lambda a, b: a+b)
print wordCounts.collect()
#save to hdfs
wordCounts.saveAsTextFile(resPath)


3、 提交任务到spark平台

通过以下指令提交任务

./bin/spark-submit   xx.py

4、 查看任务执行状态

到spark ui web界面上根据Appname 查看任务执行状态以及运行细节。

Web ui地址为:spark.master:spark.ui.port。

5 Spark sql主要概念

Spark支持以sql的方式来查询处理大数据,除了自己构建的spark table外,也支持访问hive的table。注意其支持嵌套sql。

1、 SqlContext

Sql的上下文对象,基于SparkContext构建SqlContext

sqlContext = SQLContext(sc)


2、 DataFrames

DataFrames是一个以表列组织的分布式数据集,类似于关系数据库中表。可以从已有RDD、hive table等多个方式构建。

以下为从已有RDD构建DataFrames的代码样例:

#user is rdd created by row element, create DataFrame from user RDD
schemaUser = sqlContext.inferSchema(user)   //spark  1.3 后是接口createDataFrame()
schemaUser.registerTempTable("user")


6 Spark sql代码实例

继续以上面的文件的词统计为例,通过sql获取出现次数top 10的词

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
#define your hdfs data path
srcPath="/xxx/xxx "
resPath="/xxx/xxx "
appName="word count test"

#init sqlContext
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

#create rdd from hadoop txt file
textRdd = sc.textFile(srcPath)
#map text word line to (word,1)
wordSplit = textRdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1))
#reducebyKey to get word count
wordCounts = wordSplit.reduceByKey(lambda a, b: a+b)

#create row based rdd
rowRdd = wordCounts.map(lambda x: Row(word=x[0],wc=x[1]))
wordFrames = sqlContext.inferSchema(rowRdd)
wordFrames.registerTempTable("tword")
top10Frames = sqlContext.sql("select word,wc FROM tword order by wc desc limit 10")
print top10Frames.collect()


7 作者简介

作者信息: 汪恭正,2010年加入百度,现任百度资深研发工程师

Github地址https://github.com/neowgz
联系方式: wgongzheng@163.com
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark