您的位置:首页 > 运维架构

基于hadoop生态圈的数据仓库实践 —— 进阶技术(十五)

2016-08-09 18:21 651 查看
十五、维度合并
        随着数据仓库中维度的增加,我们会发现有些通用的数据存在于多个维度中。例如,客户维度的客户地址相关信息、送货地址相关信息和工厂维度里都有邮编、城市和州。本节说明如何把三个维度里的邮编相关信息合并到一个新的维度。

1. 修改数据仓库模式
        为了合并维度,需要改变数据仓库模式。下图显示了修改后的模式。新增了一个zip_code_dim表,sales_order_fact和production_fact表的结构也做了相应的修改。注意图中只显示了与邮编维度相关的表。



        zip_code_dim表与两个事实表相关联。这些关系替换了这两个事实表与客户维度、工厂维度的关系。sales_order_fact表需要两个关系,一个关联到客户地址邮编,另一个关联到送货地址邮编。与production_fact表只有一个关系,所以在这个事实表里只增加了工厂地址邮编代理键。

        下面的脚本用于修改数据仓库模式,所做的修改如下。
创建邮编维度表zip_code_dim。
初始装载邮编相关数据
基于zip_code_dim表创建customer_zip_code_dim和shipping_zip_code_dim视图。
在sales_order_fact表上增加customer_zip_code_sk和shipping_zip_code_sk列。
基于已有的客户邮编和送货邮编初始装载两个邮编代理键
在customer_dim表上删除客户和送货邮编及其它们的城市和州列。
在pa_customer_dim上删除客户的城市、州和邮编列。
基于zip_code_dim表创建factory_zip_code_dim视图。
给production_fact表增加factory_zip_code_sk列。
从现有的工厂邮编装载factory_zip_code_sk值。
在factory_dim表上删除工厂编码及其它们的城市和州列。
use dw;

-- 建立地址维度表
create table zip_code_dim (
zip_code_sk int,
zip_code int,
city varchar(30),
state varchar(2),
version int,
effective_date date,
expiry_date date
)
clustered by (zip_code_sk) into 8 buckets
stored as orc tblproperties ('transactional'='true');

-- 初始装载邮编相关数据
insert into zip_code_dim values (1,17050,'pittsburgh','PA',1,'1900-01-01','2200-01-01');
insert into zip_code_dim values (2,17051,'mc veytown','PA',1,'1900-01-01','2200-01-01');
insert into zip_code_dim values (3,17052,'mapleton depot','PA',1,'1900-01-01','2200-01-01');
insert into zip_code_dim values (4,17053,'marysville','PA',1,'1900-01-01','2200-01-01');
insert into zip_code_dim values (5,17054,'mattawana','PA',1,'1900-01-01','2200-01-01');
insert into zip_code_dim values (6,17055,'mechanicsburg','PA',1,'1900-01-01','2200-01-01');
insert into zip_code_dim values (7,44102,'cleveland','OH',1,'1900-01-01','2200-01-01');

-- 创建视图
create view customer_zip_code_dim (customer_zip_code_sk , customer_zip_code , customer_city , customer_state , version , effective_date , expiry_date) as
select
zip_code_sk,
zip_code,
city,
state,
version,
effective_date,
expiry_date
from
zip_code_dim;

create view shipping_zip_code_dim (shipping_zip_code_sk , shipping_zip_code , shipping_city , shipping_state , version , effective_date , expiry_date) as
select
zip_code_sk,
zip_code,
city,
state,
version,
effective_date,
expiry_date
from
zip_code_dim;

-- 添加邮编代理键
alter table sales_order_fact rename to sales_order_fact_old;
create table sales_order_fact(
order_number int COMMENT 'order number',
customer_sk int COMMENT 'customer surrogate key',
customer_zip_code_sk int COMMENT 'customer zip code sk',
shipping_zip_code_sk int COMMENT 'shipping zip code sk',
product_sk int COMMENT 'product surrogate key',
sales_order_attribute_sk int COMMENT 'sales order attribute surrogate key',
order_date_sk int COMMENT 'order date surrogate key',
entry_date_sk int COMMENT 'entry date surrogate key',
allocate_date_sk int COMMENT 'allocate date surrogate key',
allocate_quantity int COMMENT 'allocate quantity',
packing_date_sk int COMMENT 'packing date surrogate key',
packing_quantity int COMMENT 'packing quantity',
ship_date_sk int COMMENT 'ship date surrogate key',
ship_quantity int COMMENT 'ship quantity',
receive_date_sk int COMMENT 'receive date surrogate key',
receive_quantity int COMMENT 'receive quantity',
request_delivery_date_sk int COMMENT 'request delivery date surrogate key',
order_amount decimal(10,2) COMMENT 'order amount',
order_quantity int COMMENT 'order quantity')
clustered by (order_number) into 8 buckets
stored as orc tblproperties ('transactional'='true');
insert into sales_order_fact
select order_number,
customer_sk,
null,
null,
product_sk,
sales_order_attribute_sk,
order_date_sk,
entry_date_sk,
allocate_date_sk,
allocate_quantity,
packing_date_sk,
packing_quantity,
ship_date_sk,
ship_quantity,
receive_date_sk,
receive_quantity,
request_delivery_date_sk,
order_amount,
order_quantity
from sales_order_fact_old;
drop table sales_order_fact_old;

-- 初始装载两个邮编代理键
drop table if exists tmp;
create table tmp as
select t1.order_number,
t1.customer_sk,
t2.customer_zip_code_sk,
t3.shipping_zip_code_sk,
t1.product_sk,
t1.sales_order_attribute_sk,
t1.order_date_sk,
t1.entry_date_sk,
t1.allocate_date_sk,
t1.allocate_quantity,
t1.packing_date_sk,
t1.packing_quantity,
t1.ship_date_sk,
t1.ship_quantity,
t1.receive_date_sk,
t1.receive_quantity,
t1.request_delivery_date_sk,
t1.order_amount,
t1.order_quantity
from sales_order_fact t1
left join
(select a.order_number order_number,c.customer_zip_code_sk customer_zip_code_sk
from sales_order_fact a,
customer_dim b,
customer_zip_code_dim c
where a.customer_sk = b.customer_sk
and b.customer_zip_code = c.customer_zip_code) t2 on t1.order_number = t2.order_number
left join
(select a.order_number order_number,c.shipping_zip_code_sk shipping_zip_code_sk
from sales_order_fact a,
customer_dim b,
shipping_zip_code_dim c
where a.customer_sk = b.customer_sk
and b.shipping_zip_code = c.shipping_zip_code) t3 on t1.order_number = t3.order_number;
delete from sales_order_fact where sales_order_fact.order_number in (select order_number from tmp);
insert into sales_order_fact select * from tmp;

alter table customer_dim rename to customer_dim_old;
create table customer_dim
(customer_sk int COMMENT 'surrogate key',
customer_number int COMMENT 'number',
customer_name varchar(50) COMMENT 'name',
customer_street_address varchar(50) COMMENT 'address',
shipping_address varchar(50) COMMENT 'shipping_address',
version int COMMENT 'version',
effective_date date COMMENT 'effective date',
expiry_date date COMMENT 'expiry date')
clustered by (customer_sk) into 8 buckets
stored as orc tblproperties ('transactional'='true');
insert into customer_dim
select customer_sk,
customer_number,
customer_name,
customer_street_address,
shipping_address,
version,
effective_date,
expiry_date
from customer_dim_old;
drop table customer_dim_old;

alter table pa_customer_dim rename to pa_customer_dim_old;
create table pa_customer_dim
(customer_sk int,
customer_number int,
customer_name varchar(50),
customer_street_address varchar(50),
shipping_address varchar(50),
version int,
effective_date date,
expiry_date date)
clustered by (customer_sk) into 8 buckets
stored as orc tblproperties ('transactional'='true');
insert into pa_customer_dim
select customer_sk,
customer_number,
customer_name,
customer_street_address,
shipping_address,
version,
effective_date,
expiry_date
from pa_customer_dim_old;
drop table pa_customer_dim_old;

-- 创建视图
create view factory_zip_code_dim (factory_zip_code_sk , factory_zip_code , factory_city , factory_state , version,effective_date , expiry_date) as
select
zip_code_sk,
zip_code,
city,
state,
version,
effective_date,
expiry_date
from
zip_code_dim;

alter table production_fact rename to production_fact_old;
create table production_fact
(product_sk int,
production_date_sk int,
factory_sk int,
factory_zip_code_sk int,
production_quantity int);
-- 初始装载邮编代理键
insert into production_fact
select a.product_sk,
a.production_date_sk,
a.factory_sk,
c.factory_zip_code_sk,
a.production_quantity
from production_fact_old a,
factory_dim b,
factory_zip_code_dim c
where a.factory_sk = b.factory_sk
and b.factory_zip_code = c.factory_zip_code;
drop table production_fact_old;

-- 在factory_dim表上删除工厂编码及其它们的城市和州列
alter table factory_dim rename to factory_dim_old;
create table factory_dim
(factory_sk int,
factory_code int,
factory_name varchar(30),
factory_street_address varchar(50),
version int,
effective_date date,
expiry_date date)
clustered by (factory_sk) into 8 buckets
stored as orc tblproperties ('transactional'='true');
insert into factory_dim
select factory_sk,
factory_code,
factory_name,
factory_street_address,
version,
effective_date,
expiry_date
from factory_dim_old;
drop table factory_dim_old;
        执行完修改数据仓库模式的脚本后,可以查询customer_zip_code_dim、shipping_code_dim、factory_zip_code_dim维度表和sales_order_fact、production_fact事实表,确认邮编已经被成功分离。

2. 修改定期装载脚本
        定期装载有三个地方的修改:
删除客户维度装载里所有邮编信息相关的列,因为客户维度里不再有客户邮编和送货邮编相关信息。
在事实表中引用客户邮编视图和送货邮编视图中的代理键。
修改pa_customer_dim装载,因为需要从销售订单事实表的customer_zip_code_sk获取客户邮编。
        修改后的regular_etl.sql脚本如下所示。
-- 设置环境与时间窗口
!run /root/set_time.sql

-- 装载customer维度
-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。
UPDATE customer_dim
SET expiry_date = ${hivevar:pre_date}
WHERE customer_dim.customer_sk IN
(SELECT a.customer_sk
FROM (SELECT customer_sk,
customer_number,
customer_street_address,
shipping_address
FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
rds.customer b ON a.customer_number = b.customer_number
WHERE b.customer_number IS NULL OR
(  !(a.customer_street_address <=> b.customer_street_address)
OR !(a.shipping_address <=> b.shipping_address)
));

-- 处理customer_street_addresses列上SCD2的新增行
INSERT INTO customer_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.shipping_address,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.customer_number customer_number,
t2.customer_name customer_name,
t2.customer_street_address customer_street_address,
t2.shipping_address shipping_address,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
FROM customer_dim t1
INNER JOIN rds.customer t2
ON t1.customer_number = t2.customer_number
AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN customer_dim t3
ON t1.customer_number = t3.customer_number
AND t3.expiry_date = ${hivevar:max_date}
WHERE (!(t1.customer_street_address <=> t2.customer_street_address)
OR  !(t1.shipping_address <=> t2.shipping_address)
)
AND t3.customer_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;

-- 处理customer_name列上的SCD1
-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT
a.customer_sk,
a.customer_number,
b.customer_name,
a.customer_street_address,
a.shipping_address,
a.version,
a.effective_date,
a.expiry_date
FROM customer_dim a, rds.customer b
WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);
INSERT INTO customer_dim SELECT * FROM tmp;

-- 处理新增的customer记录
INSERT INTO customer_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.shipping_address,
1,
${hivevar:pre_date},
${hivevar:max_date}
FROM
(
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number
WHERE t2.customer_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;

-- 装载product维度
-- 设置已删除记录和product_name、product_category列上SCD2的过期
UPDATE product_dim
SET expiry_date = ${hivevar:pre_date}
WHERE product_dim.product_sk IN
(SELECT a.product_sk
FROM (SELECT product_sk,product_code,product_name,product_category
FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
rds.product b ON a.product_code = b.product_code
WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));

-- 处理product_name、product_category列上SCD2的新增行
INSERT INTO product_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.product_code product_code,
t2.product_name product_name,
t2.product_category product_category,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
FROM product_dim t1
INNER JOIN rds.product t2
ON t1.product_code = t2.product_code
AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN product_dim t3
ON t1.product_code = t3.product_code
AND t3.expiry_date = ${hivevar:max_date}
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;

-- 处理新增的product记录
INSERT INTO product_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
1,
${hivevar:pre_date},
${hivevar:max_date}
FROM
(
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code
WHERE t2.product_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;

-- 装载product_count_fact表
insert overwrite table product_count_fact
select product_sk,date_sk
from (select a.product_sk product_sk,
a.product_code product_code,
b.date_sk date_sk,
row_number() over (partition by a.product_code order by b.date_sk) rn
from product_dim a,date_dim b
where a.effective_date = b.date) t
where rn = 1;

-- 装载销售订单事实表
-- 前一天新增的销售订单
INSERT INTO sales_order_fact
SELECT
a.order_number,
customer_sk,
i.customer_zip_code_sk,
j.shipping_zip_code_sk,
product_sk,
g.sales_order_attribute_sk,
e.order_date_sk,
h.entry_date_sk,
null,
null,
null,
null,
null,
null,
null,
null,
f.request_delivery_date_sk,
order_amount,
quantity
FROM
rds.sales_order a,
customer_dim c,
product_dim d,
order_date_dim e,
request_delivery_date_dim f,
sales_order_attribute_dim g,
entry_date_dim h,
customer_zip_code_dim i,
shipping_zip_code_dim j,
rds.customer k,
rds.cdc_time l
WHERE
a.order_status = 'N'
AND a.customer_number = c.customer_number
AND a.status_date >= c.effective_date
AND a.status_date < c.expiry_date
AND a.customer_number = k.customer_number
AND k.customer_zip_code = i.customer_zip_code
AND a.status_date >= i.effective_date
AND a.status_date <= i.expiry_date
AND k.shipping_zip_code = j.shipping_zip_code
AND a.status_date >= j.effective_date
AND a.status_date <= j.expiry_date
AND a.product_code = d.product_code
AND a.status_date >= d.effective_date
AND a.status_date < d.expiry_date
AND to_date(a.status_date) = e.order_date
AND to_date(a.entry_date) = h.entry_date
AND to_date(a.request_delivery_date) = f.request_delivery_date
AND a.verification_ind = g.verification_ind
AND a.credit_check_flag = g.credit_check_flag
AND a.new_customer_ind = g.new_customer_ind
AND a.web_order_flag = g.web_order_flag
AND a.entry_date >= l.last_load AND a.entry_date < l.current_load ;

-- 重载PA客户维度
TRUNCATE TABLE pa_customer_dim;
INSERT INTO pa_customer_dim
SELECT DISTINCT a.*
FROM customer_dim a,
sales_order_fact b,
customer_zip_code_dim c
WHERE c.customer_state = 'PA'
AND b.customer_zip_code_sk = c.customer_zip_code_sk
AND a.customer_sk = b.customer_sk;

-- 处理分配库房、打包、配送和收货四个状态
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
t0.customer_sk customer_sk,
t0.customer_zip_code_sk,
t0.shipping_zip_code_sk,
t0.product_sk product_sk,
t0.sales_order_attribute_sk,
t0.order_date_sk order_date_sk,
t0.entry_date_sk entry_date_sk,
t2.allocate_date_sk allocate_date_sk,
t1.quantity allocate_quantity,
t0.packing_date_sk packing_date_sk,
t0.packing_quantity packing_quantity,
t0.ship_date_sk ship_date_sk,
t0.ship_quantity ship_quantity,
t0.receive_date_sk receive_date_sk,
t0.receive_quantity receive_quantity,
t0.request_delivery_date_sk request_delivery_date_sk,
t0.order_amount order_amount,
t0.order_quantity order_quantity
from sales_order_fact t0,
rds.sales_order t1,
allocate_date_dim t2,
rds.cdc_time t4
where t0.order_number = t1.order_number and t1.order_status = 'A'
and to_date(t1.status_date) = t2.allocate_date
and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;

DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
t0.customer_sk customer_sk,
t0.customer_zip_code_sk,
t0.shipping_zip_code_sk,
t0.product_sk product_sk,
t0.sales_order_attribute_sk,
t0.order_date_sk order_date_sk,
t0.entry_date_sk entry_date_sk,
t0.allocate_date_sk allocate_date_sk,
t0.allocate_quantity allocate_quantity,
t2.packing_date_sk packing_date_sk,
t1.quantity packing_quantity,
t0.ship_date_sk ship_date_sk,
t0.ship_quantity ship_quantity,
t0.receive_date_sk receive_date_sk,
t0.receive_quantity receive_quantity,
t0.request_delivery_date_sk request_delivery_date_sk,
t0.order_amount order_amount,
t0.order_quantity order_quantity
from sales_order_fact t0,
rds.sales_order t1,
packing_date_dim t2,
rds.cdc_time t4
where t0.order_number = t1.order_number and t1.order_status = 'P'
and to_date(t1.status_date) = t2.packing_date
and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;

DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
t0.customer_sk customer_sk,
t0.customer_zip_code_sk,
t0.shipping_zip_code_sk,
t0.product_sk product_sk,
t0.sales_order_attribute_sk,
t0.order_date_sk order_date_sk,
t0.entry_date_sk entry_date_sk,
t0.allocate_date_sk allocate_date_sk,
t0.allocate_quantity allocate_quantity,
t0.packing_date_sk packing_date_sk,
t0.packing_quantity packing_quantity,
t2.ship_date_sk ship_date_sk,
t1.quantity ship_quantity,
t0.receive_date_sk receive_date_sk,
t0.receive_quantity receive_quantity,
t0.request_delivery_date_sk request_delivery_date_sk,
t0.order_amount order_amount,
t0.order_quantity order_quantity
from sales_order_fact t0,
rds.sales_order t1,
ship_date_dim t2,
rds.cdc_time t4
where t0.order_number = t1.order_number and t1.order_status = 'S'
and to_date(t1.status_date) = t2.ship_date
and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;

DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
t0.customer_sk customer_sk,
t0.customer_zip_code_sk,
t0.shipping_zip_code_sk,
t0.product_sk product_sk,
t0.sales_order_attribute_sk,
t0.order_date_sk order_date_sk,
t0.entry_date_sk entry_date_sk,
t0.allocate_date_sk allocate_date_sk,
t0.allocate_quantity allocate_quantity,
t0.packing_date_sk packing_date_sk,
t0.packing_quantity packing_quantity,
t0.ship_date_sk ship_date_sk,
t0.ship_quantity ship_quantity,
t2.receive_date_sk receive_date_sk,
t1.quantity receive_quantity,
t0.request_delivery_date_sk request_delivery_date_sk,
t0.order_amount order_amount,
t0.order_quantity order_quantity
from sales_order_fact t0,
rds.sales_order t1,
receive_date_dim t2,
rds.cdc_time t4
where t0.order_number = t1.order_number and t1.order_status = 'R'
and to_date(t1.status_date) = t2.receive_date
and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;

DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp);
INSERT INTO sales_order_fact SELECT * FROM tmp;

-- 更新时间戳表的last_load字段
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;

3. 测试修改后的定期装载
        执行修改后的定期装载脚本前,需要做一些准备工作。首先对源数据的客户信息做以下两处修改:
客户编号4的客户和送货邮编从17050改为17055
新增一个编号15的客户
        使用下面的语句进行修改:
update source.customer
set customer_street_address = '9999 Louise Dr.',
customer_zip_code = 17055,
customer_city = 'Pittsburgh',
shipping_address = '9999 Louise Dr.',
shipping_zip_code = 17055,
shipping_city = 'Pittsburgh'
where customer_number = 4;

insert into source.customer
values(15, 'Super Stores', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA');

COMMIT;
        现在在装载新的客户数据前查询最后的客户和送货邮编。后面可以用改变后的信息和此查询的输出作对比。查询语句如下。
use dw;
SELECT order_date_sk odsk,
customer_number cn,
customer_zip_code czc,
shipping_zip_code szc
FROM customer_zip_code_dim a,
shipping_zip_code_dim b,
sales_order_fact c,
customer_dim d
WHERE a.customer_zip_code_sk = c.customer_zip_code_sk
AND b.shipping_zip_code_sk = c.shipping_zip_code_sk
AND d.customer_sk = c.customer_sk;
        然后使用下面的语句新增两条销售订单。
SET @order_date := from_unixtime(unix_timestamp('2016-08-08 00:00:01') + rand() * (unix_timestamp('2016-08-08 12:00:00') - unix_timestamp('2016-08-08 00:00:01')));
SET @amount := floor(1000 + rand() * 9000);
SET @quantity := floor(10 + rand() * 90);

INSERT INTO source.sales_order VALUES
(null, 144, 4, 3, 'Y', 'Y', 'Y', 'N',  @order_date, 'N', '2016-08-10',
@order_date, @amount, @quantity);

SET @order_date := from_unixtime(unix_timestamp('2016-08-08 12:00:00') + rand() * (unix_timestamp('2016-08-09 00:00:00') - unix_timestamp('2016-08-08 12:00:00')));
SET @amount := floor(1000 + rand() * 9000);
SET @quantity := floor(10 + rand() * 90);

INSERT INTO source.sales_order VALUES
(null, 145, 15, 4, 'Y', 'N', 'Y', 'N', @order_date, 'N', '2016-08-10',
@order_date, @amount, @quantity);
commit;
        使用下面的SQL命令修改时间窗口。
INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
        执行下面的命令定期装载。
./regular_etl.sh
        查询customer_dim表,确认两个改变的客户,即编号4和15的客户,已经正确装载。
select customer_sk csk,
customer_number cnum,
customer_name cnam,
customer_street_address csd,
shipping_address sd,
version,
effective_date,
expiry_date
from dw.customer_dim
where customer_number in (4, 15);
        查询结果如下图所示。



        查询sales_order_fact表里的两条新销售订单,确认邮编已经正确装载。
use dw;
select a.order_number onum,
f.customer_number cnum,
b.customer_zip_code czc,
c.shipping_zip_code szc,
g.product_code pc,
d.order_date od,
e.entry_date ed,
a.order_amount,
a.order_quantity
from sales_order_fact a,
customer_zip_code_dim b,
shipping_zip_code_dim c,
order_date_dim d,
entry_date_dim e,
customer_dim f,
product_dim g
where a.order_number IN (144, 145)
and a.customer_sk = f.customer_sk
and a.product_sk = g.product_sk
and a.customer_zip_code_sk = b.customer_zip_code_sk
and a.shipping_zip_code_sk = c.shipping_zip_code_sk
and a.order_date_sk = d.order_date_sk
and a.entry_date_sk = e.entry_date_sk;
        查询结果如下图所示。



        查询pa_customer_dim表,确认PA客户正确装载。
select customer_sk csk,
customer_number cnum,
customer_name cnam,
customer_street_address csa,
shipping_address sad,
version,
effective_date,
expiry_date
from dw.pa_customer_dim;
        查询结果如下图所示。



4. 修改产品定期装载
        类似于对定期数据仓库装载的修改,需要删除工厂维度导入里所有与邮编相关的列,并在产品事实表导入时使用工厂邮编代理键。修改后的regular_etl_daily_production.sql脚本如下所示。
-- 设置环境与时间窗口
!run /root/set_time.sql

-- 工厂信息很少修改,一般不需要保留历史,所以使用SCD1
drop table if exists tmp;
create table tmp as
select a.factory_sk,
a.factory_code,
b.factory_name,
b.factory_street_address,
a.version,
a.effective_date,
a.expiry_date
from factory_dim a,rds.factory_master b
where a.factory_code = b.factory_code and
!(a.factory_name <=> b.factory_name
and a.factory_street_address <=> b.factory_street_address
);

delete from factory_dim where factory_dim.factory_sk in (select factory_sk from tmp);
insert into factory_dim select * from tmp;

-- 添加新的工厂信息
INSERT INTO factory_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.factory_code) + t2.sk_max,
t1.factory_code,
t1.factory_name,
t1.factory_street_address,
1,
${hivevar:pre_date},
${hivevar:max_date}
FROM
(
SELECT t1.* FROM rds.factory_master t1 LEFT JOIN factory_dim t2 ON t1.factory_code = t2.factory_code
WHERE t2.factory_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(factory_sk),0) sk_max FROM factory_dim) t2;

-- 装载每日产品事实表
INSERT INTO production_fact
SELECT
b.product_sk
, c.date_sk
, d.factory_sk
, e.factory_zip_code_sk
, production_quantity
FROM
rds.daily_production a
, product_dim b
, date_dim c
, factory_dim d
, factory_zip_code_dim e
, rds.factory_master f
WHERE
production_date = ${hivevar:pre_date}
AND a.product_code = b.product_code
AND a.production_date >= b.effective_date
AND a.production_date <= b.expiry_date
AND a.factory_code = f.factory_code
AND f.factory_zip_code = e.factory_zip_code
AND a.production_date >= e.effective_date
AND a.production_date < e.expiry_date
AND a.production_date = c.date
AND a.factory_code = d.factory_code ;
5. 测试修改后的产品定期装载
        添加一个新的工厂信息。
insert into source.factory_master
values (5,'Fifth Factory','90909 McNicholds Blvd.',17055,'Pittsburgh','PA');
commit;
        向daily_production表里添加三个日常产品记录。
INSERT INTO source.daily_production VALUES
(1, '2016-08-08', 3, 400 )
, (3, '2016-08-08', 4, 200 )
, (5, '2016-08-08', 5, 100 );
commit;
        修改时间窗口。
INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
        执行产品定期装载。
./regular_etl_daily_production.sh
        查询factory_dim,确认导入是正确的。
select factory_sk,
factory_code,
factory_name,
factory_street_address,
version,
effective_date,
expiry_date
from dw.factory_dim;
        查询结果如下图所示。



        查询production_fact表确认三个新的日常产品被正确装载。
use dw;
select e.product_code pc,
b.date,
c.factory_code fc,
d.factory_zip_code fzc,
a.production_quantity qty
from production_fact a,
date_dim b,
factory_dim c,
factory_zip_code_dim d,
product_dim e
where a.product_sk = e.product_sk
and a.production_date_sk = b.date_sk
and a.factory_sk = c.factory_sk
and a.factory_zip_code_sk = d.factory_zip_code_sk;
        查询结果如下图所示。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: