您的位置:首页 > 其它

Spark算子详解之reduceByKey_sample_take_takeSample_distinct_sortByKey_saveAsTextFile_intersection

2018-01-04 20:40 561 查看
 XML Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

package com.lyzx.day16

import org.apache.spark.{SparkContext, SparkConf}

class T2 {

  /**

   * reduceByKey  = groupByKey + reduce

   *从功能上讲相当于先做GroupByKey然后再做reduce操作但是比groupByKey+reduce操作效率要高

   * 即对于键值对的RDD来说,应用于所有的key相同的值做操作

   *

   * 由于reduceByKey是先做groupByKey后做reduce

   * 所以它包含shuffle操作(groupByKey包含shuffle操作)

   *前面说过

   *   功能上:reduceByKey  = groupByKey + reduce

   *   效率上:reduceByKey  > groupByKey + reduce

   * 原因是reduceByKey在map端自带Combiner

   * 例如wordCount例子中map端

   *  [("java",(1,1,1,1)),("c",(1,1,1,1))]

   *  如果在map做Combiner就像[("java",(4)),("c",(4))]

   *  在reduce端fatch时效率会高

   */

  def f1(sc:SparkContext): Unit ={

    val arr = List(1,2,3,4,5,5,4,3,2,1)

    //rdd中的数据如下[(1,2,3,4,5,5,4,3,2,1)]

    val rdd = sc.parallelize(arr)

    //mapRdd中的数据如下[((1,1),(2,2),(3,3),...(5,5),....,(1,1))]

    val mapRdd = rdd.map(item=>(item,item*10))

    val reduceRdd = mapRdd.reduceByKey(_+_)

    reduceRdd.foreach(println)

  }

  /**

   * sample方法用于随机的采样即随机的从rdd中取出一定个数的数量

   * 参数1 Boolean  取样的方式 是不是取完后放回去再取

   * 参数2 Double   取多少

   * 参数3 Double   种子

   */

  def f2(sc:SparkContext): Unit ={

    val arr = List(1,2,3,4,5,6,7,8,9,10)

    val rdd = sc.parallelize(arr)

    val sampleRdd = rdd.sample(false,0.5)

    sampleRdd.foreach(println)

  }

  /**

   * take 取出前N个元素

   */

  def f3(sc:SparkContext): Unit ={

    val arr = (1 to 10)

    val rdd = sc.parallelize(arr)

    val list = rdd.take(4)

    list.foreach(println)

  }

  /**

   * takeSample 随机的取N个元素

   * 参数1 随机取元素时要不要放回去再取

   * 参数2 随机取几个

   */

  def f4(sc:SparkContext): Unit ={

    val arr = (1 to 10)

    val rdd = sc.parallelize(arr)

    val list = rdd.takeSample(true,2)

    list.foreach(println)

  }

  /**

   *distinct是去重功能

   * 有shuffle操作

   * 其内部实现是通过reduceByKey实现

   */

  def f5(sc:SparkContext): Unit ={

    val arr1 = List(1 to 10)

    val arr2 = List(1 to 10)

    val rdd = sc.parallelize(arr1.++(arr2))

    rdd.foreach(println)

    println("=================================")

    rdd.distinct().foreach(println)

  }

  /**
sortByKey

   *通过对键排序

   */

  def f6(sc:SparkContext): Unit ={

    val rdd = sc.parallelize((1 to 10))

    val mapRdd = rdd.map(item=>(item,item))

    val sortRdd = mapRdd.sortByKey(false)

    sortRdd.foreach(println)

  }

  /**

   * 把计算结果保存为一个文本文件

   * 传入的路径必须是不存在否则会抛异常,试想一下如果计算了很久的结果被覆盖这会有多酸爽啊~~~

   * @param sc

   */

  def f7(sc:SparkContext): Unit ={

    val rdd = sc.parallelize((1 to 10))

    val mapRdd = rdd.map(item=>(item,item))

    mapRdd.saveAsTextFile("./k")

  }

  /**

   * intersection 交点

   * 求出两个RDD的交集

   * @param sc

   */

  def f8(sc:SparkContext): Unit ={

    val rdd1 = sc.parallelize((1 to 10))

    val rdd2 = sc.parallelize((5 to 15))

    rdd1.intersection(rdd2).foreach(println)

  }

}

object T2{

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("myTest").setMaster("local")

    val sc = new SparkContext(conf)

    val t = new T2()
//    t.f1(sc)
//    t.f2(sc)
//    t.f3(sc)
//    t.f4(sc)
//    t.f5(sc)
//     t.f6(sc)
//    t.f7(sc)

    t.f8(sc)

    sc.stop()

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: