您的位置:首页 > 其它

Spark随谈——开发指南(译)

2013-08-11 00:00 971 查看

本文翻译自官方博客,略有添加:https://github.com/mesos/spark/wiki/Spark-Programming-Guide,谢谢师允tx的校正。希望能够给希望尝试Spark的朋友,带来一些帮助。目前的版本是0.5.0

系列文章见: Spark随谈 http://www.linuxidc.com/Linux/2013-08/88592.htm

Spark开发指南 

从高的层面来看,其实每一个Spark的应用,都是一个Driver类,通过运行用户定义的main函数,在集群上执行各种并发操作和计算

Spark提供的最主要的抽象,是一个弹性分布式数据集(RDD),它是一种特殊集合,可以分布在集群的节点上,以函数式编程操作集合的方式,进行各种各样的并发操作。它可以由hdfs上的一个文件创建而来,或者是Driver程序中,从一个已经存在的集合转换而来。用户可以将数据集缓存在内存中,让它被有效的重用,进行并发操作。最后,分布式数据集可以自动的从结点失败中恢复,再次进行计算。

Spark的第二个抽象,是并行计算中使用的共享变量。默认来说,当Spark并发运行一个函数时,它是以多个的task,在不同的结点上运行,它传递每一个变量的一个拷贝,到每一个独立task使用到的函数中,因此这些变量并非共享的。然而有时候,我们需要在任务中能够被共享的变量,或者在任务与驱动程序之间共享。Spark支持两种类型的共享变量:

广播变量: 可以在内存的所有结点中被访问,用于缓存变量(只读)

累加器: 只能用来做加法的变量,例如计数和求和

本指南通过一些样例展示这些特征。读者最好是熟悉Scala,尤其是闭包的语法。请留意,Spark可以通过Spark-Shell的解释器进行交互式运行。你可能会需要它。

接入Spark

为了写一个Spark的应用,你需要将Spark和它的依赖,加入到CLASSPATH中。最简单的方法,就是运行sbt/sbt assembly来编译Spark和它的依赖,打到一个Jar里面core/target/scala_2.9.1/spark-core-assembly-0.0.0.jar,然后将它加入到你的CLASSPATH中。或者你可以选择将spark发布到maven的本地缓存中,使用sbt/sbt publish。它将在组织org.spark-project下成为一个spark-core.

另外,你会需要导入一些Spark的类和隐式转换, 将下面几行加入到你程序的顶部

import spark.SparkContext

import SparkContext._

初始化Spark

写Spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉Spark如何访问一个集群。这个通常是通过下面的构造器来实现的:

new SparkContext(master, jobName, [sparkHome], [jars])

Master参数是一个字符串,指定了连接的Mesos集群,或者用特殊的字符串“local”来指明用local模式运行。如下面的描述一般,JobName是你任务的名称,当在集群上运行的时候,将会在Mesos的Web UI监控界面显示。后面的两个参数,是用在将你的代码,部署到mesos集群上运行时使用的,后面会提到。

在Spark的解释器中,一个特殊的SparkContext变量已经为你创建,变量名字叫sc。创建你自己的SparkContext是不会生效的。你可以通过设置MASTER环境变量,来让master连接到需要的上下文。

MASTER=local; ./spark-shell

Master的命名

Master的名字可以是以下3个格式中的一种

Master Name

Meaning

local

本地化运行Spark,使用一个Worker线程(没有并行)

 

local[K]

本地化运行Spark,使用K个Worker线程(根据机器的CPU核数设定)

 

HOST:PORT

将Spark连接到指定的Mesos Master,在集群上运行。Host参数是Mesos Master的Hostname, 端口是master配置的端口,默认为5050.

注意:在早期的Mesos版本(spark的old-mesos分支),你必须使用master@HOST:PORT.

集群部署

如果你想你的任务运行在一个集群上,你需要指定2个可选参数:

  • SparkHome:Spark在集群机器上的安装路径(必须全部一致)
  • Jars:在本地机器上,包含了你任务的代码和依赖的Jars文件列表。 Spark会把它们部署到所有的集群结点上。 你需要使用自己的编译系统将你的作业,打包成一套jars文件。例如,如果你使用sbt,那么sbt-assembly插件是一个好方法,将你的代码和依赖,变成一个单一的jar文件。

如果有一些类库是公用的,需要在不同的作业间共享,你可能需要手工拷贝到mesos的结点上,在conf/spark-env中,通过设置SPARK_CLASSPATH环境变量指向它们。详细信息可以参考配置

分布式数据集

Spark围绕的核心概念,是弹性分布式数据集(RDD),一个有容错机制,可以被并行操作的集合。目前有两种类型的RDD: 并行集合(Parrallelized Collections),接收一个已经存在的Scala集合,在它上面运行各种并发计算; Hadoop数据集(Hadoop DataSets),在一个文件的每条记录上,运行各种函数。只要文件系统是Hdfs,或者hadoop支持的任意存储系统。这两种RDD都可以通过相同的方式进行操作。

并行集合

并行集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合(只要是seq对象就可以)上创建而来。集合的对象将会被拷贝来创建一个分布式数据集,可以被并行操作。下面通过spark解释器的例子,展示如何从一个数组创建一个并发集合

scala> val data = Array(1, 2, 3, 4, 5)

data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)

distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

一旦被创建,分布数据集(distData)可以被并行操作。例如,我们可以调用distData.reduce(_ +_) 来将数组的元素相加。我们会在后续的分布式数据集做进一步描述。

创建并行集合的一个重要参数,是slices的数目,它指定了将数据集切分为几份。在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。然而,你也可以手动的设置它,通过parallelize方法的第二个参数(例如:sc.parallelize(data, 10)).

接下来请看第2页精彩内容: http://www.linuxidc.com/Linux/2013-08/88595p2.htm

推荐阅读:

Spark 0.7.0 发布,开源集群计算环境 http://www.linuxidc.com/Linux/2013-03/81990.htm

Spark 并行计算模型 http://www.linuxidc.com/Linux/2012-12/76490.htm

Spark,一种快速数据分析替代方案 https://www.geek-share.com/detail/2587150437.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark