您的位置:首页 > 其它

HAWQ取代传统数仓实践(六)——增加列

2017-05-23 17:18 549 查看
        业务的扩展或变化是不可避免的,尤其像互联网行业,需求变更已经成为常态,唯一不变的就是变化本身,其中最常碰到的扩展是给一个已经存在的表曾加列。
        以销售订单为例,假设因为业务需要,在操作型源系统的客户表中增加了送货地址的四个字段,并在销售订单表中增加了销售数量字段。由于数据源表增加了字段,数据仓库中的表也要随之修改。本篇说明如何在客户维度表和销售订单事实表上添加列,并在新列上应用SCD2,以及对定时装载脚本所做的修改。图1显示了增加列后的数据仓库模式。



图1

一、修改数据库表结构

1. 修改源数据库表结构
        使用下面的SQL语句修改MySQL中的源数据库模式。
use source;

-- 在客户表最后增加四列
alter table customer
add shipping_address varchar(30) after customer_state,
add shipping_zip_code int after shipping_address,
add shipping_city varchar(30) after shipping_zip_code,
add shipping_state varchar(2) after shipping_city ;

-- 在销售订单表最后增加一列
alter table sales_order add order_quantity int after order_amount ;
        以上语句给客户表增加了四列,表示客户的送货地址。销售订单表在销售金额列后面增加了销售数量列。注意after关键字,这是MySQL对标准SQL的扩展,HAWQ目前还不支持这种扩展,只能把新增列加到已有列的后面。在关系理论中,列是没有顺序的。

2. 修改ext模式中的表结构

        HAWQ外部表目前不支持ALTER TABLE语句,报错如下:
dw=> alter table ext.customer add column shipping_address varchar(30);
ERROR:  "customer" is an external table
HINT:  Use ALTER EXTERNAL TABLE instead
dw=> alter external table ext.customer add column shipping_address varchar(30);
ERROR:  Cannot support alter external table statement yet
        因此要增加列只能重建HAWQ外部表。我在数据抽取时都是覆盖外部表,其中的数据只是临时性的,重建表不涉及数据问题,并不会造成很大影响。
-- 设置模式查找路径
set search_path to ext;

-- 删除客户外部表
drop external table customer;

-- 建立客户外部表
create external table customer
(
customer_number int,
customer_name varchar(30),
customer_street_address varchar(30),
customer_zip_code int,
customer_city varchar(30),
customer_state varchar(2),
shipping_address varchar(30),
shipping_zip_code int,
shipping_city varchar(30),
shipping_state varchar(2)
)
location ('pxf://mycluster/data/ext/customer?profile=hdfstextsimple')
format 'text' (delimiter=e',');

comment on table customer is '客户外部表';
comment on column customer.customer_number is '客户编号';
comment on column customer.customer_name is '客户姓名';
comment on column customer.customer_street_address is '客户地址';
comment on column customer.customer_zip_code is '客户邮编';
comment on column customer.customer_city is '客户所在城市';
comment on column customer.customer_state is '客户所在省份';
comment on column customer.shipping_address is '送货地址';
comment on column customer.shipping_zip_code is '送货邮编';
comment on column customer.shipping_city is '送货城市';
comment on column customer.shipping_state is '送货省份';

-- 删除销售订单外部表
drop external table sales_order;

-- 建立销售订单外部表
create external table sales_order
(
order_number int,
customer_number int,
product_code int,
order_date timestamp,
entry_date timestamp,
order_amount decimal(10 , 2 ),
order_quantity int
)
location ('pxf://mycluster/data/ext/sales_order?profile=hdfstextsimple')
format 'text' (delimiter=e',', null='null');

comment on table sales_order is '销售订单外部表';
comment on column sales_order.order_number is '订单号';
comment on column sales_order.customer_number is '客户编号';
comment on column sales_order.product_code is '产品编码';
comment on column sales_order.order_date is '订单日期';
comment on column sales_order.entry_date is '登记日期';
comment on column sales_order.order_amount is '销售金额';
comment on column sales_order.order_quantity is '销售数量';
        需要注意的是ext表中列的顺序要和源数据库严格保持一致。因为客户表和产品表是全量覆盖抽取数据,所以如果源和目标顺序不一样,将产生错误的结果。

3. 修改rds模式中的表结构

        HAWQ允许使用ALTER TABLE语句为内部表增加列。与MySQL不同,HAWQ每条ALTER TABLE语句只能增加一列,因此增加四列需要执行四次ALTER TABLE语句。并且在增加列时需要指定新增列的缺省值,否则会报类似如下的错误:
ERROR:  ADD COLUMN with no default value in append-only tables is not yet supported.
        使用下面的SQL语句修改rds模式中的表结构。
alter table rds.customer add column shipping_address varchar(30) default null;
alter table rds.customer add column shipping_zip_code int default null;
alter table rds.customer add column shipping_city varchar(30) default null;
alter table rds.customer add column shipping_state varchar(2) default null;

comment on column rds.customer.shipping_address is '送货地址';
comment on column rds.customer.shipping_zip_code is '送货邮编';
comment on column rds.customer.shipping_city is '送货城市';
comment on column rds.customer.shipping_state is '送货省份';

alter table rds.sales_order add column order_quantity int default null;
comment on column rds.sales_order.order_quantity is '销售数量';

4. 修改tds模式中的表结构

        使用下面的SQL语句修改tds模式中的表结构。
alter table tds.customer_dim add column shipping_address varchar(30) default null;
alter table tds.customer_dim add column shipping_zip_code int default null;
alter table tds.customer_dim add column shipping_city varchar(30) default null;
alter table tds.customer_dim add column shipping_state varchar(2) default null;

comment on column tds.customer_dim.shipping_address is '送货地址';
comment on column tds.customer_dim.shipping_zip_code is '送货邮编';
comment on column tds.customer_dim.shipping_city is '送货城市';
comment on column tds.customer_dim.shipping_state is '送货省份';

alter table tds.sales_order_fact add column order_quantity int default null;
comment on column tds.sales_order_fact.order_quantity is '销售数量';

二、重建相关视图

        HAWQ不允许修改视图的列数,错误信息如下:
ERROR:  cannot change number of columns in view
        因此需要使用下面的SQL语句重建客户维度表的当前视图和历史视图,增加四列。

1. 重建客户维度当前视图

-- 切换到tds模式
set search_path=tds;

-- 删除视图
drop view v_customer_dim_latest;

-- 建立视图
create or replace view v_customer_dim_latest as
select customer_sk,
customer_number,
customer_name,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
shipping_address,
shipping_zip_code,
shipping_city,
shipping_state,
version,
effective_date
from (select distinct on (customer_number) customer_number,
customer_sk,
customer_name,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
shipping_address,
shipping_zip_code,
shipping_city,
shipping_state,
isdelete,
version,
effective_date
from customer_dim
order by customer_number, customer_sk desc) as latest
where isdelete is false;

2. 重建客户维度历史视图

-- 切换到tds模式
set search_path=tds;

-- 删除视图
drop view v_customer_dim_his;

-- 建立视图,增加版本过期日期导出列
create or replace view v_customer_dim_his as
select *, date(lead(effective_date,1,date '2200-01-01') over (partition by customer_number order by effective_date)) expiry_date
from customer_dim;

三. 修改定期装载函数fn_regular_load

        增加列后,对定期装载函数fn_regular_load也要做相应的修改,增加对新增数据列的处理。本例只需要对客户维度表和销售订单事实表的部分进行修改,修改后的函数如下。
create or replace function fn_regular_load ()
returns void as
$$
declare
-- 设置scd的生效时间
v_cur_date date := current_date;
v_pre_date date := current_date - 1;
v_last_load date;
begin
-- 分析外部表
analyze ext.customer;
analyze ext.product;
analyze ext.sales_order;

-- 将外部表数据装载到原始数据表
truncate table rds.customer;
truncate table rds.product;

insert into rds.customer select * from ext.customer;
insert into rds.product select * from ext.product;
insert into rds.sales_order select * from ext.sales_order;

-- 分析rds模式的表
analyze rds.customer;
analyze rds.product;
analyze rds.sales_order;

-- 设置cdc的上限时间
select last_load into v_last_load from rds.cdc_time;
truncate table rds.cdc_time;
insert into rds.cdc_time select v_last_load, v_cur_date;

-- 装载客户维度
insert into tds.customer_dim
(customer_number,
customer_name,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
shipping_address,
shipping_zip_code,
shipping_city,
shipping_state,
isdelete,
version,
effective_date)
select case flag
when 'D' then a_customer_number
else b_customer_number
end customer_number,
case flag
when 'D' then a_customer_name
else b_customer_name
end customer_name,
case flag
when 'D' then a_customer_street_address
else b_customer_street_address
end customer_street_address,
case flag
when 'D' then a_customer_zip_code
else b_customer_zip_code
end customer_zip_code,
case flag
when 'D' then a_customer_city
else b_customer_city
end customer_city,
case flag
when 'D' then a_customer_state
else b_customer_state
end customer_state,
case flag
when 'D' then a_shipping_address
else b_shipping_address
end shipping_address,
case flag
when 'D' then a_shipping_zip_code
else b_shipping_zip_code
end shipping_zip_code,
case flag
when 'D' then a_shipping_city
else b_shipping_city
end shipping_city,
case flag
when 'D' then a_shipping_state
else b_shipping_state
end shipping_state,
case flag
when 'D' then true
else false
end isdelete,
case flag
when 'D' then a_version
when 'I' then 1
else a_version + 1
end v,
v_pre_date
from (select a.customer_number a_customer_number,
a.customer_name a_customer_name,
a.customer_street_address a_customer_street_address,
a.customer_zip_code a_customer_zip_code,
a.customer_city a_customer_city,
a.customer_state a_customer_state,
a.shipping_address a_shipping_address,
a.shipping_zip_code a_shipping_zip_code,
a.shipping_city a_shipping_city,
a.shipping_state a_shipping_state,
a.version a_version,
b.customer_number b_customer_number,
b.customer_name b_customer_name,
b.customer_street_address b_customer_street_address,
b.customer_zip_code b_customer_zip_code,
b.customer_city b_customer_city,
b.customer_state b_customer_state,
b.shipping_address b_shipping_address,
b.shipping_zip_code b_shipping_zip_code,
b.shipping_city b_shipping_city,
b.shipping_state b_shipping_state,
case when a.customer_number is null then 'I'
when b.customer_number is null then 'D'
else 'U'
end flag
from v_customer_dim_latest a
full join rds.customer b on a.customer_number = b.customer_number
where a.customer_number is null -- 新增
or b.customer_number is null -- 删除
or (a.customer_number = b.customer_number
and not
(coalesce(a.customer_name,'') = coalesce(b.customer_name,'')
and coalesce(a.customer_street_address,'') = coalesce(b.customer_street_address,'')
and coalesce(a.customer_zip_code,0) = coalesce(b.customer_zip_code,0)
and coalesce(a.customer_city,'') = coalesce(b.customer_city,'')
and coalesce(a.customer_state,'') = coalesce(b.customer_state,'')
and coalesce(a.shipping_address,'') = coalesce(b.shipping_address,'')
and coalesce(a.shipping_zip_code,0) = coalesce(b.shipping_zip_code,0)
and coalesce(a.shipping_city,'') = coalesce(b.shipping_city,'')
and coalesce(a.shipping_state,'') = coalesce(b.shipping_state,'')
))) t
order by coalesce(a_customer_number, 999999999999), b_customer_number limit 999999999999;

-- 装载产品维度
insert into tds.product_dim
(product_code,
product_name,
product_category,
isdelete,
version,
effective_date)
select case flag
when 'D' then a_product_code
else b_product_code
end product_code,
case flag
when 'D' then a_product_name
else b_product_name
end product_name,
case flag
when 'D' then a_product_category
else b_product_category
end product_category,
case flag
when 'D' then true
else false
end isdelete,
case flag
when 'D' then a_version
when 'I' then 1
else a_version + 1
end v,
v_pre_date
from (select a.product_code a_product_code,
a.product_name a_product_name,
a.product_category a_product_category,
a.version a_version,
b.product_code b_product_code,
b.product_name b_product_name,
b.product_category b_product_category,
case when a.product_code is null then 'I'
when b.product_code is null then 'D'
else 'U'
end flag
from v_product_dim_latest a
full join rds.product b on a.product_code = b.product_code
where a.product_code is null -- 新增
or b.product_code is null -- 删除
or (a.product_code = b.product_code
and not
(a.product_name = b.product_name
and a.product_category = b.product_category))) t
order by coalesce(a_product_code, 999999999999), b_product_code limit 999999999999;

-- 装载order维度
insert into order_dim (order_number, version, effective_date)
select t.order_number, t.v, t.effective_date
from (select order_number, 1 v, order_date effective_date
from rds.sales_order, rds.cdc_time
where entry_date >= last_load and entry_date < current_load) t;

-- 装载销售订单事实表
insert into sales_order_fact
select order_sk,
customer_sk,
product_sk,
date_sk,
year * 100 + month,
order_amount,
order_quantity
from rds.sales_order a,
order_dim b,
v_customer_dim_his c,
v_product_dim_his d,
date_dim e,
rds.cdc_time f
where a.order_number = b.order_number
and a.customer_number = c.customer_number
and a.order_date >= c.effective_date
and a.order_date < c.expiry_date
and a.product_code = d.product_code
and a.order_date >= d.effective_date
and a.order_date < d.expiry_date
and date(a.order_date) = e.date
and a.entry_date >= f.last_load and a.entry_date < f.current_load;

-- 分析tds模式的表
analyze customer_dim;
analyze product_dim;
analyze order_dim;
analyze sales_order_fact;

-- 更新时间戳表的last_load字段
truncate table rds.cdc_time;
insert into rds.cdc_time select v_cur_date, v_cur_date;

end;
$$
language plpgsql;
        同客户地址一样,新增的送货地址列也是用SCD2新增历史版本。与“HAWQ实践(四)——定期ETL(Sqoop、HAWQ)”建立的定期装载函数中相同部分比较,会发现在比较客户属性时使用了coalesce函数。
        在源系统库中,客户地址和送货地址列都是允许为空的,这样的设计是出于灵活性和容错性的考虑。我们以送货地址为例进行讨论。使用“a.shipping_address = b.shipping_address”条件判断送货地址是否更改,根据等号两边的值是否为空,会出现以下三种情况:
a.shipping_address和b.shipping_address都不为空。这种情况下如果两者相等则返回true,说明地址没有变化,否则返回false,说明地址改变了,逻辑正确。
a.shipping_address和b.shipping_address都为空。两者的比较会演变成null=null,根据HAWQ对“=”操作符的定义,会返回NULL。此时如果其它属性没变,则比较演变为NOT (NULL AND TRUE),否则演变为NOT (NULL AND FALSE),前者返回NULL,后者返回TRUE。这符合我们的逻辑。
a.shipping_address和 b.shipping_address只有一个为空。就是说地址列从NULL变成非NULL,或者从非NULL变成NULL,这种情况明显应该新增一个版本,但根据“=”的定义,此时a.shipping_address=b.shipping_address返回值是NULL,查询不会返回行,不符合我们的需求。
        基于以上分析,这里使用HAWQ的coalesce函数处理NULL值(类似于Oracle的NVL或SQL Server的ISNULL)将NULL值比较转化为标量值比较。空值的逻辑判断有其特殊性,为了避免不必要的麻烦,数据库设计时应该尽量将字段设计成非空,必要时用默认值代替NULL,并将此作为一个基本的设计原则。

四、测试

1. 在源库中增加测试数据

        执行下面的SQL脚本,在MySQL的源数据库中增加客户和销售订单测试数据。
use source;

-- 缺省的送货地址与客户地址相同
update customer
set shipping_address = customer_street_address,
shipping_zip_code = customer_zip_code,
shipping_city = customer_city,
shipping_state = customer_state ;

-- 新增一个客户
insert into customer
(customer_name,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
shipping_address,
shipping_zip_code,
shipping_city,
shipping_state)
values
('online distributors',
'2323 louise dr.',
17055,
'pittsburgh',
'pa',
'2323 louise dr.',
17055,
'pittsburgh',
'pa') ;

-- 新增订单日期为昨天的的9条订单。
set @start_date := unix_timestamp(date_add(current_date, interval -1 day));
set @end_date := unix_timestamp(current_date);
drop table if exists temp_sales_order_data;
create table temp_sales_order_data as select * from sales_order where 1=0;

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (117, 1, 1, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (118, 2, 2, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (119, 3, 3, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (120, 4, 4, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (121, 5, 1, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (122, 6, 2, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (123, 7, 3, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (124, 8, 4, @order_date, @order_date, @amount, @quantity);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);
insert into temp_sales_order_data
values (125, 9, 1, @order_date, @order_date, @amount, @quantity);

insert into sales_order
select null,customer_number,product_code,order_date,entry_date,order_amount,order_quantity
from temp_sales_order_data
order by order_date;

commit ;

2. 执行定期ETL脚本

su - hdfs -c 'hdfs dfs -chmod -R 777 /data/ext'
~/regular_etl.sh
regular_etl.sh文件内容如下:
#!/bin/bash

# 外部表只保存销售订单增量数据
su - hdfs -c 'hdfs dfs -rm -r /data/ext/sales_order/*'

# 使用sqoop用户执行定期抽取脚本
su - sqoop -c '~/regular_extract.sh'

# 使用gpadmin用户执行定期装载函数
su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp3 -c "set search_path=tds;select fn_regular_load ();"'

/home/sqoop/regular_extract.sh内容如下:
#!/bin/bash

# 全量抽取客户表
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table customer --targe
t-dir /data/ext/customer --delete-target-dir --compress

# 全量抽取产品表
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table product --target
-dir /data/ext/product --delete-target-dir --compress

# 增量抽取销售订单表
sqoop job --exec myjob_incremental_import

3. 查询数据,确认ETL过程正确执行

        查询客户维度当前视图,结果如图2所示。
select *
from v_customer_dim_latest
order by customer_number;


图2        查询客户维度历史视图,部分结果如图3所示。
select *
from v_customer_dim_his
order by customer_number, version;


 图3
        可以看到,由于源库中为送货地址增加了缺省值,每条记录都新增了一个版本。老的过期记录的送货地址为空。9号客户是新加的,具有送货地址。
        查询订单维度表和事实表,结果如图4所示,新装载了9条订单记录。
select count(*) from order_dim;
select count(*) from sales_order_fact;


图4        查询事实表数据,结果如图5所示。
select * from sales_order_fact
where order_quantity >= 0
order by order_sk;


 图5        可以看到,只有9个订单有销售数量,老的销售数据数量字段为空。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: