Spark项目实战:购物网站评价标签生成(非常详细的Spark算子操作)
实战概览
一、项目简介
1. 需求
如今我们已经离不开没有电商的生活,在上面购物后会做出相应的评价,电商会为那些评价打上标签,比如这样:
通过评价抽取标签有多种方法,有人工方法抽取,机器抽取等等,现标签已经由机器抽取生成,要对其聚合以标签出现的频次作为参考依据,作为该商家的评论标签。
2. 内容
分析原始的用户评价数据,分析并清洗数据,得到用户的评价标签,并选出每个商家评价最多的10评价标签价作为该商家的展示标签。本次会使用Spark进行数据的处理,体验Spark算子处理数据的便捷与魅力。
二、项目的开发环境
1. Spark版本:spark-2.2.0-bin-2.6.0-cdh5.7.0
2. JDK版本1.8,Scala版本2.11
3. 开发工具为IDEA2018
三、项目代码编写
1. 项目搭建
(1). 构建Scala工程,添加Maven支持,创建包目录:
(2). 添加完整依赖
<properties> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.41</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.0.0</version> </plugin> <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.20.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build>
2. 分析原始的数据
本次所用于分析的数据比较多就不好直接放出来了,大家可以取自行下载:购物网站评价标签生成原始数据,截取其中一条数据:
其中77287793 是商家编号,由制表符隔开的是一个{…},这是JSON数据,其中Key"extInfoList"为一个JSON数组,其中的一个组员"Value"包含我们想要得到的评价,是一个字符串数组。所以我们要做的首先对数据进行清洗,然后解析JSON数据,得到评价字符串,解析使用Java脚本,然后使用Spark算子对其进行处理。
3. 编写JSON解析类
(1). 在Java目录的util包下创建 ReviewTags 类,用于解析JSON数据:
(2). ReviewTags 类完整代码如下:
/** * 取出标签的评语,拼装成以","分割的字符串 */ public class ReviewTags { public static String extractTags(String jsonStr){ //将字符串封装为JSON对象 JSONObject object = JSON.parseObject(jsonStr); //判断JSON对象是否为空或者是否包含Key"extInfoList" if(object == null || !object.containsKey("extInfoList")){ return ""; } //extInfoList对应的Value是一个JSON数组 //"extInfoList":[{"title":"contentTags","values":["干净卫生","服务热情"],"desc":"","defineType":0},{"title":"tagIds","values":["852","22"],"desc":"","defineType":0}] JSONArray array = object.getJSONArray("extInfoList"); if(array == null) { return ""; } StringBuilder sb = new StringBuilder(); for (int i = 0; i < array.size(); i++) { //取出JSON数组每一个元素,为JSON对象 JSONObject obj = array.getJSONObject(i); //验证JSON数组的元素是否为目标元素 if (obj != null && obj.containsKey("title") && obj.getString("title").equals("contentTags") && obj.containsKey("values")) { //取出评价标签的内容,是一个字符串数组 JSONArray arr = obj.getJSONArray("values"); if(arr == null){ continue; } boolean begin = true; //拼接评价,生成:"干净卫生,服务热情" for (int j = 0; j < arr.size(); j++) { //使得第一个字符串前无"," if (begin) { begin = false; } else { sb.append(","); } sb.append(arr.getString(j)); } } } //返回字符串 return sb.toString(); } }
4. 编写数据处理类
(1). 在Scala目录的app包下创建 Scala的Object类 TagGenerator ,用于处理数据:
(2). TagGenerator 类完整代码如下(有详细注释):
/** *数据处理类 */ object TagGenerator { def main(args: Array[String])= { //检验是否已经给定合法的参数 if(args.length != 2){ println("Yo should input <InputPath> <OutputPath>") System.exit(1) } //将args里面的参数取出来 val inputPath = args(0) val outputPath = args(1) //创建parkConf val sparkConf = new SparkConf(). setAppName("TagGenerator") //创建SparkContext val sc = new SparkContext(sparkConf) //从给定的HDFS路径中取出数据 val poi_tags = sc.textFile(inputPath) //使用制表符分割原始数据,得到一个字符串数组,长度为2 val poi_taglist = poi_tags.map(e=>e.split("\t")). //过滤掉非法数据,如果数组长度不为2则说明数据有缺失,要过滤 filter(e=>e.length == 2). //e为数组,e(0)为商家编号,然后做一个映射,将评价字符串与与商家编号做映射 //得到这样的元组:(77287793,"音响效果好,体验好,价格实惠") map(e => e(0) -> ReviewTags.extractTags(e(1))). //e._2.length是指元组第二个元素的长度,即"音响效果好,体验好,价格实惠",若小于0则过滤 filter(e => e._2.length > 0). //将商家编号取出来,并将商家的评价字符串以","分割,得到评价数组,并以商家编号映射该数组 //得到这样的元组:(77287793,("音响效果好","体验好","价格实惠")) map(e => e._1 -> e._2.split(",")). //将商家编号分别映射数组的每个元素,得到以下元组: //(77287793,"音响效果好"),(77287793,"体验好") ... flatMapValues(e => e). //将元组再次映射为新的元组,原来的元组为新元组的Key,Value为1: //((77287793,"音响效果好"),1),((77287793,"体验好"),1),... map(e => (e._1,e._2) -> 1). //对Key进行聚合,得到结果为: //((77287793,"音响效果好"),180),((77287793,"体验好"),95),... reduceByKey(_+_). //e._1._1代表取出元组的第一个组员的第一个组员,即为商家编号,来映射一个List //List的元素为元组,第一个组员为e._1._2,即为评价"音响效果好",e._2为评价数,为180,从而得到以下结果: //(77287793,[("音响效果好",180),("体验好",95),...]) map(e => e._1._1 -> List((e._1._2,e._2))). //这一步非常之巧妙,对Key进行聚合,但是Key为列表List,所以采用:::,表示List的追加,从而把商家的所有评价都聚集在了一起 //(77287793,[("音响效果好",180),("体验好",95),...]) reduceByKey(_:::_). //这一步工作比较多,首先将商家编号取出,然后对List基于评价数(_._2)进行升序排序, //然后对排序结果反转(reverse),取前10条(take(10)),即评价人数最多的10条,然后将那10个元组取出 //变为评价:评论数的字符串列表:List("音响效果好:180","体验好,95",...),最后对List使用mkString方法,生产 //以逗号隔开的字符串,并以商家号映射:(77287793,"音响效果好:180,体验好,95,...") map(e => e._1 -> e._2.sortBy(_._2).reverse.take(10).map(a => a._1+":"+a._2.toString).mkString(",")) //将结果的元组取出来,并将其转变为以制表符分割的字符串 poi_taglist.map(e=>e._1+"\t"+e._2). //将统计结果存入给定的HDFS路径中以便分析和运用 saveAsTextFile(outputPath) } }
5. 将项目打包成jar提交到集群
这里有个问题要注意,因为我们把项目打包提交到Spark集群上运行,但是Spark集群是没有JSON类的jar包的,所以我们需要打依赖jar包,即把项目所依赖的jar包也包含进去,这需要额外的Maven插件来完成,但是Spark集群本身包含Spark的jar包,如果再把Saprk的jar打包进去是没必要且会使得jar变得很大,所以我们要使用Maven提供的一个域标签隔离Spark,使得Spark的jar包只在本地有效,使其在打包的时候不被打进去。
(1). 在
<build>标签的的
<plugins>下再添加以下插件:
<plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin>
(2). 修改pom.xml文件的Spark的依赖配置,使其不能将其jar包也打进我们的项目jar包中:
(3). 在项目本地目录下的target文件下可以看到两个jar,选择依赖jar上传至集群:
四、运行结果
(1). 将下载好的原始数据上传至HDFS:
hdfs dfs -put temptags.txt /data
(2). 执行命令运行程序
如果你部署了Spark集群,可以执行如下命令:
spark-submit --master spark://hadoop00:7077 --jars temptag-1.0-jar-with-dependencies.jar --class com.temptag.app.TagGenerator hdfs://hadoop00:/data/temptags.txt hdfs://hadoop00:/result/out
后面的两个路径分别是原始数据的路径,计算结果的路径,顺序不可以改变。
如果你没有部署Spark集群,但是安装了Saprk,可以执行如下命令:
spark-submit --master local[2] --jars temptag-1.0-jar-with-dependencies.jar --class com.temptag.app.TagGenerator hdfs://hadoop00:/data/temptags.txt hdfs://hadoop00:/result/out
(3). 执行结果:
五、总结
本次的实战中,我们首先在HDFS上读取原始数据,使用Java脚本解析JSON数据,然后使用Spark算子对数据进行清洗,映射,聚合,排序等工作,最后将计算结果存储在HDFS中。从中我们发现通过使用Spark算子的组合,使得繁琐复杂的数据处理工作变得相当的简洁,这也是Spark的优势之一。我是人间,乐于结交共同学习的朋友,欢迎大家一起交流,共同进步!
- 第76课:Spark SQL基于网站Log的综合案例实战之Hive数据导入、Spark SQL对数据操作每天晚上20:00YY频道现场授课频道68917580
- Spark Streaming 项目实战(1)——日志生成脚本
- Android项目实战(四十):在线生成按钮Shape的网站
- 第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战
- 大数据-基于Spark的机器学习-智能客户系统项目实战
- 我们的一个已投产项目的高可用数据库实战 - mongo 副本集的搭建详细过程
- 分享一个很牛X的域名转发(URL转发)服务网站。的确牛,有详细的操作说明,配图
- SEO网站优化之ASP.NET生成Google网站地图的详细代码
- 米扑科技的开源项目:sitemap-php 自动生成网站地图
- Bootstrap企业网站实战项目4
- Spark转化算子和操作算子
- 【SSH网上商城项目实战18】过滤器实现购物登录功能的判断
- Android绘图机制(三)——自定义View的三种实现方式以及实战项目操作
- javaee实战项目--农贸产品开发(详细讲解及代码实现)
- [置顶] spring boot项目实战-集合操作
- Spark算子:RDD行动Action操作(1)–first、count、reduce、collect
- 第99讲:使用sparkStreaming实战对论坛网站动态行为的多维度分析下
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- 第十三周 项目六--体验文件操作(3-实战)
- 实战ASP.NET访问共享文件夹(含详细操作步骤)