您的位置:首页 > 其它

Spark项目:大型电商日志分析(二)

2018-12-28 10:12 555 查看
版权声明:@GaoShan https://blog.csdn.net/weixin_42969976/article/details/85317731

Spark项目:大型电商日志分析

准备工作

1.程序员介入项目

        从项目经理(或是组长)处获得账户信息,以及仓库的url
        账户:marry/12345
        url:http://marry@janson01/r/web-log-analysis.git
        基于gitblit服务器上搭建好的项目轮廓创建工程
        按部就班进行后续的开发

2.数据的准备
  a)真实场景:

    ①将模拟(例如XX商城后台的日志文件)出的数据存入到文件中(user_visit_action.txt,user_info.txt,product_info)
    ②使用Flume导入到hdfs上
        a)开启Hadoop集群
            在hdfs上创建一个目录web-log-analysis

hdfs dfs -mkdir /web-log-analysis

        b)书写一个配置文件,定制agent的组件(source,channel,sink)

flume-hdfs-visit-action.properties
flume-hdfs-user-info.properties
flume-hdfs-product-info.properties

        c)正式进行数据采集

[root@JANSON01 flume]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-hdfs-visit-action.properties
[root@JANSON01 conf]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-hdfs-user-info.properties
[root@JANSON01 conf]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-hdfs-product-info.properties

        注意:
             ①一般一个agent中有三个组件;但是根据需求,可以配置多个
             ②若是配置多个,需要注意:
                 a1.sources = r1 r2 r3
                 a1.sinks = k1 k2 k3
                 a1.channels = c1 c2 c3
                 如:web-log-all.properties
                 [root@JANSON01 conf]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/web-log-all.properties

    ③创建hive外部表与hdfs上文件的映射关系
     a)方式1:手动创建
            hive> source db-session-hive.hql;
            db-session-hive.hql

DROP database if exists web_log_analysis_hive;
create database if not exists web_log_analysis_hive;
USE web_log_analysis_hive;

CREATE external table IF NOT EXISTS user_visit_action(
`date` string,
user_id	bigint,
session_id	string,
page_id	bigint,
action_time string,
search_keyword string,
click_category_id bigint,
click_product_id bigint,
order_category_ids string,
order_product_ids string,
pay_category_ids string,
pay_product_ids string,
city_id bigint
) row format delimited
fields terminated by "|"
location  "hdfs://ns1/web-log-analysis/user_visit_action";

CREATE external table IF NOT EXISTS user_info(
user_id bigint,
username string,
name string,
age int,
professional string,
sex string,
city string
) row format delimited
fields terminated by "|"
location  "hdfs://ns1/web-log-analysis/user_info";

CREATE external table IF NOT EXISTS product_info(
product_id  bigint,
product_name string,
extend_info string
) row format delimited
fields terminated by "|"
location  "hdfs://ns1/web-log-analysis/product_info";

-- 测试
select * from product_info;

            hql脚本中最好是外部表,可以使用下述指令将内部表修改为外部表
            alter table product_info set tblProperties(“EXTERNAL”=“TRUE”);
            alter table user_info set tblProperties(“EXTERNAL”=“TRUE”);
            alter table user_visit_action set tblProperties(“EXTERNAL”=“TRUE”);

查看hive表的表结构:
DESC extended product_info ;

程序测试:
@Test
def testReadDataFromHiveTable(): Unit = {
//println("好想念Scala...")

//Spark高版本中,核心类:SparkSession
val spark:SparkSession = SparkSession.builder()
.enableHiveSupport().appName(classOf[CommonScalaTest].getSimpleName)
.master("local[*]").getOrCreate()

spark.sql("select * from web_log_analysis_hive.user_info").show(500)
}

注意点:
需要将hive,hdfs核心的配置文件复制到项目的resoruces资源根目录下,包括:
hive ~> hive-site.xml
hdfs ~> hdfs-site.xml, core-site.xml

     b)方式2:通过程序自动创建(真实项目中这种情况使用的较多)
               SparkSQL实例.sql(“create external table xx if not exists xxx”);需要每次drop一下表,从hive源数据库中删除映射关系
    ④针对hive表数据进行后续操作

  b)模拟出运行在内存中的且映射为虚拟表的数据(离线数据)

    ①直接将模拟出的数据装载到临时表中(Spark低版本,DataFrame,Spark高版本,DataSet)
    ②将临时表映射为内存中的虚拟表(DataSet实例,crateOrReplaceTempView(“user_visit_action”))
    ③针对hive表数据进行后续的操作

/**
* 模拟出运行在内存中的且映射为虚拟表的数据测试
*/
@Test
def testMemoryMockData() = {
//Spark高版本中,核心类:SparkSession,封装了SpardConf, SparkContext,SQLContext
val spark: SparkSession = SparkSession.builder()
.appName(classOf[CommonScalaTest].getSimpleName)
.master("local[*]").getOrCreate()

//调用下述的方法,三张虚拟表已经驻留在内存中了,后续的程序直接使用即可
MockData.mock(spark.sparkContext, spark.sqlContext)

//测试:访问user_visit_action虚拟表中的数据
spark.sql("select * from user_visit_action").show(2000000)
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐