您的位置:首页 > 编程语言 > ASP

Java lambda 简化JavaSpark Core代码

2019-06-04 19:04 886 查看

直接上个写得很烂的小代码片段,慢慢体会。。。

SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("xxx")
.config("spark.mongodb.input.uri", "mongodb://xx.xx.xx.xx/xx.xx")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaMongoRDD<Document> mongoRDD = MongoSpark.load(sc);
//将数据压平
JavaRDD<Document> dataRDD = mongoRDD.flatMap(document -> {
List<Document> doc = (List<Document>) document.get("data");
return doc.iterator();
});
//过滤_id 为空的数据
JavaRDD<Tuple2> tupleRDD = dataRDD.filter(document -> document.get("_id") != null && ((List<String>) document.get("car")).size() > 0)
.map(document -> {
String str = ((List<String>) document.get("car")).get(0);
return new Tuple2(document.get("_id"), str);
});
//根据_id分组 聚合
JavaPairRDD<Object, Set<String>> tupleArrRDD = tupleRDD.groupBy(tuple2 -> tuple2._1).mapValues(arr -> {
Set<String> set = new HashSet();
for (Tuple2 tuple : arr) {
set.add((String) tuple._2);
}
return set;
});
//分区内部进行 jdbc操作数据库
List nullResult = tupleArrRDD.mapPartitions(tuple2Iterator -> {......});
.....
spark.srop
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: