Spark快速开始
2016-11-13 12:14
337 查看
本文主要介绍快速使用Spark的方法。首先通过Spark的交互式shell介绍相关的API,然后介绍如何使用Java、Scala和Python编写应用程序。
开始介绍之前需要从Spark website下载一个发行包。虽然我们不使用HDFS,但是可以下载任何版本的Hadoop包。
基于Spark Shell的交互式分析
Spark's shell 除了是一个强大的交互式分析数据的工具,也提供了相关API的简单学习方式。Scala和Python都支持它。在Spark安装路径下运行./bin/pyspark
Spark主要概念是一个叫做弹性的分布式数据集合-RDDs。RDDS能够通过Hadoop InputFormats或者其它RDDs来创建。下面代码是通过在Spark安装路径下的README文件创建一个新的RDD:
>>> textFile = sc.textFile("README.md")
RDDs有actions和transformations两种操作,actions返回值,transformations返回一个指向新RDDs的指针,下面的代码是用来统计textFile的单词数和显示第一个元素:
>>> textFile.count()
126
>>> textFile.first()
u"#Apache Spark"
filter是transformation的一种,它可以返回一个新的由文件元素子集合组成的RDD
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
我们可以将以上的动作合并到一行代码中
>>> textFile.filter(lambda line: "Spark" in line).count()
15
更多的RDD操作
RDD actions和transformations能够用于更复杂的计算中,下面的代码用来找出最多单词的函数
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15
第一个map创建一个元素为整数的RDD,然后该RDD调用reduce找出最大的数。
MapReduce是一个一般的数据流模式,就像流行的Hadoop那样,Spark能够独立的简单的完成MapReduce流
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
Caching
Spark支持将数据集合放入内存中缓存. 该功能有利于频繁的访问数据。
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
19
独立的应用程序
通过Spark的API我们能编写独立的应用程序,并且运行在Scala、Java和Python的环境中。
下面是Python的一个简单的应用程序:
"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
开始介绍之前需要从Spark website下载一个发行包。虽然我们不使用HDFS,但是可以下载任何版本的Hadoop包。
基于Spark Shell的交互式分析
Spark's shell 除了是一个强大的交互式分析数据的工具,也提供了相关API的简单学习方式。Scala和Python都支持它。在Spark安装路径下运行./bin/pyspark
Spark主要概念是一个叫做弹性的分布式数据集合-RDDs。RDDS能够通过Hadoop InputFormats或者其它RDDs来创建。下面代码是通过在Spark安装路径下的README文件创建一个新的RDD:
>>> textFile = sc.textFile("README.md")
RDDs有actions和transformations两种操作,actions返回值,transformations返回一个指向新RDDs的指针,下面的代码是用来统计textFile的单词数和显示第一个元素:
>>> textFile.count()
126
>>> textFile.first()
u"#Apache Spark"
filter是transformation的一种,它可以返回一个新的由文件元素子集合组成的RDD
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
我们可以将以上的动作合并到一行代码中
>>> textFile.filter(lambda line: "Spark" in line).count()
15
更多的RDD操作
RDD actions和transformations能够用于更复杂的计算中,下面的代码用来找出最多单词的函数
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15
第一个map创建一个元素为整数的RDD,然后该RDD调用reduce找出最大的数。
MapReduce是一个一般的数据流模式,就像流行的Hadoop那样,Spark能够独立的简单的完成MapReduce流
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
Caching
Spark支持将数据集合放入内存中缓存. 该功能有利于频繁的访问数据。
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
19
独立的应用程序
通过Spark的API我们能编写独立的应用程序,并且运行在Scala、Java和Python的环境中。
下面是Python的一个简单的应用程序:
"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
相关文章推荐
- Spark1.1.1官网文档翻译2快速开始
- spark streaming programming guide 快速开始(二)
- 快速上手写spark代码系列03:开始写一个spark小脚本(1)
- spark官方文档之——Quick Star 快速开始
- 快速开始使用Spark和Alluxio
- spark2.2官方教程笔记-快速开始
- Spark2.1.0官方文档:快速开始-Quick Start
- spark2.0 翻译:Quick Start 快速开始
- Spark-python-快速开始
- Spark快速开始
- 快速开始Spark
- Spark2.2.1官方文档翻译 快速开始(Quick Start)
- 《Drools6.4 中文文档》第18章18.2 Quick Start(快速开始)
- Apache Spark 2.2.0 中文文档 - 快速入门 | ApacheCN
- spark快速入门
- spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案
- Hibernate使用之快速开始
- appframework 快速开始
- jqmobi快速开始(翻译)
- AngularJS入门和快速开始