您的位置:首页 > 移动开发

第45课 Spark 2.0实战之Dataset:map、flatMap、mapPartitions、dropDuplicate、coalesce、repartition等

2016-09-16 19:01 519 查看
第45课 Spark 2.0实战之Dataset:map、flatMap、mapPartitions、dropDuplicate、coalesce、repartition等

 

package com.dt.spark200

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer

object DataSetsops {

  case class Person(name:String,age:Long)

  def main(args: Array[String]): Unit = {

   

     val spark = SparkSession

       .builder()

       .appName("DatasetOps")

       .master("local")

       .config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFScalaWorkspace_spark200/Spark200/spark-warehouse")

       .getOrCreate()

      

  import spark.implicits._ 

  import org.apache.spark.sql.functions._

  val personDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json")

  val personScoresDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\peopleScores.json")

  val personDS = personDF.as[Person]

    

/*    personDS.map{person=>

      (person.name,if (person.age == null) 0 else person.age +100 )

         

     }.show()

    

     personDS.mapPartitions{persons =>

       val result = ArrayBuffer[(String,Long)]()

       while(persons.hasNext){

         val person = persons.next()

         result +=((person.name,person.age+10000))

       }

        result.iterator

     }.show

    

    

     personDS.dropDuplicates("name").show

     personDS.distinct().show*/

    

     println(personDS.rdd.partitions.size)

     val repartitionDs= personDS.repartition(4)

   println(repartitionDs.rdd.partitions.size)

   val coalesced= repartitionDs.coalesce(2)

     println(coalesced.rdd.partitions.size)

    coalesced.show

   

 

  // personDF.show()

 // personDF.collect().foreach (println)

 // println(personDF.count())

  

  //val personDS = personDF.as[Person]

 // personDS.show()

 // personDS.printSchema()

  //val dataframe=personDS.toDF()

  

 /* personDF.createOrReplaceTempView("persons")

  spark.sql("select * from persons where age > 20").show()

    spark.sql("select * from persons where age > 20").explain()

  */

 //  val personScoresDF= spark.read.json("G:\\IMFBigDataSpark2016\\spark-2.0.0-bin-hadoop2.6\\examples\\src\\main\\resources\\peopleScores.json")

 // personDF.join(personScoresDF,$"name"===$"n").show()

 /*  personDF.filter("age > 20").join(personScoresDF,$"name"===$"n").show()

 

  personDF.filter("age > 20")

   .join(personScoresDF,$"name"===$"n")

   .groupBy(personDF("name"))

   .agg(avg(personScoresDF("score")),avg(personDF("age")))

   .explain()

   //.show()

  

   */

   

  while(true) {}

    spark.stop()

  }

}

 

运行结果

 

16/09/16 18:55:03 INFO CodeGenerator: Code generated in 299.363907 ms

1

16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:

16/09/16 18:55:03 INFO FileSourceStrategy: Post-Scan Filters:

16/09/16 18:55:03 INFO FileSourceStrategy: Pruned Data Schema: struct<age: bigint, name: string>

16/09/16 18:55:03 INFO FileSourceStrategy: Pushed Filters:

16/09/16 18:55:03 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 194.6 KB, free 413.5 MB)

16/09/16 18:55:03 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 20.5 KB, free 413.5 MB)

16/09/16 18:55:03 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.3.5:64223 (size: 20.5 KB, free: 413.9 MB)

16/09/16 18:55:03 INFO SparkContext: Created broadcast 5 from rdd at DataSetsops.scala:43

16/09/16 18:55:03 INFO FileSourceStrategy: Planning scan with bin packing, max size: 4194387 bytes, open cost is considered as scanning 4194304 bytes.

4

16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:

16/09/16 18:55:03 INFO FileSourceStrategy: Post-Scan Filters:

16/09/16 18:55:03 INFO FileSourceStrategy: Pruned Data Schema: struct<age: bigint, name: string>

16/09/16 18:55:03 INFO FileSourceStrategy: Pushed Filters:

16/09/16 18:55:03 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 194.6 KB, free 413.3 MB)

16/09/16 18:55:03 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 20.5 KB, free 413.3 MB)

16/09/16 18:55:03 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 192.168.3.5:64223 (size: 20.5 KB, free: 413.9 MB)

16/09/16 18:55:03 INFO SparkContext: Created broadcast 6 from rdd at DataSetsops.scala:45

16/09/16 18:55:03 INFO FileSourceStrategy: Planning scan with bin packing, max size: 4194387 bytes, open cost is considered as scanning 4194304 bytes.

1

16/09/16 18:55:03 INFO FileSourceStrategy: Pruning directories with:

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐