您的位置:首页 > 产品设计 > UI/UE

spark官方文档之——Quick Star 快速开始

2015-06-24 16:44 417 查看
本教程提供怎么快速使用spark的介绍。我们首先通过交互式shell(用python或者scala)介绍API,然后展示怎样用java,scala和python写应用。更详尽的参考官方文档programming
guide。

首先需要下载spark,因为我们不会使用HDFS,所以你可以下载任何版本的hadoop。

spark shell交互式分析

基础

spark的shell提供简单的方式去学习API,同样也是一个强有力的交互式分析数据的工具。shell语句可以使用scala(运行在java 虚拟机上,因此能够很好的使用存在的java库)或者python。在spark目录下如下运行来启动shell:

scala:./bin/spark-shell python:./bin/pyspark

spark主要的抽象概念是一个称为弹性分布式数据集(RDD)的分布式的数据集合。RDDs能够从hadoop的inputFormats(例如HDFS文件)或者从别的RDDs创建而来。下面让我们从spark文件夹下的README文本文件创建一个RDD:

scala> val textFile = sc.textFile("README.md")

RDDs可进行actions算子操作,返回值;也可进行转换(transformations)操作,返回新的RDDs的指针。让我们进行一些actions操作:

scala>textFile.count()//RDD中的条目数

res0:Long = 126

scala> textFile.first()//RDD的第一个条目

res1: String = # Apache Spark

现在让我们进行转换算子操作。我们会使用filter转换,它会返回新的RDD(文件README条目的一个子集)。

scala>val linesWithSpark = textFile.filter(line => line.contains("Spark"))

我们也可以把转换和actions连接起来使用:

scala> textFile.filter(line => line.contains("Spark")).count()//包含“Spark”的行数

res3:Long = 15

更多RDD操作

RDD的actions算子和转换算子可以用作更为复杂的计算。如下找出单词最多的行的单词数:

scala: scala>textFile.map(line =>line.split(" ").size).reduce((a,b) => if (a>b) a else b)

res4: Long = 15

python:>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if(a>b) else b)

15

第一步对每一行进行maps得到一个整值,创建了一个新的RDD。reduce在此RDD上找出行单词数最大的值。map和reduce的参数为scala函数常量(闭包),并且可使用语言的任何特性或者scala/java库。例如,我们可以很容易的调用别处声明的函数。我们将使用Math.max()函数使代码更容易理解:

scala> import java.lang.Math

improt java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a,b) => Math.max(a,b))

res5: Int = 15

hadoop中极为流行的常见数据流模式为MapReduce。spark能够很简单的实现MapReduce流:

scala> val workCounts = textFile.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey((a,b) => a+b)

这里,我们结合flatMap,map和reduceByKey转换操作来计算(String,Int)类型键值对RDD的每个单词的频数。//把每行的每个单词变为(word,1)形式,然后value进行加

我们可以在shell中使用collect action算子来收集单词频数:

scala> wordCounts.collect()

res6: Array[(String,Int)]= Array((means,1),(under,2),(this,3),(Because,1),(Python,2),....)//收集到driver,这里显示出来频数统计的结果

缓存

spark也支持抽取数据集到集群范围的内存缓存中。这在数据被重复访问时是很有用的,例如当查询一个很小的“hot”数据集或者运行一个像PageRank的迭代算法时。作为一个简单的例子,让我们把linesWithSpark数据集缓存起来(多次进行count操作,若不缓存则每次都会从最初的RDD进行split等转换然后count action,缓存之后只会对最后一个RDD上进行count操作):

scala:scala> linesWithSpak.cache()

scala> linesWithSpark.count()

res8: Long = 19

scala> linesWithSpark.count()

res9: Long = 19

Python:>>> linesWithSpark.cache()

>>> linesWithSpark.count()

19

>>> linesWithSpark.count()

19

对100行的文本文件使用spark进行探索和缓存是愚蠢的。很大数据集上使用这些函数,即使这些数据集跨越数十或数百的节点。你也可以通过bin/pyspark连接到一个集群上进行交互操作,就像官方文档programming guide描述的那样。

自带的应用例程

现在我们想使用spark API写一个自带的应用。我们会分别用scala(用SBT),java(用Maven)和python来进行一个简单的应用。

scala:

我们用scala创建一个简单的spark应用。命名为SimpleApp.scala:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}

注意应用需定义一个main()方法而不是继承scala.App。scala.App的子类可能不能正确运行。

这个程序只是统计spark的README文件中包含“a”和“b”的行的数目。注意你需要替换YOUR_SPARK_HOME为spark的安装目录。和之前spark shell(初始化自己的SparkContext,直接使用sc)的例子不同,这个程序作为程序的一部分初始化SparkContext。

我们传递给SparkContext一个包含应用配置信息的对象SparkConf。我们的应用依赖于spark的API,所以我们也要include一个sbt配置文件:simple.sbt,表明依赖于spark,它也会添加一个spark依赖的版本库:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"

为了sbt能够正确运行,我们需要SimpleApp.scala和simple.sbt正确放置到文件夹下。一旦放置完毕,我们就可以创建包含应用代码的JAR包,接着就可以使用spark-submit脚本运行我们的程序了。

# 文件要如下放置
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# 打包为jar包
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# 使用spark-submit运行应用
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

(java和python版的没翻译,自行查阅)

下一步

祝贺你运行了第一个spark应用

*更深入的了解API请查阅 Spark
programming guide,或者其他组件的“programming guides”。

*在集群上运行应用,查阅deployment overview

*最后,spark在examples(scala,java,python,R)文件夹下有一些例程。你可以如下运行它们:

# 对于scala,java, 使用run-example:
./bin/run-example SparkPi

# 对于python例程, 直接使用spark-submit:
./bin/spark-submit examples/src/main/python/pi.py

# 对于R例程, 直接使用spark-submit:
./bin/spark-submit examples/src/main/r/dataframe.R
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: