UserView--第一种方式set去重,基于Spark算子的java代码实现
2017-03-05 23:24
639 查看
UserView--第一种方式set去重,基于Spark算子的java代码实现
测试数据
java代码
package com.hzf.spark.study; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; public class UVAnalysis { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("UV_ANA").setMaster("local") .set("spark.testing.memory", "2147480000"); @SuppressWarnings("resource") JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRDD = sc.textFile("userLog1"); String str = "View"; final Broadcast<String> broadcast = sc.broadcast(str); uvAnalyze(logRDD, broadcast); } private static void uvAnalyze(JavaRDD<String> logRDD, final Broadcast<String> broadcast) { JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String v1) throws Exception { String actionParam = broadcast.value(); String action = v1.split("\t")[5]; return actionParam.equals(action); } }); JavaPairRDD<String, String> pairLogRDD = filteredLogRDD .mapToPair(new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String val) throws Exception { String pageId = val.split("\t")[3]; String userId = val.split("\t")[2]; return new Tuple2<String, String>(pageId, userId); } }); pairLogRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<String>> tuple) throws Exception { String pageId = tuple._1; Iterator<String> iterator = tuple._2.iterator(); Set<String> userSets = new HashSet<>(); while (iterator.hasNext()) { String userId = iterator.next(); userSets.add(userId); } System.out.println("PAGEID:" + pageId + "\t UV_COUNT:" + userSets.size()); } }); } }
result
相关文章推荐
- UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现
- 基于java的nio消息实现方式优缺点分析及示例代码说明
- 好友推荐—基于关系的java和spark代码实现
- 基于内容的图像检索(颜色,直方图相交法,)java实现代码
- 基于Java实现的Base64加密、解密原理代码
- Java实现基于内容的数字图像处理代码
- java分布式,基于开源框架实现消息方式
- java基于TCP的socket编程简单实现[代码实践过]
- [分布式java]基于JavaAPI实现消息方式的系统间通信:UDP/IP+NIO
- listView扩展2——java代码方式实现animation动画输出
- iOS 基于APNS消息推送原理与实现(包括JAVA后台代码)
- [分布式java]基于JavaAPI实现消息方式的系统间通信:TCP/IP+BIO
- 基于数论变换的大整数乘法Java代码实现
- java基于swing实现的五子棋游戏代码
- Java:基于Map实现的频率统计代码
- 基于OPENCV的CANNY边缘检测算子详细代码实现
- JAVA线程的应用实例(运用2种中断线程方式,基于实现进度条为例)
- JAVA线程的应用实例(运用2种中断线程方式,基于实现进度条为例)
- Java实现的基于socket通信的实例代码
- 实现了基于TCP的Java Socket编程实例代码