Spark 入门篇之spark&spark sql
2016-02-17 20:40
393 查看
Spark 入门篇
1 概述
Spark是一个通用的快速的大数据处理引擎,是类似于hadoop的map reduce大数据并行处理引擎。它的数据源可以是hdfs、cassandra、hbase等,除常规编程模式外,它还是支持sql使用方式。Spark支持streaming流式计算(秒级延迟)、机器学习库MLib、图计算GraphX、Bagel(Google的pregel图计算框架的实现)、SparkR等多种库,以用于各种复杂的数据处理的场景。基于spark的编程框架,编写简洁的数据处理脚本,通过spark shell等方式将任务提交到spark平台,spark即可完成大数据任务拆分以及处理,用户可以通过管理的页面来查看任务的处理状态。
Spark基于scala编写,目前spark框架API接口支持scala、java、python、R等语言。
2 Spark优点
Spark 于2012年推出,相对hadoop的map reduce框架,具备较多优点。优点具体如下:
1) 计算速度快,官方宣称:相对于hadoop,存储基于内存时,快100倍以上,数据存储基于磁盘时快10倍以上。
2) 编程简单
做迭代计算时,不需要像hadoop反复的写多个map reduce,更多和单机的过程式编程类似,代码简单很多。
提供了map(映射处理)、filter、count、reduce、join、group by等80种以上的计算算子,直接使用即可。
简单的已有算子支持的多轮迭代计算任务,一个脚本几行代码就搞定,相对于hadoop的多个map reduce类要简洁很多。
3) 可以在HDfs、hbase、cassandra、kafaka等多种数据源上运行。
3 Spark的主要概念
1) SparkContextSpark程序首先就要构建一个SparkContext,它告知程序如何访问spark集群。 SparkContext可以基于sparkConf这个包含了集群等配置信息的对象构建。默认从/conf配置文件夹中的配置文件中读取相关配置,如spark平台的地址、hdfs地址等。
Python api初始化样例如下
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
2) RDD(Resilient DistributedDatasets)
弹性分布式数据集,这个是spark这核心的抽象,它是spark的迭代计算过程中操作的分布式数据对象,是其主要的数据形态。RDD作为数据结构,本质上是一个只读的分区记录集合,一个RDD可以包含多个分区,每个分区就是一个dataset片段,RDD可以相互依赖。RDD是自容错的,如丢失,会自动重新计算生成。rdd在spark中可以有多重存储级别,可以默认纯cach,也可以在磁盘上。可以选择多副本模式,用于为在线服务提供即时查询的场景。
RDD可以由已有的数据集合生成,也可以由hdfs等外部数据文件或库生成。以下为从hdfs文件生成的Python代码样例:
distFile = sc.textFile("/directory/data.txt") //输入可以为目录或者模糊匹配的目录或文件 /*/*.txt
对于rdd支持的各类操作(过滤、映射、reduce等),多看api doc,一个页面都包括了,很简洁易懂。
4 Spark代码实例demo
提供python语言的代码demo。1、 开始写代码前,做好环境准备
可以自己搭建部署spark,也可以利用已有环境进行配置。
包括:
1) conf/spark-defaults.conf
配置spark.master、spark.ui.port等信息
2) 结合hdfs使用,配置hdfs的信息
可以将正确配置的hadoop-site.xml放到spark的conf目录下,主要包括hdfs地址以及hadoop的用户名密码等信息。
2、 示例代码
基于python构建,统计文件中词的数量map、reduce等计算
from pyspark import SparkContext, SparkConf #give your own hdfs path srcPath="/xx/xx" resPath="/xx/xx" appName="word count test" #init sparkContext conf = SparkConf().setAppName(appName) sc = SparkContext(conf=conf) #create rdd from hadoop txt file textRdd = sc.textFile(srcPath) #map text word line to (word,1) wordSplit = textRdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1)) #reducebyKey to get word count wordCounts = wordSplit.reduceByKey(lambda a, b: a+b) print wordCounts.collect() #save to hdfs wordCounts.saveAsTextFile(resPath)
3、 提交任务到spark平台
通过以下指令提交任务
./bin/spark-submit xx.py
4、 查看任务执行状态
到spark ui web界面上根据Appname 查看任务执行状态以及运行细节。
Web ui地址为:spark.master:spark.ui.port。
5 Spark sql主要概念
Spark支持以sql的方式来查询处理大数据,除了自己构建的spark table外,也支持访问hive的table。注意其支持嵌套sql。1、 SqlContext
Sql的上下文对象,基于SparkContext构建SqlContext
sqlContext = SQLContext(sc)
2、 DataFrames
DataFrames是一个以表列组织的分布式数据集,类似于关系数据库中表。可以从已有RDD、hive table等多个方式构建。
以下为从已有RDD构建DataFrames的代码样例:
#user is rdd created by row element, create DataFrame from user RDD schemaUser = sqlContext.inferSchema(user) //spark 1.3 后是接口createDataFrame() schemaUser.registerTempTable("user")
6 Spark sql代码实例
继续以上面的文件的词统计为例,通过sql获取出现次数top 10的词from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext, Row #define your hdfs data path srcPath="/xxx/xxx " resPath="/xxx/xxx " appName="word count test" #init sqlContext conf = SparkConf().setAppName(appName) sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) #create rdd from hadoop txt file textRdd = sc.textFile(srcPath) #map text word line to (word,1) wordSplit = textRdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1)) #reducebyKey to get word count wordCounts = wordSplit.reduceByKey(lambda a, b: a+b) #create row based rdd rowRdd = wordCounts.map(lambda x: Row(word=x[0],wc=x[1])) wordFrames = sqlContext.inferSchema(rowRdd) wordFrames.registerTempTable("tword") top10Frames = sqlContext.sql("select word,wc FROM tword order by wc desc limit 10") print top10Frames.collect()
7 作者简介
作者信息: 汪恭正,2010年加入百度,现任百度资深研发工程师Github地址: https://github.com/neowgz
联系方式: wgongzheng@163.com
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- spark内存概述
- Spark Shuffle之Hash Shuffle
- Spark Shuffle之Sort Shuffle
- Spark Shuffle之Tungsten Sort Shuffle
- 编译Spark 1.5.2
- 问题记录:[Ambari]
- 整合Kafka到Spark Streaming——代码示例和挑战