您的位置:首页 > 数据库 > Oracle

【转】数据仓库拉链算法在ORACLE中的实现

2016-03-17 16:50 603 查看
1、 拉链算法用到的相关表结构

建表脚本(源表、目的表、临时表).sql

[c-sharp]view plain copy print?

--生成目标表
create table EDW_T100_STATUS_H
(
ID VARCHAR2(8) not null,
STATUS VARCHAR2(8) not null,
START_DATE DATE not null,
END_DATE DATE not null
)
;
--生成源表
create table ODS_XT_ZT
(
ID VARCHAR2(8),
STATUS VARCHAR2(8),
ODS_DATA_DATE VARCHAR2(8)
)
;
--生成数据恢复机制用的临时表
create global temporary table TMP_T100_STATUS_H
(
ID VARCHAR2(8) not null,
STATUS VARCHAR2(8) not null,
START_DATE DATE not null,
END_DATE DATE not null
)
on commit delete rows;
--生成存放历史的临时表
create global temporary table TMP_T100_STATUS_H_PRE
(
ID VARCHAR2(8) not null,
STATUS VARCHAR2(8) not null,
START_DATE DATE not null,
END_DATE DATE not null
)
on commit delete rows;
--生成存放当前数据的临时表
create global temporary table TMP_T100_STATUS_H_CUR
(
ID VARCHAR2(8) not null,
STATUS VARCHAR2(8) not null,
START_DATE DATE not null,
END_DATE DATE not null
)
on commit delete rows;
--生成存放新增数据(包含:1、CUR中有而PRE中没有的数据;2、CUR中有PRE中也有,但属性值有更新的,也需要新增开链的数据)的临时表
create global temporary table TMP_T100_STATUS_H_INS
(
ID VARCHAR2(8) not null,
STATUS VARCHAR2(8) not null,
START_DATE DATE not null,
END_DATE DATE not null
)
on commit delete rows;
--生成存放需要关链数据(包含:1、PRE中有而CUR中没有;2、PRE中有CUR中也有,但属性值有更新,需要将老数据关链的数据)的临时表
create global temporary table TMP_T100_STATUS_H_UPD
(
ID VARCHAR2(8) not null,
STATUS VARCHAR2(8) not null,
START_DATE DATE not null,
END_DATE DATE not null
)
on commit delete rows;

2、 插入测试数据

INSERT到源表测试数据.sql

[c-sharp]view plain copy print?

TRUNCATE TABLE Ods_Xt_Zt;
INSERT INTO Ods_Xt_Zt(ID,Status,Ods_Data_Date)
SELECT '1','正常','20100101' FROM dual
UNION ALL
SELECT '2','正常','20100101' FROM dual
UNION ALL
SELECT '3','正常','20100101' FROM dual
;
INSERT INTO Ods_Xt_Zt(ID,Status,Ods_Data_Date)
SELECT '1','正常','20100102' FROM dual
UNION ALL
SELECT '2','呆滞','20100102' FROM dual
UNION ALL
SELECT '4','正常','20100102' FROM dual
;
COMMIT;
--select * from Ods_Xt_Zt;

3、 实现拉链算法的代码

拉链过程.sql

[c-sharp]view plain copy print?

DECLARE
P_ETLDATE VARCHAR2(8) := '20100102'; --日期参数
O_RUNSTATUS NUMBER; --执行结果
O_MSG VARCHAR2(100); --错误返回
--定义存储过程信息
V_PROC_NAME VARCHAR2(50) := 'P_T100_STATUS_H';
V_TABLE_NAME VARCHAR2(50) := 'EDW_T100_STATUS_H';
V_START_TIMESTAMP TIMESTAMP; --加载开始时间
V_END_TIMESTAMP TIMESTAMP; --加载结束时间
V_RECORD_NUMBER INTEGER; --记录数
--定义错误代码,错误状态
V_SQLERRM VARCHAR2(1000); --异常信息
V_ERR_SQL VARCHAR2(1000); --出错位置
BEGIN

--捕获过程开始时间
SELECT SYSDATE INTO V_START_TIMESTAMP FROM dual;

--数据恢复(删除start_date大于当天的数据;将end_date大于当天且不为无限大的数据将end_date更新为无限大)
--(更新操作之所以使用先删除再插入而不是直接UPDATE的方式是基于效率的考虑)
V_ERR_SQL:='DELETE FROM EDW_T100_STATUS_H';
DELETE FROM EDW_T100_STATUS_H
WHERE Start_Date >= TO_DATE(P_ETLDATE,'YYYY-MM-DD')
;
V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H';
INSERT INTO TMP_T100_STATUS_H
SELECT *
FROM EDW_T100_STATUS_H
WHERE End_Date >= TO_DATE(P_ETLDATE,'YYYY-MM-DD')
AND End_Date <> TO_DATE('30001231','YYYY-MM-DD')
;
V_ERR_SQL:='DELETE FROM EDW_T100_STATUS_H-2';
DELETE FROM EDW_T100_STATUS_H
WHERE End_Date >= TO_DATE(P_ETLDATE,'YYYY-MM-DD')
AND End_Date <> TO_DATE('30001231','YYYY-MM-DD')
;
V_ERR_SQL:='INSERT INTO EDW_T100_STATUS_H FROM TMP_T100_STATUS_H';
INSERT INTO EDW_T100_STATUS_H
(Id
,Status
,Start_Date
,End_Date
)
SELECT Id
,Status
,Start_Date
,TO_DATE('30001231','YYYY-MM-DD')
FROM TMP_T100_STATUS_H
;

--取目标表中的上日状态正常(未被封链,即end_date为最大日期)的数据
V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H_PRE';
INSERT INTO TMP_T100_STATUS_H_PRE
(Id
,Status
,Start_Date
,End_Date
)
SELECT Id
,Status
,Start_Date
,End_Date
FROM EDW_T100_STATUS_H
WHERE END_DATE = TO_DATE('30001231','YYYY-MM-DD')
;

--从源表中获取当日数据(将CUR中的start_date置为当天,end_date置为最大日期)
V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H_CUR';
INSERT INTO TMP_T100_STATUS_H_CUR
(Id
,Status
,Start_Date
,End_Date
)
SELECT ID
,Status
,TO_DATE(P_ETLDATE,'YYYY-MM-DD')
,TO_DATE('30001231','YYYY-MM-DD')
FROM ODS_XT_ZT
WHERE Ods_Data_Date = P_ETLDATE
;

--[cur-pre] = 增量数据{需要开链}(包含两部分:1、在CUR不在PRE中的纯新增数据;2、在CUR也在PRE中,但CUR中其属性值有更新。=>关联关系中,有主键,也有属性,才能全部包含这两部分)
V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H_INS';
INSERT INTO TMP_T100_STATUS_H_INS
(Id
,Status
,Start_Date
,End_Date
)
SELECT a1.Id
,a1.Status
,a1.Start_Date
,a1.End_Date
FROM TMP_T100_STATUS_H_CUR a1
WHERE NOT EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_PRE a2
WHERE a1.Id = a2.Id
AND a1.Status=a2.Status
)
;

--[pre - cur] = 要封链的数据(包含两部分:1、在PRE中不在CUR中的纯封链数据;2、在PRE中也在CUR中,但CUR中属性值已经改变。=>关联关系中,有主键,也有属性,才能全部包含这两部分)
V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H_UPD';
INSERT INTO TMP_T100_STATUS_H_UPD
(Id
,Status
,Start_Date
,End_Date
)
SELECT a1.Id
,a1.Status
,a1.Start_Date
,a1.End_Date
FROM TMP_T100_STATUS_H_PRE a1
WHERE NOT EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_CUR a3
WHERE a1.Id = a3.Id
AND a1.Status = a3.Status
)
;

--删除目标表中需要更新的数据(采用先删除再插入而不是直接UPDATE的方式,完全出于效率考虑){筛选条件用:主键+开始时间}
V_ERR_SQL:='DELETE FROM EDW_T100_STATUS_H';
DELETE FROM EDW_T100_STATUS_H
WHERE (Id,Start_Date)
IN (SELECT Id
,start_date
FROM TMP_T100_STATUS_H_UPD
)
;

--向目标表中插入更新数据(将结束日期值置成当天日期)
V_ERR_SQL:='INSERT INTO EDW_T100_STATUS_H FROM TMP_T100_STATUS_H_UPD';
INSERT INTO EDW_T100_STATUS_H
(Id
,Status
,Start_Date
,End_Date
)
SELECT Id
,Status
,Start_Date
,TO_DATE(P_ETLDATE,'YYYY-MM-DD')
FROM TMP_T100_STATUS_H_UPD
;

--向目标表中插入新增数据
V_ERR_SQL:='INSERT INTO EDW_T100_STATUS_H FROM TMP_T100_STATUS_H_INS';
INSERT INTO EDW_T100_STATUS_H
(Id
,Status
,Start_Date
,End_Date
)
SELECT Id
,Status
,TO_DATE(P_ETLDATE,'YYYY-MM-DD')
,TO_DATE('30001231','YYYY-MM-DD')
FROM TMP_T100_STATUS_H_INS
;

--正常处理
V_RECORD_NUMBER := SQL%ROWCOUNT;
SELECT SYSDATE INTO V_END_TIMESTAMP FROM dual;
INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,P_ETLDATE)
VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,V_RECORD_NUMBER,'成功',P_ETLDATE);
COMMIT;

--异常处理
EXCEPTION WHEN OTHERS THEN
BEGIN
ROLLBACK;
V_SQLERRM := SQLERRM;
INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,ERR_MSG,ERR_SQL,P_ETLDATE)
VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,0,'失败',V_SQLERRM,V_ERR_SQL,P_ETLDATE);
O_RUNSTATUS := 1;
O_MSG := 'PROGRAMMING ERROR HAPPENED!';
COMMIT;
END;
END;

4、 总结说明

(1)、拉链算法要求提供的数据源最好以全量方式提供;要深入理解[INS=CUR-PRE]和[UPD=PRE-CUR]所分别包含的两个方面;表UPD及更新特指目标表中需要将end_date从max_date置为p_etldate,与源表中的属性字段更新,目标表需要先关链再开链,有本质区别;相关关键注释,在代码中有详细说明。

(2)、比对,就是全字段比对,包含主键和属性;源数据如果是增量的情况,就没办法给已经删除的记录关链,因为删掉的记录不会再传过来,在源数据的表现上,跟没有变化的数据是一样的,这时的UDP表只包含源数据中有变化的那一部分需要封链的记录,源表中被删除的记录,将永不封链;如果源表有增量传的也有全量传的,那只能作为增量数据来考虑,还要作特殊处理,条件为in(主键)and_not_in(主键+属性)。

(3)、针对不同的源数据供数方式,拉链算法的不同主要体现在INSERT到UPD表的关联上,插入到INS上等,并没有不同

全量的情况:

[c-sharp]view plain copy print?

V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H_UPD';
INSERT INTO TMP_T100_STATUS_H_UPD
(Id
,Status
,Start_Date
,End_Date
)
SELECT a1.Id
,a1.Status
,a1.Start_Date
,a1.End_Date
FROM TMP_T100_STATUS_H_PRE a1
WHERE /*EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_CUR a2
WHERE a1.Id = a2.Id
)
AND */NOT EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_CUR a3
WHERE a1.Id = a3.Id
AND a1.Status = a3.Status
)
;

增量的情况:

[c-sharp]view plain copy print?

V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H_UPD';
INSERT INTO TMP_T100_STATUS_H_UPD
(Id
,Status
,Start_Date
,End_Date
)
SELECT a1.Id
,a1.Status
,a1.Start_Date
,a1.End_Date
FROM TMP_T100_STATUS_H_PRE a1
WHERE EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_CUR a2
WHERE a1.Id = a2.Id
)
/*AND NOT EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_CUR a3
WHERE a1.Id = a3.Id
AND a1.Status = a3.Status
)*/
;

有增量也有全量的情况(只能作为增量,且将全量的处理为增量):

[c-sharp]view plain copy print?

V_ERR_SQL:='INSERT INTO TMP_T100_STATUS_H_UPD';
INSERT INTO TMP_T100_STATUS_H_UPD
(Id
,Status
,Start_Date
,End_Date
)
SELECT a1.Id
,a1.Status
,a1.Start_Date
,a1.End_Date
FROM TMP_T100_STATUS_H_PRE a1
WHERE EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_CUR a2
WHERE a1.Id = a2.Id
)
AND NOT EXISTS (SELECT 1
FROM TMP_T100_STATUS_H_CUR a3
WHERE a1.Id = a3.Id
AND a1.Status = a3.Status
)
;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: