您的位置:首页 > 其它

spark使用

2016-05-11 13:49 323 查看


1. 概述

目标读者

本文档面向Spark应用开发人员,并要求用户具备一定的Java和Scala的开发经验。

简介

Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。 适用以下场景:

l   数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。

l   迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。

l   数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。

l   流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。

l   查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。

基本概念

l   RDD

全名:弹性分布数据集(Resilient Distributed Dataset)

它是Spark的核心概念,指的是一个只读,可变分区的分布式数据集,该数据集的内容可以缓存在内存中,并在多次计算中间重用。

RDD的生成:

−  从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建 。

−  从父RDD转换得到新RDD。

−  从集合转换而来。

RDD的存储:

−  用户可以选择不同的存储级别(例如DISK_ONLY,MEMORY_AND_DISK)存储RDD以便重用。

−  当前RDD默认只存储于内存,当内存不足时,RDD也不会溢出到磁盘中。

l   Dependency(RDD的依赖)

父RDD与子RDD之间的逻辑关系,可分为窄依赖和宽依赖两种。

图  RDD的依赖



 

−  窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,或者两个父RDD的分区对应于一个子RDD的分区。上图中,map/filter和union属于第一类前者,对输入进行协同划分(co-partitioned)的join属于第二类后者。

−  宽依赖:指子RDD的分区依赖于父RDD的所有分区,子RDD必须要通过shuffle类操作实现父RDD的数据全部重新分配,如上图中的groupByKey和未经协同划分的join。

窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个
fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线 (pipeline)优化。

l   Transformation和Action(RDD的操作)

对RDD的操作包含Transformation(返回值还是一个RDD)和Action(返回值不是一个RDD)两种。RDD的操作流程如下图所示。其中Transformation操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformation操作时只会记录需要这样的操作,并不会去执行,需要等到有Action操作的时候才会真正启动计算过程进行计算。Action操作会返回结果或把RDD数据写到存储系统中。Action是触发Spark启动计算的动因。

图  RDD操作示例



 

然后,来看一个简单的例子,如下图所示。RDD看起来与Scala集合类型没有太大差别,但它们的数据和运行模型大相迥异。

a. textFile算子从HDFS读取日志文件,返回file(初始RDD)。

b. filter算子筛出带“ERROR”的行,赋给errors(新RDD)。filter算子为一个Transformation操作。

c. cache算子把它缓存下来以备未来使用。

d. count算子返回errors的行数。count算子为一个Action操作。

图1-3 Scala样例



 

Transformation操作可以分为如下几种类型:

−  视RDD的元素为简单元素。

输入输出一对一,且结果RDD的分区结构不变,主要是map。

输入输出一对多,且结果RDD的分区结构不变,如flatMap(map后由一个元素变为一个包含多个元素的序列,然后展平为一个个的元素)。

输入输出一对一,但结果RDD的分区结构发生了变化,如union(两个RDD合为一个,分区数变为两个RDD分区数之和)、coalesce(分区减少)。

从输入中选择部分元素的算子,如filter、distinct(去除重复元素)、subtract(本RDD有、它RDD无的元素留下来)和sample(采样)。

−  视RDD的元素为Key-Value对。

对单个RDD做一对一运算,如mapValues(保持源RDD的分区方式,这与map不同);

对单个RDD重排,如sort、partitionBy(实现一致性的分区划分,这个对数据本地性优化很重要);

对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;

对两个RDD基于key进行join和重组,如join、cogroup。



后三种操作都涉及重排,称为shuffle类操作。

Action操作可以分为如下几种:

−  生成标量,如count(返回RDD中元素的个数)、reduce、fold/aggregate(返回几个标量)、take(返回前几个元素)。

−  生成Scala集合类型,如collect(把RDD中的所有元素倒入 Scala集合类型)、lookup(查找对应key的所有值)。

−  写入存储,如与前文textFile对应的saveAsTextFile。

−  还有一个检查点算子checkpoint。当Lineage特别长时(这在图计算中时常发生),出错时重新执行整个序列要很长时间,可以主动调用 checkpoint把当前数据写入稳定存储,作为检查点。

l   Shuffle

Shuffle是Spark框架中一个特定的phase,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

下图清晰地描述了Shuffle算法的整个流程。

图  算法流程



 

l   Spark Application的结构

Spark Application的结构可分为两部分:初始化SparkContext和主体程序。

−  初始化SparkContext:构建Spark Application的运行环境。

构建SparkContext对象,如:

new SparkContext(master, appName, [SparkHome], [jars]) 

 

参数介绍:

master:连接字符串,连接方式有local, yarn-cluster, yarn-client等

appName:构建的Application名称

SparkHome:集群中安装Spark的目录

jars:应用程序代码和依赖包

−  主体程序:处理数据

l   Spark shell命令

Spark基本shell命令,支持提交Spark应用。命令为:

./bin/spark-submit \ 
  --class <main-class> 
  --master <master-url> \ 
  ... # other options 
  <application-jar> \ 
  [application-arguments] 


 

参数解释:

--class:Spark应用的类名

--master:Spark用于所连接的master,如yarn-client,yarn-cluster等

application-jar:Spark应用的jar包的路径

application-arguments:提交Spark应用的所需要的参数(可以为空)。

l   Spark Web UI界面

用于监控正在运行的或者历史的Spark作业在Spark框架各个阶段的细节以及提供日志显示,帮助用户更细粒度地去开发、配置和调优作业。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: