您的位置:首页 > 编程语言 > Java开发

UserView--第一种方式set去重,基于Spark算子的java代码实现

2017-03-05 23:24 639 查看

UserView--第一种方式set去重,基于Spark算子的java代码实现



测试数据



java代码

package com.hzf.spark.study;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

import scala.Tuple2;

public class UVAnalysis {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("UV_ANA").setMaster("local")
.set("spark.testing.memory", "2147480000");
@SuppressWarnings("resource")
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("userLog1");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
uvAnalyze(logRDD, broadcast);
}

private static void uvAnalyze(JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() {

private static final long serialVersionUID = 1L;

@Override
public Boolean call(String v1) throws Exception {
String actionParam = broadcast.value();
String action = v1.split("\t")[5];
return actionParam.equals(action);
}
});

JavaPairRDD<String, String> pairLogRDD = filteredLogRDD
.mapToPair(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, String> call(String val) throws Exception {
String pageId = val.split("\t")[3];
String userId = val.split("\t")[2];
return new Tuple2<String, String>(pageId, userId);
}
});

pairLogRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
String pageId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Set<String> userSets = new HashSet<>();
while (iterator.hasNext()) {
String userId = iterator.next();
userSets.add(userId);
}
System.out.println("PAGEID:" + pageId + "\t UV_COUNT:" + userSets.size());
}
});
}
}


  

result

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: