您的位置:首页 > 其它

【Spark1.6官方翻译】Spark快速入门

2016-01-28 19:45 309 查看
英文标题:Quick Start

英文原址:http://spark.apache.org/docs/latest/quick-start.html

Spark Version:1.6.0

使用Spark Shell进行交互分析
基础使用

复杂的RDD操作

缓存Caching

Spark应用程序

深入Spark

1. 使用Spark Shell进行交互分析

1. 基础使用

Spark Shell提供了建议的学习API的方式,是一个进行交互式数据分析的强大工具,它提供了Scala和Python的交互环境。

scala

./bin/spark-shell


python

./bin/pyspark


Spark的主要抽象是一个叫弹性分布式数据集(Resilient Distributed Dataset,RDD)的数据集合,RDD可以由Hadoop的输入格式(例如HDFS文件)创建或者由其他RDD转换而来。下例RDD由一个文件格式创建而来:

scala> val textFile = sc.textFile("/data/README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:27


RDD有动作-actions(返回值)和转换-transformations(返回指向新RDD的指针)两种操作:

动作举例

scala> val textFile.count() //此RDD中项-items的数量
res0: Long = 95
scala> textFile.first() //此RDD的第一个项
res1: String = # Apache Spark


转换举例

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:29


同时可以将动作和转换串起来执行:

scala> textFile.filter(line => line.contains("Spark")).count    //多少行包含单词‘Spark’
res2: Long = 17


2. 复杂的RDD操作

RDD的动作和转换可以用来进行更复杂的计算。

示例任务:找出README.md中包含单词数最多的行的单词数量

scala> textFile.map(line => line.split(" ").size).reduce((a,b) => if(a > b) a else b)
res3: Int = 14


首先将每一行映射(map)成一个整型值(创建了一个新的RDD),然后在此RDD上调用reduce找单词数量最大的。map和reduce的参数是Scala中函数,可以使用任意语言特征或Scala/Java库。例如,可以轻松的调用其他地方声明的函数,如下使用了Java中的Math.max()函数达到上述代码同样的效果:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a,b) => Math.max(a,b))
res4: Int = 14


一个比较通用的数据计算模型是MapReduce,Spark可以轻松的执行MapReduce,如下:


scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => a+b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:30


此处结合了flatMap,map,reduceByKey几个不同的转换计算文件中每一个单词的数量,得到一个(String,Int)对的RDD。接下来使用collection动作收集和呈现单词技术的结果:

scala> wordCounts.collect()
res5: Array[(String, Int)] = Array((package,1), (this,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (DataFrames,,1), (provides,1), (refer,2)...


3. 缓存(Caching)

Spark支持将数据集拉到集群的内存缓存中,当数据集被频繁的访问(如热点数据集或云心个交互式的算法例如PageRank)时,这个特性特别重要。请看示例:

scala> linesWithSpark.cache()
res6: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:29

scala> linesWithSpark.count()
res7: Long = 17

scala> linesWithSpark.count()
res8: Long = 17


示例中,将linesWithSpark这个RDD进行缓存(虽然看上去将一个100行左右的文件进行缓存很可笑,但是此操作同样可以应用于非常大的数据集之上,即使数据集跨上百个节点同样可以),然后对它进行了2次count操作。从http://localhost:4040/jobs/ 的作业完成来看,缓存之后第一次计算仍然需要0.1s,但是第二次计算由于已经缓存成功,所以所需时长为16ms。



2. Spark应用程序

假设我们希望使用Spark API写一个自包含的应用程序,下面示例使用Scala(sbt编译)实现(Java(Maven编译)、Python请跳转至官方处查看)。

scala

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
def main(args: Array[String]) {
val logFile = "/data/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}


应用程序必须顶一个一个main方法,而不是直接继承scala.App,因为scala.App的子类可能会运行错误。示例程序只计算了在README.md中包含字母a和b的总行数,此外,与上述的Spark-Shell示例不同之处在于,在应用程序中需要自行初始化SparkContext,在Spark Shell中已经初始化为sc。

给SparkContext构造器传递一个SparkConf对象作为参数,这个SparkConf对象包含了应用程序的一些信息,此程序基于Spark API,程序的运行目录结构中将包含一个simple.sbt文件用户解释Spark的依赖:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"


接下来,需要根据当前的目录结构部署SimpleApp.scala和simple.sbt,以使sbt正常运行,然后可以将此程序打包成JAR包,使用spark-submit脚本进行提交运行。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23


3. 深入Spark

API的深入学习,查看“Spark编程指南”

在集群上运行应用程序,查看“部署概述”

Spark示例(examples目录下,包括Scala、Java、Python、R),可运行如下:

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: