您的位置:首页 > 编程语言 > Python开发

Spark RDD 的Transformation与Action的常用功能总结(Python版本)

2016-04-01 16:26 736 查看
本文主要演示如何通过Python对Spark的RDD进行编程,只列出了一些常用的RDD操作接口,完整的功能,请参考官方文档

演示环境说明

RDD的详细介绍请参考:http://blog.csdn.net/eric_sunah/article/details/49705145
操作系统:Ubuntu 12.04

部署环境:1.6单机版
演示环境:pyspark
测试语言:Python
Transformation
map
概述:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
样例

>>> templist=[1,2,3,4,5,6]
>>> rdd=sc.parallelize(templist)
>>> result=rdd.map(lambda x:x*3)
>>> result.collect()

[3, 6, 9, 12, 15, 18]

filter
概述:filter是通过指定的函数对已有的RDD做过滤操作,只有符合条件的元素才会被放到新的RDD中
样例

>>> templist=[1,2,3,4,5,6]
>>> rdd=sc.parallelize(templist)

>>> result=rdd.filter(lambda x:x%2==0)

>>> result.collect()

[2, 4, 6]

flatMap
概览:类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
样例

>>> templist=[1,2,3,4,5,6]
>>> rdd=sc.parallelize(templist)
>>> result=rdd.flatMap(lambda x:x)
>>> result.collect()
[0, 0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5]

mapPartitions
概述:mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。

func作为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数func,func的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。

样例

>>> templist=[1,2,3,4,5,6]
>>> rdd=sc.parallelize(templist)
>>> def func(chain):
...     for item in chain:
...             yield item*2
... 
>>> result=rdd.mapPartitions(func);
>>> result.collect()

[2, 4, 6, 8, 10, 12]

mapPartitionsWithIndex
概述:和mapPattitions类似只是它能把分区的index传递给用户指定的输入函数
样例

>>> templist=[1,2,3,4,5,6]
>>> rdd=sc.parallelize(templist)
>>> def func(par_index,chain):
...     for item in chain:
...             yield item*par_index
...             print "##partition index:%d  item:%d" %(par_index,item)
...     print "###partition index:%d" %(par_index)
... 
>>> result=rdd.mapPartitionsWithIndex(func)
>>> result.collect()

###partition index:4
##partition index:1  item:1
###partition index:1
###partition index:0
##partition index:5  item:4
###partition index:5
##partition index:2  item:2
###partition index:2
##partition index:7  item:6
###partition index:7
##partition index:6  item:5
###partition index:6
##partition index:3  item:3
###partition index:3
[1, 4, 9, 20, 30, 42]

mapValues
概述:mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
样例

>>> originalMap={1:"first",2:"second",3:"thrid"}

>>> keyRdd=sc.parallelize(originalMap)

>>> mapRdd=keyRdd.map(lambda x:(x,originalMap[x]))

>>> newMapRdd=mapRdd.mapValues(lambda x:x.upper())

>>> newMapRdd.collect()

[(1, 'FIRST'), (2, 'SECOND'), (3, 'THRID')]

mapWith
概述:mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:
def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

样例:Python API没有提供该接口

sample
概述:随机返回RDD中的样本数据,方法定义为
sample
(withReplacement, fraction, seed=None)

withReplacement:表示一个元素是否可以出现多次
fraction:随机的不重复样本占整个RDD的比例,值得范围为[0,1]

seed:随机种子
样例

In [36]: rdd = sc.parallelize(range(100), 4)
In [37]: rdd.sample(False,0.1,37).collect()

Out[37]: [9, 10, 18, 22, 52, 53, 64, 66, 85, 91, 96]

union
概述:将两个RDD进行结合,返回并集
样例

>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd2 = sc.parallelize([4, 5, 6, 7])

[1, 1, 2, 3, 4, 5, 6, 7]

intersection
概述:返回两个RDD的交集,如果交集包含重复元素,那么也只显示一个。该操作内部会执行shuffer操作
样例

>>> rdd1 = sc.parallelize([1, 10, 2,2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2,2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect();

[1, 2, 3]

distinct
概述:对一个RDD进行去重操作
样例

>>> rdd1 = sc.parallelize([1, 10, 2,2, 3, 4, 5])

>>> rdd1.distinct().collect()

[1, 10, 2, 3, 4, 5]

groupByKey
概述:将RDD中的元素按照key进行分组,如果是为了对每个key进行汇聚操作,使用reduceByKey和aggregateByKey 效率会更高一点
样例

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]

reduceByKey(func, numPartitions=None, partitionFunc=<function
portable_hash at 0x7f1ac7340578>)
概述:首先对RDD进行分组操作并在本地进行合并,合并后的结果再调用func进行reduce处理,
样例

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("b",3)])

>>> rdd.reduceByKey(lambda e1,e2:e1+e2).collect()

[('a', 3), ('b', 4)]

aggregate(zeroValue, seqOp, combOp)
概述:aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

下面例子中 红色Tuple表示zeroValue,绿色Tuple表示seqOp产生的结果,橙色Tuple表示comOP产生的结果

样例

>>> def seqOp(x,y):
...     print "seqOp %s %s" %(str(x),str(y))
...     return x[0] + y, x[1] + 1
... 
>>> def comOp(x,y):
...     print "comOp %s %s" %(str(x),str(y))
...     return x[0] + y[0], x[1] + y[1]
... 
>>> sc.parallelize([1, 2, 3, 4]).aggregate((1, 1), seqOp, comOp)

seqOp (1, 1) 1

seqOp (1, 1) 2

seqOp (1, 1) 3

seqOp (1, 1) 4

comOp (1, 1) (1, 1)
comOp (2, 2) (2, 2)
comOp (4, 4) (1, 1)
comOp (5, 5) (3, 2)
comOp (8, 7) (1, 1)
comOp (9, 8) (4, 2)
comOp (13, 10) (1, 1)
comOp (14, 11) (5, 2)
(19, 13)

sortByKey(ascending=True, numPartitions=None, keyfunc=<function
<lambda> at 0x7f1ac7345de8>)
概述:对RDD安装Key排序,前提是RDD的元素类型是(K,V)型的

keyfunc只是在比较的时候做对应的操作,而不是改变原有RDD里面的值
样例

>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey().collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

>>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5),('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]
>>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()

[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]

join
概述:按照Key合并两个(K,V)类型的RDD,合并后的数据形式为k, (v1, v2),该操作是跨整个集群的操作
样例

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> x.join(y).collect()

[('a', (1, 2)), ('a', (1, 3))]

cogroup
概述:对两个包含(K,V)类型列表的RDD进行操作,返回的结果是按照key进行组织的tuple列表
样例

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([1], [2])), ('b', ([4], []))]

groupWith(other, *others)
概述:和cogroup类似,只是支持同事对多个RDD进行操作
样例

>>> w = sc.parallelize([("a", 5), ("b", 6)])
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> z = sc.parallelize([("b", 42)])
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

pipe
概述:通过调用一个外部程序生成RDD,例如tr 'A-Z' 'a-z'命令主要用来将输入装换成小写,下面的例子用来演示如果通过该命令将RDD的元素都转换成小写
样例

>>> sc.parallelize(['sun', 'BDE', 'ddddsacF', 'asdfasdf']).pipe("tr 'A-Z' 'a-z'").collect()

[u'sun', u'bde', u'ddddsacf', u'asdfasdf']

coalesce(numPartitions, shuffle=False)
概述:对RDD的数据按照指定的分区数重新分区。新分配的分区数必须小于原始分区数
样例

>>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
[[1], [2, 3], [4, 5]]
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]

>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(4).glom().collect()
[[1], [2, 3], [4, 5]]

repartition(numPartitions)
概述:返回一个重新分区过的RDD,分区的数量可以增加也可以减少,内部会使用shuffle来重新分配数据。

在partition数量减少的情况下,建议使用coalesce(可以避免执行shuffle),
样例

>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
>>> len(rdd.repartition(2).glom().collect())
2
>>> len(rdd.repartition(10).glom().collect())
10

cartesian
概述:生成两个RDD的笛卡尔集
样例

>>> rdd = sc.parallelize([1, 2])

>>> rdd2 = sc.parallelize([2, 3])

>>> rdd.cartesian(rdd2).collect()

[(1, 2), (1, 3), (2, 2), (2, 3)]

Action
reduce
概述:通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
样例

对RDD做sum操作

>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
从RDD中选出最大值
>>> sc.parallelize([11, 2, 8, 9, 5]).reduce(lambda x,y:max(x,y))
11

collect
概述:在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
样例

>>> sc.parallelize([11, 2, 8, 9, 5]).filter(lambda x:x%2==0).collect()

[2, 8]

count
概述:返回数据集的元素个数
样例

>>> sc.parallelize([11, 2, 8, 9, 5]).count()
5

take
概述:返回一个数组,由数据集的前n个元素组成。该函数会首先在一个分区上进行扫描,用第一个分区的扫描结果去评估其他的分区情况
样例

>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]

first
概述:返回数据集的第一个元素(类似于take(1)
样例

>>> sc.parallelize([2, 3, 4]).first()
2

takeSample(withReplacement, num, seed=None)
概述: 返回固定数量的样本
样例

>>> sc.parallelize(range(100),3).takeSample(False,10);
[44, 34, 27, 54, 30, 21, 58, 85, 45, 32]

takeOrdered(num, key=None)
概述:按照指定的顺序返回一定数量的样本
样例

>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
[10, 9, 7, 6, 5, 4]

saveAsTextFile(path, compressionCodecClass=None)
概述:将结果保存为文本文件
样例

>>> tempFile3 = NamedTemporaryFile(delete=True)
>>> tempFile3.close()
>>> codec = "org.apache.hadoop.io.compress.GzipCodec"
>>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
>>> from fileinput import input, hook_compressed
>>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
>>> b''.join(result).decode('utf-8')
u'bar\nfoo\n'

saveAsSequenceFile
概述:将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)

countByKey
概述:返回每个key在map类型的RDD中出现的次数,返回的结果是一个map
样例

>>>sc.parallelize([("a", 1), ("b", 1), ("a", 2)]).countByKey().items()

[('a', 2), ('b', 1)]

stat
概述:返回数字列表RDD的统计信息,例如最大、最小值,平均值等信息
样例

>>> result=sc.parallelize(range(10)).sample(False,0.5,37);
>>> result.collect()

[1, 2, 3, 7, 9]
>>> result.stats()

(count: 5, mean: 4.4, stdev: 3.07245829915, max: 9.0, min: 1.0)

foreach
概述:在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互
样例

>>> def f(x): print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

参考:Python的Spark RDD接口:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息