您的位置:首页 > 编程语言 > Python开发

Spark编程指南(python版)

2015-09-14 22:51 477 查看
主要是翻译官网的编程指南,自己调整了一下内容安排,同时为了偷懒大量参考了淘宝的翻译版嘿嘿。但他们的编程指南主要是写java、scala语言用的,要求掌握sbt(scala),maven(java),我选择python就是因为提交简单嘛。

楼主配置:1.0.0版Spark + 12.04Ubuntu + 1.8.0_05jdk + scala-2.11.1


shell交互式编程

cd到spark的顶层目录中

cd ~/spark-1.0.1

然后运行spark-shell即可。这里因为是python版,所以运行的是pyspark,这里常用的是下几种形式:

./bin/pyspark

./bin/pyspark --master local[4]

./bin/pyspark --master local[4] --py-files code.py

第一条语句是默认的设置运行pyspark

第二条配置了master参数,使用4个Worker线程本地化运行Spark(理想情况下,local[K]应该根据运行机器的CPU核数设定)

第三条增加的–py-files,是将指定的文件加到search path,以便于之后import之。

更详尽的信息可以运行./bin/pyspark –help进行查看 Master URLs

下面是一些常用的:

Master URL
含义

local 使用一个Worker线程本地化运行SPARK(完全不并行)

local[K] 使用K个Worker线程本地化运行Spark

local[*] 使用K个Worker线程本地化运行Spark(这里K自动设置未机器的CPU核数)

spark://HOST:PORT 连接到指定的Spark单机版集群(Spark standalone cluster)master。必须使用master所配置的接口,默认接口7077.

mesos://HOST:PORT 连接到指定的Mesos集群。host参数是Moses master的hostname。必须使用master所配置的接口,默认接口是5050.

yarn-client 以客户端模式连接到yarn集群,集群位置由环境变量HADOOP_CONF_DIR决定.

yarn-cluster 以集群模式连接到yarn集群,同样由HADOOP_CONF_DIR决定连接到哪儿

如果没有指定的msater URL, spark shell 的默认值是“local”。 下面是一些例子程序,均已鉴定有效,可以去shell试一试:

[html] view
plaincopy

>>> textFile = sc.textFile("README.md") #读取README,用textFile方法创建RDD数据集

>>> textFile.count() # Number of items in this RDD(textfile返回的每一行的记录,这里相当于返回文件行数)

127

>>> textFile.first() # First item in this RDD

u'# Apache Spark'

>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line) #filter方法创建一个新的RDD数据集(包含有'Spark'的行)

>>> linesWithSpark.count()

15


spark应用编程


初始化Spark

第一步当然是加入所需的Spark模块

[python] view
plaincopy

from pyspark import SparkContext, SparkConf

....

sc = SparkContext("local", "Page Rank")

Spark程序需要做的第二件事情,一定是创建一个SparkContext对象,它将告诉Spark如何访问一个集群。这个通常是通过构造器sc = SparkContext(master, appname)来实现的,例如:

这里要说明的是,在Spark shell中,一个特殊的解释器感知的SparkContext已经为你创建好了,变量名是sc。创建你自己的SparkContext是不会生效的。


RDD操作

Spark围绕的概念是弹性分布式数据集(RDD),这是一个有容错机制并可以被并行操作的元素集合。Spark程序的核心就是一系列对RDD的操作。目前有两种类型的RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。 Hadoop数据集(Hadoop Datasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。
这两种类型的RDD都可以通过相同的方式进行操作。

本节主要讲RDD的创建,以及支持的两种操作:转换(transformation)从现有的数据集创建一个新的数据集;动作(actions)在数据集上运行计算后,返回一个值给驱动程序。


创建

两种类型的RDD由以下两种方式创建而来。

1、并行集合(Parallelized Collections)

并行集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

[python] view
plaincopy

data = [1, 2, 3, 4, 5]

distData = sc.parallelize(data)

一旦分布式数据集(distData)被创建好,它们将可以被并行操作。例如,我们可以调用distData.reduce(lambda a, b: a + b)来将数组的元素相加。我们会在后续的分布式数据集运算中进一步描述。

并行集合的一个重要参数是slices,表示数据集切分的份数。Spark将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个CPU上分布2-4个slices. 一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。然而,你也可以通过传递给parallelize的第二个参数来进行手动设置。(例如:sc.parallelize(data,
10)).

2、Hadoop数据集

Spark可以从存储在HDFS,或者Hadoop支持的其它文件系统(包括本地文件,Amazon S3, Hypertable, HBase等等)上的文件创建RDD数据集。Spark可以支持TextFile,SequenceFiles以及其它任何Hadoop输入格式。(Python接口目前还不支持SequenceFile,很快会支持吧)

Text file的RDDs可以通过SparkContext’s textFile的方式创建,该方法接受一个文件的URI地址(或者机器上的一个本地路径,或者一个hdfs://, sdn://,kfs://,其它URI). 下面是一个调用例子:

[python] view
plaincopy

distFile = sc.textFile("data.txt")

一旦创建完成,distFile可以被进行数据集操作。例如,我们可以通过使用如下的map和reduce操作:

[html] view
plaincopy

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) #将所有数据行的长度相加。

Spark上面读文件的一些notes:

--如果使用本地文件系统的路径,那么这个文件在工作节点上也应该可以按照这个路径读到。即,要么把文件copy到所有工作节点或者使用网络共享之。

--Spark的文件输入方法,可以run在目录、压缩文件上,支持通配符。 For example, 可以用textFile(”/my/directory”), textFile(”/my/directory/*.txt”), and textFile(”/my/directory/*.gz”).

--textFile方法也可以通过输入一个可选的第二参数,来控制文件的分片数目。默认情况下,Spark为每一块文件创建一个分片(HDFS默认的块大小为64MB),但是你也可以通过传入一个更大的值,来指定一个更高的片值。注意,你不能指定一个比块数更小的片值(和Map数不能小于Block数一样,但是可以比它多)

--使用SparkContext.wholeTextFiles可以读一个包含各种小textfile的目录,并对里头每一个文件返回(filename,content);就像textFile读文件会返回每一行的记录一样


转换

转换(transformation):从现有的数据集创建一个新的数据集。例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。

Spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。

下面是常用的转换:

转换 含义

map(func) 返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成

filter(func) 返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成

flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)

mapPartitions(func) 类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] ⇒ Iterator[U]

mapPartitionsWithSplit(func) 类似于mapPartitions, 但func带有一个整数参数表示分块的索引值。因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) ⇒ Iterator[U]

sample(withReplacement,fraction, seed) 根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset) 返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成

distinct([numTasks])) 返回一个包含源数据集中所有不重复元素的新数据集

groupByKey([numTasks]) 在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集.注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它

reduceByKey(func, [numTasks]) 在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的

sortByKey([ascending], [numTasks]) 在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定

join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集

cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操作也可以称之为groupwith

cartesian(otherDataset) 笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)


动作

转换是惰性的,直到动作开始才会执行。例如下面:

[python] view
plaincopy

lines = sc.textFile("data.txt")

lineLengths = lines.map(lambda s: len(s))

totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行定义了一个基础RDD,但并没有开始载入内存啊balabala的,仅仅将lines指向了这个file

第二行也仅仅定义了linelengths是作为map的结果,但也没有开始map

直到第三句话才开始动作起来,各个worker节点开始运行自己的map、reduce。

默认情况下,每一个转换过的RDD都会在你在它之上执行一个动作时被重新计算。不过,你也可以使用persist(或者cache)方法,持久化一个RDD在内存中。上面的例子中,每一次用到linelengths的时候,它的值是重新map一遍得到的。因此你可以用persist()或cache()方法来标记一个要被持久化的RDD,然后一旦首次被一个动作(Action)触发计算,它将会被保留在计算结点的内存中并重用。其中,Cache有容错机制,如果RDD的任一分区丢失了,通过使用原先创建它的转换操作,它将会被自动重算(不需要全部重算,只计算丢失的部分)。持久化之后,Spark将会在集群中保存相关元素,下次你查询这个RDD时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的,这些选项将在本文档的下一节进行描述。

常用的动作列表

动作 含义

reduce(func) 通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。

collect() 在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。

count() 返回数据集的元素的个数。

first() 返回数据集的第一个元素(类似于take(1))

take(n) 返回一个由数据集的前n个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素

takeSample(withReplacement,num, seed) 返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子

saveAsTextFile(path) 将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行

saveAsSequenceFile(path) 将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的可以转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等)

countByKey() 对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数

foreach(func) 在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase


RDD持久化

Spark最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(Actions)变得更加迅速(通常快10倍)。缓存是用Spark构建迭代算法的关键。 使用以下两种方法可以标记要缓存的RDD:

[python] view
plaincopy

lineLengths.persist()

lineLengths.cache()

取消缓存则用

[python] view
plaincopy

lineLengths.unpersist()

此外,每一个RDD都可以用不同的保存级别进行保存,从而允许你持久化数据集在硬盘,或者在内存作为序列化的Java对象(节省空间),甚至于跨结点复制。这些等级选择,是通过将一个org.apache.spark.storage.StorageLevel对象传递给persist()(persist(self, storageLevel))方法进行确定。cache()方法是使用默认存储级别的快捷方法,也就是StorageLevel.MEMORY_ONLY(将反序列化的对象存入内存)。
完整的可选存储级别如下:

存储级别
意义

MEMORY_ONLY
将RDD作为反序列化的的对象存储JVM中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。这是是默认的级别

MEMORY_AND_DISK
将RDD作为反序列化的的对象存储在JVM中。如果RDD不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取

MEMORY_ONLY_SER
将RDD作为序列化的的对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化的空间利用率更高,尤其当使用fast serializer,但在读取时会比较占用CPU

MEMORY_AND_DISK_SER
与MEMORY_ONLY_SER相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算

DISK_ONLY
只将RDD分区存储在硬盘上

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
与上述的存储级别一样,但是将每一个分区都复制到两个集群结点上

存储级别的选择 Spark的不同存储级别,旨在满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:

--如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快。

--如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。

--尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。

--如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算。

--如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用StorageLevel 单例对象的apply()方法。


run it

Spark构建起一个程序支持三种语言:Scala (with SBT), Java (with Maven), and Python.

这里python最为简洁,写好你的.py文件后,到Spark主文件夹中执行

[python] view
plaincopy

./bin/spark-submit --master local[4] SimpleApp.py

就可以了。


Examples

本节给出一些小例子(其实都可以去Example文件夹下找到):

排序(当文件存储大量数字(只能是数字)时,将内容从小到大排序输出):

sort.py

[python] view
plaincopy

import sys

from pyspark import SparkContext

if __name__ == "__main__":

if len(sys.argv) != 2:

print >> sys.stderr, "Usage: sort <file>"

exit(-1)

sc = SparkContext(appName="PythonSort")

lines = sc.textFile(sys.argv[1], 1)

sortedCount = lines.flatMap(lambda x: x.split(' ')) \

.map(lambda x: (int(x), 1)) \

.sortByKey(lambda x: x)

# This is just a demo on how to bring all the sorted data back to a single node.

# In reality, we wouldn't want to collect all the data to the driver node.

output = sortedCount.collect()

for (num, unitcount) in output:

print num

单词计数:

wordcount.py

[python] view
plaincopy

import sys

from operator import add

from pyspark import SparkContext

if __name__ == "__main__":

if len(sys.argv) != 2:

print >> sys.stderr, "Usage: wordcount <file>"

exit(-1)

sc = SparkContext(appName="PythonWordCount")

lines = sc.textFile(sys.argv[1], 1)

counts = lines.flatMap(lambda x: x.split(' ')) \

.map(lambda x: (x, 1)) \

.reduceByKey(add)

output = counts.collect()

for (word, count) in output:

print "%s: %i" % (word, count)

参考文献:

官网文档: Quik Start http://spark.apache.org/docs/latest/quick-start.html

SparkProgramingGuide http://spark.apache.org/docs/latest/programming-guide.html

淘宝技术部: Spark编程指南 http://rdc.taobao.org/?p=2024
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: