您的位置:首页 > 其它

使用ODI处理没有主键的表

2011-01-06 12:48 225 查看
一、背景
在使用ODI进行数据抽取时,经常会遇到没有主键的表,比如Oracle EBS中的MTL_TRANSACTION_LOT_NUMBERS。还有一些表如HR模块的,虽然存在唯一索引,但是一般都是通过ID与EFFECTIVE_START_DATE、EFFECTIVE_END_DATE组成复合唯一索引,然而EFFECTIVE_START_DATE、EFFECTIVE_END_DATE实际上发生变化,所以无法当成主键来处理,因为ODI的日志功能与增量刷新方式都必须要求主键不发生变化。下面将介绍对于没有主键的表的处理方法,以及每种方法存在的问题,最终引出实现增量刷新可能的最佳方式。
二、处理方法
1、全刷新
以TRUNC/INSERT方式来处理没有主键的表,这种方式可以确保数据的准确性,对于数据量小的表,不存在任何问题,但是就像之前所说的那些表,都是大数据量的表,特别象MTL_TRANSACTION_LOT_NUMBERS,可能达到百万以上,使用这种方式就会存在性能问题。
2、利用时间戳来增量
对于MTL_TRANSACTION_LOT_NUMBERS这样的表,只会做INSERT,而不存在UPDATE和DELETE的方法,可以采用时间戳的方式来处理。通过在源数据表筛选目标数据表MAX(LAST_UPDATE_DATE)之后的数据,INSERT到目标表。在源表中根据目标字段的值来筛选的具体实现代码可以参考Oracle Data Integrator中的Java代码中的描述。
使用这种方式,可以每次只插入最新的数据,在绝大多数情况下,是没有问题的,但是还是存在错误的可能性,比如在执行数据抽取时,如果有库存事务接口正在运行,就有可能发生数据遗漏的问题(批量处理时,Commit可能会置后,导致LAST_UPDATE_DATE有偏差),即使一般数据抽取过程在凌晨执行,也不能百分之百肯定凌晨是不会执行库存事务接口。
 
对于源表存在仅插入和更新的情况,可以通过时间戳抽取源一定时间间隔的数据,在目标删除此时间间隔的数据后插入到目标表中。
而当源表存在删除记录的情况,使用时间戳方式就无法适应了,因为一般情况下,时间戳抽取的时间间隔不可能太久,而删除的数据是不受时间**的。
3、利用ROWID
通过对源表创建包含ROWID的视图,可以利用ROWID作为主键,此时就可以满足要求,通过CDC来实现增量刷新。
虽然ROWID在大多数情况下不会发生改变,但是ROWID在数据更新时可能会发生变化,并且记录删除也会导致ROWID的重用,所以使用ROWID做主键也存在风险。
4、利用外键
一般没有主键的表总会包含外键或者不会发生变化的字段,在这种方式下,可以利用此字段,在日志表中记录变化的外键,从目标删除所有此外键的数据,再将源中所有该外键的数据写入目标。
具体方式如下:
(1) 在ODI模型中,为没有主键的表,以外键(或不会发生变化的字段)来创建主键,此主键仅在ODI模型中创建
(2) 客户化JKM,修改日志视图,使得日志视图能包含所有变化的数据
ODI在记录日志时,如果发生INSERT或UPDATE时,会在日志表中增加一条JRN_FLAG为"I"的记录,而发生DELETE时,会在日志表增加一条JRN_FLAG为"D"的记录。
我们所要实现的就是让所有变化的记录都能在日志视图时取出"I"和"D"的数据,所以我们在JKM的Create JV$ view的实现代码变为:
create or replace view <%=odiRef.getJrnInfo("JRN_FULL_VIEW")%> /* This view is created in the work physical schema of the current Data server */
as
select 'I' <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_FLAG,
JRN.JRN_SUBSCRIBER <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_SUBSCRIBER,
JRN.JRN_DATE <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_DATE,
<%=odiRef.getColList("", "JRN.[COL_NAME]/t/t" + odiRef.getInfo("DEST_COL_ALIAS_WORD") + " [COL_NAME]", ",/n/t", "", "PK")%>,
<%=odiRef.getColList("", "TARG.[COL_NAME]/t/t" + odiRef.getInfo("DEST_COL_ALIAS_WORD") + " [COL_NAME]", ",/n/t", "", "NOT PK")%>
from (
select L.JRN_SUBSCRIBER <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_SUBSCRIBER,
<%=odiRef.getColList("", "L.[COL_NAME]/t" + odiRef.getInfo("DEST_COL_ALIAS_WORD") + " [COL_NAME]", ",/n/t/t/t", ",", "PK")%>
max(L.JRN_DATE) <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_DATE
from <%=odiRef.getJrnInfo("JRN_FULL_NAME")%> <%= odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> L
where L.JRN_CONSUMED = '1'
group by L.JRN_SUBSCRIBER,
<%=odiRef.getColList("", "L.[COL_NAME]", ",/n/t/t/t", "", "PK")%>
) <%= odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> JRN,
<%=odiRef.getJrnInfo("FULL_TABLE_NAME")%> <%= odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> TARG
where <%=odiRef.getColList("", "JRN.[COL_NAME]/t= TARG.[COL_NAME]", "/nand/t", "", "PK")%>
UNION ALL
select 'D' <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_FLAG,
JRN.JRN_SUBSCRIBER <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_SUBSCRIBER,
JRN.JRN_DATE <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_DATE,
<%=odiRef.getColList("", "JRN.[COL_NAME]/t/t" + odiRef.getInfo("DEST_COL_ALIAS_WORD") + " [COL_NAME]", ",/n/t", "", "PK")%>,
<%=odiRef.getColList("", "TARG.[COL_NAME]/t/t" + odiRef.getInfo("DEST_COL_ALIAS_WORD") + " [COL_NAME]", ",/n/t", "", "NOT PK")%>
from (
select L.JRN_SUBSCRIBER <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_SUBSCRIBER,
<%=odiRef.getColList("", "L.[COL_NAME]/t" + odiRef.getInfo("DEST_COL_ALIAS_WORD") + " [COL_NAME]", ",/n/t/t/t", ",", "PK")%>
max(L.JRN_DATE) <%= odiRef.getInfo("DEST_COL_ALIAS_WORD")%> JRN_DATE
from <%=odiRef.getJrnInfo("JRN_FULL_NAME")%> <%= odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> L
group by L.JRN_SUBSCRIBER,
<%=odiRef.getColList("", "L.[COL_NAME]", ",/n/t/t/t", "", "PK")%>
) <%= odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> JRN,
<%=odiRef.getJrnInfo("FULL_TABLE_NAME")%> <%= odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> TARG
where <%=odiRef.getColList("", "JRN.[COL_NAME]/t= TARG.[COL_NAME] (+)", "/nand/t", "", "PK")%>
注意:这里与原代码的差别是原来的一段查询变成UNION ALL了两段查询,同时对于“I”的数据,取消了外连接。
或者在启用日志之后,修改生成的Trigger,Trigger的名称为T$<TABLE_NAME>。
(3) 客户化IKM,删除以下步骤:
162 Recycle previous errors
202 Remove deleted rows from flow table(这一步一定要删除,否则会将插入的记录也一并删除)
212 Flow control(由于将外键设为主键,所以如果启用Check,则数据肯定都是错误的,导致无法执行正确,必须删除)
222 Flag rows for update
242 Flag useless rows
252 Update existing rows(在这种模式下,更新变得多余)
(4) 修改IKM中步骤152 Insert flow into I$ table
将代码修改如下:
insert /*+ APPEND */ into <%=odiRef.getTable("L","INT_NAME","W")%>
(
<%=odiRef.getColList("", "[COL_NAME]", ",/n/t", "", "(((INS OR UPD) AND NOT TRG) AND REW)")%>,
IND_UPDATE
)

select <%=odiRef.getUserExit("OPTIMIZER_HINT")%> <%=odiRef.getPop("DISTINCT_ROWS")%>
<%=odiRef.getColList("", "[EXPRESSION]/t[CX_COL_NAME]", ",/n/t", "", "(((INS OR UPD) AND NOT TRG) AND REW)")%>,
JRN_FLAG
from <%=odiRef.getFrom()%>
where (1=1)
<%=odiRef.getJoin()%>
<%=odiRef.getFilter()%>
<%=odiRef.getJrnFilter()%>
<%=odiRef.getGrpBy()%>
<%=odiRef.getHaving()%>
(5) 在创建Inte**ce时,将IKM改成客户化KM
特别说明:在为模型中的表启用日志时,特别注意操作步骤。因为日志使用的JKM是设置在模型上,而不是模型的表上,所以首先设置标准的JKM来启用有主键的表的日志。当操作完成之后,将模型的JKM替换为客户化的JKM,然后再一个一个表的进行日志启用,这个必须非常注意。
三、MTL_TRANSACTION_LOT_NUMBERS的特殊处理
对于MTL_TRANSACTION_LOT_NUMBERS表,只存在INSERT记录的可能性,所以还有另外一种处理,就是在MTL_MATERIAL_TRANSACTIONS表上启用CDC时,增加一个新的订阅为MTL_TRANSACTION_LOT_NUMBERS使用,通过捕获MTL_MATERIAL_TRANSACTIONS上TRANSACTION_ID的新增,来实现MTL_TRANSACTION_LOT_NUMBERS表上的增量更新。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐