您的位置:首页 > 其它

map 与 flatMap 的区别

2015-07-16 16:59 375 查看
今天写程序遇到取数据时用map报错,用flatmap就可以正常使用,想分析其原因如下:

通过sc.textFile取文件后先map切割:

scala> val mrdd = line.map(_.split(" "))
mrdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:23

查看map函数的返回值——文件中的每一行数据返回了一个数组对象:
scala> mrdd.collect
15/07/16 16:52:30 INFO FileInputFormat: Total input paths to process : 1
15/07/16 16:52:30 INFO SparkContext: Starting job: collect at <console>:26
15/07/16 16:52:30 INFO DAGScheduler: Got job 0 (collect at <console>:26) with 2 output partitions (allowLocal=false)
15/07/16 16:52:30 INFO DAGScheduler: Final stage: ResultStage 0(collect at <console>:26)
15/07/16 16:52:30 INFO DAGScheduler: Parents of final stage: List()
15/07/16 16:52:30 INFO DAGScheduler: Missing parents: List()
15/07/16 16:52:30 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at map at <console>:23), which has no missing parents
15/07/16 16:52:30 INFO MemoryStore: ensureFreeSpace(3304) called with curMem=97607, maxMem=278302556
15/07/16 16:52:30 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.2 KB, free 265.3 MB)
15/07/16 16:52:30 INFO MemoryStore: ensureFreeSpace(1873) called with curMem=100911, maxMem=278302556
15/07/16 16:52:30 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1873.0 B, free 265.3 MB)
15/07/16 16:52:30 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.21.17.201:56535 (size: 1873.0 B, free: 265.4 MB)
15/07/16 16:52:30 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874
15/07/16 16:52:30 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[4] at map at <console>:23)
15/07/16 16:52:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/07/16 16:52:30 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.21.17.201, PROCESS_LOCAL, 1407 bytes)
15/07/16 16:52:30 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.21.17.201, PROCESS_LOCAL, 1407 bytes)
15/07/16 16:52:31 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.21.17.201:15802 (size: 1873.0 B, free: 265.0 MB)
15/07/16 16:52:31 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.21.17.201:62664 (size: 1873.0 B, free: 265.0 MB)
15/07/16 16:52:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.21.17.201:15802 (size: 16.8 KB, free: 265.0 MB)
15/07/16 16:52:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.21.17.201:62664 (size: 16.8 KB, free: 265.0 MB)
15/07/16 16:52:32 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2142 ms on 10.21.17.201 (1/2)
15/07/16 16:52:32 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2130 ms on 10.21.17.201 (2/2)
15/07/16 16:52:32 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/07/16 16:52:32 INFO DAGScheduler: ResultStage 0 (collect at <console>:26) finished in 2.152 s
15/07/16 16:52:32 INFO DAGScheduler: Job 0 finished: collect at <console>:26, took 2.315735 s
res2: Array[Array[String]] = Array(Array(CHAPTER, I), Array(THERE, was, no, possibility, of, taking, a, walk, that, day., We, had, been, wandering,, indeed,, in, the, leafless, shrubbery, an, hour, in, the, morning;, but, since, dinner, (Mrs., Reed,, when, there, was, no, company,, dined, early), the, cold, winter, wind, had, brought, with, it, clouds, so, sombre,, and, a, rain, so, penetratin再通过flatMap函数返回的RDD:
scala> val frdd = line.flatMap(_.split(" "))
frdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at flatMap at <console>:23

查看flatMap函数的返回值——文件中的所有行数据仅返回了一个数组对象:
scala> frdd.collect
15/07/16 16:54:00 INFO SparkContext: Starting job: collect at <console>:26
15/07/16 16:54:00 INFO DAGScheduler: Got job 1 (collect at <console>:26) with 2 output partitions (allowLocal=false)
15/07/16 16:54:00 INFO DAGScheduler: Final stage: ResultStage 1(collect at <console>:26)
15/07/16 16:54:00 INFO DAGScheduler: Parents of final stage: List()
15/07/16 16:54:00 INFO DAGScheduler: Missing parents: List()
15/07/16 16:54:00 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at flatMap at <console>:23), which has no missing parents
15/07/16 16:54:00 INFO MemoryStore: ensureFreeSpace(3360) called with curMem=102784, maxMem=278302556
15/07/16 16:54:00 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.3 KB, free 265.3 MB)
15/07/16 16:54:00 INFO MemoryStore: ensureFreeSpace(1888) called with curMem=106144, maxMem=278302556
15/07/16 16:54:00 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1888.0 B, free 265.3 MB)
15/07/16 16:54:00 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.21.17.201:56535 (size: 1888.0 B, free: 265.4 MB)
15/07/16 16:54:00 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:874
15/07/16 16:54:00 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at flatMap at <console>:23)
15/07/16 16:54:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/07/16 16:54:00 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 10.21.17.201, PROCESS_LOCAL, 1407 bytes)
15/07/16 16:54:00 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 10.21.17.201, PROCESS_LOCAL, 1407 bytes)
15/07/16 16:54:00 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.21.17.201:62664 (size: 1888.0 B, free: 265.0 MB)
15/07/16 16:54:00 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.21.17.201:15802 (size: 1888.0 B, free: 265.0 MB)
15/07/16 16:54:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.21.17.201:56535 in memory (size: 1873.0 B, free: 265.4 MB)
15/07/16 16:54:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.21.17.201:62664 in memory (size: 1873.0 B, free: 265.0 MB)
15/07/16 16:54:01 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 318 ms on 10.21.17.201 (1/2)
15/07/16 16:54:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.21.17.201:15802 in memory (size: 1873.0 B, free: 265.0 MB)
15/07/16 16:54:01 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 369 ms on 10.21.17.201 (2/2)
15/07/16 16:54:01 INFO DAGScheduler: ResultStage 1 (collect at <console>:26) finished in 0.370 s
15/07/16 16:54:01 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/07/16 16:54:01 INFO DAGScheduler: Job 1 finished: collect at <console>:26, took 0.399414 s
res3: Array[String] = Array(CHAPTER, I, THERE, was, no, possibility, of, taking, a, walk, that, day., We, had, been, wandering,, indeed,, in, the, leafless, shrubbery, an, hour, in, the, morning;, but, since, dinner, (Mrs., Reed,, when, there, was, no, company,, dined, early), the, cold, winter, wind, had, brought, with, it, clouds, so, sombre,, and, a, rain, so, penetrating,, that, further, outdoor, exercise, was, now, out, of, the, question., I, was, glad, of, it:, I, never, liked, long, walks,, especially, on, chilly, afternoons:, dreadful, to, me, was, the, coming, home, in, the, raw, twilight,, with, nipped, fingers, and, toes,, and, a, heart, sad

结论:

- Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;

- 而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:

   操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象

   操作2:最后将所有对象合并为一个对象
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: