您的位置:首页 > 编程语言 > Python开发

Spark1.4.1 快速入门

2015-09-06 16:29 507 查看
使用Spark Shell进行互动式分析

基础

更多RDD的actions

缓存

独立运行的应用程序

下一步该做什么

这个教程提供了一个使用Spark的快速介绍。我们首先会通过Spark的交互式脚本(使用Python或Scala)来介绍API,然后展示如何写使用Java,Scala和Python去写应用程序。更多完整的参考请参阅编程指南

依照这个教程,首先从Spark的官网下载一个Spark的发布包。因为我们将不再使用HDFS(Hadoop分布式文件系统),你可以下载任意一个版本的Hadoop包。

使用Spark Shell进行互动式分析

基础

Spark的脚本提供了一个简单的方式去学习API,同时也提供了一个强有力的工具去交互分析数据。无论是Scala(在Java虚拟机上运行现有的Java库是一个很好的方式)还是Python它都可以使用。在Spark的目录下使用下述方式开始运行:

./bin/pyspark


Spark基本的抽象是一个称为Resilient Distributed Dataset(RDD,弹性分布式数据集)的分布式集合。RDDs可以被Hadoop的InputFormats(诸如HDFS的文件等)创建,或者是由其他的RDDs转换而来。让我们通过Spark的源目录里的README文本文件来创建一个新的RDD:

>>> textFile = sc.textFile("README.md")


RDDs的actions可以返回一些值,并且transformations可以转换成新的RDDs并返回相应的指针。让我们开始使用几个actions:

>>> textFile.count() # Number of items in this RDD
126

>>> textFile.first() # First item in this RDD
u'# Apache Spark'


现在让我们使用transformation。我们将会使用transformation的filter返回一个新的RDD,这个RDD包含了README文件中的项目的子集。

>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)


我们可以把transformations和actions连在一起:

>>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"?
15


更多RDD的actions

RDD的actions和transformations可以应用在更复杂的计算中。比如说我们想找到单词数量最多的一行:

>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15


首先将每行都映射到一个整数值上并创建一个新的RDD。在这个新的RDD上调用reduce找到单词数量最大的那一行对应的那个数。map和reduce的参数是Python的匿名函数(lambdas),但是我们也可以传递任何我们想要传递的上层的Python函数。例如,我们将会定义一个max函数使代码更容易被理解:

>>> def max(a, b):
...     if a > b:
...         return a
...     else:
...         return b
...

>>> textFile.map(lambda line: len(line.split())).reduce(max)
15


One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:

MapReduce是Hadoop流行的一个通用的数据流模式。Spark可以很容易地实现MapReduce流:

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)


这里我们结合了flatMap,map和reduceByKey transformations来计算文件中每个单词的数量,它返回的RDD是一组(string, int)的键值对,我们可以使用collect这个行为去统计单词的数量:

>>> wordCounts.collect()
[(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]


缓存

Spark同样支持把数据集拉取到一个集群的内存缓存中。这在数据数据反复被访问的时候非常有用,例如查询一个小的“热”数据集或者当运行一个像网页排名这样的交互算法。让我们用一个简单的例子实现把linesWithSpark数据集标记在缓存中:

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
19

>>> linesWithSpark.count()
19


通过缓存100行的文本文件去研究Spark并且看起来很傻。真正让人感兴趣的部分是这些相同的函数可以被用在一个非常庞大的数据集中,甚至在数十个或上百个节点中交叉计算。你也可以通过bin/pyspark连接到一个集群来实现编程指南中描述的那些交互式操作。

独立运行的应用程序

如果说现在我们想使用Spark API写一个独立运行的应用程序。我们将通过使用Scala(用SBT),Java(用Maven)和python写一个简单的应用程序来进行学习。

现在我们将展示怎么使用Python API(PySpark)去写一个应用程序。

这个例子我们将会创建一个简单的Spark应用程序,SimpleApp.py:

"""SimpleApp.py"""
from pyspark import SparkContext

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print "Lines with a: %i, lines with b: %i" % (numAs, numBs)


这个程序只是计算文本文件中包含了‘a’和包含了‘b’的行数。你需要注意将 YOUR_SPARK_HOME 替换成你自己安装的Spark的路径。就像Scala和Java的例子,我们使用SparkContext去创建RDDs。我们可以传递Python的函数给Spark,Spark会自动地序列化任何它们引用的变量。由于应用程序会使用自定义的类或者第三方的库,我们也可以通过它的–py-files参数将它们打包到一个.zip文件(详细可参考spark-submit –help)中来添加代码的一些依赖到spark-submit。SimpleApp很简单,所以我们并不需要特别的代码依赖。

我们可以通过bin/spark-submit运行这个应用程序:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23


下一步该做什么

祝贺你的第一个Spark应用程序成功运行!

可以从Spark编程指南或者其他的组件开始深入了解API。

要在集群上的运行应用程序,可以前往部署概览

最后,Spark在examples目录包含了几个例子(Scala,Java,Python,R语言等)。你可以直接运行它们:

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark python