您的位置:首页 > 产品设计 > UI/UE

spark官方文档之——Spark programming guide spark编程指南

2015-06-25 09:53 543 查看
(相关代码为scala版本,其他java和python版自行查阅)

概述

每个spark应用由一个driver program组成,driver program运行用户main函数并在集群上执行多种并行操作。spark主要的抽象概念是弹性分布式数据集(RDD),它是分区在集群节点上的数据集合,可在其上做并行操作。RDDs可以从一个hadoop文件系统(或者其他任何hadoop支持的文件系统)上的文件或者driver program上存在的scala集合转换后的结果创建而来。用户也可以把RDD持久化persist到内存,让它在并行操作中被有效的重复使用。最后,RDDs会自动恢复失败的节点(容错)。
spark的第二个抽象概念是在并行操作中使用的shared variables共享变量。spark默认把函数作为并行的任务集在不同节点上运行,它传递每个函数中使用的变量的拷贝到每个任务。有时,一个变量需要在任务中共享,或者在任务和driver program间共享。spark支持两种类型的共享变量:广播变量(缓存到所有节点内存上使用);累加器accumulators(只是用来累加,例如counters和sums)。
文档展示了spark支持的每种语言的特性。很容易从spark的交互式shell开始,bin/spark-shell(scala shell)或者bin/pyspark(python)。

Linking with Spark

scala(java和python版自行查阅):
spark 1.4.0使用scala 2.10。为了能够用scala写应用,你需要使用一个兼容的scala版本(例如2.10.X)。
为了写spark应用,你需要在spark上添加Maven依赖。spark is available through Maven Central at:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.4.0

另外,如果你希望可以使用HDFS集群,你需要在hadoop-client上为你HDFS版本增加一个依赖。一些常用的HDFS版本标签在third
party distributions页列出。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,你需要在你的程序中导入一些spark类。如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在spark1.3.0之前,你需要导入import org.apache.spark.SparkContext._来使必要的隐式转换起作用)

初始化spark

scala(java和python版自行查阅):

spark程序第一件必须做的是创建一个SparkContext对象,它告诉spark怎样访问集群。首先需要创建一个包含你应用信息的SparkConf对象。
每个JVM中只有一个SparkContext对象是活跃的。你在创建一个新的之前必须stop()活跃的SparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName参数是你应用的名称,显示在集群UI上。master是一个spark,Mesos或YARN集群的URL,或者“local”字符串表示运行在本地模式。事实上,在一个集群上运行的时候,你并不想在程序中硬编码(写死?)master,而是用spark-submit启动并接收应用程序。但是,为了本地测试和单元测试,你可以用“local”在进程中运行spark。

使用shell

scala(python版自行查阅):
spark shell中,SparkContext已经为你创建好了,就是变量sc。创建你自己的SparkContext将不会工作。你可以用--master参数设置SparkContext连接哪个主机,并且你可以使用--jars参数添加JARs包(逗号分隔的列表)。你也可以使用--packages参数添加依赖(逗号分隔的列表)到你的shell会话(例如spark包)。任何用到的依赖(例如SonaType)都可以传递到--repositories参数。例如,四核运行bin/spark-shell:

$ ./bin/spark-shell --master local[4]//master参数,local表示本地模式,4表示四核

或者添加code.jar到路径中:

$ ./bin/spark-shell --master local[4] --jars code.jar//使用到的jar包

使用maven coordinates include一个依赖:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

运行spark-shell --help得到完整的操作列表。更多参考
spark-submit
script
(http://spark.apache.org/docs/latest/submitting-applications.html)

弹性分布式数据集(RDDs)

spark围绕着RDD的概念,它是一个可容错的数据集,在它之上可以进行并行操作。两种方法创建RDDs:parallelizing一个driver program中存在的集合,或者指向一个外部存储系统中的数据集(例如共享的文件系统,HDFS,HBase,或任何提供一个hadoop InputFormat的数据源)。

并行集合

scala(java和python版自行查阅):
并行集合由在driver program上已存在的集合上调用SparkContext的parallelize方法创建而来。集合的元素会被复制形成一个分布式数据集,在其上可以进行并行操作。例如,这里是怎样创建一个1到5数字的并行集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一经创建,可以在分布式数据集(distData)上进行并行操作。例如,我们可以调distData.reduce((a,b) => a+b)来把这个数组累加。我们随后会描述分布式数据集上的操作。
对于并行集合一个重要的参数就是此数据集被分区的数目。spark会在集群上为每个分区启动一个任务。典型的是集群中每个CPU 2-4个分区。spark一般会根据你的集群自动设置分区数。但是,你也可以用parallelize的第二个参数(例如 sc.parallelize(data,10))自己设置。注意:代码中有些地方使用term slices(分区的同义词)来保持向后兼容性。

外部数据集

scala(java和python版自行查阅):
spark可以从任何hadoop支持的存储源创建分布式数据集,包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。spark支持文本文件,序列化文件和任何其他hadoop InputFormat。
使用SparkContext的textFile方法创建文本文件RDDs。此方法用到文件的URL(本地路径或hdfs;//,s3n://等URL),并读取这些文件为文件行的集合。这里是一个例子:

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08

一经创建,distFile上可进行数据集操作。例如,我们可以像下面那样使用map和reduce操作累加所有行的大小:distFile.map(s => s.length).reduce((a,b) => a + b)。
spark在读取文件时的一些注意点:
*如果在本地文件系统上使用一个路径,在worker节点上此文件必须有相同的路径。要么拷贝此文件到所有workers节点,要么使用网络共享文件系统。

*spark所有基于文件的输入方法,包括textFile,支持在文件夹,压缩文件,通配符上运行(文件参数可通配符表示,或为压缩文件,或为文件夹)。例如,textFile(“/my/directory”)//文件夹,textFile("/my/directory/*.txt")//通配符,textFile("/my/directory/*.gz")//压缩文件及通配符。

*textFile方法的第二个可选参数用来控制文件的分区数。默认spark为文件的每个块创建一个分区(HDFS默认块为64M),但你可以传递一个更大的值指定分区大小。注意分区数不能少于块的数目。

除了文本文件,spark的scala API也支持一些其他的数据形式:
*SparkContext.wholeTextFiles可读取文件下多个小的文本文件,每个文件以(filename,content)键值对形式返回。相比之下,textFile使文件中的每行返回一个记录。

*对于序列化文件,使用SparkContext的sequenceFile[K,V]方法,K V和文件里key value的类型相同。且应该是hadoop的Writable接口的子类,像IntWritable和Text一样。另外,spark允许为一些常见Writables指定native类型;例如,sequenceFile[Int,String]会自动读取IntWritables和Texts。

*对于其他hadoop的InputFormats,可以使用SparkContext.hadoopRDD方法,此方法需JobConf和输入格式类(key类和value类)。像hadoop job那样设置你的输入源。基于新的MapReduce API(org.apache.hadoop.mapreduce)的输入格式,可以使用SparkContext.newAPIHadoopRDD。

*RDD.saveAsObjectFile和SparkContext.objectFile支持以一个组成序列化java objects的简单格式保存RDD。尽管不是像Avro那样有效的序列化格式,但它提供一个简单的方法去保存任何RDD。

RDD操作

RDDs支持两种类型的操作:transformations(从一个存在的数据集创建一个新的);actions(数据集上运行一个计算后返回到driver program的一个值)。例如,map转换通过一个函数转换每个数据集元素,返回一个代表结果的新的RDD。另一方面,reduce是使用相同的函数聚合RDD的所有的元素的一个action,并且返回给driver program最终的结果(尽管也有一个并行的返回一个分布式数据集的方法reduceByKey)。
spark中的转换为惰性的,不会立刻计算他们的结果。相反,它会记住这些应用于一些基本数据集(例如一个文件)上的转换。这些转换只有在一个action需要返回结果给driver program的时候被计算。这个设计使spark更有效的运行—例如,通过map创建的一个数据集可以被reduce使用,并只返回reduce的结果给driver,而不是mapped更大的数据集。
默认的,每次运行一个action时,每个RDD转换会被重复计算。但是,也可以使用persist(或cache)方法持久化一个RDD到内存里,这样spark会保持元素在集群上,提供更快的访问在下次你查询它时。也支持在磁盘上持久化RDDs,或者跨多节点复制。

basics

scala:
如下简单的程序阐明RDD基础:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行从一个外部文件定义一个RDD。这个数据集没有加载到内存:lines只是一个指向文件的指针。第二行定义lineLengths作为一个map转换的结果。由于惰性转换机制,lineLengths不会被立刻计算。最后,运行reduce,它是一个action。在这个时间点(action触发)spark把计算分到很多任务中, 在不同机器上运行,每个机器运行它那一部分map和本地的reduction,只返回它的结果给driver program。
如果我们想随后再次使用lineLengths,我们可以加入语句:

lineLengths.persist()

在reduce之前,可使lineLengths一经计算出立刻存到内存中。

向spark传递函数

scala:
spark的API很大程度上依赖于在driver program里传递函数来运行在集群上。两种推荐的方法:
*Anonymous
function syntax 匿名函数语法(http://docs.scala-lang.org/tutorials/tour/anonymous-function-syntax.html),被用于短的片段代码。
*全局单例对象里的静态方法。例如,你可以定义object MyFuntions,然后传递MyFunctions.func1,如下:

object MyFunctions {
def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意传递一个引用给一个类实例的一个方法(和一个单例object对照)是可能的,它需要传递包含类和方法的对象。例如:

class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

这里,如果你new了一个MyClass,并调用它的doStuff,方法内的map引用MyClass类实例的func1方法,所以整个object需要被发送到集群上。类似于这样写rdd.map(x => this.func1(x))。
相同的方式,访问外部对象的字段将引用整个对象:

class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

和rdd.map(x => this.field + x)是等价的。为了避免这个问题,最简单的方法是拷贝字段到一个局部变量而不是外部访问它。

def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}

理解集群

理解变量及方法的作用域和生命周期当在集群执行代码时是spark一个难点。作用域外修改变量的RDD操作是导致困惑的来源。在下面的例子中我们会看到使用foreach()增加计数的代码,但是其他别的操作也可能会发生类似的问题。

例子

根据执行是否发生在相同的JVM,下面naive RDD element sum 表现行为完全不同。一个常见的例子是local模式(--master = local
)下运行spark(相对于集群上部署应用(例如通过spark-submit部署到YARN))时:

scala:

var counter = 0
var rdd = sc.parallelize(data)

// 错误: 不要这样做!!(不同运行模式下表现行为不同,local模式在一个JVM中,集群模式却不是)
rdd.foreach(x => counter += x)

println("Counter value: " + counter)


本地VS集群模式

上面代码的行为是不明确的。本地模式下有一个JVM,上面代码会在RDD内累加values并存储在变量counter中。这是因为RDD和变量counter都在driver节点相同的内存空间中。
但是,在集群模式下,行为就更复杂,上面的代码可能不会像预期的那样工作。为了执行工作,spark会把RDD操作的过程变成多个任务—每个在一个executor上执行。在执行之前,spark计算闭包。为了executor在RDD上执行计算,这些变量和方法必须是可见的,这些变量和方法就是闭包。闭包被序列化并发送到每个executor。在本地模式下,只有一个executors,所以全部都共享相同的闭包。但在其他模式下,情况就不是这样的了,executors运行在不同的worker节点上,每个节点有它们自己的闭包拷贝。
送到每个executor的闭包里的变量是备份,因此在foreach函数里引用counter时,就不再是driver节点上的counter了。在driver节点内存里仍然有一个counter,但是对于executors是不可见的!executors只能看到序列化闭包的拷贝。因此,counter最后的值仍然会是0,因为在counter上所有的操作都指向序列化的闭包。
为了确保场景中明确定义的行为,应该使用一个accumulator累加器。当执行被分散到集群worker节点上时,spark中的accumulators被用来提供一种安全更新变量的机制。本指南的accumulators部分更详细讨论了这些。
总之,闭包—像loops或本地定义方法的结构,不应该用来改变一些全局状态。spark不会定义或保证引用自闭包外的对象的变化行为。这样做的一些代码可能会在本地模式下起作用,但那是偶然的并且这样的代码分布式模式下并不会表现出预期的行为。如果一些全局的聚合需要,使用accumulator。

打印RDD的元素

另一个常见的操作是试图使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。在一个单独的机器上,会生成期望的输出并打印出RDD所有的元素。但是,在集群模式下,executors会调用输出到stdout,这样返回会输出到executors的stdout,而不是driver上的,所以driver上的stdout不会显示那些元素。为了在driver上打印所有的元素,可以使用collect()方法首先把RDD收集到driver节点上,接着rdd.collect().foreach(println)。即使这样会导致driver内存溢出,因为collect()获取整个RDD到一个单独的机器上;如果你只是需要打印RDD的少许元素,一个更安全的方法是使用take():
rdd.take(100).foreach(println)。

使用键值对

scala:
尽管大多数spark操作可工作在包含任何类型对象的RDDs上,仍然有一些特定的操作只适用于键值对形式的RDDs。最常见的是分布式“shuffle”操作,例如根据key分类或聚合元素。
在scala中,在包含Tuple2(http://www.scala-lang.org/api/2.10.4/index.html#scala.Tuple2)对象(语言内置元组,通过简单的写(a,b)创建)的RDDs上这些操作会自动使用。在类PairRDDFunctions(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions)中键值对操作是可用的。
例如,如下代码在键值对上使用reduceByKey操作去计算文件中每行文本出现的次数。

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

例如,我们可以使用counts.sortByKey()按照字母顺序排序键值对,并且可以最后counts.collect()送回driver program作为一个对象数组。
注意:当在键值对操作中使用自定义对象作为key时,你必须确保自定义equals()方法伴随着一个匹配的hashCode()方法。更多细节,Object.hashCode()
documentation(http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode())

转换

下面表格列出了spark支持的常用转换。详细参考RDD API(scala(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD),java(http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html),python(http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD),R(http://spark.apache.org/docs/latest/api/R/index.html))文档和pair
RDD(scala(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),java(http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))方法文档。

转换 含义
map(func) func转换每个元素形成的一个新的分布式数据集

filter(func) 过滤出func返回true的元素形成的新的数据集

flatMap(func) 类似于map,但是每个输入元素可被映射为0或多个输出元素(所以func返回一个序列而不是单个的元素)

mapPartitions(func) 类似于map,但是RDD的每个分区(块)分别运行,所以当在类型T的RDD上运行时,func必须为类型Iterator<T> => Iterator<U>

mapPartitionsWithIndex(func) 类似于mapPartitions,但可提供一个整型值给func代表分区的索引,所以当在类型T的RDD上运行时,func必须为类型 (Int,Iterator<T>) => Iterator<U>

sample(withReplacement,fraction,seed) 可替换也可不替换,使用一个给定的随机数生成器seed,来采样部分数据的样品。

union(otherDataset) 返回一个新的RDD,为源数据集和参数集合的并集

distinct([numTasks]) 返回一个新的RDD,为源数据集中不重复的元素集合

intersection(otherDataset) 返回一个新的RDD,为源数据集和参数数据集的交集

groupByKey([numTasks]) 当在(K,V)键值对数据集上调用时,返回(K,Iterable<V>)键值对数据集。注意:如果你为了在每个key上执行一个聚 合(例如sum或average)而分类,使用reduceByKey或aggregateByKey会更好。注意:默认的,并行输出的水平取决于父
RDD的分区数。你可以传递可选参数numTasks设置不同的tasks数。

reduceByKey(func,[numTasks]) 在(K,V)键值对数据集上调用,返回一个新的(K,V)键值对数据集,它使用给定的reduce函数func对每个key的values 进行聚合操作,必须是类型(V,V)=> V(例如源数据集为(key,(value,1)),reduce函数功能为加操作,则对源
数据集每个元素的1进行加操作).就像groupByKey,reduce任务的数目是可用第二个参数配置的。

aggregateByKey(zeroValue)(seqOp,combOp, 在(K,V)键值对数据集上调用,返回一个新的(K,U)键值对数据集,使用combine函数聚合每个key的values值和一个
[numTasks]) “zero”值。为了避免不必要的分配,允许聚合的value类型不同于输入value类型。就像groupByKey,reduce任务的数 目是可用第二个参数配置的。

sortByKey([ascending],[numTasks]) 在要实现K排序的(K,V)键值对数据集上调用,返回以keys降序或升序排序的(K,V)键值对数据集。ascending控制降 序或升序。

join(otherDataset,[numTasks]) 在(K,V)和(K,W)类型数据集上调用,返回(K,(V,W))(相同key的聚集在一起)类型数据集。letfOuterJoin, rightOuterJoin和fullOuterJoin支持outer joins。

cogroup(otherDataset,[numTasks]) 在(K,V)和(K,W)类型数据集上调用,返回(K,(Iterable<V>,Iterable<W>))元组数据集。这种操作也可以调用 groupWith。

cartesian(otherDataset) 当在T和U类型上调用时,返回(T,U)键值对(所有元素的键值对)数据集。

pipe(command,[envVars]) 通过shell命令连通RDD每个分区,例如,一个perl或bash脚本。RDD元素写入到stdin,输出到stdout的行作为一个字符 RDD返回

coalesce(numPartitions) 减少RDD分区到numPartitions。对于一个filter处理后的大的数据集进行操作时,是很有用的。

repartition(numPartitions) 随机的重新洗牌RDD中的数据,去创建更多或更少的分区。会通过网络洗牌所有数据。

repartitionAndSortWithPartitions(partitioner) 根据给定的partitioner重新分区RDD,在每个结果分区内,根据它们的keys排序记录。因为它能够把排序交给shuffle machinery,这比调用repartition,并且之后在每个分区内排序更有效。

Actions算子

下面列出常用的一些spark支持的actions。详细参考RDD API(scala(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD),java(http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html),python(http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD),R(http://spark.apache.org/docs/latest/api/R/index.html))文档和pair
RDD(scala(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),java(http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))方法文档。

action 含义

reduce(func) 使用func函数聚合数据集的元素(变两个参数变为一个)。为了保证正确的并行计算,函数应该 是commutative
and associative(这里不理解)

collect() 作为一个数组返回数据集所有的元素到driver program。通常在filter或其他返回数据一个充分小子集操作之 后,collect是很有用的。

count() 返回数据集元素的数目

first() 返回数据集的第一个元素(类似于take(1))

take(n) 返回数据集前n个元素数组

takeSample(withReplacement)

未完待续···········
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: