Spark1.4.1 快速入门
2015-09-06 16:29
507 查看
使用Spark Shell进行互动式分析
基础
更多RDD的actions
缓存
独立运行的应用程序
下一步该做什么
这个教程提供了一个使用Spark的快速介绍。我们首先会通过Spark的交互式脚本(使用Python或Scala)来介绍API,然后展示如何写使用Java,Scala和Python去写应用程序。更多完整的参考请参阅编程指南。
依照这个教程,首先从Spark的官网下载一个Spark的发布包。因为我们将不再使用HDFS(Hadoop分布式文件系统),你可以下载任意一个版本的Hadoop包。
Spark基本的抽象是一个称为Resilient Distributed Dataset(RDD,弹性分布式数据集)的分布式集合。RDDs可以被Hadoop的InputFormats(诸如HDFS的文件等)创建,或者是由其他的RDDs转换而来。让我们通过Spark的源目录里的README文本文件来创建一个新的RDD:
RDDs的actions可以返回一些值,并且transformations可以转换成新的RDDs并返回相应的指针。让我们开始使用几个actions:
现在让我们使用transformation。我们将会使用transformation的filter返回一个新的RDD,这个RDD包含了README文件中的项目的子集。
我们可以把transformations和actions连在一起:
首先将每行都映射到一个整数值上并创建一个新的RDD。在这个新的RDD上调用reduce找到单词数量最大的那一行对应的那个数。map和reduce的参数是Python的匿名函数(lambdas),但是我们也可以传递任何我们想要传递的上层的Python函数。例如,我们将会定义一个max函数使代码更容易被理解:
One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
MapReduce是Hadoop流行的一个通用的数据流模式。Spark可以很容易地实现MapReduce流:
这里我们结合了flatMap,map和reduceByKey transformations来计算文件中每个单词的数量,它返回的RDD是一组(string, int)的键值对,我们可以使用collect这个行为去统计单词的数量:
通过缓存100行的文本文件去研究Spark并且看起来很傻。真正让人感兴趣的部分是这些相同的函数可以被用在一个非常庞大的数据集中,甚至在数十个或上百个节点中交叉计算。你也可以通过bin/pyspark连接到一个集群来实现编程指南中描述的那些交互式操作。
现在我们将展示怎么使用Python API(PySpark)去写一个应用程序。
这个例子我们将会创建一个简单的Spark应用程序,SimpleApp.py:
这个程序只是计算文本文件中包含了‘a’和包含了‘b’的行数。你需要注意将 YOUR_SPARK_HOME 替换成你自己安装的Spark的路径。就像Scala和Java的例子,我们使用SparkContext去创建RDDs。我们可以传递Python的函数给Spark,Spark会自动地序列化任何它们引用的变量。由于应用程序会使用自定义的类或者第三方的库,我们也可以通过它的–py-files参数将它们打包到一个.zip文件(详细可参考spark-submit –help)中来添加代码的一些依赖到spark-submit。SimpleApp很简单,所以我们并不需要特别的代码依赖。
我们可以通过bin/spark-submit运行这个应用程序:
可以从Spark编程指南或者其他的组件开始深入了解API。
要在集群上的运行应用程序,可以前往部署概览
最后,Spark在examples目录包含了几个例子(Scala,Java,Python,R语言等)。你可以直接运行它们:
基础
更多RDD的actions
缓存
独立运行的应用程序
下一步该做什么
这个教程提供了一个使用Spark的快速介绍。我们首先会通过Spark的交互式脚本(使用Python或Scala)来介绍API,然后展示如何写使用Java,Scala和Python去写应用程序。更多完整的参考请参阅编程指南。
依照这个教程,首先从Spark的官网下载一个Spark的发布包。因为我们将不再使用HDFS(Hadoop分布式文件系统),你可以下载任意一个版本的Hadoop包。
使用Spark Shell进行互动式分析
基础
Spark的脚本提供了一个简单的方式去学习API,同时也提供了一个强有力的工具去交互分析数据。无论是Scala(在Java虚拟机上运行现有的Java库是一个很好的方式)还是Python它都可以使用。在Spark的目录下使用下述方式开始运行:./bin/pyspark
Spark基本的抽象是一个称为Resilient Distributed Dataset(RDD,弹性分布式数据集)的分布式集合。RDDs可以被Hadoop的InputFormats(诸如HDFS的文件等)创建,或者是由其他的RDDs转换而来。让我们通过Spark的源目录里的README文本文件来创建一个新的RDD:
>>> textFile = sc.textFile("README.md")
RDDs的actions可以返回一些值,并且transformations可以转换成新的RDDs并返回相应的指针。让我们开始使用几个actions:
>>> textFile.count() # Number of items in this RDD 126 >>> textFile.first() # First item in this RDD u'# Apache Spark'
现在让我们使用transformation。我们将会使用transformation的filter返回一个新的RDD,这个RDD包含了README文件中的项目的子集。
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
我们可以把transformations和actions连在一起:
>>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"? 15
更多RDD的actions
RDD的actions和transformations可以应用在更复杂的计算中。比如说我们想找到单词数量最多的一行:>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b) 15
首先将每行都映射到一个整数值上并创建一个新的RDD。在这个新的RDD上调用reduce找到单词数量最大的那一行对应的那个数。map和reduce的参数是Python的匿名函数(lambdas),但是我们也可以传递任何我们想要传递的上层的Python函数。例如,我们将会定义一个max函数使代码更容易被理解:
>>> def max(a, b): ... if a > b: ... return a ... else: ... return b ... >>> textFile.map(lambda line: len(line.split())).reduce(max) 15
One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
MapReduce是Hadoop流行的一个通用的数据流模式。Spark可以很容易地实现MapReduce流:
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
这里我们结合了flatMap,map和reduceByKey transformations来计算文件中每个单词的数量,它返回的RDD是一组(string, int)的键值对,我们可以使用collect这个行为去统计单词的数量:
>>> wordCounts.collect() [(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]
缓存
Spark同样支持把数据集拉取到一个集群的内存缓存中。这在数据数据反复被访问的时候非常有用,例如查询一个小的“热”数据集或者当运行一个像网页排名这样的交互算法。让我们用一个简单的例子实现把linesWithSpark数据集标记在缓存中:>>> linesWithSpark.cache() >>> linesWithSpark.count() 19 >>> linesWithSpark.count() 19
通过缓存100行的文本文件去研究Spark并且看起来很傻。真正让人感兴趣的部分是这些相同的函数可以被用在一个非常庞大的数据集中,甚至在数十个或上百个节点中交叉计算。你也可以通过bin/pyspark连接到一个集群来实现编程指南中描述的那些交互式操作。
独立运行的应用程序
如果说现在我们想使用Spark API写一个独立运行的应用程序。我们将通过使用Scala(用SBT),Java(用Maven)和python写一个简单的应用程序来进行学习。现在我们将展示怎么使用Python API(PySpark)去写一个应用程序。
这个例子我们将会创建一个简单的Spark应用程序,SimpleApp.py:
"""SimpleApp.py""" from pyspark import SparkContext logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system sc = SparkContext("local", "Simple App") logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
这个程序只是计算文本文件中包含了‘a’和包含了‘b’的行数。你需要注意将 YOUR_SPARK_HOME 替换成你自己安装的Spark的路径。就像Scala和Java的例子,我们使用SparkContext去创建RDDs。我们可以传递Python的函数给Spark,Spark会自动地序列化任何它们引用的变量。由于应用程序会使用自定义的类或者第三方的库,我们也可以通过它的–py-files参数将它们打包到一个.zip文件(详细可参考spark-submit –help)中来添加代码的一些依赖到spark-submit。SimpleApp很简单,所以我们并不需要特别的代码依赖。
我们可以通过bin/spark-submit运行这个应用程序:
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23
下一步该做什么
祝贺你的第一个Spark应用程序成功运行!可以从Spark编程指南或者其他的组件开始深入了解API。
要在集群上的运行应用程序,可以前往部署概览
最后,Spark在examples目录包含了几个例子(Scala,Java,Python,R语言等)。你可以直接运行它们:
# For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R
相关文章推荐
- Python动态类型的学习---引用的理解
- Python3写爬虫(四)多线程实现数据爬取
- 垃圾邮件过滤器 python简单实现
- 下载并遍历 names.txt 文件,输出长度最长的回文人名。
- Spark RDD API详解(一) Map和Reduce
- install and upgrade scrapy
- Scrapy的架构介绍
- Centos6 编译安装Python
- 使用Python生成Excel格式的图片
- 让Python文件也可以当bat文件运行
- [Python]推算数独
- Python中zip()函数用法举例
- Python中map()函数浅析
- Python将excel导入到mysql中
- Spark随谈——开发指南(译)
- Python在CAM软件Genesis2000中的应用
- 使用Shiboken为C++和Qt库创建Python绑定
- FREEBASIC 编译可被python调用的dll函数示例