spark使用
2016-05-11 13:49
323 查看
1. 概述
目标读者本文档面向Spark应用开发人员,并要求用户具备一定的Java和Scala的开发经验。
简介
Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。 适用以下场景:
l 数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。
l 迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。
l 数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。
l 流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。
l 查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。
基本概念
l RDD
全名:弹性分布数据集(Resilient Distributed Dataset)
它是Spark的核心概念,指的是一个只读,可变分区的分布式数据集,该数据集的内容可以缓存在内存中,并在多次计算中间重用。
RDD的生成:
− 从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建 。
− 从父RDD转换得到新RDD。
− 从集合转换而来。
RDD的存储:
− 用户可以选择不同的存储级别(例如DISK_ONLY,MEMORY_AND_DISK)存储RDD以便重用。
− 当前RDD默认只存储于内存,当内存不足时,RDD也不会溢出到磁盘中。
l Dependency(RDD的依赖)
父RDD与子RDD之间的逻辑关系,可分为窄依赖和宽依赖两种。
图 RDD的依赖
− 窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,或者两个父RDD的分区对应于一个子RDD的分区。上图中,map/filter和union属于第一类前者,对输入进行协同划分(co-partitioned)的join属于第二类后者。
− 宽依赖:指子RDD的分区依赖于父RDD的所有分区,子RDD必须要通过shuffle类操作实现父RDD的数据全部重新分配,如上图中的groupByKey和未经协同划分的join。
窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个
fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线 (pipeline)优化。
l Transformation和Action(RDD的操作)
对RDD的操作包含Transformation(返回值还是一个RDD)和Action(返回值不是一个RDD)两种。RDD的操作流程如下图所示。其中Transformation操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformation操作时只会记录需要这样的操作,并不会去执行,需要等到有Action操作的时候才会真正启动计算过程进行计算。Action操作会返回结果或把RDD数据写到存储系统中。Action是触发Spark启动计算的动因。
图 RDD操作示例
然后,来看一个简单的例子,如下图所示。RDD看起来与Scala集合类型没有太大差别,但它们的数据和运行模型大相迥异。
a. textFile算子从HDFS读取日志文件,返回file(初始RDD)。
b. filter算子筛出带“ERROR”的行,赋给errors(新RDD)。filter算子为一个Transformation操作。
c. cache算子把它缓存下来以备未来使用。
d. count算子返回errors的行数。count算子为一个Action操作。
图1-3 Scala样例
Transformation操作可以分为如下几种类型:
− 视RDD的元素为简单元素。
输入输出一对一,且结果RDD的分区结构不变,主要是map。
输入输出一对多,且结果RDD的分区结构不变,如flatMap(map后由一个元素变为一个包含多个元素的序列,然后展平为一个个的元素)。
输入输出一对一,但结果RDD的分区结构发生了变化,如union(两个RDD合为一个,分区数变为两个RDD分区数之和)、coalesce(分区减少)。
从输入中选择部分元素的算子,如filter、distinct(去除重复元素)、subtract(本RDD有、它RDD无的元素留下来)和sample(采样)。
− 视RDD的元素为Key-Value对。
对单个RDD做一对一运算,如mapValues(保持源RDD的分区方式,这与map不同);
对单个RDD重排,如sort、partitionBy(实现一致性的分区划分,这个对数据本地性优化很重要);
对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;
对两个RDD基于key进行join和重组,如join、cogroup。
后三种操作都涉及重排,称为shuffle类操作。
Action操作可以分为如下几种:
− 生成标量,如count(返回RDD中元素的个数)、reduce、fold/aggregate(返回几个标量)、take(返回前几个元素)。
− 生成Scala集合类型,如collect(把RDD中的所有元素倒入 Scala集合类型)、lookup(查找对应key的所有值)。
− 写入存储,如与前文textFile对应的saveAsTextFile。
− 还有一个检查点算子checkpoint。当Lineage特别长时(这在图计算中时常发生),出错时重新执行整个序列要很长时间,可以主动调用 checkpoint把当前数据写入稳定存储,作为检查点。
l Shuffle
Shuffle是Spark框架中一个特定的phase,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。
下图清晰地描述了Shuffle算法的整个流程。
图 算法流程
l Spark Application的结构
Spark Application的结构可分为两部分:初始化SparkContext和主体程序。
− 初始化SparkContext:构建Spark Application的运行环境。
构建SparkContext对象,如:
new SparkContext(master, appName, [SparkHome], [jars])
参数介绍:
master:连接字符串,连接方式有local, yarn-cluster, yarn-client等
appName:构建的Application名称
SparkHome:集群中安装Spark的目录
jars:应用程序代码和依赖包
− 主体程序:处理数据
l Spark shell命令
Spark基本shell命令,支持提交Spark应用。命令为:
./bin/spark-submit \ --class <main-class> --master <master-url> \ ... # other options <application-jar> \ [application-arguments]
参数解释:
--class:Spark应用的类名
--master:Spark用于所连接的master,如yarn-client,yarn-cluster等
application-jar:Spark应用的jar包的路径
application-arguments:提交Spark应用的所需要的参数(可以为空)。
l Spark Web UI界面
用于监控正在运行的或者历史的Spark作业在Spark框架各个阶段的细节以及提供日志显示,帮助用户更细粒度地去开发、配置和调优作业。
相关文章推荐
- SpringMVC整合mybatis实例代码
- Android UI线程
- ViewPager中切换界面Fragment被销毁的问题
- 前端常用网址收集
- ActivityViewController 使用AirDrop分享
- mysql中指定数据排序
- OpenGL ES Shader相关API 总结【2】——执行绘制命令
- android:onTouch()和onTouchEvent()的区别?看完这篇文章就知道了
- android 慎用drawable中大图,造成内存溢出的解决方案
- 设计模式之三---装饰者设计模式
- 【Linux进阶】CentOS安装MySQL数据库
- OpenGL ES Shader相关API 总结【1】——传入绘制信息
- android视频播放器Media
- 列变位法解密-2016"百度之星" - 测试赛(热身,陈题)
- (转载)Kafka文件存储机制那些事
- 第五次c++作业
- js纯ajax
- STM32单片机实现中断后不继续向下执行而是返回到main函数
- iOS多线程编程之GCD
- Dynamics CRM2016 在实体命名时需要注意的事项