第45课 Spark 2.0实战之Dataset:map、flatMap、mapPartitions、dropDuplicate、coalesce、repartition等
2016-09-16 19:01
519 查看
第45课 Spark 2.0实战之Dataset:map、flatMap、mapPartitions、dropDuplicate、coalesce、repartition等
package com.dt.spark200
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
object DataSetsops {
case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DatasetOps")
.master("local")
.config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFScalaWorkspace_spark200/Spark200/spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val personDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json")
val personScoresDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\peopleScores.json")
val personDS = personDF.as[Person]
/* personDS.map{person=>
(person.name,if (person.age == null) 0 else person.age +100 )
}.show()
personDS.mapPartitions{persons =>
val result = ArrayBuffer[(String,Long)]()
while(persons.hasNext){
val person = persons.next()
result +=((person.name,person.age+10000))
}
result.iterator
}.show
personDS.dropDuplicates("name").show
personDS.distinct().show*/
println(personDS.rdd.partitions.size)
val repartitionDs= personDS.repartition(4)
println(repartitionDs.rdd.partitions.size)
val coalesced= repartitionDs.coalesce(2)
println(coalesced.rdd.partitions.size)
coalesced.show
// personDF.show()
// personDF.collect().foreach (println)
// println(personDF.count())
//val personDS = personDF.as[Person]
// personDS.show()
// personDS.printSchema()
//val dataframe=personDS.toDF()
/* personDF.createOrReplaceTempView("persons")
spark.sql("select * from persons where age > 20").show()
spark.sql("select * from persons where age > 20").explain()
*/
// val personScoresDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\peopleScores.json")
// personDF.join(personScoresDF,$"name"===$"n").show()
/* personDF.filter("age > 20").join(personScoresDF,$"name"===$"n").show()
personDF.filter("age > 20")
.join(personScoresDF,$"name"===$"n")
.groupBy(personDF("name"))
.agg(avg(personScoresDF("score")),avg(personDF("age")))
.explain()
//.show()
*/
while(true) {}
spark.stop()
}
}
运行结果
16/09/16 18:55:03 INFO CodeGenerator: Code generated in 299.363907 ms
1
16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:
16/09/16 18:55:03 INFO FileSourceStrategy: Post-Scan Filters:
16/09/16 18:55:03 INFO FileSourceStrategy: Pruned Data Schema: struct<age: bigint, name: string>
16/09/16 18:55:03 INFO FileSourceStrategy: Pushed Filters:
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 194.6 KB, free 413.5 MB)
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 20.5 KB, free 413.5 MB)
16/09/16 18:55:03 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.3.5:64223 (size: 20.5 KB, free: 413.9 MB)
16/09/16 18:55:03 INFO SparkContext: Created broadcast 5 from rdd at DataSetsops.scala:43
16/09/16 18:55:03 INFO FileSourceStrategy: Planning scan with bin packing, max size: 4194387 bytes, open cost is considered as scanning 4194304 bytes.
4
16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:
16/09/16 18:55:03 INFO FileSourceStrategy: Post-Scan Filters:
16/09/16 18:55:03 INFO FileSourceStrategy: Pruned Data Schema: struct<age: bigint, name: string>
16/09/16 18:55:03 INFO FileSourceStrategy: Pushed Filters:
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 194.6 KB, free 413.3 MB)
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 20.5 KB, free 413.3 MB)
16/09/16 18:55:03 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 192.168.3.5:64223 (size: 20.5 KB, free: 413.9 MB)
16/09/16 18:55:03 INFO SparkContext: Created broadcast 6 from rdd at DataSetsops.scala:45
16/09/16 18:55:03 INFO FileSourceStrategy: Planning scan with bin packing, max size: 4194387 bytes, open cost is considered as scanning 4194304 bytes.
1
16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:
package com.dt.spark200
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
object DataSetsops {
case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DatasetOps")
.master("local")
.config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFScalaWorkspace_spark200/Spark200/spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val personDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json")
val personScoresDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\peopleScores.json")
val personDS = personDF.as[Person]
/* personDS.map{person=>
(person.name,if (person.age == null) 0 else person.age +100 )
}.show()
personDS.mapPartitions{persons =>
val result = ArrayBuffer[(String,Long)]()
while(persons.hasNext){
val person = persons.next()
result +=((person.name,person.age+10000))
}
result.iterator
}.show
personDS.dropDuplicates("name").show
personDS.distinct().show*/
println(personDS.rdd.partitions.size)
val repartitionDs= personDS.repartition(4)
println(repartitionDs.rdd.partitions.size)
val coalesced= repartitionDs.coalesce(2)
println(coalesced.rdd.partitions.size)
coalesced.show
// personDF.show()
// personDF.collect().foreach (println)
// println(personDF.count())
//val personDS = personDF.as[Person]
// personDS.show()
// personDS.printSchema()
//val dataframe=personDS.toDF()
/* personDF.createOrReplaceTempView("persons")
spark.sql("select * from persons where age > 20").show()
spark.sql("select * from persons where age > 20").explain()
*/
// val personScoresDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\peopleScores.json")
// personDF.join(personScoresDF,$"name"===$"n").show()
/* personDF.filter("age > 20").join(personScoresDF,$"name"===$"n").show()
personDF.filter("age > 20")
.join(personScoresDF,$"name"===$"n")
.groupBy(personDF("name"))
.agg(avg(personScoresDF("score")),avg(personDF("age")))
.explain()
//.show()
*/
while(true) {}
spark.stop()
}
}
运行结果
16/09/16 18:55:03 INFO CodeGenerator: Code generated in 299.363907 ms
1
16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:
16/09/16 18:55:03 INFO FileSourceStrategy: Post-Scan Filters:
16/09/16 18:55:03 INFO FileSourceStrategy: Pruned Data Schema: struct<age: bigint, name: string>
16/09/16 18:55:03 INFO FileSourceStrategy: Pushed Filters:
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 194.6 KB, free 413.5 MB)
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 20.5 KB, free 413.5 MB)
16/09/16 18:55:03 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.3.5:64223 (size: 20.5 KB, free: 413.9 MB)
16/09/16 18:55:03 INFO SparkContext: Created broadcast 5 from rdd at DataSetsops.scala:43
16/09/16 18:55:03 INFO FileSourceStrategy: Planning scan with bin packing, max size: 4194387 bytes, open cost is considered as scanning 4194304 bytes.
4
16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:
16/09/16 18:55:03 INFO FileSourceStrategy: Post-Scan Filters:
16/09/16 18:55:03 INFO FileSourceStrategy: Pruned Data Schema: struct<age: bigint, name: string>
16/09/16 18:55:03 INFO FileSourceStrategy: Pushed Filters:
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 194.6 KB, free 413.3 MB)
16/09/16 18:55:03 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 20.5 KB, free 413.3 MB)
16/09/16 18:55:03 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 192.168.3.5:64223 (size: 20.5 KB, free: 413.9 MB)
16/09/16 18:55:03 INFO SparkContext: Created broadcast 6 from rdd at DataSetsops.scala:45
16/09/16 18:55:03 INFO FileSourceStrategy: Planning scan with bin packing, max size: 4194387 bytes, open cost is considered as scanning 4194304 bytes.
1
16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:
相关文章推荐
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset问题的分析与解决
- 大数据Spark “蘑菇云”行动第47课程 Spark 2.0实战之Dataset:collect_list、collect_set、avg、sum、countDistinct等
- Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues
- Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- 第44课:Spark 2.0编程实战之DataSet案例开发实战
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues
- 第43课:Spark 2.0编程实战之SparkSession、DataFrame、DataSet开发实战
- 第46课程 Spark 2.0实战之Dataset:sort、join、joinWith、randomSplit、sample、select、groupBy、agg、col等
- Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- spark coalesce和repartition的区别
- Spark 中 map 与 flatMap 的区别
- Spark 中 map 与 flatMap 的区别
- Spark RDD中Transformation的map、flatMap、mapPartitions、glom详解
- Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和collect
- 第35讲:List的map、flatMap、foreach、filter操作代码实战
- spark dataframe和dataSet用电影点评数据实战
- 【Spark Java API】Transformation(4)—coalesce、repartition