您的位置:首页 > 编程语言 > Python开发

Spark1.4.1 编程指南 (Scala/Java/Python)

2015-11-07 13:32 656 查看
官网原文地址:http://spark.apache.org/docs/1.4.1/programming-guide.html

一 概述

二 连接Spark

三 初始化Spark
1 使用命令行

四 弹性分布式数据集RDD
1 并行化集合

2 外部数据集

3 RDD操作
I 基本操作

II 向Spark传递函数

III 理解 closures

IV 使用键值对

V 转化操作

VI 启动 Actions

VII Shuffle 操作

4 RDD持久化
I 应该选择哪个存储级别

II 删除数据

五 共享变量
1 广播变量

2 累加器

六 在集群上部署

七 单元测试

八 从10之前版本的Spark迁移

九 还有什么要做的

一. 概述

从高层次上来看,每一个 Spark 应用都包含一个驱动程序 (driver program),用于执行用户的 main 函数以及在集群上运行各种并行操作。Spark 提供的主要抽象是弹性分布式数据集 (resilient distributed dataset) (RDD),这是一个包含诸多元素、被划分到不同节点上进行并行处理的数据集合。RDD 通过打开 HDFS(或其他 Hadoop 支持的文件系统)上的一个文件、在驱动程序中打开一个已有的 Scala 集合或由其他 RDD 转换操作得到。用户可以要求 Spark 将 RDD 持久化到内存中,这样就可以有效地在并行操作中复用。另外,在节点发生错误时 RDD 可以自动恢复

Spark 提供的另一个抽象是可以在并行操作中使用的共享变量。在默认情况下,当 Spark将一个函数转化成许多任务在不同的节点上运行的时候,对于所有在函数中使用的变量,每一个任务都会得到一个副本。有时,某一个变量需要在任务之间或任务与驱动程序之间共享。Spark 支持两种共享变量:广播变量,用来将一个值缓存到所有节点的内存中;累加器,只能用于累加,比如计数器和求和。

这篇指南将展示这些特性在 Spark 支持的语言中是如何使用的。如果你打开了 Spark 的交互命令行——bin/spark-shell 的 Scala 命令行或 bin/pyspark 的 Python 命令行都可以——那么这篇文章你学习起来将是很容易的。

二. 连接Spark

Scala

Spark 1.4.1 需要搭配使用 Scala 2.10。编写 Scala 应用程序,你需要使用兼容的 Scala 版本 (比如 2.10.X)。

编写一个 Scala 应用程序时,你需要在 Maven 中添加 Spark 依赖。Spark 依赖可以从 Maven 中心库获取:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.4.1


另外,如果你想访问一个 HDFS 集群,需要根据你的 HDFS 版本,添加 hadoop-client 依赖。一些常见的 HDFS 版本标签已经在 第三方发行版 (third party distributions) 页面中列出。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>


最后,你需要将一些 Spark 的类导入你的程序中。添加下列语句:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


(在 Spark 1.3.0 之前,你需要明确地导入 org.apache.spark.SparkContext._ 使得必要的隐式转换生效 )

Java

Spark 1.4.1 工作在 Java 6 或更高版本之上。如果你使用 Java 8,Spark 支持用 lambda 表达式 简化函数编写,除此之外你还可以使用 org.apache.spark.api.java.function 包中的类。

用 Java 编写 Spark 应用时,你需要添加 Spark 依赖,Spark 依赖可以从 Maven 中心库获取:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.4.1


另外,如果你想访问一个 HDFS 集群时,需要根据你的 HDFS 版本,添加 hadoop-client 依赖。一些常见的 HDFS 版本标签已经在 第三方发行版 页面中列出。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>


最后,你需要将一些 Spark 的类导入到你的程序中。添加下列语句:

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.SparkConf


Python

Spark1.4.1 只支持 Python2.6 或更高的版本(但不支持Python3)。它使用了标准的CPython 解释器,所以诸如 NumPy 一类的 C 库也是可以使用的。

通过 Spark 目录下的 bin/spark-submit 脚本你可以在 Python 中运行 Spark 应用。这个脚本会载入 Spark 的 Java/Scala 库然后让你将应用提交到集群中。你可以执行bin/pyspark 来打开 Python 的交互命令行。

如果你希望访问 HDFS 上的数据,你需要为你使用的 HDFS 版本建立一个PySpark 连接。常见的 HDFS 版本标签都已经列在了这个第三方发行版页面。

最后,你需要将一些Spark的类import到你的程序中。加入如下这行:

from pyspark import SparkContext, SparkConf


三. 初始化Spark

Scala

Spark 程序需要做的第一件事情,就是创建一个 SparkContext 对象,它将告诉 Spark 如何访问一个集群。而要创建一个 SparkContext 对象,你首先要创建一个 SparkConf 对象,该对象包含了你的应用程序信息。

在每个 JVM 中只能有一个 SparkContext 处于 active 状态。在创建一个新的 SparkContext 之前,你必须先 stop() 在 active 状态下的 SparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)


Java

Spark 程序需要做的第一件事情,就是创建一个 JavaSparkContext 对象,它将告诉 Spark 如何访问一个集群。而要创建一个 SparkContext 对象,你首先要创建一个 SparkConf 对象,该对象包含了你的应用程序信息。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);


Python

在一个Spark程序中要做的第一件事就是创建一个SparkContext对象来告诉Spark如何连接一个集群。为了创建SparkContext,你首先需要创建一个SparkConf对象,这个对象会包含你的应用的一些相关信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)


(Scala / Java / Python)其中,appName 参数是你的应用程序的名字,会在集群的 Web UI 界面上显示。 master 参数用于指定 Spark, Mesos or YARN cluster URL ,或一个特殊的字符串 “local”,来指定在本地 (local) 模式下运行。实际上,当一个集群上运行时,一般不会在程序中对 master 参数进行硬编码,而是在 使用 spark-submit 脚本启动应用程序 时传入 master 参数。然而,在本地测试或者单元测试时,你可以传入 “local”,在进程中运行 Spark。

3.1 使用命令行

Scala

在 Spark shell 中,一个特殊的解释器感知 (interpreter-aware) 的 SparkContext 已经为你创建好了,变量名为 sc 。创建你自己的 SparkContext 是不会生效的。你可以使用 –master 选项来设置 SparkContext 所连接的 master,也可以使用逗号分隔的文件列表的形式,通过
--jars
选项将 JARs 添加到 classpath 中。例如,在四核的 CPU 上运行 bin/spark-shell,使用:

$ ./bin/spark-shell --master local[4]


或者,另外在 classpath 中添加 code.jar ,使用:

$ ./bin/spark-shell --master local[4] --jars code.jar


使用 Maven 协调器将依赖包括进来。

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"


运行
spark-shell -- help
可获取命令选项的完整列表。spark-shell 脚本运行的幕后,是调用了更通用的 spark-submit 脚本

Python

在 PySpark 命令行中,一个特殊的解释器感知的 SparkContext 变量已经建立好了,变量名叫做sc。创建你自己的 SparkContext 不会起作用。你可以通过使用
--master
命令行参数来设置这个上下文连接的 master 主机,你也可以通过
--py-files
参数传递一个用逗号隔开的列表来将Python 的 .zip、.egg 或 .py 文件添加到运行时路径中。你还可以通过
--package
参数传递一个用逗号隔开的 maven 列表来给这个命令行会话添加依赖(比如Spark的包)。任何额外的包含依赖包的仓库(比如 SonaType)都可以通过传
--repositorys
参数来添加进去。Spark 包的所有 Python 依赖(列在这个包的 requirements.txt 文件中)在必要时都必须通过pip手动安装。

比如,使用四核来运行bin/pyspark应当输入这个命令:

$ ./bin/pyspark --master local[4]


又比如,把code.py文件添加到搜索路径中(为了能够import在程序中),应当使用这条命令:

$ ./bin/pyspark --master local[4] --py-files code.py


想要了解命令行选项的完整信息请执行
pyspark --help
命令。在这些场景下,pyspark 会触发一个更通用的 spark-submit 脚本

IPython 这个加强的 Python 解释器中运行 PySpark 也是可行的。PySpark 可以在1.0.0或更高版本的 IPython 上运行。为了使用 IPython,必须在运行 bin/pyspark 时将PYSPARK_DRIVER_PYTHON 变量设置为 ipython,就像这样:

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark


你还可以通过设置 PYSPARK_DRIVER_PYTHON_OPTS 环境变量来定制 ipython 命令。比如,启动支持 PyLab plot 的IPython Notebook

$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab inline" ./bin/pyspark


在 IPython Notebook 服务器启动之后,你可以从 “Files” 标签中创建新的 “Pyhton 2” notebook 。在 notebook 中,在你开始从 IPython notebook 中启动 Spark之前, 你可以输入命令
%pylab inline
作为你 notebook 的一部分。

四. 弹性分布式数据集(RDD)

Spark 的核心概念是 弹性分布式数据集 (resilient distributed dataset , RDD),这是一个有容错机制并可以被并行操作的元素集合。目前有两种方式可以创建 RDDs :并行化 ( parallelizing ) 一个已经存在于驱动程序 (driver program) 中的集合 (collection),或者引用外部存储系统上的一个数据集,比如一个共享文件系统,HDFS, HBase,或者任何提供了 Hadoop InputFormat 的数据源。

4.1 并行化集合

Scala

并行集合,是通过对存在于驱动程序中的集合 (Collection)( 一个 Scala Seq),调用
SparkContext's paralleize
方法来构建的。构建时会拷贝集合中的元素,创建一个可以被并行操作的分布式数据集。例如,这里演示了如何创建一个包含数字 1 到 5 的并行集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)


一旦创建了分布式数据集 (distData),就可以对其执行并行操作。例如,我们可以调用 distData.reduce( (a,b) => a+b ) 来累加数组的元素。后续我们会进一步地描述对分布式数据集的操作。

Java

并行集合,是通过对存在于驱动程序中集合 (Collection),调用 JavaSparkContext’s parallelize 方法来构建的。构建时会拷贝集合中的元素,创建一个可以被并行操作的分布式数据集。例如,这里演示了如何创建一个包含数字 1 到 5 的并行集合。

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);


一旦创建了分布式数据集 (distData),就可以对其执行并行操作。例如,我们可以调用
distData.reduce((a, b) -> a + b)
来累加列表的元素。后续我们会进一步地描述对分布式数据集的操作。

Note: 在本指南中,我们会经常使用简洁的 Java 8 的 lambda 语法来指明 Java 函数。而在 Java 的旧版本中,你可以实现 org.apache.spark.api.java.function 包中的接口。下面我们将在 把函数传递到 Spark 中描述更多细节。

Python

并行集合是通过在驱动程序中一个现有的迭代器或集合上调用 SparkContext 的 parallelize 方法建立的。为了创建一个能够并行操作的分布数据集,集合中的元素都会被拷贝。比如,以下语句创建了一个包含1到5的并行化集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)


分布数据集(distData)被建立起来之后,就可以进行并行操作了。比如,我们可以调用disData.reduce(lambda a, b: a+b)来对元素进行叠加。在后文中我们会描述分布数据集上支持的操作。

(Scala / Java / Python) 并行集合的一个重要参数是分片数 (the number of slices),表示数据集切分的分数,Spark 将在集群上为每一个分片数据起一个任务,典型情况下,你希望集群的每个 CPU 分布 2-4 个分布 (slices)。通常,Spark 会尝试基于集群状况自动设置分片数。然而,你也可以进行手动设置,通过将分片数作为第二个参数传递给 paralleize 方法来实现。(例如:sc.parallelize(data,10))。注意:在一些代码中会使用术语 slices (分区的一个同义词)来提供向后兼容性。

4.2 外部数据集

Spark 可以从 Hadoop 支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFS,Cassandre,HBase,Amazon S3,等。Spark 支持 text files, SequenceFiles,以及其他任何一种 Hadoop InputFormat

Text file RDDs 的创建可以使用
SparkContext's textFile
方法。该方法接受一个文件的 URI 地址 (或者是机器上的一个本地路径,或者是一个 hdfs://, s3n://, 等 URI) 作为参数,并读取文件的每一行数据,放入集合中。下面是一个调用例子:

Scala

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08


Java

JavaRDD<String> distFile = sc.textFile("data.txt");


一旦创建完成,就可以在 distFile 上执行数据集操作。例如,要想对所有行的长度进行求和,我们可以通过如下的 map 和 reduce 操作来完成:

Scala

distFile.map(s => s.length).reduce((a, b) => a + b)


Java

distFile.map(s -> s.length()).reduce((a, b) -> a + b).


Spark 读文件时的一些注意事项:

如果文件使用文件系统上的路径,要保证在 worker 节点上这个文件也能够通过这个路径访问。可以将文件拷贝到所有的 workers 节点上,或者使用 network-mounted 共享文件系统

包括 textFile 在内的所有基于文件的 Spark 读入方法,都支持将文件夹、压缩文件、包含通配符的路径作为参数。比如,以下代码都是合法的:

textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")


textFile 方法也可以传入第二个可选参数来控制文件的分片数量。默认情况下,Spark 会为文件的每一个块(在HDFS中块的大小默认是 64 MB)创建一个分片。但是你也可以通过传入一个更大的值来要求Spark 建立更多的分片。注意,分片的数量绝不能小于文件块的数量

除了文本文件之外,Spark 的 Scala API 还支持多种其他数据格式:

SparkContext.wholeTextFiles 可以让你读取包含多个小 text files 的目录,并且每个文件对应返回一个 (filename, content) 对。而对应的 textFile 方法,文件的每一行对应返回一条记录 (record)

对于 SequenceFiles ,使用
SparkContext’s sequenceFile[K, V]
方法,其中 K 和 V 分别对应文件中 key 和 values 的类型。这些类型必须是 Hadoop’s Writable 接口的子类,如 IntWritable 和 Text 。另外,Spark 允许你使用一些常见 Writables 的原生类型 (native types);例如,
sequenceFile[Int, String]
会自动的转换成类型 IntWritables 和 Texts

对于其他的 Hadoop InputFormats ,你可以使用 SparkContext.hadoopRDD 方法,它可以接受一个任意类型的 JobConf 和输入格式类,Key 类 和 value 类。像 Hadoop Job 设置输入源那样去设置这些参数即可,对于基于 “new” MapReduce API (org.apache.hadoop.mapreduce) 的 InputFormats ,你也可以使用 SparkContext.newAPIHadoopRDD

RDD.saveAsObjectFile 和 SparkContext.objectFile 支持由序列化的 Java 对象组成的简单格式来保存 RDD。虽然这不是一种像 Avro 那样有效的序列化格式,但是它提供了一种可以存储任何 RDD 的简单方式

除了文本文件之外,Spark 的 Java API 还支持多种其他数据格式:

JavaSparkContext.wholeTextFiles 可以让你读取包含多个小 text files 的目录,并且每个文件对应返回一个 (filename, content) 对。而对应的 textFile 方法,文件的每一行对应返回一条记录 (record)

对于 SequenceFiles ,使用
SparkContext’s sequenceFile[K, V]
方法,其中 K 和 V 分别对应文件中 key 和 values 的类型。这些类型必须是 Hadoop’s Writable 接口的子类,如 IntWritable 和 Text

对于其他的 Hadoop InputFormats ,你可以使用 JavaSparkContext.hadoopRDD 方法,它可以接受一个任意类型的 JobConf 和输入格式类,Key 类 和 value 类。像 Hadoop Job 设置输入源那样去设置这些参数即可,对于基于 “new” MapReduce API (org.apache.hadoop.mapreduce) 的 InputFormats ,你也可以使用 JavaSparkContext.newAPIHadoopRDD

JavaRDD.saveAsObjectFile 和 JavaSparkContext.objectFile 支持由序列化的 Java 对象组成的简单格式来保存 RDD。虽然这不是一种像 Avro 那样有效的序列化格式,但是它提供了一种可以存储任何 RDD 的简单方式

Python

PySpark 可以从 Hadoop 支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFS,Cassandre,HBase,Amazon S3,等。Spark 支持 text files, SequenceFiles,以及其他任何一种 Hadoop InputFormat

Text file RDDs 的创建可以使用
SparkContext's textFile
方法。该方法接受一个文件的 URI 地址 (或者是机器上的一个本地路径,或者是一个 hdfs://, s3n://, 等 URI) 作为参数,并读取文件的每一行数据,放入集合中。下面是一个调用例子:

>>> distFile = sc.textFile("data.txt")


一旦创建完成,就可以在 distFile 上执行数据集操作。例如,要想对所有行的长度进行求和,我们可以通过如下的 map 和 reduce 操作来完成:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)


Spark 读文件时的一些注意事项:

如果文件使用文件系统上的路径,要保证在 worker 节点上这个文件也能够通过这个路径访问。可以将文件拷贝到所有的 workers 节点上,或者使用 network-mounted 共享文件系统

包括 textFile 在内的所有基于文件的 Spark 读入方法,都支持将文件夹、压缩文件、包含通配符的路径作为参数。比如,以下代码都是合法的:

textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")


textFile 方法也可以传入第二个可选参数来控制文件的分片数量。默认情况下,Spark 会为文件的每一个块(在HDFS中块的大小默认是 64 MB)创建一个分片。但是你也可以通过传入一个更大的值来要求Spark 建立更多的分片。注意,分片的数量绝不能小于文件块的数量

除了文本文件之外,Spark的 Python API 还支持多种其他数据格式:

SparkContext.wholeTextFiles 能够读入包含多个小文本文件的目录,然后为每一个文件返回一个(文件名,内容)对。这是与textFile方法为每一个文本行返回一条记录相对应的

RDD.saveAsPickleFile 和 SparkContext.pickleFile 支持将 RDD 以串行化的 Python 对象格式存储起来。串行化的过程中会以默认 10 个一批的数量批量处理

序列文件和其他Hadoop输入输出格式

Note: 这个特性目前仍处于试验阶段,被标记为 Experimental,目前只适用于高级用户。这个特性在未来可能会被基于 Spark SQL 的读写支持所取代,因为 Spark SQL 是更好的方式。

可写类型支持

PySpark 的 SequenceFile 支持加载 Java 中的键值 (key-value) 对 RDD,可以将 Writables 转换为基本的 Java 类型,并且通过 Pyrolite ,在结果 Java 对象中执行 pickles 序列化操作。当将一个键值对的 RDD 保存为 SequenceFile 时,PySpark 会对其进行反操作。它会 unpickles Python 的对象为 Java 对象,然后再将它们转换成 Writables。下标中的 Writables 会被自动地转换:

可写类型Python类型
Textunicode str
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict
数组不支持自动转换 (开箱, out-of-the-box)处理。当读或写数组时,用户需要制定自定义的 ArrayWritable 子类。当写数组时,用户也需要制定自定义的转换器 (converters) ,将数组转换为自定义的 ArrayWritable 子类。当读数组时,默认的转换器会将自定义的 ArrayWritable 子类转换为 Java 的 Object[] ,然后被 pickled 成 Python 的元组。如果想要获取包含基本数据类型 (primitive types) 的数组, Python 的 array.array 的话,用户需要为该数组指定自定义的转换器。

保存和加载序列文件(SequenceFiles)

类似于 text files, SequenceFiles 可以被保存和加载到指定的路径下。可以指定 key 和 value 的类型,但对标准的 Writables 类型则不需要指定。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]


保存和加载其他的 Hadoop 输入/输出 格式

PySpark同样支持写入和读出其他Hadoop输入输出格式,包括’新’和’旧’两种Hadoop MapReduce API。如果有必要,一个Hadoop配置可以以Python字典的形式传入。以下是一个例子,使用了Elasticsearch ESInputFormat:

PySpark 也可以读任何 Hadoop InputFormat 或者写任何 Hadoop OutputForma,包括 “new” 和 “old” 两个 Hadoop MapReduce APIs 。如果需要的话,可以将传递进来的一个 Hadoop 配置当成一个 Python 字典。这里有一个使用了 Elasticsearch ESInputFormat 的样例:

$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
"org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})


Note: 如果这个 InputFormat 只是简单地依赖于 Hadoop 配置 和/ 或 输入路径,以及 key 和 value 的类型,它就可以很容易得根据上面的表格进行转换,那么这种方法应该可以很好地处理这些情况。

如果你有一个定制序列化 (serialized) 的二进制数据 (比如加载自 Cassandra / HBase 的数据),那么你首先要做的,是在 Scala / Java 端将数据转换为可以用 Pyrolite 的 pickler 处理的东西。Converter 特质 (trait) 提供了这一转换功能。简单地 extend 该特质,然后在 convert 方法中实现你自己的转换代码。记住要确保该类,已经访问你的 InputFormat 所需要的依赖,都需要被打包到你的 Spark 作业 (job)的 jar ,并且包含在 PySpark 的类路径中。

在 Python examples 和the Converter examples上给出了带自定义转换器的 Cassandra/HBase 的 InputFormat 和 OutputFormat 的使用样例。

4.3 RDD操作

RDDs 支持两类操作:转化操作 (transformations) ,可以从已有的数据集创建一个新的数据集;而动作 (actions),在数据集上运行计算之后,会向驱动程序返回一个值。例如 map 就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集来表示结果。另一方面,reduce 是一种动作,通过一些函数将所有的元素聚合 (aggregates)起来,并将最终结果返回给驱动程序(不过还有一个并行的 reduceByKey )能返回一个分布式数据集)。

Spark 中所有的转换都是 惰性的 (lazy),也就是说,它们并不会马上计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只要当发生一个要求返回结果驱动程序 ( driver program ) 的动作时,这些转换才会真正运行。这种设计让 Spark 更加有效率的运行。例如,我们对 map 操作创建的数据集进行 reduce 操作时,只会向驱动返回 reduce 操作的结果。而不是返回更大的 map 操作创建的数据集。

默认情况下,每一个转换后的 RDD 都会在你对它执行一个动作 (action) 时被重新计算。不过 ,你也可以使用 持久化 (persist ) 到内存中。在这种情况下,Spark 会在集群中保存相关元素,以便你下次查询这个 RDD 时,能更快速地访问。对于把 RDDs 持久化到磁盘上,或在集群中复制到更多个节点也是支持的。

I. 基本操作

为了演示RDD的基本操作,请看以下的简单程序:

Scala

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)


Java

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);


Python

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)


第一行定义了一个由外部文件产生的基本RDD。这个数据集不是从内存中载入的也不是由其他操作产生的;lines仅仅是一个指向文件的指针。第二行将lineLengths定义为map操作的结果。再强调一次,由于惰性求值的缘故,lineLengths并不会被立即计算得到。最后,我们运行了reduce操作,这是一个启动操作。从这个操作开始,Spark将计算过程划分成许多任务并在多机上运行,每台机器运行自己部分的map操作和reduce操作,最终将自己部分的运算结果返回给驱动程序

第一行通过一个外部文件定义了一个基本的 RDD。这个数据集未被加载到内存,也未在上面执行动作:lines 仅仅指向了这个文件。第二行定义了 lineLengths 作为 map 转换的结果。此外,由于惰性,不会立即计算出 lineLengths 。 最后我们运行 reduce ,这是一个动作 (action)。这个时候,Spark 才会将这个计算拆分为不同的 task ,并运行在独立的机器上,并且每台机器运行它自己的 map 部分和本地的 reduction,仅仅返回它的结果给驱动程序。

如果我们希望以后重复使用 lineLengths,只需在reduce前加入下面这行代码:

lineLengths.persist()


在 reduce 之前,这将导致 lineLengths 在第一次被计算之后,被保存在内存中。

II. 向Spark传递函数

Scala

Spark 的 API ,在很大程度上依赖于把程序驱动 (driver program) 中的函数传递到集群上运行,这有两种推荐的实现方式:

使用 匿名函数的语法 ,这可以用来代替简短的代码。

使用全局单例对象 (global singleton object) 的静态方法。比如,你可以定义函数对象 object MyFunctions ,然后传递该对象的方法 MyFunctions.func1,如下所示:

object MyFunctions {
def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)


Note: 由于可能传递的是一个类实例的方法的引用(而不是一个单例对象 singleton object),在传递方法的时候,应该同时传递包含该方法的对象。比如,考虑:

class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}


这里我们创建了一个实例 new MyClass,并且调用了实例的 doStuff 方法,该方法中的 map 处调用了这个 MyClass 实例的 func1 方法,所以需要将整个对象传递到集群中。类似于写成:rdd.map(x => this.func1(x))。

类似地,访问外部对象的字段时将引用整个对象:

class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}


等同于写成 rdd.map(x => this.field + x) 引用了整个 this 。为了避免这种问题,最简单的方式是把 field 拷贝到本地变量,而不是去外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}


Java

Spark 的 API ,在很大程度上依赖于把驱动程序 (driver program) 中的函数传递到集群上运行。在 Java 中,函数由那些实现了 org.apache.spark.api.java.function 包中的接口的类表示。有两种创建这样的函数的方式:

在你自己的类中实现 Function 接口,可以是匿名内部类,或者命名类,并且传递类的一个实例到 Spark

在 Java 8 中,使用 lambda expressions 来简明地定义函数的实现

为了保持简洁性,本指南中大量使用了 lambda 语法,这在长格式 (long-form) 中很容易使用所有相同的 APIs 。比如,我们可以把上面的代码写成:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});


或者,如果不方便编写内联函数的话,可以写成:

class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());


Note: Java 中的匿名内部类也可以访问封闭域 (the enclosing scope) 中的变量,只要这些变量标识为 final 即可。Spark 会像处理其他语言一样,将这些变量拷贝到每个工作 (worker) 节点上。

Python

Spark 的 API ,在很大程度上依赖于把驱动程序 (driver program) 中的函数传递到集群上运行。有三种推荐方法可以使用:

使用 Lanbda 表达式 来编写可以写成一个表达式的简单函数。(Lambdas 不支持没有返回值的多语句函数 multi-statement functions)或表达式)

Spark 调用的函数中 Local defs ,可以用来代替更长的代码

模块 (module) 中的顶级 (Top-level) 函数

例如想表达传递一个支持使用 lambda 表达式的更长的函数,可以考虑以下代码:

"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)

sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)


Note: 由于可能传递的是一个类实例的方法的引用 (而不是单例对象 (singleton object)),在传递方法的时候,应该同时传递包含该方法的对象。比如,考虑:

class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)


这里,如果我们创建了一个类实例 new MyClass ,并且调用了实例的 doStuff 方法,该方法中的 map 处调用了这个 MyClass 实例的 func 方法,所以需要将整个对象传递到集群中。

类似地,访问外部对象的字段时将引用整个对象:

class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + s)


为了避免这种问题,最简单的方式是把 field 拷贝到本地变量,而不是去外部访问它:

def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + s)


III. 理解 closures

当在集群中执行代码时,关于 Spark 的一件很困难的事就是知道变量的生命周期以及作用范围 (understanding the scope ) 。修饰那些在它们活动范围之外变量的 RDD 一些操作往往造成混乱。

在下面的例子中,我们将会看到使用
foreach()
使计数器不断增加的代码,但是相似的问题同样会出现在其他操作上。

例子

考虑以下的 本地RDD元素总和,这些 RDD 变现的完全不同,这取决于 执行 (execution) 是否发生在相同 JVM 中。一个很常见的例子就是当 Spark 运行在 local 模式下 (–master = local
) 与 Spark 应用程序部署运行在集群上 (例如,通过 spark-submit 提交到 YARN)。

Scala

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)


Java

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);


Python

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
rdd.foreach(lambda x: counter += x)

print("Counter value: " + counter)


Local 模式 vs 集群模式

主要的挑战在于,上述代码是未定义的。在单个 JVM 的 Local 模式中,上述代码将 RDD 中的值进行求和运算,并将其存储在计数器中。这是因为 driver node 上的 RDD 和 可变的计数器都处于相同的内存中。

然而,在集群模式中,所发生的情况更加复杂,并且可能没有计划中正常工作。在执行 jobs 时,Spark 将 RDD 操作的处理过程分解为各个子任务,每个 executor 执行一个子任务。在执行任务之前,Spark 计算 closureexecutor 在 RDD 执行它的计算(在 foreach() 这个例子中)中可见的那些变量和方法,称之为 closure。该 closure 被序列化并送到每个 executor 去。在 Local 模式中,因为只有一个 executors 所以所有共享相同的 clousre。然而在其他模式中并不会出现这种情况,executor 运行在各自的 worker 节点上,每个节点都有它们自己的 closure 副本。

这其中发生的是,当 counter 在 foreach 函数中被引用时,发送到各个 executor 上的 closure 中的变量已经是副本了,因此它在在 driver 节点上不再充当 counter 了。在 driver 节点内存中依旧存在一个 counter,但此时对于 executors 而言已经不可见了。executors 仅从已经序列化后的 closure 中看到拷贝副本。所以,最终 counter 的依旧为 0,这是因为在 counter 上的所有操作其实都是引用已经序列化 closure 中的值。

为了确保在这些场景中行为已经很好的被定义,我们应该使用一个 Accumulator。 当集群中的 execution 在 worker 节点们上进行分片 (split up),Spark 中的 Accumulator 具体被用于在更新变量中提供一个安全的机制。现在提及的将在 Accumulators 章节中详细谈到。

总的来说,closures —— 构造循环或局部定义的方法,不应用于改变一些全局状态。Spark 没有定义或保证改变从 closures 外部引用的对象的行为。一些这么做的代码也许能在 Local 模式下工作,但是那仅是偶然且那样的代码在分布式模式中运行并不没预想中正常工作。如果需要某些全局集合,那么用 Accumulator 来代替。

打印出 RDD 的 elements

另一个常用惯用语法是使用
rdd.foreach(println) or rdd.map(println)
尝试打印出 RDD 的 elements。在单个机器上面,这个惯用语法将产生预期的输出结果并打印出所有的 RDD 的 elements。然而在 cluster 模式中,被 executors 称作指向 stdout 的输出现在直接写入executor 的 stdout,不在是在 driver 上的那个了,所以在 driver 上的 stdout 不会显示这些了。为了打印出在 driver 上的所有 elements,你可以使用 colletc() 方法来先给 driver 节点带去 RDD,因此:rdd.collect().foreach(println)。这能导致 driver 运行时内存溢出,尽管因为 collect() 获取整个 RDD 到单个机器上。如果你金需要打印一些 RDD 上的一些 elements ,一个更安全可靠的方法就是使用 take() : rdd.take(100).foreach(println).

IV. 使用键值对

虽然在包含任意类型的对象的 RDDs 中,可以使用大部分的 Spark 操作,但也有一些特殊的操作只能再键值 (key-value) 对的 RDDs 上使用。最常见的一个就是分布式的 shuffle 操作,比如基于元素进行分组或聚合的操作。

Scala

在 Scala 中,包含 二元组Tuple2 对象(可以通过简单地 (a, b) 代码,来构建内置 (built-in) 于语言中的元组 (tuples))的 RDDs 支持这些操作。PairRDDFunctions 类支持键值 (key-value) 对的操作,如果你导入了隐式转换,该类型就能自动地对元组 (tuples) RDD 的元素进行转换。

比如,一下代码对键值对调用了reduceByKey操作,来统计每一文本行在文本文件中出现的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)


Java

在 Java 中,可以使用 Scala 标准库中的 Scala.Tuple2 类来表示键值 (key-value) 对,你可以简单地调用
new Tuple2(a, b)
来创建一个元组 (tuple),然后使用
tuple._1() 和 tuple._2().
方法来访问元组的字段。

使用 JavaPairRDD 来表示键值 (key-value) 对 RDDs。你可以使用指定版本的 map 操作,从 JavaRDDs 构建 JavaPairRDDs , 比如 mapToPair 和 flatMapToPair。JavaPairRDD 支持标准的 RDD 函数,也支持特殊的键值 (key-value) 函数。

比如,一下代码对键值对调用了reduceByKey 操作,来统计每一文本行在文本文件中出现的次数:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b)


ScalaJava

我们还可以使用 counts.sortByKey(),比如,当我们想将这些键值对按照字母表顺序排序,然后调用counts.collect() 方法来将结果以对象列表的形式返回。

Note: 在键值 (key-value) 对操作中,如果使用自定义对象作为键,你必须确保该对象实现了自定义的 equlas() 和对应的 hashCode() 方法。更多详情请查看 Object.hashCode() documentation 大纲中列出的规定。

Python

在Python中,这类操作一般都会使用Python内建的元组类型,比如(1, 2)。它们会先简单地创建类似这样的元组,然后调用你想要的操作。

比如,一下代码对键值对调用了reduceByKey操作,来统计每一文本行在文本文件中出现的次数:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)


我们还可以使用 counts.sortByKey(),比如,当我们想将这些键值对按照字母表顺序排序,然后调用counts.collect() 方法来将结果以对象列表的形式返回。

V. 转化操作

下面的表格列出了Spark支持的常用转化操作。欲知细节,请查阅RDD API文档(Scala, Java, PythonR)和键值对RDD函数文档(Scala, Java)。

转化操作作用
map(func)返回一个新分布式数据集,由每一个输入元素经过 func 函数转换后组成
filter(func)返回一个数据集,由经过 func 函数计算后返回值为 true 的输入元素组成
flatMap(func)类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素 (因此 func 应该返回一个序列(Seq),而不是单一元素)
mapParitions(func)类似于 map,但独立地在 RDD 的每一个分区对应块 block 上运行每当类型为 T 的RDD 上运行时,func 的函数类型必须为
Iterator<T> => Iterator<U>
mapParitionsWithIndex(func)类似于 mapPartitions ,但 func 带有一个整数参数表示分区 (partition)的索引值。当在类型为 T 的 RDD 上运行时,func 的函数类型必须是
(Int, Iterator<T>) => Iterator<U>
sample(withReplacement, fraction, seed)根据 fraction 指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed 用于指定随机数生成器种子
union(otherDataset)返回一个新的数据集,新数据集由源数据和参数数据集的元素联合 (union) 而成
intersection(otherDataset)返回一个新的数据集,新数据集由源数据和参数数据集的元素交集 (intersection) 而成
distinct([numTasks])返回一个新的数据集,新数据集有源数据过滤掉多余的重复元素只保留一个而成
groupByKey([numTasks])在一个 (K, V) 对的数据集上调用,返回一个 (K, Iterable) 对的数据集。          注意:如果你想在每个 Key 上分组执行聚合(如总和或平均值)操作,使用 reduceByKey 或 aggregateByKey 会产生更好的性能。                        注意:默认情况下,输出的并行数据依赖于父 RDD(parent RDD)的分区数。你可以通过传递可选的第二个参数 numTasks 来设置不同的任务数
reduceByKey(func, [numTasks])在一个 (K, V)对的数据集上调用时,返回一个 (K, V) 对的数据集,使用指定的 reduce 函数 func 将相同 key 的值聚合到一起,该函数的类型必须是 (V, V) => V。类似 groupByKey,reduce 的任务个数是可以通过第二个可选参数来配置的
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])在一个 (K, V) 对的数据集上调用时,返回一个 (K, U) 对的数据集,对每个键的值使用给定的组合函数( combine functions ) 和一个中性的 “零” 值进行聚合。允许聚合后的值类型不同于输入的值类型,从而避免了不必要的内存分配。如同 groupByKey,可以通过设置第二个可选参数来配置 reduce 任务的个数
sortByKey([ascending], [numTasks])在一个 (K, V) 对的数据集上调用,其中 K 必须实现 Ordered ,返回一个按照 Key 进行排序的 (K, V) 对数据集,升序或降序由布尔参数 ascending 决定
join(otherDataset, [numTasks])在类型为 (K, V) 和 (K, W) 类型的数据集上调用时,返回一个相同 Key 对应的所有元素对在一起的 (K, (V, W)) 对的数据集。外联 (Outer joins) 操作由 leftOuterJoin, rightOuterJoin, 和 fullOuterJoin 来支持
cogroup(otherDataset, [numTasks])在类型为 (K,V) 和 (K, W) 的数据集上调用,返回一个
(K, (Iterable<V>, Iterable<W>))
元组的数据集。这个操作也可以被称之为 groupWith
cartesian(otherDataset)笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U) 对数据集(所有元素交互进行笛卡尔积)
pipe(command, [envVars])以管道的方式将 RDD 的各个分区传递到 shell 命令,比如一个 Perl 或 bash 脚本中。RDD 的元素会被写入进程的标准输入 (stdin),并且将作为字符串的 RDD ,在进程的标准输出 (stdout) 上输出一行行数据
coalesce(numPartitions)把 RDD 的分区数降低到指定的 numPartitons。过滤掉一个大数据集之后再执行操作会更加有效
repartition(numPartitions)随机地对 RDD 的数据重新洗牌 (Reshuffle),以便创建更多或更少的分区,对它们进行平衡。总是在网络上对所有数据进行 Shuffle以管道的方式将 RDD 的各个分区传递到 shell 命令,比如一个 Perl 或 bash 脚本中。RDD 的元素会被写入进程的标准输入 (stdin),并且将作为字符串的 RDD ,在进程的标准输出 (stdout) 上输出一行行数据
repartitionAndSortWithinPartitions(partitioner)根据指定的 partitioner 对 RDD 重新分区,并且在每个解雇分区中,记录会根据其键值进行排序。这比西安调用 repartiton ,然后在每个分区中调用 sorting 的效率高很多,因为它可以把 sorting 放入到 Shuffle 机制中执行

VI. 启动 (Actions)

下面的表格列出了Spark支持的部分常用启动操作。欲知细节,请查阅RDD API文档(Scala, Java, Python)和键值对RDD函数文档(Scala, Java)。

启动操作作用
reduce(func)通过函数 func(接受两个参数,返回一个参数),聚集数据集中的所有元素。该函数应该是可以交换和可结合的,以便它可以正确地并行计算
collect()在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用 filter 或者其他操作,并返回一个足够小的数据子集后再使用会比较有用
count()返回数据集的元素的个数
first()返回数据集的第一个元素(类似于 take(1))
take(n)返回一个由数据集的前 n 个元素组成的数组。注意,这个操作也许不能并行执行
takeSample(withReplacement, num, [seed])返回一个数组。由数据集中随机采样的 num 个元素组成,可以选择是否用随机数来替换不足的部分么可以指定可选参数 seed ,预先指定一个随机数生成器的种子
takeOrder(n, [ordering])返回一个由数据集的前 n 个元素,并使用自然顺序或定制顺序对这些元素进行排序
saveAsTextFile(path)将数据集的元素,以 text file (或 text file 的集合)的形式,保存到本地文件系统的指定目录, Spark 会对每个元素调用 toString 方法,然后转换为文件中的文本行
saveAsSequenceFile(path)将数据集的元素,以 Hadoop sequencefile 的格式,保存到各种文件系统的指定路径下,包括本地系统,HDFS或者任何其他 Hadoop 支持的文件系统,该方法只能用于 键值(key-value)对的 RDDs, 或者实现了 Hadoop 的 Writable 接口的情况下。在 Scala 中,也可以用于支持隐式转换为 Writable 的类型。(Spark 包括了基本类型的转换,例如 Int,Double,String,等)
saveAsObjectFile(path)以简单地 Java 序列化方式将数据集的元素写入指定的路径,对应的可以用 SparkContext.objectFile() 加载该文件
countByCount()只对 (K, V) 类型的 RDD 有效。返回一个 (K, Int)对的 hashmap ,其中 (K, Int)对表示每一个 Key 对应的元素个数
foreach(func)在数据集的每一个元素上,运行 func 函数。这通常用于副作用 (side effects) 例如更新一个累加器变量 (accumulator variable),或者和外部存储系统进行交互

VII. Shuffle 操作

在 Spark 中触发的某些熟知的事件——Shuffle。Shuffle 是 Spark 里关于数据重分发(re-distributing) 的机制,比便于在不同分区 (partitions) 进行差异化分组。典型地包括在 executors 和机器间拷贝数据,这使得 shuffle 成为一个复杂且成本高的操作。

1. 背景

为了理解在 shuffle 期间都发生了什么,我们可以考虑 reduceByKey 操作这个例子。reduceByKey 操作产生一个新的 RDD ,该 RDD 的一个 kye 所对应的所有 values 全部整合到一个包含那个 key 的元组中,并且对所有 values 都执行 reduce 函数后的结果与那个 key 相联系起来。这其中的挑战在于那个 key 对应的所有 values 并不是都在相同的分区上,或即使在相同的机器上,它们也必须要 co-located 地计算结果。

在 Spark 的一次具体的操作中,数据通常不会从分区 (partitions) 分布到必要的地方。在计算期间,每个分区上执行单个任务。因此,为了组织执行所有在单次 reduceByKey reduce 任务中的数据,Spark 需要执行一次 all-to-all 操作。它必须为所有的 key 从所有的分区中找到对应的所有 value,并跨分区将 values 整合计算出每个 key 最终结果,这个过程就称为 shuffle。

尽管最新 shuffled 数据的每个分中的元素集合将是确定性的,并且这是分区自身的排序,这些元素 (elements) 的顺序并非如此。如果想获得按照 shuffle 后已排序的数据,那么可以使用:

mapPartitions to sort each partition using, for example, .sorted

repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning

sortBy to make a globally ordered RDD

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

2. 性能影响

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and ‘ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.

4.4 RDD持久化

Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些 数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。

你能通过persist()或者cache()方法持久化一个rdd。首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术-如果RDD的任何一个分区丢失,它 可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。

此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到 Tachyon中。我们可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY。完整的存储级别介绍如下所示:

存储级别(Storage Level)含义
MEMORY_ONLY将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,一些分区将不会被缓存,从而在每次需要这些分区时都需重新计算它们。这是系统默认的存储级别
MEMORY_AND_DISK将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,将这些不适合存在内存中的分区存储在磁盘中,每次需要时读出它们
MEMORY_ONLY_SER将RDD作为序列化的Java对象存储(每个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,但是会更耗费cpu资源—密集的读操作
MEMORY_AND_DISK_SER和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中
DISK_ONLY仅仅将RDD分区存储到磁盘中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.和上面的存储级别类似,但是复制每个分区到集群的两个节点上面
OFF_HEAP (experimental)以序列化的格式存储RDD到Tachyon中。相对于 MEMORY_ONLY_SER, OFF_HEAP减少了垃圾回收的花费,允许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具有更强的吸引力
以序列化的格式存储RDD到Tachyon中。相对于MEMORY_ONLY_SER,OFF_HEAP减少了垃圾回收的花费,允许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具有更强的吸引力

注意:在Python中,储存的对象永远是通过Pickle库序列化过的,所以设不设置序列化级别不会产生影响。

Spark还会在shuffle操作(比如reduceByKey)中自动储存中间数据,即使用户没有调用persist。这是为了防止在shuffle过程中某个节点出错而导致的全盘重算。不过如果用户打算复用某些结果RDD,我们仍然建议用户对结果RDD手动调用persist,而不是依赖自动持久化机制。

I. 应该选择哪个存储级别?

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快

如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问

除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢

如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据

在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:

它运行多个执行者共享Tachyon中相同的内存池

它显著地减少垃圾回收的花费

如果单个的执行者崩溃,缓存的数据不会丢失

II. 删除数据

Spark 自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除 RDD,可以使用 RDD.unpersist() 方法

五. 共享变量

一般情况下,当一个传递给Spark操作 (例如map和reduce) 的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上 的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark 还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)

5.1 广播变量

广播变量允许程序员在每台机器上保持一个只读变量的缓存而不是将一个变量的拷贝传递给各个任务。它们可以被使用,比如,给每一个节点传递一份大输入数据集的拷贝是很低效的。Spark试图使用高效的广播算法来分布广播变量,以此来降低通信花销

可以通过SparkContext.broadcast(v)来从变量v创建一个广播变量。这个广播变量是v的一个包装,同时它的值可以功过调用value方法来获得。以下的代码展示了这一点:

Scala

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)


Java

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]


Python

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]


>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]


广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量 v,这样我们就不需要再次传递变量 v 到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象 v 不能在广播之后被修改。

5.2 累加器

顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现 counters 和 sums。Spark 原生支持数值类型的累加器,开发者可以自己添加支持的类型。 如果创建了一个具名的累加器,它可以在 spark 的 UI 中显示。这对于理解运行阶段(running stages)的过程有很重要的作用。(注意:这在python中还不被支持)

一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。 如下的代码,展示了如何利用累加器将一个数组里面的所有元素相加:

以下的代码展示了向一个累加器中累加数组元素的过程:

Scala

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10


虽然代码可以使用内置支持的 Int 类型的累加器,但是程序员也可以通过子类化 (subclassing) AccumulatorParam 来创建自己的类型。AccumulatorParam 接口有两个方法:zero,为你的类型提供了一个 “零值(zero value)”,以及 addInPlace 提供了两个值相加的方法。比如,假设我们有一个表示数学上向量的 Vector 类,我们可以这么写:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)


在 Scala 中,Spark 也支持更通用的 Accumulable 接口去累加数据,其结果类型和累加的元素不同(比如,构建一个包含所有元素的列表),并且 SparkContext.accumulableCollection 方法可以累加普通的 Scala 集合 (collection) 类型。

Java

Accumulator<Integer> accum = sc.accumulator(0);

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10


虽然代码可以使用内置支持的 Int 类型的累加器,但是程序员也可以通过子类化 (subclassing) AccumulatorParam 来创建自己的类型。AccumulatorParam 接口有两个方法:zero,为你的类型提供了一个 “零值(zero value)”,以及 addInPlace 提供了两个值相加的方法。比如,假设我们有一个表示数学上向量的 Vector 类,我们可以这么写:

class VectorAccumulatorParam implements AccumulatorParam<Vector> {
public Vector zero(Vector initialValue) {
return Vector.zeros(initialValue.size());
}
public Vector addInPlace(Vector v1, Vector v2) {
v1.addInPlace(v2); return v1;
}
}

// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());


在 Java 中,Spark 也支持更通用的 AccumulatorParam 接口去累加数据,其结果类型和累加的元素不同(比如,构建一个包含所有元素的列表)。

Python

>>> accum = sc.accumulator(0)
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
10


这段代码利用了累加器对int类型的内建支持,程序员可以通过继承 AccumulatorParam 类来创建自己想要的类型支持。AccumulatorParam 的接口提供了两个方法:zero’用于为你的数据类型提供零值;’addInPlace’用于计算两个值得和。比如,假设我们有一个Vector`类表示数学中的向量,我们可以这样写:

class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)

def addInPlace(self, v1, v2):
v1 += v2
return v1

'# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())


(Scala / Python / Java) 对于累加器的更新,只有在 actions 中执行时,Spark 才保证每个 task 对累加器的更新仅执行一次,也就是说说,重启的 tasks 不会再更新该累加器。在转换操作中,用户应该意识到,如果 tasks 或 job 的 stages 被重新执行的话,每个 task 中的累加器就可能不止一次被更新了。

累加器不会改变 Spark 的惰性求值模型 (lazy evaluation model)。如果累加器在对 RDD 的操作中被更新了,它们的值只会在启动操作中作为 RDD 计算过程中的一部分被更新。所以,在一个懒惰的转化操作中调用累加器的更新,并没法保证会被及时运行。下面的代码段展示了这一点:

Scala

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the `map` to be computed.


Java

Accumulator<Integer> accum = sc.accumulator(0);
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.


Python

accum = sc.accumulator(0)
data.map(lambda x => acc.add(x); f(x))
'# Here, acc is still 0 because no actions have cause the `map` to be computed.


六. 在集群上部署

这个 应用程序提交指南 描述了一个应用被提交到集群上的过程。简而言之,只要你把你的应用打成了 JAR 包(Java/Scala应用)或 .py 文件的集合或 .zip 压缩包 (Python应用),bin/spark-submit脚本会将应用提交到任意支持的集群管理器上

七. 单元测试

Spark对单元测试是友好的,可以与任何流行的单元测试框架相容。你只需要在测试中创建一个SparkContext,并如前文所述将 master 的 URL 设为 local,执行你的程序,最后调用SparkContext.stop() 来终止运行。请确保你在 finally 块或测试框架的 tearDown 方法中终止了 contexts ,因为 Spark 不支持两个 contexts 在一个程序中同时运行.

八. 从1.0之前版本的Spark迁移

Spark 1.0 冻结了 1.x 系列 的 Spark 核心 (Core) API ,现在的 API ,除了标识为 “实验性 (experimental)” 或 “开发者的 (developer ) API” 的。在将来版本中都会被支持。

Scala

对于 Scala 用户而言,唯一的改变在于组操作 (grouping operations),比如,groupByKey,cogroup 和 join ,其返回值已经从 (Key, Seq[Value]) 对修改为 (Key, Iterable[Value])。

Java

其中对 Java API 做了一些修改:

对于 org.apache.spark.api.java.function 中的类函数 (Function classes),在 1.0 版本中变成了接口,这意味着旧的代码中, extends Function 应该需要为 implement Function

增加了 map 转换 (transformations) 的新变体,如 mapToPair 和 mapToDouble ,用于创建指定数据类型的 RDDs

组操作 (Grouping operations) , 如 groupByKey, cogroup 和 join 的返回值也被修改了,从原先返回 (Key, List) 对改为 (Key, Iterable)

Python

对Python用户来说唯一的变化就是分组操作 (grouping operations),比如 groupByKey, cogroup, join, 它们的返回值都从(键,值列表)对变成了(键, 值迭代器)对

迁移指南也可以从 Spark Streaming, MLlibGraphX 获取。

九. 还有什么要做的

你可以在Spark的网站上看到更多的 Spark样例程序。另外,在examples目录下还有许多样例代码(Scala, Java, Python)。你可以通过将类名称传给 Spark 的 bin/run-example 脚本来运行 Java 和 Scala 语言样例,举例说明:

./bin/run-example SparkPi


对于 Python 例子,使用spark-submit脚本代替:

./bin/spark-submit examples/src/main/python/pi.py


对于 R 例子,使用spark-submit脚本代替:

./bin/spark-submit examples/src/main/r/dataframe.R


为了给你优化代码提供帮助,配置指南调优指南 提供了关于最佳实践的一些信息。确保你的数据储存在以高效的格式储存在内存中,这很重要。为了给你部署应用提供帮助,集群模式概览 描述了许多内容,包括分布式操作和支持的集群管理器

最后,完整的API文档在这里。Scala版本 、Java版本 、Python版本 和 R 版本
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: