spark dataframe dataset reducebykey用法
2017-09-27 16:58
656 查看
case class Record(ts: Long, id: Int, value: Int) 如果是rdd,我们经常会用reducebykey获取到最新时间戳的一条记录,用下面的方法 def findLatest(records: RDD[Record])(implicit spark: SparkSession) = { records.keyBy(_.id).reduceByKey{ (x, y) => if(x.ts > y.ts) x else y }.values } 在dataset中可以用一下方法: import org.apache.spark.sql.functions._ val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*") 为什么可以这样操作呢?因为对于struct,或者tuple类型而言,max方法默认按照第一个元素进行排序处理 举个详细点的例子: import org.apache.spark.sql.functions._ val data = Seq( ("michael", 1, "event 1"), ("michael", 2, "event 2"), ("reynold", 1, "event 3"), ("reynold", 3, "event 4")).toDF("user", "time", "event") val newestEventPerUser = data .groupBy('user) .agg(max(struct('time, 'event)) as 'event) .select($"user", $"event.*") // Unnest the struct into top-level columns. scala> newestEventPerUser.show() +-------+----+-------+ | user|time| event| +-------+----+-------+ |reynold| 3|event 4| |michael| 2|event 2| +-------+----+-------+ 复杂一点可参考如下: case class AggregateResultModel(id: String, mtype: String, healthScore: Int, mortality: Float, reimbursement: Float) // assume that the rawScores are loaded behorehand from json,csv files val groupedResultSet = rawScores.as[AggregateResultModel].groupByKey( item => (item.id,item.mtype )) .reduceGroups( (x,y) => getMinHealthScore(x,y)).map(_._2) // the binary function used in the reduceGroups def getMinHealthScore(x : AggregateResultModel, y : AggregateResultModel): AggregateResultModel = { // complex logic for deciding between which row to keep if (x.healthScore > y.healthScore) { return y } else if (x.healthScore < y.healthScore) { return x } else { if (x.mortality < y.mortality) { return y } else if (x.mortality > y.mortality) { return x } else { if(x.reimbursement < y.reimbursement) return x else return y } } }
ref:https://stackoverflow.com/questions/41236804/spark-dataframes-reducing-by-key
相关文章推荐
- 【Spark系列2】reduceByKey和groupByKey区别与用法
- Spark DataFrame 的 groupBy vs groupByKey
- 【转载】Spark中:reduceByKey和groupByKey区别与用法
- 【Spark系列2】reduceByKey和groupByKey区别与用法
- Spark:reduceByKey函数的用法
- SparkStreaming之Transform、foreachRDD、updateStateByKey以及reduceByKeyAndWindow
- [Spark--基础]--聚合操作-reduceByKey、combineBykey、groupBy和AggregateByKey
- spark streaming updateStateByKey 用法
- Spark wordcount 编译错误 -- reduceByKey is not a member of RDD
- spark的aggregateByKey简单用法
- 深入理解Spark算子之 reduceByKey
- reduceByKey和groupByKey区别与用法
- spark streaming updateStateByKey 用法
- spark RDD,reduceByKey vs groupByKey
- spark--transform算子--reduceByKey
- spark新能优化之reduceBykey和groupBykey的使用
- Spark算子:RDD键值转换操作(3)–groupBy、keyBy、groupByKey、reduceByKey、reduceByKeyLocally
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- SparkStreaming找不到reduceByKey的解决方法
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)