Spark Quick Start
2015-09-11 17:09
459 查看
Spark Shell交互分析
基础
Spark的shell提供了一种简单的方式去学习Spark API和功能强大的数据分析工具。既可以使用Scala也可以使用Python。在Spark目录下启动Spark Shell:Scala
./bin/spark-sehll
Python
./bin/pyspark
Spark的基础抽象范式是一个分布式数据集合,即弹性分布式数据集(Resilient Distributed Dataset, RDD)。RDDs可以从Hadoop的HDFS文件创建,也可以通过其他RDDs转换。例如,从README文件中创建一个新的RDD:
Scala
scala> val textFile = sc.textFile(“README.md”) textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
Python
>>> textFile = sc.textFile("README.md")
RDDs包含有action和transformation,action返回value,transformation返回指向新的RDDs的指针。例如,创建新的actions:
Scala
scala> textFile.count() // RDD中条目的个数(README.md文件的行数) res0: Long = 126 scala> textFile.first() Res1: String = # Apache Spark
Python
>>> textFile.count() # Number of items in this RDD 126 >>> textFile.first() # First item in this RDD u'# Apache Spark'
下测试transformation的例子,使用filter transformation返回一个子集新的RDD:
Scala
scala> val linesWithSpark = textFile.filter(line => line.contains(“Spark”)) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
Python
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
我们也可以将action和transformation以函数式编程的样式连成串编写:
scala> textFile.filter(line => line.contains(“Spark”)).count() res3: Long = 15
RDD的更多操作
RDD的action和transformation可以用于更复杂的计算。例如,计算最长行的单词个数:Scala
scala> textFile.map(lilne => line.split(“ ”).size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
Python
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b) 15
首先通过map将每行映射为一个整数(每行单词的个数size),即创建了一个新的RDD。然后调用reduce查找个数最大的。map和reduce的参数是Scala函数语法(闭包),当然,可以使用任何Scala和Java的功能和库。例如,我们可以使用Math.max()函数使这个代码更容易理解:
Scala
scala> import java.lang.Math Import java.lang.Math scala> textFile.map(line => line.split(“ ”).size).reduce((a, b) => Math.max(ab)) res5: Int = 15
Python
>>> def max(a, b): ... if a > b: ... return a ... else: ... return b ... >>> textFile.map(lambda line: len(line.split())).reduce(max) 15
一个常见的数据流模式是Hadoop平台下的MapReduce,Spark可以轻松的实现MapReduce:
Scala
scala> val wordCounts = textFile.flatMap(line => line.split(“ ”)).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(String, Int)] = spark.ShuffledaggregatedRDD@71f027b8 scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Python
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) >>> wordCounts.collect() [(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]
缓存Caching
Spark也支持将数据集推送到内存cluster-wide的缓存中。当数据集频繁重复的访问时候,这是非常有用的,比如,查询一个小量的数据集或者执行类似PageRank迭代算法。举一个简单的例子,缓存标记的linesWithSpark数据集。Scala
scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082 scala> lineWithSpark.count() res8: Long = 19
Python
>>> linesWithSpark.cache() >>> linesWithSpark.count() 19 >>> linesWithSpark.count() 19
缓存对于很大的数据集也同样适用,即使运行在几十个、几百个节点上。
Self-Contained应用程序
假设我们希望用Spark API写一个Self-Contained应用程序,Scala会非常简单:
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
Python的例子:
"""SimpleApp.py""" from pyspark import SparkContext logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system sc = SparkContext("local", "Simple App") logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
相关文章推荐
- 深入消息机制(三) 结合MessageQueue 的JNI 层
- 13.GPUImage初探以及人脸检测
- UICollectionView
- 小试uiautomatorviewer
- 假设做一个循环滚动UIScrollView
- iOS编程——最简单的实现UITableView下拉时顶部图片放大消失的效果
- iOS小技巧--用runtime 解决UIButton 重复点击问题
- [Jenkins] Jenkins 执行 Composite 模式的 SoapUI Project
- UINavigationController基本使用
- request和response的中文乱码问题
- Android UI系列-----Dialog对话框
- Could not resolve com.android.tools.build:gradle:0.5.+
- easy ui 给表单元素赋值input,combobox,numberbox
- UIday1403:storyBoard 创建控制器 传值 sizeClasses
- HDU 5136 Yue Fei's Battle
- UIImageView 播放一组图片动画
- 【iOS】UIButton 图标在上文字在下
- UE4 HLSL Cross Compiler
- [Android UI开发] Android开源框架
- UE4 Material Custom Expressions