您的位置:首页 > 其它

RDD持久化、Spark广播、累加器笔记

2018-08-23 12:49 134 查看

RDD持久化:操作RDD时如何准确保存结果(action操作),cache和persist,checkpoint。
Spark广播:构建算法时,对降低网络传输的数据量,提高内存使用效率,加快程序运行速度很重要。
Spark累加器:全局的指针步减变量,只能增加累加器的内容,Executor中不能读累加器,Driver中可以读累加器。
一、RDD持久化

(一)action级别操作

scala> val numbers = sc.parallelize(1 to 100)
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> numbers.reduce(_+_)  //action
res0: Int = 5050

scala> val result = numbers.map(2*_)   //transformation
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23

scala> val data = result.collect()  //action
data: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200)

scala> numbers.count   //action
res1: Long = 100

scala> val topN = numbers.take(5)  //action
topN: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val scores = Array(Tuple2(1, 100), Tuple2(1, 100),Tuple2(2, 100),Tuple2(3, 100))
scores: Array[(Int, Int)] = Array((1,100), (1,100), (2,100), (3,100))

scala> val content = sc.parallelize(scores)
content: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:23

scala> val data = content.countByKey  //action
data: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 1, 3 -> 1)
//saveAstextFile()也是action操作

collect操作图解:

凡是action级别的操作都会触发sc.runjob。

(二)persist持久化
1. 某步骤计算比较耗时
2. 计算链条较长
3. checkpoint所在的RDD要持久化数据
checkpoint工作机制:lazy级别,触发一个job后,开始计算,计算结束后,调度框架发现RDD有checkpoint标记,框架本身基于checkpoint又会提交一个job。即checkpoint会触发一个新的job,故对checkpoint所在RDD进行持久化(在checkpoint之前进行持久化)。
4. shuffle之后进行持久化。因为shuffle进行了网络传输,风险较大,故进行persist,以应对数据丢失。
5. shuffle之前进行persist。该过程由框架默认把数据持久化到本地磁盘。因为如果不持久化,一旦shuffle出错,所有父RDD都要重新计算。

scala> sc.textFile("/usr/data").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_+_).count   //耗时1.517477s
res2: Long = 260

scala> sc.textFile("D:/spark-1.6.2-bin-hadoop2.6/README.md").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_+_).cache.count  //耗时0.666754s,cache后不能有其他任何算子出现
res3: Long = 260

scala> val cached = sc.textFile("D:/spark-1.6.2-bin-hadoop2.6/README.md").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_+_).cache  //cache是transformation操作,不是action
cached: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:21

scala> cached.count  //耗时0.461832s
res4: Long = 260

scala> cached.count  //耗时0.086841s,与res3产生相比速度大幅提升,就是因为res3产生命令,cache后跟了算子count,而cache后不能有其他任何算子出现
res5: Long = 260

scala> cached.unpersist()
18/08/23 16:22:26 INFO rdd.ShuffledRDD: Removing RDD 19 from persistence list
18/08/23 16:22:26 INFO storage.BlockManager: Removing RDD 19
res7: cached.type = ShuffledRDD[19] at reduceByKey at <console>:27

persist是lazy级别的操作,而unpersist是eager级别的,立即执行。
cache是persist的一个特殊情况。persist可以放内存,可以放磁盘,或同时存放在内存和磁盘,可有一份或多份副本。而cache只能存放在内存,且只有一份副本。persist可选则参数StorageLevel,若选择为只存储到内存,则为cache。
cache操作后,ShuffledRDD被缓存到内存中。

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
/**
* Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
* new storage levels.
*/
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)


IDEA快捷键: Ctrl+F查找文本,Ctrl+shift+N查找文件
二、Spark广播(Broadcast)
每个Task在运行时都会拷贝一份数据副本,更好的保证了数据一致性,但是消耗大量内存。当数据量很大时,极易出现内存溢出(OOM)。
Spark通过broadcast减少数据量消耗,减少通信,减少网络传输,进行同步。
广播变量只读,是全局变量,放到Executor(进程节点)的内存中。
广播是由Driver发给当前Application分配的所有Executor(进程节点)内存级别的全局只读变量,Executor(进程节点)中的线程池中的线程共享该全局变量,极大地减少了网络传输(否则的话每个Task都要传输一次该变量),并极大地节省了内存,也隐形地提高了CPU的有效工作。
Broadcast工作机制:

scala> val number = 10
number: Int = 10

scala> val broadcastNumber = sc.broadcast(number)
18/08/23 19:28:27 INFO storage.MemoryStore: Block broadcast_14 stored as values in memory (estimated size 40.0 B, free 229.1 KB)
18/08/23 19:28:27 INFO storage.MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 97.0 B, free 229.2 KB)
18/08/23 19:28:27 INFO storage.BlockManagerInfo: Added broadcast_14_piece0 in memory on 192.168.230.132:35438 (size: 97.0 B, free: 146.2 MB)
18/08/23 19:28:27 INFO spark.SparkContext: Created broadcast 14 from broadcast at <console>:29
broadcastNumber: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(14)

scala> val data = sc.parallelize(1 to 10000)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:27

scala> val bn = data.map(_*broadcastNumber.value)
bn: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at map at <console>:33

scala> bn.collect
res8: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 250, 260, 270, 280, 290, 300, 310, 320, 330, 340, 350, 360, 370, 380, 390, 400, 410, 420, 430, 440, 450, 460, 470, 480, 490, 500, 510, 520, 530, 540, 550, 560, 570, 580, 590, 600, 610, 620, 630, 640, 650, 660, 670, 680, 690, 700, 710, 720, 730, 740, 750, 760, 770, 780, 790, 800, 810, 820, 830, 840, 850, 860, 870, 880, 890, 900, 910, 920, 930, 940, 950, 960, 970, 980, 990, 1000, 1010, 1020, 1030, 1040, 1050, 1060, 1070, 1080, 1090, 1100, 1110, 1120, 1130, 1140, 1150, 1160, 1170, 1180, 1190, 1200, 1210, 1220, 1230, 1240, 1250, 1260, 1270, 1280, 1290, 1300, 1310, 1320, 1330, 1340, 1350, 1360, 1370, 1380, 1390, 1400, 1410, 1420, 1430, 1440, 1450, 1460, 1470...

三、累加器(Accumulator)
累加器:全局级别,且对于Executor只能修改,即只能则增加它的内容,不可读,只对Driver可读。获取全局唯一的状态对象。
累加器是全局唯一的,每次操作只增不减。

scala> val sum = sc.accumulator(0)
sum: org.apache.spark.Accumulator[Int] = 0

scala> val data = sc.parallelize(1 to 100)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:27

scala> data.foreach(item => sum += item)
scala> println(sum)
5050

scala> val data = sc.parallelize(1 to 5)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
scala> data.foreach(item => sum += item)
scala> println(sum)   //在前面运行结果上继续累加,全局唯一
5065

创建累加器后,在Task中对其进行修改。无论是哪个Task,修改的都是全局唯一的累加器,在每次计算时都计算对该操作的标记,在Driver访问时,对所有作用的值进行状态的读取。

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: