您的位置:首页 > 其它

Spark之RDD的Transformation操作

2017-11-23 22:55 387 查看
1.Java版本

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class TransformationOperator {
//Return a new distributed dataset formed by passing each element of the source through a function func.
public static void map(){
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
//1    local[*]  diaonao heshu
JavaSparkContext sc = new JavaSparkContext(conf);

List<String> list = Arrays.asList("kris", "kevin");
JavaRDD<String> listRDD = sc.parallelize(list);
//Function类里面的两个参数,一个是输入类型的参数,一个是输出类型的参数
JavaRDD<String> mapRDD = listRDD.map(new Function<String, String>() {
@Override
public String call(String s) throws Exception {
return "hello " + s;
}
});
mapRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.close();
}

//Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
public static  void  flatMap(){
SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list = Arrays.asList("you,jump", "i,jump");
JavaRDD<String> listRDD = sc.parallelize(list);
//FlatMapFunction<String, String>这里面的两个参数一个是输入类型的参数,一个是输出类型的参数
JavaRDD<String> flatMapRDD = listRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(",")).iterator();
}
});
flatMapRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.close();
}

//Return a new dataset formed by selecting those elements of the source on which func returns true.
public  static void filter(){
SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
JavaRDD<Integer> listRDD = sc.parallelize(list);
//Function<Integer, Boolean>第一个参数是输入类型的参数,第二个参数是返回类型的参数,第二个参数是true,则说明对应的这个输入参数是我们想要的
JavaRDD<Integer> filterRDD = listRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) throws Exception {
return integer % 2 == 0;
}
});
filterRDD.foreac
4000
h(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer+"");
}
});
sc.close();
}
/*
mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD,
so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T
*/

public  static  void  mapPartitions(){
SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
JavaRDD<Integer> listRDD = sc.parallelize(list, 2);
JavaRDD<String> resultRDD = listRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, String>() {
@Override
public Iterator<String> call(Iterator<Integer> integerIterator) throws Exception {
ArrayList<String> listResult = new ArrayList<>();
while (integerIterator.hasNext()) {
String result = "hello" + integerIterator.next();
listResult.add(result);
}

return listResult.iterator();
}

});
resultRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.close();

}

/*
mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition,
so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
*/
public static void mapPartitionsWithIndex(){
SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
JavaRDD<Integer> listRDD = sc.parallelize(list, 2);
JavaRDD<String> resultRDD = listRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer index, Iterator<Integer> integer) throws Exception {
ArrayList<String> list1 = new ArrayList<>();
while (integer.hasNext()) {
String element = integer.next() + "-->" + index;
list1.add(element);
}
return list1.iterator();
}
}, true);

resultRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.close();
}

/*
sample(withReplacement, fraction, seed)
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
*/
public  static  void  sample(){
SparkConf conf = new SparkConf().setAppName("sample").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 2, 3, 1,5,8,90,1,3,5,6,3,8,1,4, 5, 6, 7, 8);
JavaRDD<Integer> listRDD = sc.parallelize(list);
//withReplacement代表放回还是不放回,fraction代表是抽取的个数,seed代表种子数
JavaRDD<Integer> sample = listRDD.sample(true, 2, 1);
System.out.println(sample.take(2));
sc.close();
}

/*
union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.
*/
public static void union(){
SparkConf conf = new SparkConf().setAppName("union").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//List<String> list1 = Arrays.asList("first", "second", "third", "forth", "fifth");
List<Integer> list1 = Arrays.asList(1, 2, 4, 5, 8, 7);
List<Integer> list2 = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> list1RDD = sc.parallelize(list1);
JavaRDD<Integer> list2RDD = sc.parallelize(list2);
JavaRDD<Integer> unionRDD = list1RDD.union(list2RDD);
unionRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
sc.close();
}

/*
intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.
*/
public  static  void  intersection(){
SparkConf conf = new SparkConf().setAppName("intersection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list1 = Arrays.asList(1, 2, 4, 5, 8, 7);
List<Integer> list2 = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> list1RDD = sc.parallelize(list1);
JavaRDD<Integer> list2RDD = sc.parallelize(list2);
JavaRDD<Integer> intersectionRDD = list1RDD.intersection(list2RDD);
intersectionRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
sc.close();
}

/*
distinct([numTasks]))
Return a new dataset that contains the distinct elements of the source dataset.
*/
public  static  void  distinct(){
SparkConf conf = new SparkConf().setAppName("distinct").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 2, 4, 5, 8, 7,5,3,1,3,6,4);
JavaRDD<Integer> listRDD = sc.parallelize(list);
JavaRDD<Integer> distinctRDD = listRDD.distinct();
distinctRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
sc.close();
}

/*
groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
*/
public static  void groupByKey(){
SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, String>> list = Arrays.asList(
new Tuple2<String, String>("峨嵋", "周芷若"),
new Tuple2<String, String>("武当", "张无忌"),
new Tuple2<String, String>("峨嵋", "灭绝师太"),
new Tuple2<String, String>("武当", "宋清书")

);
List<Tuple2<Integer, String>> list1 = Arrays.asList(
new Tuple2<Integer, String>(1, "周芷若"),
new Tuple2<Integer, String>(2, "张无忌"),
new Tuple2<Integer, String>(1, "灭绝师太"),
new Tuple2<Integer, String>(2, "宋清书")

);
JavaPairRDD<Integer, String> listRDD1 = sc.parallelizePairs(list1);
JavaPairRDD<Integer, Iterable<String>> groupByKeyRDD = listRDD1.groupByKey();
groupByKeyRDD.foreach(new VoidFunction<Tuple2<Integer, Iterable<String>>>() {
@Override
public void call(Tuple2<Integer, Iterable<String>> integerIterableTuple2) throws Exception {
System.out.println(integerIterableTuple2._1+"----"+integerIterableTuple2._2);
}
});
JavaRDD<Tuple2<String, String>> listRDD = sc.parallelize(list);
//第二个参数让我们指定的是我们要按照分组的那个字段的数据类型
JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupByRDD = listRDD.groupBy(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> stringStringTuple2) throws Exception {
//这个返回的参数就是分组的依据
return stringStringTuple2._1;
}
});

groupByRDD.foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, String>>>>() {
@Override
public void call(Tuple2<String, Iterable<Tuple2<String, String>>> stringIterableTuple2) throws Exception {
// System.out.println("menpai: "+stringIterableTuple2._1+"-----"+"renwu: "+stringIterableTuple2._2));
}
});
sc.close();
}

/*
reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func,
which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
*/
public  static  void reduceByKey() {
SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("you", 1),
new Tuple2<String, Integer>("jump", 1),
new Tuple2<String, Integer>("i", 1),
new Tuple2<String, Integer>("jump", 1)
);
JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list);
JavaPairRDD<String, Integer> reduceByKeyRDD = listRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
reduceByKeyRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple2) throws Exception {
System.out.println(tuple2._1+"---"+tuple2._2);
}
});
sc.close();
}
/*
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type,
while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
*/
public  static  void  aggregateByKey(){
SparkConf conf = new SparkConf().setAppName("aggregateByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("you", 1),
new Tuple2<String, Integer>("jump", 1),
new Tuple2<String, Integer>("i", 1),
new Tuple2<String, Integer>("jump", 1)
);
JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list);
JavaPairRDD<String, Integer> aggregateByKeyRDD = listRDD.aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});

aggregateByKeyRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple2) throws Exception {
System.out.println(tuple2._1+"-----"+tuple2._2);
}
});
sc.close();
}

/*
sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a
dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
*/
public static  void sortByKey(){
SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list = Arrays.asList(
new Tuple2<Integer, String>(3, "周芷若"),
new Tuple2<Integer, String>(2, "张无忌"),
new Tuple2<Integer, String>(1, "灭绝师太"),
new Tuple2<Integer, String>(4, "宋清书")
);
JavaPairRDD<Integer, String> listRDD = sc.parallelizePairs(list);
JavaPairRDD<Integer, String> sortByKeyRDD = listRDD.sortByKey();
sortByKeyRDD.foreach(new VoidFunction<Tuple2<Integer, String>>() {
@Override
public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
System.out.println(integerStringTuple2._1+"---"+integerStringTuple2._2);
}
});
sc.close();
}

/*
join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs
with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
*/
public static  void  join(){
SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list1 = Arrays.asList(
new Tuple2<Integer, String>(1, "周芷若"),
new Tuple2<Integer, String>(2, "张无忌"),
new Tuple2<Integer, String>(3, "灭绝师太"),
new Tuple2<Integer, String>(4, "宋清书")
);
List<Tuple2<Integer, String>> list2 = Arrays.asList(
new Tuple2<Integer, String>(1, "峨嵋"),
new Tuple2<Integer, String>(2, "武当"),
new Tuple2<Integer, String>(3, "峨嵋"),
new Tuple2<Integer, String>(4, "武当")
);
JavaPairRDD<Integer, String> list1RDD = sc.parallelizePairs(list1);
JavaPairRDD<Integer, String> list2RDD = sc.parallelizePairs(list2);
JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = list1RDD.join(list2RDD);
joinRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, String>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, String>> integerTuple2Tuple2) throws Exception {
System.out.println(integerTuple2Tuple2._1+"---"+integerTuple2Tuple2._2._1+"++"+integerTuple2Tuple2._2._2);
}
});
sc.close();
}
/*
cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
*/
public  static
bdc8
void  cogroup(){
SparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list1 = Arrays.asList(
new Tuple2<Integer, String>(1, "周芷若"),
new Tuple2<Integer, String>(2, "张无忌"),
new Tuple2<Integer, String>(3, "灭绝师太"),
new Tuple2<Integer, String>(4, "宋清书")
);
List<Tuple2<Integer, String>> list2 = Arrays.asList(
new Tuple2<Integer, String>(1, "峨嵋"),
new Tuple2<Integer, String>(2, "武当"),
new Tuple2<Integer, String>(3, "峨嵋"),
new Tuple2<Integer, String>(4, "武当")
);
JavaPairRDD<Integer, String> list1RDD = sc.parallelizePairs(list1);
JavaPairRDD<Integer, String> list2RDD = sc.parallelizePairs(list2);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<String>>> cogroupRDD = list1RDD.cogroup(list2RDD);
cogroupRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<String>>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<String>>> integerTuple2Tuple2) throws Exception {
System.out.println(integerTuple2Tuple2._1+"---"+integerTuple2Tuple2._2._1+"++"+integerTuple2Tuple2._2._2);
}
});
sc.close();
}

/*
cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
*/
public  static  void  cartesian(){
SparkConf conf = new SparkConf().setAppName("cartesian").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list1 = Arrays.asList("a", "b");
List<Integer> list2 = Arrays.asList(1, 2);
JavaRDD<String> list1RDD = sc.parallelize(list1);
JavaRDD<Integer> list2RDD = sc.parallelize(list2);
JavaPairRDD<String, Integer> cartesianRDD = list1RDD.cartesian(list2RDD);
cartesianRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple2) throws Exception {
System.out.println(tuple2._1+"--"+tuple2._2);
}
});
sc.close();
}

/*
Decrease the number of partitions in the RDD to numPartitions.
Useful for running operations more efficiently after filtering down a large dataset.
*/
public  static  void  coalesce(){
SparkConf conf = new SparkConf().setAppName("coalesce").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 12, 0, 4, 25, 16, 7, 2);
JavaRDD<Integer> listRDD = sc.parallelize(list,4);
JavaRDD<Integer> coalesceRDD = listRDD.coalesce(2, true);
coalesceRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
sc.close();
}

/*
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network.
*/
public  static  void  repartition(){
SparkConf conf = new SparkConf().setAppName("repartition").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 12, 0, 4, 25, 16, 7, 2);
JavaRDD<Integer> listRDD = sc.parallelize(list);
JavaRDD<Integer> repartitionRDD = listRDD.repartition(3);
repartitionRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});

sc.close();

}

/*
repartitionAndSortWithinPartitions(partitioner)
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.
This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
*/
public  static  void   repartitionAndSortWithinPartitions(){
SparkConf conf = new SparkConf().setAppName("repartitionAndSortWithinPartitions").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list = Arrays.asList(
new Tuple2<Integer, String>(3, "周芷若"),
new Tuple2<Integer, String>(2, "张无忌"),
new Tuple2<Integer, String>(1, "灭绝师太"),
new Tuple2<Integer, String>(4, "宋清书")
);
JavaPairRDD<Integer, String> listRDD = sc.parallelizePairs(list);
JavaPairRDD<Integer, String> repartitionAndSortWithinPartitionsRDD = listRDD.repartitionAndSortWithinPartitions(new Partitioner() {
@Override
public int getPartition(Object key) {
return key.hashCode()%2;
}

@Override
public int numPartitions() {
return 4;
}
});

repartitionAndSortWithinPartitionsRDD.foreach(new VoidFunction<Tuple2<Integer, String>>() {
@Override
public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
System.out.println(integerStringTuple2._1+"---"+integerStringTuple2._2);
}
});

}

public  static  void  main(String[] args){
//map();
//flatMap();
//filter();
mapPartitions();
//mapPartitionsWithIndex();
// sample();
//union();
//intersection();
//distinct();
// groupByKey();
//reduceByKey();
//aggregateByKey();
// sortByKey();
// join();
// cogroup();
//cartesian();
// coalesce();
// repartition();
//repartitionAndSortWithinPartitions();

}
}


2.Scala版本

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object TransformationOperator {
private val conf = new SparkConf().setAppName("scalaTansformation").setMaster("local")
private val sc = new SparkContext(conf)

def map(): Unit ={
val array = Array("kris","kevin")
val arrayRDD = sc.parallelize(array)
val mapRDD = arrayRDD.map(f => "hello  "+f)
mapRDD.foreach(s=> println(s))
sc.stop()
}

def mapPartitions(): Unit ={
val array = Array("kris","kevin","wyf","fwdse","ouyangjing")
val arrayRDD = sc.parallelize(array)

var mapPartitionRDD =arrayRDD.mapPartitions(x=>{
val arr = x.toArray
var array1 =new ArrayBuffer[String]()
for(i <- arr) {
array1+="hello   "+i
}
array1.toIterator

})
mapPartitionRDD.foreach(f => println(f))

}

def  flatMap(): Unit ={
val array = Array("you,jump","i,jump")
val arrayRDD = sc.parallelize(array)
val flatMapRDD = arrayRDD.flatMap(f => f.split(","))
flatMapRDD.foreach(f =>println(f) )
}

def filter(): Unit ={
val array = Array(1,2,3,4,5,6,7,8)
val arrayRDD = sc.parallelize(array)
val filterRDD = arrayRDD.filter(f => f%2==0)
filterRDD.foreach(f => println(f))
}

def union(): Unit ={
val array1 = Array(1,2,3,4,5)
val array2 = Array(2,3,4,5,6,7)
val list1RDD = sc.parallelize(array1)
val list2RDD = sc.parallelize(array2)
val unionRDD = list1RDD.union(list2RDD)
unionRDD.foreach(f => println(f))
}

def intersection(): Unit ={
val array1 = Array(1,2,3,4,5)
val array2 = Array(2,3,4,5,6,7)
val list1RDD = sc.parallelize(array1)
val list2RDD = sc.parallelize(array2)
val intersectionRDD = list1RDD.intersection(list2RDD)
intersectionRDD.foreach(f => println(f))
}

def distinct(): Unit ={
val array = Array(1,2,3,4,5,2,3,4,5,6,7)
val listRDD = sc.parallelize(array)
val distinctRDD = listRDD.distinct()
distinctRDD.foreach(println)
}

def join(): Unit ={
val array1 = Array((1,"zhangwuji"),(2,"zhouzhiruo"),(3,"zhaomin"));
val array2 = Array((1,"wudang"),(2,"emei"),(3,"caoting"))
val array1RDD = sc.parallelize(array1)
val array2RDD = sc.parallelize(array2)
val joinRDD = array1RDD.join(array2RDD)
joinRDD.foreach(tuple => println(tuple._1+"--"+tuple._2._1+"+"+tuple._2._2))

}
def cogroup(): Unit ={
val array1 = Array((1,"zhangwuji"),(2,"zhouzhiruo"),(3,"zhaomin"));
val array2 = Array((1,"wudang"),(2,"emei"),(3,"caoting"))
val array1RDD = sc.parallelize(array1)
val array2RDD = sc.parallelize(array2)
val cogroupRDD = array1RDD.cogroup(array2RDD)
cogroupRDD.foreach(tuple => println(tuple._1+"--"+tuple._2._1+"+"+tuple._2._2))
}

def cartesian(): Unit ={
val array1 = Array("a","b")
val array2 = Array(1,2)
val array1RDD = sc.parallelize(array1)
val array2RDD = sc.parallelize(array2)
val cartesianRDD = array1RDD.cartesian(array2RDD)
cartesianRDD.foreach(tuple => println(tuple._1,tuple._2))
}
def reduceByKey(): Unit ={
val array = Array(("you",1),("jump",1),("i",1),("jump",1))
val arrayRDD = sc.parallelize(array)
val reduceByKeyRDD = arrayRDD.reduceByKey((x,y) => x+y)
reduceByKeyRDD.foreach(tuple => println(tuple._1+"--"+tuple._2))
}

def sortByKey(): Unit ={
val array =Array((2,"second"),(5,"fifth"),(1,"first"),(4,"forth"),(3,"third"))
val arrayRDD = sc.parallelize(array)
val sortByKeyRDD = arrayRDD.sortByKey(true,1)
sortByKeyRDD.foreach(tuple => println(tuple._1+"--"+tuple._2))
}

def groupByKey(): Unit ={
val array = Array((1,"zhangwuji"),(2,"zhouzhirou"),(1,"zhaomin"),(3,"miejueshitai"))
val arrayRDD = sc.parallelize(array)
val groupByKeyRDD = arrayRDD.groupByKey()
groupByKeyRDD.foreach(tuple =>  println(tuple._1+"--"+tuple._2))
}

def coalesce(): Unit ={
val array = Array(1, 12, 0, 4, 25, 16, 7, 2);
val arrayRDD = sc.parallelize(array,4)
val coalesceRDD = arrayRDD.coalesce(2,true)
coalesceRDD.foreach(println)

}

def main(args: Array[String]): Unit = {
// map()
// flatMap()
// filter()
//  mapPartitions()
//union()
//intersection()
//  distinct()
join()
cogroup()
//  reduceByKey()
//  sortByKey()
// groupByKey()
//  cartesian()
//  coalesce()

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: