RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
2016-09-14 15:34
501 查看
partitionBy
def partitionBy(partitioner: Partitioner): RDD[(K, V)]该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21
scala> rdd1.partitions.size
res20: Int = 2
//查看rdd1中每个分区的元素
scala> rdd1.mapPartitionsWithIndex{
| (partIdx,iter) => {
| var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
| while(iter.hasNext){
| var part_name = "part_" + partIdx;
| var elem = iter.next()
| if(part_map.contains(part_name)) {
| var elems = part_map(part_name)
| elems ::= elem
| part_map(part_name) = elems
| } else {
| part_map(part_name) = List[(Int,String)]{elem}
| }
| }
| part_map.iterator
|
| }
| }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中
//使用partitionBy重分区
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23
scala> rdd2.partitions.size
res23: Int = 2
//查看rdd2中每个分区的元素
scala> rdd2.mapPartitionsWithIndex{
| (partIdx,iter) => {
| var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
| while(iter.hasNext){
| var part_name = "part_" + partIdx;
| var elem = iter.next()
| if(part_map.contains(part_name)) {
| var elems = part_map(part_name)
| elems ::= elem
| part_map(part_name) = elems
| } else {
| part_map(part_name) = List[(Int,String)]{elem}
| }
| }
| part_map.iterator
| }
| }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中
mapValues
def mapValues[U](f: (V) => U): RDD[(K, U)]同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。
scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[27] at makeRDD at :21
scala> rdd1.mapValues(x => x + "_").collect
res26: Array[(Int, String)] = Array((1,A_), (2,B_), (3,C_), (4,D_))
flatMapValues
def flatMapValues[U](f: (V) => TraversableOnce[U]): RDD[(K, U)]同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作。
scala> rdd1.flatMapValues(x => x + "_").collect
res36: Array[(Int, Char)] = Array((1,A), (1,_), (2,B), (2,_), (3,C), (3,_), (4,D), (4,_))
相关文章推荐
- RDD基本转换操作(7)–zipWithIndex、zipWithUniqueId
- UITextField 键盘上弹出"完成"字样
- easyui tree 点击文字打开或者闭合
- AndroidStudio-Eat-Guide—— 1.下载安装配置
- @RequestMaping的用法
- Coding 初级教程(一)——用GitHub的GUI客户端对Coding的项目进行管理
- mui日期选择器弹出两次的解决方法
- easyui datagrid 列拖动
- iOS开发 UILocalNotification 本地通知实现提醒功能
- android W/View: requestLayout() improperly called by xxxView
- StringBuilder的实现与技巧
- Codeforces Round #371 (Div. 2)(set\unique)
- 详解XMLHttpRequest(二)响应属性、二进制数据、监测上传下载进度
- UGUI内核大探究(五)Toggle与ToggleGroup
- 前端框架——AmazeUI学习
- HDU 5869 Different GCD Subarray Query rmq+离线+数状数组
- easyUI中combobox的使用方法
- 详解XMLHttpRequest(一)同步请求和异步请求
- Codeforces Round #371 (Div. 2)C. Sonya and Queries
- UIAlertView 按钮顺序