您的位置:首页 > 产品设计 > UI/UE

Spark Quick Start

2015-09-11 17:09 459 查看

Spark Shell交互分析

基础

Spark的shell提供了一种简单的方式去学习Spark API和功能强大的数据分析工具。既可以使用Scala也可以使用Python。在Spark目录下启动Spark Shell:

Scala

./bin/spark-sehll


Python

./bin/pyspark


Spark的基础抽象范式是一个分布式数据集合,即弹性分布式数据集(Resilient Distributed Dataset, RDD)。RDDs可以从Hadoop的HDFS文件创建,也可以通过其他RDDs转换。例如,从README文件中创建一个新的RDD:

Scala

scala> val textFile = sc.textFile(“README.md”)
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3


Python

>>> textFile = sc.textFile("README.md")


RDDs包含有action和transformation,action返回value,transformation返回指向新的RDDs的指针。例如,创建新的actions:

Scala

scala> textFile.count() // RDD中条目的个数(README.md文件的行数)
res0: Long = 126
scala> textFile.first()
Res1: String = # Apache Spark


Python

>>> textFile.count() # Number of items in this RDD
126
>>> textFile.first() # First item in this RDD
u'# Apache Spark'


下测试transformation的例子,使用filter transformation返回一个子集新的RDD:

Scala

scala> val linesWithSpark = textFile.filter(line => line.contains(“Spark”))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09


Python

>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)


我们也可以将action和transformation以函数式编程的样式连成串编写:

scala> textFile.filter(line => line.contains(“Spark”)).count()
res3: Long = 15


RDD的更多操作

RDD的action和transformation可以用于更复杂的计算。例如,计算最长行的单词个数:

Scala

scala> textFile.map(lilne => line.split(“ ”).size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15


Python

>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15


首先通过map将每行映射为一个整数(每行单词的个数size),即创建了一个新的RDD。然后调用reduce查找个数最大的。map和reduce的参数是Scala函数语法(闭包),当然,可以使用任何Scala和Java的功能和库。例如,我们可以使用Math.max()函数使这个代码更容易理解:

Scala

scala> import java.lang.Math
Import java.lang.Math

scala> textFile.map(line => line.split(“ ”).size).reduce((a, b) => Math.max(ab))
res5: Int = 15


Python

>>> def max(a, b):
...     if a > b:
...         return a
...     else:
...         return b
...

>>> textFile.map(lambda line: len(line.split())).reduce(max)
15


一个常见的数据流模式是Hadoop平台下的MapReduce,Spark可以轻松的实现MapReduce:

Scala

scala> val wordCounts = textFile.flatMap(line => line.split(“ ”)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledaggregatedRDD@71f027b8

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)


Python

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
>>> wordCounts.collect()
[(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]


缓存Caching

Spark也支持将数据集推送到内存cluster-wide的缓存中。当数据集频繁重复的访问时候,这是非常有用的,比如,查询一个小量的数据集或者执行类似PageRank迭代算法。举一个简单的例子,缓存标记的linesWithSpark数据集。

Scala

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> lineWithSpark.count()
res8: Long = 19


Python

>>> linesWithSpark.cache()
>>> linesWithSpark.count()
19
>>> linesWithSpark.count()
19


缓存对于很大的数据集也同样适用,即使运行在几十个、几百个节点上。

Self-Contained应用程序

假设我们希望用Spark API写一个Self-Contained应用程序,Scala会非常简单:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}


Python的例子:

"""SimpleApp.py"""
from pyspark import SparkContext

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: