您的位置:首页 > 运维架构

HAWQ取代传统数仓实践(四)——定期ETL(Sqoop、HAWQ)

2017-05-12 16:03 801 查看

一、变化数据捕获(CDC)

        初始装载只在数据仓库开始使用前执行一次,然而,必须要周期性地执行装载源数据过程。与初始装载不同,定期装载一般都是增量的,并且需要捕获并且记录数据的变化历史。

1. 识别数据源与装载类型

        定期装载首先要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据。然后要决定适合装载的抽取模式和维度历史装载类型。表1总了本示例的这些信息。
[align=center]
数据源EXT模式RDS模式TDS模式抽取模式维度历史装载类型
customer
customer
customer
customer_dim
整体、拉取
所有属性均为SCD2
product
product
product
product_dim
整体、拉取
所有属性均为SCD2
sales_order
sales_order
sales_order
order_dim
CDC(每天)、拉取
唯一订单号
sales_order_fact
CDC(每天)、拉取
N/A
N/A
N/A
N/A
date_dim
N/A
预装载
[/align]表1

2. 处理渐变维(SCD)

        上一篇已经提到,HAWQ只有INSERT,没有UPDATE、DELETE操作,因此所有维度属性都使用SDC2记录全部历史变化。在捕获数据变化时,需要使用维度表的当前版本数据与从业务数据库最新抽取来的数据做比较。实现方式是在维度表上建立一个当前维度版本的视图,用于比较数据变化。这种设计既可以保留所有数据变化的历史,又屏蔽了查询当前版本的复杂性。
        事实表需要引用维度表的代理键,而且不一定是引用当前版本的代理键。比如有些迟到的事实,就必须找到事实发生时的维度版本。因此一个维度的所有版本区间应该构成一个连续且互斥时间范围,每个事实数据都能对应维度的唯一版本。实现方式是在维度表上建立一个维度历史版本的视图,在这个视图中增加版本过期日期导出列。任何一个版本的有效期是一个“左闭右开”的区间,也就是说该版本包含生效日期,但不包含过期日期,而是到过期日期的前一天为止。

3. 设置数据处理时间窗口

        对于事实表,我们采用基于时间戳的CDC增量装载模式,时间粒度为天。因此需要两个时间点,分别是本次装载的起始时间点和终止时间点,这两个时间点定义了本次处理的时间窗口,即装载这个时间区间内的数据。还要说明一点,这个区间是左包含的,就是处理的数据包括起始时间点的,但不包括终止时间点的。这样设计的原因是,我们既要处理完整的数据,不能有遗漏,又不能重复装载数据,这就要求时间处理窗口既要连续,又不能存在重叠的部分。

二、创建维度表当前版本视图

-- 切换到tds模式
set search_path=tds;

-- 建立客户维度当前视图
create or replace view v_customer_dim_latest as
select customer_sk,
customer_number,
customer_name,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
version,
effective_date
from (select distinct on (customer_number) customer_number,
customer_sk,
customer_name,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
isdelete,
version,
effective_date
from customer_dim
order by customer_number, customer_sk desc) as latest
where isdelete is false;

-- 建立产品维度当前视图
create or replace view v_product_dim_latest as
select product_sk,
product_code,
product_name,
product_category,
version,
effective_date
from (select distinct on (product_code) product_code,
product_sk,
product_name,
product_category,
isdelete,
version,
effective_date
from product_dim
order by product_code, product_sk desc) as latest
where isdelete is false;
        说明:
如前所述,创建维度表的当前视图。这里只为客户和产品维度创建视图,而订单维度不需要当前版本视图,因为假设业务上订单数据只能增加,不能修改,所以没有版本变化。
使用HAWQ的DISTINCT ON语法去重。DISTINCT ON ( expression [, …] )把记录根据[, …]的值进行分组,分组之后仅返回每一组的第一行。需要注意的是,如果不指定ORDER BY子句,返回的第一条的不确定的。如果使用了ORDER BY 子句,那么[, …]里面的值必须靠近ORDER BY子句的最左边。本例中我们按业务主键(customer_number、product_code)分组,每组按代理键(customer_sk、product_sk)倒排序,每组第一行即为维度的当前版本。

三、创建维度表历史视图

-- 切换到tds模式
set search_path=tds;

-- 建立客户维度历史视图,增加版本过期日期导出列
create or replace view v_customer_dim_his as
select *, date(lead(effective_date,1,date '2200-01-01') over (partition by customer_number order by effective_date)) expiry_date
from customer_dim;

-- 建立产品维度历史视图,增加版本过期日期导出列
create or replace view v_product_dim_his as
select *, date(lead(effective_date,1,date '2200-01-01') over (partition by product_code order by effective_date)) expiry_date
from product_dim;
        说明:
维度历史视图增加了版本的过期日期列。
使用LEAD窗口函数实现。以业务主键(customer_number、product_code)分区,每个分区内按生效日期排序。LEAD函数在一个分区内取到当前生效日期的下一个日期,该日期即为对应版本的过期日期。如果是当前版本,下一日期为空,则返回一个很大的时间值,大到足以满足数据仓库整个生命周期的需要,本示例设置的是2200年1月1日。

四、建立时间戳表

create table rds.cdc_time
(last_load date,
current_load date);

insert into rds.cdc_time select current_date - 1, current_date - 1;
        说明:
本示例中order_dim维度表和sales_order_fact事实表使用基于时间戳的CDC装载模式。为此在rds模式中建立一个名为cdc_time的时间戳表,这个表里有last_load和current_load两个字段。之所以需要两个字段,是因为抽取到的数据可能会多于本次需要处理的数据。比如,两点执行ETL过程,则零点到两点这两个小时的数据不会在本次处理。为了确定这个截至时间点,需要给时间戳设定一个上限条件,即这里的current_load字段值。
本示例的时间粒度为每天,所以时间戳只要保留日期部分即可,因此数据类型选为date。这两个字段的初始值是“初始加载”执行日期的前一天。当开始装载时,current_load设置为当前日期。

五、用Sqoop用户建立定期抽取脚本

        用sqoop操作系统用户建立初始数据抽取脚本文件~/regular_extract.sh,内容如下:
#!/bin/bash

# 全量抽取客户表
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table customer --targe
t-dir /data/ext/customer --delete-target-dir --compress

# 全量抽取产品表
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table product --target
-dir /data/ext/product --delete-target-dir --compress

# 增量抽取销售订单表
sqoop job --exec myjob_incremental_import
        这个文件与上一篇介绍的初始抽取的shell脚本基本相同,只是去掉了创建Sqoop作业的命令。每次装载后,都会将已经导入的最大订单号赋予增量抽取作业的last-value。
        将文件修改为可执行模式:
chmod 755 ~/regular_extract.sh

六、建立定期装载HAWQ函数

create or replace function fn_regular_load ()
returns void as
$$
declare
-- 设置scd的生效时间
v_cur_date date := current_date;
v_pre_date date := current_date - 1;
v_last_load date;
begin
-- 分析外部表
analyze ext.customer;
analyze ext.product;
analyze ext.sales_order;

-- 将外部表数据装载到原始数据表
truncate table rds.customer;
truncate table rds.product;

insert into rds.customer select * from ext.customer;
insert into rds.product select * from ext.product;
insert into rds.sales_order select * from ext.sales_order;

-- 分析rds模式的表
analyze rds.customer;
analyze rds.product;
analyze rds.sales_order;

-- 设置cdc的上限时间
select last_load into v_last_load from rds.cdc_time;
truncate table rds.cdc_time;
insert into rds.cdc_time select v_last_load, v_cur_date;

-- 装载客户维度
insert into tds.customer_dim
(customer_number,
customer_name,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
isdelete,
version,
effective_date)
select case flag
when 'D' then a_customer_number
else b_customer_number
end customer_number,
case flag
when 'D' then a_customer_name
else b_customer_name
end customer_name,
case flag
when 'D' then a_customer_street_address
else b_customer_street_address
end customer_street_address,
case flag
when 'D' then a_customer_zip_code
else b_customer_zip_code
end customer_zip_code,
case flag
when 'D' then a_customer_city
else b_customer_city
end customer_city,
case flag
when 'D' then a_customer_state
else b_customer_state
end customer_state,
case flag
when 'D' then true
else false
end isdelete,
case flag
when 'D' then a_version
when 'I' then 1
else a_version + 1
end v,
v_pre_date
from (select a.customer_number a_customer_number,
a.customer_name a_customer_name,
a.customer_street_address a_customer_street_address,
a.customer_zip_code a_customer_zip_code,
a.customer_city a_customer_city,
a.customer_state a_customer_state,
a.version a_version,
b.customer_number b_customer_number,
b.customer_name b_customer_name,
b.customer_street_address b_customer_street_address,
b.customer_zip_code b_customer_zip_code,
b.customer_city b_customer_city,
b.customer_state b_customer_state,
case when a.customer_number is null then 'I'
when b.customer_number is null then 'D'
else 'U'
end flag
from v_customer_dim_latest a
full join rds.customer b on a.customer_number = b.customer_number
where a.customer_number is null -- 新增
or b.customer_number is null -- 删除
or (a.customer_number = b.customer_number
and not
(a.customer_name = b.customer_name
and a.customer_street_address = b.customer_street_address
and a.customer_zip_code = b.customer_zip_code
and a.customer_city = b.customer_city
and a.customer_state = b.customer_state))) t
order by coalesce(a_customer_number, 999999999999), b_customer_number limit 999999999999;

-- 装载产品维度
insert into tds.product_dim
(product_code,
product_name,
product_category,
isdelete,
version,
effective_date)
select case flag
when 'D' then a_product_code
else b_product_code
end product_code,
case flag
when 'D' then a_product_name
else b_product_name
end product_name,
case flag
when 'D' then a_product_category
else b_product_category
end product_category,
case flag
when 'D' then true
else false
end isdelete,
case flag
when 'D' then a_version
when 'I' then 1
else a_version + 1
end v,
v_pre_date
from (select a.product_code a_product_code,
a.product_name a_product_name,
a.product_category a_product_category,
a.version a_version,
b.product_code b_product_code,
b.product_name b_product_name,
b.product_category b_product_category,
case when a.product_code is null then 'I'
when b.product_code is null then 'D'
else 'U'
end flag
from v_product_dim_latest a
full join rds.product b on a.product_code = b.product_code
where a.product_code is null -- 新增
or b.product_code is null -- 删除
or (a.product_code = b.product_code
and not
(a.product_name = b.product_name
and a.product_category = b.product_category))) t
order by coalesce(a_product_code, 999999999999), b_product_code limit 999999999999;

-- 装载order维度
insert into order_dim (order_number, version, effective_date)
select t.order_number, t.v, t.effective_date
from (select order_number, 1 v, order_date effective_date
from rds.sales_order, rds.cdc_time
where entry_date >= last_load and entry_date < current_load) t;

-- 装载销售订单事实表
insert into sales_order_fact
select order_sk,
customer_sk,
product_sk,
date_sk,
year * 100 + month,
order_amount
from rds.sales_order a,
order_dim b,
v_customer_dim_his c,
v_product_dim_his d,
date_dim e,
rds.cdc_time f
where a.order_number = b.order_number
and a.customer_number = c.customer_number
and a.order_date >= c.effective_date
and a.order_date < c.expiry_date
and a.product_code = d.product_code
and a.order_date >= d.effective_date
and a.order_date < d.expiry_date
and date(a.order_date) = e.date
and a.entry_date >= f.last_load and a.entry_date < f.current_load;

-- 分析tds模式的表
analyze customer_dim;
analyze product_dim;
analyze order_dim;
analyze sales_order_fact;

-- 更新时间戳表的last_load字段
truncate table rds.cdc_time;
insert into rds.cdc_time select v_cur_date, v_cur_date;

end;
$$
language plpgsql;
        说明:
该函数分成两大部分,一是装载RDS模式的表,而是处理TDS的表。
同初始装载一样,RDS模式表的数据来自从EXT模式的外部表,rds.customer和rds.product全量装载,rds.sales_order增量装载。
脚本中设置三个变量,v_last_load和v_cur_date分别赋予起始日期、终止日期,并且将时间戳表rds.cdc_time的last_load和current_load字段分别设置为起始日期和终止日期。v_pre_date表示版本过期日期。
维度表数据可能是新增、修改或删除。这里用FULL JOIN连接原始数据表与维度当前版本视图,统一处理这三种情况。外查询中使用CASE语句判断属于哪种情况,分别取得不同的字段值。
为了保证数据插入维度表时,代理键与业务主键保持相同的顺序,必须使用“order by coalesce(a_product_code, 999999999999), b_product_code limit 999999999999;”类似的语句。
订单维度增量装载,没有历史版本问题。
装载事实表时连接维度历史视图,引用事实数据所对应的维度代理键。该代理键可以通过维度版本的生效日期、过期日期区间唯一确定。
装载数据后,执行查询前,分析表以提高查询性能。
数据装载完成后,更新数据处理时间窗口。

七、用root用户建立定期ETL脚本

        用root操作系统用户建立初始ETL脚本文件~/regular_etl.sh,内容如下:
#!/bin/bash

# 外部表只保存销售订单增量数据
su - hdfs -c 'hdfs dfs -rm -r /data/ext/sales_order/*'

# 使用sqoop用户执行定期抽取脚本
su - sqoop -c '~/regular_extract.sh'

# 使用gpadmin用户执行定期装载函数
su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp3 -c "set search_path=tds;select fn_regular_load ();"'
        该文件的作用与初始ETL的shell脚本基本相同,为定期ETL提供统一的执行入口。
        将文件修改为可执行模式:
chmod 755 ~/regular_etl.sh

八、测试定期ETL过程

1. 准备测试数据

        在MySQL数据库中执行下面的SQL脚本准备源数据库中的客户、产品和销售订单测试数据。
use source;

/***
客户数据的改变如下:
客户6的街道号改为7777 ritter rd。(原来是7070 ritter rd)
客户7的姓名改为distinguished agencies。(原来是distinguished partners)
新增第八个客户。
***/
update customer set customer_street_address = '7777 ritter rd.' where customer_number = 6 ;
update customer set customer_name = 'distinguished agencies' where customer_number = 7 ;
insert into customer
(customer_name, customer_street_address, customer_zip_code, customer_city, customer_state)
values
('subsidiaries', '10000 wetline blvd.', 17055, 'pittsburgh', 'pa') ;

/***
产品数据的改变如下:
产品3的名称改为flat panel。(原来是lcd panel)
新增第四个产品。
***/
update product set product_name = 'flat panel' where product_code = 3 ;
insert into product
(product_name, product_category)
values
('keyboard', 'peripheral') ;

/***
新增订单日期为2017年5月4日的16条订单。
***/
set @start_date := unix_timestamp('2017-05-04');
set @end_date := unix_timestamp('2017-05-05');
drop table if exists temp_sales_order_data;
create table temp_sales_order_data as select * from sales_order where 1=0;

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (101, 1, 1, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (102, 2, 2, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (103, 3, 3, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (104, 4, 4, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (105, 5, 2, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (106, 6, 2, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (107, 7, 3, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (108, 8, 4, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (109, 1, 1, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (110, 2, 2, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (111, 3, 3, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (112, 4, 4, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (113, 5, 1, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (114, 6, 2, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (115, 7, 3, @order_date, @order_date, @amount);

set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (116, 8, 4, @order_date, @order_date, @amount);

insert into sales_order
select null,customer_number,product_code,order_date,entry_date,order_amount from temp_sales_order_data order by order_date;

commit ;

2. 执行定期ETL脚本

        用root用户执行定期ETL脚本。
~/regular_etl.sh

3. 查询数据,确认ETL过程正确执行

        查询客户维度当前视图,结果如图1所示。
select customer_sk,
customer_number,
customer_name,
customer_street_address,
version,
effective_date
from v_customer_dim_latest
order by customer_number;


图1
        查询客户维度历史视图,结果如图2所示。
select customer_sk c_sk,
customer_number c_num,
customer_name c_name,
customer_street_address c_address,
version,
effective_date,
expiry_date,
isdelete
from v_customer_dim_his
order by customer_number, version;


图2
        查询产品维度当前视图,结果如图3所示。
select product_sk,
product_code,
product_name,
version,
effective_date
from v_product_dim_latest
order by product_code;


图3
        查询客户维度历史视图,结果如图4所示。
select product_sk,
product_code,
product_name,
version,
effective_date,
expiry_date
from v_product_dim_his
order by product_code, version;


图4
查询订单维度表和事实表,结果如图5所示,新装载了16条订单记录。
select count(*) from order_dim;
select count(*) from sales_order_fact;


图5
        查询事实表数据,结果如图6所示。
select * from sales_order_fact
where order_sk > 100
order by order_sk;


图6
        可以看到,customer_sk没有6,7,而是8、9,10为新增;product_sk用4代替3,5为新增。
        查询时间窗口表,结果如图7所示。
select * from rds.cdc_time;


图7
        可以看到时间窗口已经更新。

九、动态分区滚动

        rds.sales_order和tds.sales_order_fact都是按月做的范围分区,需要进一步设计滚动分区维护策略。通过维护一个数据滚动窗口,删除老分区,添加新分区,将老分区的数据迁移到数据仓库以外的次级存储,以节省系统开销。下面的HAWQ函数按照转储最老分区数据、删除最老分区数据、建立新分区的步骤动态滚动分区。
-- 创建动态滚动分区的函数
create or replace function tds.fn_rolling_partition(p_year_month_start date) returns int
as $body$
declare
v_min_partitiontablename name;
v_year_month_end date := p_year_month_start + interval '1 month';
v_year_month_start_int int := extract(year from p_year_month_start) * 100 + extract(month from p_year_month_start);
v_year_month_end_int int := extract(year from v_year_month_end) * 100 + extract(month from v_year_month_end);
sqlstring varchar(1000);
begin

-- 处理rds.sales_order

-- 转储最早一个月的数据,
select partitiontablename
into v_min_partitiontablename
from pg_partitions
where tablename='sales_order' and partitionrank = 1;

sqlstring = 'copy (select * from ' || v_min_partitiontablename || ') to ''/home/gpadmin/sales_order_' || cast(v_year_month_start_int as varchar) || '.txt'' with delimiter ''|'';';
execute sqlstring;
-- raise notice '%', sqlstring;

-- 删除最早月份对应的分区
sqlstring := 'alter table sales_order drop partition for (rank(1));';
execute sqlstring;

-- 增加下一个月份的新分区
sqlstring := 'alter table sales_order add partition start (date '''|| p_year_month_start ||''') inclusive end (date '''||v_year_month_end  ||''') exclusive;';
execute sqlstring;
-- raise notice '%', sqlstring;

-- 处理tds.sales_order_fact

-- 转储最早一个月的数据,
select partitiontablename
into v_min_partitiontablename
from pg_partitions
where tablename='sales_order_fact' and partitionrank = 1;

sqlstring = 'copy (select * from ' || v_min_partitiontablename || ') to ''/home/gpadmin/sales_order_fact_' || cast(v_year_month_start_int as varchar) || '.txt'' with delimiter ''|'';';
execute sqlstring;
-- raise notice '%', sqlstring;

-- 删除最早月份对应的分区
sqlstring := 'alter table sales_order_fact drop partition for (rank(1));';
execute sqlstring;

-- 增加下一个月份的新分区
sqlstring := 'alter table sales_order_fact add partition start ('||cast(v_year_month_start_int as varchar)||') inclusive end ('||cast(v_year_month_end_int as varchar)||') exclusive;';
execute sqlstring;
-- raise notice '%', sqlstring;

-- 正常返回1
return 1;

-- 异常返回0
exception when others then
raise exception '%: %', sqlstate, sqlerrm;
return 0;
end
$body$ language plpgsql;
        将执行该函数的psql命令行放到cron中自动执行。下面的例子表示每月1号2点执行分区滚动操作。假设数据仓库中只保留最近一年的销售数据。
0 2 1 * * psql -d dw -c "set search_path=rds,tds; select fn_rolling_partition(date(date_trunc('month',current_date) + interval '1 month'));" > rolling_partition.log 2>&1

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: