您的位置:首页 > 运维架构 > 网站架构

Spark项目实战:购物网站评价标签生成(非常详细的Spark算子操作)

2019-04-09 10:56 176 查看
版权声明:原创不易,转载请声明出处,谢谢! https://blog.csdn.net/qq_41955099/article/details/89116802

实战概览

  • 二、项目的开发环境
  • 三、项目代码编写
  • 四、运行结果
  • 五、总结
  • 一、项目简介

    1. 需求

    如今我们已经离不开没有电商的生活,在上面购物后会做出相应的评价,电商会为那些评价打上标签,比如这样:

    通过评价抽取标签有多种方法,有人工方法抽取,机器抽取等等,现标签已经由机器抽取生成,要对其聚合以标签出现的频次作为参考依据,作为该商家的评论标签。

    2. 内容

    分析原始的用户评价数据,分析并清洗数据,得到用户的评价标签,并选出每个商家评价最多的10评价标签价作为该商家的展示标签。本次会使用Spark进行数据的处理,体验Spark算子处理数据的便捷与魅力。

    二、项目的开发环境

    1. Spark版本:spark-2.2.0-bin-2.6.0-cdh5.7.0
    2. JDK版本1.8,Scala版本2.11
    3. 开发工具为IDEA2018

    三、项目代码编写

    1. 项目搭建

    (1). 构建Scala工程,添加Maven支持,创建包目录:

    (2). 添加完整依赖

    <properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
    </properties>
    
    <repositories>
    <repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
    </repositories>
    
    <dependencies>
    <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
    </dependency>
    
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>${spark.version}</version>
    </dependency>
    
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.41</version>
    </dependency>
    
    </dependencies>
    <build>
    <pluginManagement>
    <plugins>
    <plugin>
    <artifactId>maven-clean-plugin</artifactId>
    <version>3.0.0</version>
    </plugin>
    <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
    <plugin>
    <artifactId>maven-resources-plugin</artifactId>
    <version>3.0.2</version>
    </plugin>
    <plugin>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.7.0</version>
    </plugin>
    <plugin>
    <artifactId>maven-surefire-plugin</artifactId>
    <version>2.20.1</version>
    </plugin>
    <plugin>
    <artifactId>maven-jar-plugin</artifactId>
    <version>3.0.2</version>
    </plugin>
    <plugin>
    <artifactId>maven-install-plugin</artifactId>
    <version>2.5.2</version>
    </plugin>
    <plugin>
    <artifactId>maven-deploy-plugin</artifactId>
    <version>2.8.2</version>
    </plugin>
    </plugins>
    </pluginManagement>
    <plugins>
    <plugin>
    <groupId>org.scala-tools</groupId>
    <artifactId>maven-scala-plugin</artifactId>
    <executions>
    <execution>
    <goals>
    <goal>compile</goal>
    <goal>testCompile</goal>
    </goals>
    </execution>
    </executions>
    <configuration>
    <scalaVersion>${scala.version}</scalaVersion>
    <args>
    <arg>-target:jvm-1.5</arg>
    </args>
    </configuration>
    </plugin>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-eclipse-plugin</artifactId>
    <configuration>
    <downloadSources>true</downloadSources>
    <buildcommands>
    <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
    </buildcommands>
    <additionalProjectnatures>
    <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
    </additionalProjectnatures>
    <classpathContainers>
    <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
    <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
    </classpathContainers>
    </configuration>
    </plugin>
    </plugins>
    </build>
    2. 分析原始的数据

    本次所用于分析的数据比较多就不好直接放出来了,大家可以取自行下载:购物网站评价标签生成原始数据,截取其中一条数据:

    其中77287793 是商家编号,由制表符隔开的是一个{…},这是JSON数据,其中Key"extInfoList"为一个JSON数组,其中的一个组员"Value"包含我们想要得到的评价,是一个字符串数组。所以我们要做的首先对数据进行清洗,然后解析JSON数据,得到评价字符串,解析使用Java脚本,然后使用Spark算子对其进行处理。

    3. 编写JSON解析类

    (1). 在Java目录的util包下创建 ReviewTags 类,用于解析JSON数据:

    (2). ReviewTags 类完整代码如下:

    /**
    * 取出标签的评语,拼装成以","分割的字符串
    */
    public class ReviewTags {
    
    public static String extractTags(String jsonStr){
    //将字符串封装为JSON对象
    JSONObject object = JSON.parseObject(jsonStr);
    //判断JSON对象是否为空或者是否包含Key"extInfoList"
    if(object == null || !object.containsKey("extInfoList")){
    return "";
    }
    //extInfoList对应的Value是一个JSON数组
    //"extInfoList":[{"title":"contentTags","values":["干净卫生","服务热情"],"desc":"","defineType":0},{"title":"tagIds","values":["852","22"],"desc":"","defineType":0}]
    JSONArray array = object.getJSONArray("extInfoList");
    if(array == null) {
    return "";
    }
    
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < array.size(); i++) {
    //取出JSON数组每一个元素,为JSON对象
    JSONObject obj = array.getJSONObject(i);
    //验证JSON数组的元素是否为目标元素
    if (obj != null && obj.containsKey("title") && obj.getString("title").equals("contentTags") && obj.containsKey("values")) {
    //取出评价标签的内容,是一个字符串数组
    JSONArray arr = obj.getJSONArray("values");
    if(arr == null){
    continue;
    }
    boolean begin = true;
    //拼接评价,生成:"干净卫生,服务热情"
    for (int j = 0; j < arr.size(); j++) {
    //使得第一个字符串前无","
    if (begin) {
    begin = false;
    } else {
    sb.append(",");
    }
    sb.append(arr.getString(j));
    }
    }
    }
    //返回字符串
    return sb.toString();
    }
    }
    4. 编写数据处理类

    (1). 在Scala目录的app包下创建 Scala的Object类 TagGenerator ,用于处理数据:

    (2). TagGenerator 类完整代码如下(有详细注释):

    /**
    *数据处理类
    */
    object TagGenerator {
    
    def main(args: Array[String])= {
    
    //检验是否已经给定合法的参数
    if(args.length != 2){
    println("Yo should input <InputPath> <OutputPath>")
    System.exit(1)
    }
    
    //将args里面的参数取出来
    val inputPath = args(0)
    val outputPath = args(1)
    
    //创建parkConf
    val sparkConf = new SparkConf().
    setAppName("TagGenerator")
    
    //创建SparkContext
    val sc = new SparkContext(sparkConf)
    //从给定的HDFS路径中取出数据
    val poi_tags = sc.textFile(inputPath)
    //使用制表符分割原始数据,得到一个字符串数组,长度为2
    val poi_taglist = poi_tags.map(e=>e.split("\t")).
    //过滤掉非法数据,如果数组长度不为2则说明数据有缺失,要过滤
    filter(e=>e.length == 2).
    //e为数组,e(0)为商家编号,然后做一个映射,将评价字符串与与商家编号做映射
    //得到这样的元组:(77287793,"音响效果好,体验好,价格实惠")
    map(e => e(0) -> ReviewTags.extractTags(e(1))).
    //e._2.length是指元组第二个元素的长度,即"音响效果好,体验好,价格实惠",若小于0则过滤
    filter(e => e._2.length > 0).
    //将商家编号取出来,并将商家的评价字符串以","分割,得到评价数组,并以商家编号映射该数组
    //得到这样的元组:(77287793,("音响效果好","体验好","价格实惠"))
    map(e => e._1 -> e._2.split(",")).
    //将商家编号分别映射数组的每个元素,得到以下元组:
    //(77287793,"音响效果好"),(77287793,"体验好") ...
    flatMapValues(e => e).
    //将元组再次映射为新的元组,原来的元组为新元组的Key,Value为1:
    //((77287793,"音响效果好"),1),((77287793,"体验好"),1),...
    map(e => (e._1,e._2) -> 1).
    //对Key进行聚合,得到结果为:
    //((77287793,"音响效果好"),180),((77287793,"体验好"),95),...
    reduceByKey(_+_).
    //e._1._1代表取出元组的第一个组员的第一个组员,即为商家编号,来映射一个List
    //List的元素为元组,第一个组员为e._1._2,即为评价"音响效果好",e._2为评价数,为180,从而得到以下结果:
    //(77287793,[("音响效果好",180),("体验好",95),...])
    map(e => e._1._1 -> List((e._1._2,e._2))).
    //这一步非常之巧妙,对Key进行聚合,但是Key为列表List,所以采用:::,表示List的追加,从而把商家的所有评价都聚集在了一起
    //(77287793,[("音响效果好",180),("体验好",95),...])
    reduceByKey(_:::_).
    //这一步工作比较多,首先将商家编号取出,然后对List基于评价数(_._2)进行升序排序,
    //然后对排序结果反转(reverse),取前10条(take(10)),即评价人数最多的10条,然后将那10个元组取出
    //变为评价:评论数的字符串列表:List("音响效果好:180","体验好,95",...),最后对List使用mkString方法,生产
    //以逗号隔开的字符串,并以商家号映射:(77287793,"音响效果好:180,体验好,95,...")
    map(e => e._1 -> e._2.sortBy(_._2).reverse.take(10).map(a => a._1+":"+a._2.toString).mkString(","))
    //将结果的元组取出来,并将其转变为以制表符分割的字符串
    poi_taglist.map(e=>e._1+"\t"+e._2).
    //将统计结果存入给定的HDFS路径中以便分析和运用
    saveAsTextFile(outputPath)
    }
    }
    5. 将项目打包成jar提交到集群

    这里有个问题要注意,因为我们把项目打包提交到Spark集群上运行,但是Spark集群是没有JSON类的jar包的,所以我们需要打依赖jar包,即把项目所依赖的jar包也包含进去,这需要额外的Maven插件来完成,但是Spark集群本身包含Spark的jar包,如果再把Saprk的jar打包进去是没必要且会使得jar变得很大,所以我们要使用Maven提供的一个域标签隔离Spark,使得Spark的jar包只在本地有效,使其在打包的时候不被打进去。
    (1).

    <build>
    标签的的
    <plugins>
    下再添加以下插件:

    <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <executions>
    <execution>
    <phase>package</phase>
    <goals>
    <goal>single</goal>
    </goals>
    </execution>
    </executions>
    <configuration>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    </configuration>
    </plugin>

    (2). 修改pom.xml文件的Spark的依赖配置,使其不能将其jar包也打进我们的项目jar包中:

    (3). 在项目本地目录下的target文件下可以看到两个jar,选择依赖jar上传至集群:

    四、运行结果

    (1). 将下载好的原始数据上传至HDFS:

    hdfs dfs -put temptags.txt /data

    (2). 执行命令运行程序
    如果你部署了Spark集群,可以执行如下命令:
    spark-submit --master spark://hadoop00:7077 --jars temptag-1.0-jar-with-dependencies.jar --class com.temptag.app.TagGenerator hdfs://hadoop00:/data/temptags.txt hdfs://hadoop00:/result/out

    后面的两个路径分别是原始数据的路径,计算结果的路径,顺序不可以改变。
    如果你没有部署Spark集群,但是安装了Saprk,可以执行如下命令:
    spark-submit --master local[2] --jars temptag-1.0-jar-with-dependencies.jar --class com.temptag.app.TagGenerator hdfs://hadoop00:/data/temptags.txt hdfs://hadoop00:/result/out

    (3). 执行结果:

    五、总结

    本次的实战中,我们首先在HDFS上读取原始数据,使用Java脚本解析JSON数据,然后使用Spark算子对数据进行清洗,映射,聚合,排序等工作,最后将计算结果存储在HDFS中。从中我们发现通过使用Spark算子的组合,使得繁琐复杂的数据处理工作变得相当的简洁,这也是Spark的优势之一。我是人间,乐于结交共同学习的朋友,欢迎大家一起交流,共同进步!

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: 
    相关文章推荐