您的位置:首页 > 其它

基于Spark实时计算商品关注度

2017-05-07 10:00 453 查看


基于Spark实时计算商品关注度


一、实验介绍


1.1 内容简介

处于网络时代的我们,随着 O2O 的营销模式的流行,越来越多的人开始做起了电商。
与此同时也产生了许多网络数据,然而这些数据有什么用呢。比如说一个电商公司可以根据一个商品被用户点击了多少次,用户停留时间是多久,用户是否收藏了该商品。这些都是可以被记录下来的。通过这些数据我们就能分析出这段时间内哪些商品最受普遍人们的关注。同时也可以针对这些数据进行用户商品推荐。
后面我们将使用Scoket来模拟用户浏览商品产生实时数据,数据包括用户当前浏览的商品以及浏览商品的次数和停留时间和是否收藏该商品。使用Spark Streaming构建实时数据处理系统,来计算当前电商平台最受人们关注的商品是哪些。


1.2 实验知识点

Java Scoket编程的基本原理

Spark Streaming应用的基本实现

DStream的基本操作

updateStateByKey的使用


1.3 实验环境

Spark1.6.1

Xfce终端

eclipse


1.4 适合人群

该项目难度一般,属于中等难度,该项目适合有一定的Java编程基础以及一定得Spark知识,了解Streaming的工作机制。


二、实验原理

通过搜集用户浏览信息,通过网络传输,实时处理用户浏览数据。进行必要的处理,经过计算得到商品的关注度。现在网络上很多人都是使用得Scala语言进行Spark得实现。因为它书写简洁,快速。但是好多学校并没有开设这一个课程。好多都是学习得Java知识,但是网上有关Java方式得实现少之又少。这次的项目我就是使用Java得方式进行Spark
Streaming的实现。


2.1 实验流程图




2.2 实验拓展

当然这只是个简单的项目,想要拓展的话,可以再加上Hadoop集群,将数据保存在HDFS上,还可以通过用户数据进行建模处理,建模型存储在HDFS上,进行用户商品推荐这个项目可以横向扩展。加上更多的模块,比如说Mlib。ALS算法进行用户商品推荐。当然这里也可以使用kafka来进行消息的传输。但是由于一些原因,这里就没有使用kafka了,使用了Socket来模拟数据传输,感兴趣的同学可以尝试使用kafka来作为数据传输。又不清楚的地方可以留言进行咨询。


2.3 实验结果展示

项目结构图



用户数据模拟效果:数据格式(商品ID::浏览次数::停留时间::是否收藏::购买件数)



商品关关注度计算结果展示(这个值会随着数据的产生会不断的变化)




三、实验步骤

下述介绍为实验楼默认环境,如果您使用的是定制环境,请修改成您自己的环境介绍。


3.1 工程准备

由于在现实场景下,我们并没有真正的实时的电商平台数据,这些数据肯定也是人家公司保密的数据。那么也只能通过我们自己创建一个模拟器,实现用户浏览商品的数据。下面我们就来创建一Java Project,来实现相关功能。


3.2 Java工程创建

File -> New -> Java Project





到此我们就创建好了一个成功的名为StreamingProject得Java工程。
目录结构为:




3.3 代码实现


3.3.1 Spark-assembly工具包的引入

在工程中创建一个lib的文件夹,用于存放jar
右键项目 ->New -> Folder





现在就会看到当前工程下多了一个lib文件夹:



向lib中添加jar:
现在我们回到桌面上,到文件系统中的spark的安装目录下的lib下拷贝需要的jar
主文件夹 -> 文件系统 -> opt -> spark-1.6.1-bin-hadoop2.6 -> lib



找到该jar将其复制到我们在工程里创建lib文件夹下



然后在当前工程中引用该jar
右键该jar -> build path -> add to build path




3.3.2 模拟器实现 SimulatorSocket.java

实现比较简单,使用了简单的Java Socket知识,以及线程来控制它发送消息的频率。详细请看注释
package com.shiyanlou.simulator;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;
public class SimulatorSocket {
public static void main(String[] args) throws Exception {
//创建一个线程来启动模拟器
new Thread(new SimulatorSocketLog()).start();
}
}
class SimulatorSocketLog implements Runnable{
//假设一共有200个商品
private int GOODSID = 200;
//随机发送消息的条数
private int MSG_NUM = 30;
//假设用户浏览该商品的次数
private int BROWSE_NUM = 5;
//假设用户浏览商品停留的时间
private int STAY_TIME = 10;
//用来体现用户是否收藏,收藏为1,不收藏为0,差评为-1
int[] COLLECTION = new int[]{-1,0,1};
//用来模拟用户购买商品的件数,0比较多是为了增加没有买的概率,毕竟不买的还是很多的,很多用户都只是看看
private int[] BUY_NUM = new int[]{0,1,0,2,0,0,0,1,0};
public void run() {
// TODO Auto-generated method stub
Random r = new Random();

try {
/**
*创建一个服务器端,监听9999端口,客户端就是Streaming,通过看源码才知道,Streaming *socketTextStream 其实就是相当于一个客户端
*/
ServerSocket sScoket = new ServerSocket(9999);
System.out.println("成功开启数据模拟模块,去运行Streaming程序把!");
while(true){
//随机消息数
int msgNum = r.nextInt(MSG_NUM)+1;
//开始监听
Socket socket = sScoket.accept();
//创建输出流
OutputStream os = socket.getOutputStream();
//包装输出流
PrintWriter pw = new PrintWriter(os);
for (int i = 0; i < msgNum; i++) {
//消息格式:商品ID::浏览次数::停留时间::是否收藏::购买件数
StringBuffer sb = new StringBuffer();
sb.append("goodsID-"+(r.nextInt(GOODSID)+1));
sb.append("::");
sb.append(r.nextInt(BROWSE_NUM)+1);
sb.append("::");
sb.append(r.nextInt(STAY_TIME)+r.nextFloat());
sb.append("::");
sb.append(COLLECTION[r.nextInt(2)]);
sb.append("::");
sb.append(BUY_NUM[r.nextInt(9)]);
System.out.println(sb.toString());
//发送消息
pw.write(sb.toString()+"\n");
}
pw.flush();
pw.close();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("thread sleep failed");
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("port used");
}

}

}


3.3.3 流式计算,Streaming 程序:StreamingGoods.java

简单的实现了Streaming程序。通过接收socket数据,对数据进行拆分,计算。
我们先来了解一些关于Streaming的基础知识。
1.什么是Streaming
Spark Streaming它实现了对实时流数据的高吞吐量, 低容错率的流处理。数据可以有许多来源,如 Kafka, Flume, TCP 套接字,可以使用一些逻辑算法来处理分析这些数据,并将这些数据持久化到数据库。比如说Hbase、Spark
SQL、HDFS等等。如下图所示。图来自Spark官网。



其内部工作原理如下图所示,图片来自Spark官网。



2.关于DStream的操作
关于DStream的了解均来自于Spark官方网站。有什么不足,还请多多谅解。
DStream 是 Spark Streaming 提供的基本抽象。它代表了一个连续的数据 流,或者从源端接收到的,或通过将输入流中产生处理后的数据流。在内部,它是由 RDDS 的连续序列表示。在 DStream 中每 个 RDD 包含数据从某一个时间间隔,如下图。图片来自Spark官网。



说白了他其实还是RDD。和Spark RDD的操作是差不多的,在DStream上面的任何操作都会转化为底层的RDDS操作。如下图所示。图片来自官网。



有关DStream的算子,在这里就不一一罗列了。大家可以参考官网上的说明。链接地址:http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html
有关Transformations on Dstreams的说明。
现在我们先分开讲解StreamingGoods.java类的具体实现,完整代码在最后贴出
初始化StreamingContext
首先使用Java初始化Streaming程序,需要先定义一个JavaStreamingContext,它是所有的Java Streaming程序的切入点。
实现一个JavaStreamingContext需要先定义一个SparkConf。实现代码如下
SparkConf sparkConf = new SparkConf().setAppName("StreamingGoods").setMaster("local[2]");
//AppName 自然就是当前Job的名字,Master就是Spark的主机地址,这里采用的是本地模式
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,new Duration(5000));
// new Duration(5000) 窗口时间,单位毫秒

获取模拟数据
JavaReceiverInputDStream<String> jds = jsc.socketTextStream("127.0.0.1", 9999);

消息处理
商品关注度怎么计算呢,这个可能需要一个约定,就是说浏览次数和浏览时间以及购买力度和是否收藏该商品都有一个权重,可能不同的公司觉得不同的选项权重不一样,可能你觉得浏览时间更重要,另一人觉得浏览次数更重要,所以我们事先约定好这个计算公式。我们约定浏览次数的权重为0.8,浏览时间权重为0.6,是否收藏和购买力度都为1。
//mapToPair就是将rdd转换为键值对rdd,与map不同的是添加了一个key
JavaPairDStream<String, Double> splitMess = jds.mapToPair(new PairFunction<String,String,Double>(){
private static final long serialVersionUID = 1L;
public Tuple2<String, Double> call(String line) throws Exception {
// TODO Auto-generated method stub.
String[] lineSplit = line.toString().split("::");
Double followValue = Double.parseDouble(lineSplit[1])*0.8+Double.parseDouble(lineSplit[2])*0.6+Double.parseDouble(lineSplit[3])*1+Double.parseDouble(lineSplit[4])*1;
return new Tuple2<String, Double>(lineSplit[0], followValue);
}});

更新关注度值
由于是流式数据,数据每分每秒都在产生,那么计算的关注值也在变化,那么就需要更新这个状态值。使用updateStateByKey来进行操作。这也是这里相对比较难的知识点。
对初始化的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算,
在这里是通过updateStateByKey来以Batch Interval为单位来对历史状态进行更新,在这里需要使用checkPoint,用于保存父RDD的值。在Spark1.6.X之后也可以尝试使用mapWithState来进行更新值。
JavaPairDStream<String, Double> UpdateFollowValue = splitMess.updateStateByKey(new Function2<List<Double>,Optional<Double>,Optional<Double>>(){

public Optional<Double> call(List<Double> newValues,
Optional<Double> statValue) throws Exception {
// 对相同的key进行value统计,实现累加
Double updateValue = statValue.or(0.0);
for (Double values : newValues) {
updateValue += values;
}
return Optional.of(updateValue);
}},new HashPartitioner(jsc.sparkContext().defaultParallelism()));

输出关注度值
结果输出,并将里面的商品进行关注度排序,降序排序,只显示关注度最高的十个商品。实现思想,由于原RDD UpdateFollowValue的值
可以知道是的形式,我们使用sortByKey是不能这样进行排序的,因为它并不是按照关注度排序。我们需要将其转化为的形式,然后再按照sortByKey来进行排序,然后进行输出。
UpdateFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String,Double>>(){
private static final long serialVersionUID = 1L;
public void call(JavaPairRDD<String, Double> followValue) throws Exception {
// TODO Auto-generated method stub
JavaPairRDD<Double,String> followValueSort = followValue.mapToPair(new PairFunction<Tuple2<String,Double>,Double,String>(){

public Tuple2<Double, String> call(
Tuple2<String, Double> valueToKey) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<Double,String>(valueToKey._2,valueToKey._1);
}
}).sortByKey(false);
List<Tuple2<String,Double>> list = followValueSort.mapToPair(new PairFunction<Tuple2<Double,String>,String, Double>() {

public Tuple2<String, Double> call(
Tuple2<Double, String> arg0) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String,Double>(arg0._2,arg0._1);
}
}).take(10);
for (Tuple2<String,Double> tu : list) {
System.out.println("商品ID: "+tu._1+"  关注度: "+tu._2);
}
}});


附上最终StreamingGoods.java的代码

package com.shiyanlou.simulator;

import java.io.Serializable;
import java.util.List;

import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.common.base.Optional;

public class StreamingGoods implements Serializable{
private static final long serialVersionUID = 1L;
//定义一个文件夹,用于保存上一个RDD的数据。该文件夹会自动创建,不需要提前创建
private static String checkpointDir = "checkDir";
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("StreamingGoods").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,new Duration(5000));
jsc.checkpoint(checkpointDir);
JavaReceiverInputDStream<String> jds = jsc.socketTextStream("127.0.0.1", 9999);JavaDStream<String> mess = jds.map(new Function<String,String>(){
private static final long serialVersionUID = 1L;
public String call(String arg0) throws Exception {
// TODO Auto-generated method stub
return arg0;
}});
mess.print();
JavaPairDStream<String, Double> splitMess = jds.mapToPair(new PairFunction<String,String,Double>(){
private static final long serialVersionUID = 1L;
public Tuple2<String, Double> call(String line) throws Exception {
// TODO Auto-generated method stub.
String[] lineSplit = line.toString().split("::");
Double followValue = Double.parseDouble(lineSplit[1])*0.8+Double.parseDouble(lineSplit[2])*0.6+Double.parseDouble(lineSplit[3])*1+Double.parseDouble(lineSplit[4])*1;
return new Tuple2<String, Double>(lineSplit[0], followValue);
}});
JavaPairDStream<String, Double> UpdateFollowValue = splitMess.updateStateByKey(new Function2<List<Double>,Optional<Double>,Optional<Double>>(){

public Optional<Double> call(List<Double> newValues,
Optional<Double> statValue) throws Exception {
// TODO Auto-generated method stub
Double updateValue = statValue.or(0.0);
for (Double values : newValues) {
updateValue += values;
}
return Optional.of(updateValue);
}},new HashPartitioner(jsc.sparkContext().defaultParallelism()));
UpdateFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String,Double>>(){ private static final long serialVersionUID = 1L; public void call(JavaPairRDD<String, Double> followValue) throws Exception { // TODO Auto-generated method stub JavaPairRDD<Double,String> followValueSort = followValue.mapToPair(new PairFunction<Tuple2<String,Double>,Double,String>(){ public Tuple2<Double, String> call( Tuple2<String, Double> valueToKey) throws Exception { // TODO Auto-generated method stub return new Tuple2<Double,String>(valueToKey._2,valueToKey._1); } }).sortByKey(false); List<Tuple2<String,Double>> list = followValueSort.mapToPair(new PairFunction<Tuple2<Double,String>,String, Double>() { public Tuple2<String, Double> call( Tuple2<Double, String> arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2<String,Double>(arg0._2,arg0._1); } }).take(10); for (Tuple2<String,Double> tu : list) { System.out.println("商品ID: "+tu._1+" 关注度: "+tu._2); } }});
jsc.start();
jsc.awaitTermination();
}
}


程序运行说明

1.先启动模拟器,模拟数据。相当于服务端。



成功运行结果:



2.现在去运行Streaming程序,模拟器检测到Streaming程序启动(时间片开始),开始发送模拟数据。然后Stram ing端接收数据并计算。



模拟器开始发送数据:



Streaming接收到的数据:



Streaming计算结果:





我们刷新工程,看是否创建好checkDir



从下图我们可以看到,已经自动创建了checkDir这个文件夹





顶0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: