您的位置:首页 > 大数据

Spark学习2 【翻译】快速入门

2017-08-30 20:11 337 查看
本教程提供一个使用Spark的快速介绍。 首先,我们通过Spark交互shell(Python或Scala)介绍其API,然后演示如何使用Java、Scala和Python实现Spark应用。

跟随该指南,首先从Spark网站下载一个Spark发行包。因为我们不使用HDFS,所以你可以下载任何版本的Hadoop版本的包。

注意,在Spark2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。在Spark2.0之后,RDDs会被Dataset替代。Dataset是类似RDD的强类型,但是底层做了很多优化。RDD接口仍然会被支持,你可以通过RDD 编程指南获取到到完整的接口文档。然而,我们强烈建议你使用比RDD性能更好的Dataset。查看SQL编程指南获取更多关于Dataset的信息。

使用Spark Shell进行交互分析

基础

Spark Shell作为一个强大的交互式数据分析工具,提供了一种简单的方式来学习它的API。Spark Shell可以使用Scala(运行在Java VM上,适合运行已有的Java库)或Python来运行。在Spark目录中运行如下命令:

1
./bin/pyspark
Spark中最主要的抽象是弹性分部署集合Dataset。Dataset可以从Hadoop InputFormats创建,也可有由其它Dataset转换而来。由于Python的动态特性,在Python中Dataset不必是强类型的。因此,在Python中所有Dataset都是Dataset[Row],我们称之为DataFrame,它与Pandas和R语言中的数据框架概念是一致的。 让我们从Spark源目录中的README文件创建一个新的DataFrame。



1

>>> textFile = spark.read.text("README.md")


你可以通过调用接口直接获取DataFrame中的数据,或者转换DataFrame成一个新的DataFrame。详细信息请此查看API文档

1
>>> textFile.count()  # 该DataFrame的行数
2
126
3
4
>>> textFile.first()  # 该DataFrame的第一行
5
Row(value=u'# Apache Spark')
让我们将该DataFame转换为一个新的DataFrame。我们调用filter来过滤该文件中的部分行并转为一个新的DataFrame。



1

linesWithSpark = textFile.filter(textFile.value.contains("Spark"))


我们可以将所有的转换和动作串联在一起:

1
>>> textFile.filter(textFile.value.contains("Spark")).count()  # 多少行包含"Spark"
2
15

更多数Dataset操作Dataset actions和transformations可用于更复杂的计算。假设我们想找到单词数量最多的行:



1

>>> from pyspark.sql.functions import *


2

>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()


3

[Row(max(numWords)=15)]


首先将行映射为一个整数值,创建一个别名为numWords的新的DataFrame。 在该DataFrame上调用agg来找到单词的最大个数。select和agg的参数都是Column,我们可以使用df.colName来从一个DataFrame获取列。我们也可以导入pyspark.sql.functions,其提供了一系列简便的函数来实现一个旧的列创建一个新的列。

MapReduce是Hadoop中常用的数据流模式。Spark可也可以轻松实现:

1
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).as("word")).groupBy("word").count()
在这里,我们使用select中的explode函数来将行数据集转换为单词数据集, 然后结合使用groupBy和count来计算包含两列"word"和“count”的文件DataFrame中的每个单词的数目。我们可以使用collect来获取我们的shell中的单词数统计:



1

>>> wordCounts.collect()


2

[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]


缓存

Spark支持把数据集拉到集群的内存缓存中。当数据被重复访问时,这很有用。例如,当查询一个小的热点数据集或者当运行一个类似PageRank的交互式算法。作为一个简单的例子,我们把linesWithSpark数据集标记在缓存中:

1
>>> linesWithSpark.cache()
2
3
>>> linesWithSpark.count()
4
15
5
6
>>> linesWithSpark.count()
7
15
使用Spark来探索和缓存100行的文本数据,这看起来很傻。但是真正让人感兴趣的部分是,我们可以在非常大的数据集中使用同样的函数,甚至超过成百上千个节点。你同样可以使用bin/pyspark交互式连接到一个集群,具体详情查看RDD programming guide
独立的应用程序假设我们系统使用Spark API写一个独立的应用程序。我们 通过使用Scala(用sbt)、Java(用Maven)和Python来写一个简单的应用程序来学习下。现在我们将介绍如何使用Python API(PySpark)写一个应用程序。示例,我们将创建一个简单的Spark应用, SimpleApp.py。



1

"""SimpleApp.py"""


2

from pyspark.sql import SparkSession


3



4

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system


5

spark = SparkSession.builder().appName(appName).master(master).getOrCreate()


6

logData = spark.read.text(logFile).cache()


7



8

numAs = logData.filter(logData.value.contains('a')).count()


9

numBs = logData.filter(logData.value.contains('b')).count()


10



11

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))


12



13

spark.stop()


这个程序仅仅统计文本文件中包含'a'和'b'的行的数目。注意,你需要将YOUR_SPARK_HOME替换为Spark的安装路径。Scala和Java例子中,我们使用SparkSession创建数据集。如果应用依赖自定义或第三方库的引用,我们可以将代码依赖打包为一个.zip文件,通过--py-files参数添加到spark-submit中(具体查看spark-submit --help)。 SimpleApp足够简单,我们不需要指定任何的代码依赖。

译者注: 

    SimpleApp.py第5行中的SparkSession.builder()调用会失败,实际应该是SparkSession.builder.appName(appName).master(master).getOrCreate()。 builder不是SparkSession的一个方法,而是一个属性

我们可以使用bin/spark-submit脚本运行该程序:

1
# Use spark-submit to run your application
2
$ YOUR_SPARK_HOME/bin/spark-submit \
3
 --master local[4] \
4
 SimpleApp.py
5
...
6
Lines with a: 46, Lines with b: 23
下一步恭喜你成功运行你的第一个Spark应用程序!要深度了解API, 可以查看  RDD programming guide和 SQL programming guide
要在集群中运行应用程序,可以查看部署概要
最后,Spark在examples文件夹(Scala,Java, Python, R)中包含了一些示例。.你可以像下面这样运行:



1

# For Scala and Java, use run-example:


2

./bin/run-example SparkPi


3



4

# For Python examples, use spark-submit directly:


5

./bin/spark-submit examples/src/main/python/pi.py


6



7

# For R examples, use spark-submit directly:


8

./bin/spark-submit examples/src/main/r/dataframe.R
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 大数据