您的位置:首页 > 编程语言

Spark核心编程-分组取topN

2015-12-31 19:15 381 查看
案例需求

对每个班级内的学生成绩,取出前3名。(分组取topN)

输入测试数据(以“ ”以做分割符)

class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77
class1 98
class2 96


实现如下:

1 、scala的版本

package com.spark.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._

/**
 * @author Ganymede
 */
object GroupTop3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Top3").setMaster("local[1]")
    val sc = new SparkContext(conf)

    val lines = sc.textFile("D:/scala-eclipse/workspace/spark-study-java/src/main/resources/score.txt", 1)

    val pairs = lines.map { x =>
      {
        val splited = x.split(" ")
        (splited(0), splited(1).toInt)
      }
    }

    val groupedPairs = pairs.groupByKey();

    val top3Score = groupedPairs.map(classScores => {
      val top3 = Array[Int](-1, -1, -1)

      val className = classScores._1

      val scores = classScores._2

      for (score <- scores) {
        breakable {
          for (i <- 0 until 3) {
            if (top3(i) == -1) {
              top3(i) = score;
              break;
            } else if (score > top3(i)) {
              var j = 2
              while (j > i) {
                top3(j) = top3(j - 1);
                j = j - 1
              }
              top3(i) = score;
              break;
            }
          }
        }
      }
      (className, top3);
    })

    top3Score.foreach(x => {
      println(x._1)
      val res = x._2
      for (i <- res) {
        println(i)
      }
      println("==========================")
    })

  }
}


输出:

class1
98
95
90
==========================
class2
96
88
87
==========================


在实现group by 后的排序算法,用到了break函数.

scala没有提供类似于java的break语句。但是可以使用boolean类型变量、return或者Breaks的break函数来替代使用。

2、用spark-sql来实现

创建一个表

create table scores(className string, score int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
加载数据

load data local inpath '/opt/software/tmp/scores.data'  overwrite into table scores;


查询按班级分组并返回倒序的top3

select className,score from (SELECT className,score, Row_Number() OVER (partition by className ORDER BY score desc ) rank FROM scores ) a where a.rank<=3;
实际就是用了 row_number() over (partition by ... order by ...)的函数。同样hive也是支持的

3、总结:实际生产中,大部分还是用SQL来分析与统计的,明显方便一条SQL搞定了;而代码实现更灵活,便于性能的优化。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: