您的位置:首页 > 其它

BI建设实战–使用SSIS根据数据仓库数据有条件提取业务系统数据

2009-03-05 15:10 1041 查看
注:为统一风格,文中一些术语均使用了缩写:ETL,数据提取,或ODS系统;DW,数据仓库;ERP,业务系统。

最近做ETL,遇到一个需求,以前没实现过,先描述一下:

ERP中有一个表,存储了供应商费用结算单,数据是系统每天根据当天结算内容自动生成的。但这时还不是最终数据,在供应商前来结款走帐后,还要再更新一次数据,这时才完成了最终结算。至于这个最终时间那就没准了,最长的一笔未结算单据还是三年前的。这样,在将数据导入DW时,就有了这样的问题:没有结算的单据何时导入?

研究了一下实现思路, 初步有三种方法:

方法1、每天导入当日完成结算的单据。

这个方法看起来是最好的,但仔细一研究,发现技术上不可行。因为业务系统中只存储了单据生成日期,而没有最终结算日期,供应商前来结算,更新一下结算状态和金额就完事了,无法判断每天有哪些单据结算完成,也就没法导入了。

方法2、每天导入新的结算单据,然后查看DW中没有完成结算的单据,从ERP中找到对应的单据并更新DW。

这种方法看来是比较好的,不过技术上以前没做有做过。

方法3、每天删除数据仓库中的全部结算单据,然后从业务系统中整体重新导入。

这个最容易实现了,不过从系统性能方面来说,是最糟糕的一种法案。

经过考虑,决定采用第二种方法,于是开始研究使用何种方法实现。每日导入新增结算单据,这个实现起来很简单,用"数据流任务"组件即可。更新数据,则先想到了用数据流中的"OLE DB命令",通过这个组件,可以很容易地更新数据仓库数据。这时需要解决的就是如何从业务系统中提取这些需要更新的数据了。前后也考虑了多种方法,先列一下。

技术1、从DW中提取未结算单据,从ERP中提取所有单据,然后用"合并联接"组件进行联接,保留ERP中与DW中相对应的那部分单据,这些就是未结算单据,更新DW。

这里的问题和前面的方法1所涉及到的问题一样了,就是系统性能问题,每天全部读取一遍结算单据,对ERP来说也是很大的负担。

技术2、使用"脚本任务"编码进行未接单据的提取、查询和更新

用脚本组件进行硬编码,使用程序代码,先从DW中提取未结单据,然后执行遍历操作,得到ERP中对应的单据,并更新DW。

此方法性上比较好,但是数据转换的工作也要使用硬编码来做,实现起来较为复杂,而且也容易出错。因此最后未采用此方法。

技术3、从DW中提取需要更新的单据,然后据此从ERP中查询得到相应单据,后面的实现技术就和导入的操作差不多了。

这种方法无疑是最合适的,但实现的关键点就是如何根据DW中的单据提取ERP中的单据,下面说一下具体的实现过程。

一、使用"脚本任务"编码得到ERP查询语句,后面使用"数据流任务"处理即可

先想到这样一个思路:使用脚本从DW中提取需要更新的单据,然后遍历单据号,根据单据号生成一条查询语句,然后使用"数据流任务"用此语句直接从ERP中提取需要更新的单据,并执行更新操作。

1、定义一个变量,命名为"SequenceNumberListQuery",用于存储生成的查询语句

2、在控制流项中拖一个"脚本任务",然后使用程序实现数据提取与查询语句生成的功能。

设置"ReadWriteVariables"内容,选中变量"SequenceNumberListQuery"。然后编写脚本内容,代码如下:

脚本代码
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.OleDb;
using System.Data.SqlClient;
using System.Data.Odbc;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
IDTSConnectionManager100 cmDW;
SqlConnection connectionDW;
SqlDataReader drDW;
IDTSConnectionManager100 cmPSZX;
OleDbConnection connectionPSZX;
OleDbDataReader drPSZX;

public override void (object Transaction)
{
cmDW = this.Connections.DW;
cmPSZX = this.Connections.PSZX;

connectionDW = (SqlConnection)cmDW.AcquireConnection(null);
connectionPSZX = new OleDbConnection("Provider=msdaora;Data Source=xxxx;User Id=xxxxxxxx;Password=xxxxxxxx;");
}

public override void CreateNewOutputRows()
{
/*
Add rows by calling the AddRow method on the member variable named "<Output Name>Buffer".
For example, call MyOutputBuffer.AddRow() if your output was named "MyOutput".
*/

SqlCommand cmdDW = new SqlCommand("SELECT ExpenseTicketNumber FROM Guideline_SupplierExpenses WHERE (BalanceTicketNumber IS NULL) OR (BalanceDate IS NULL) OR IsBalance = 0", connectionDW);
drDW = cmdDW.ExecuteReader();

IList<string> expenseTicketNumberList = new List<string>();

if (drDW.HasRows)
{
while (drDW.Read())
{
expenseTicketNumberList.Add(drDW["ExpenseTicketNumber"].ToString());
}
drDW.Close();
}

cmDW.ReleaseConnection(connectionDW);

foreach (string expenseTicketNumber in expenseTicketNumberList)
{

OleDbCommand cmdPSZX = new OleDbCommand("SELECT * FROM fy WHERE fydjbh = '" + expenseTicketNumber + "'", connectionPSZX);
this.connectionPSZX.Open();
drPSZX = cmdPSZX.ExecuteReader();

if (drPSZX.HasRows)
{
while (drPSZX.Read())
{
MyOutputBuffer.AddRow();
MyOutputBuffer.SequenceNumber = (drPSZX["SequenceNumber"] is DBNull) ? string.Empty : drPSZX["SequenceNumber"].ToString();
MyOutputBuffer.ExpenseTicketNumber = (drPSZX["ExpenseTicketNumber"] is DBNull) ? string.Empty : drPSZX["ExpenseTicketNumber"].ToString();
MyOutputBuffer.BalanceTicketNumber = (drPSZX["BalanceTicketNumber"] is DBNull) ? string.Empty : drPSZX["BalanceTicketNumber"].ToString();
MyOutputBuffer.IsBalance = drPSZX["IsBalance"].ToString();
MyOutputBuffer.HappenDate = Convert.ToDateTime(drPSZX["HappenDate"]);
MyOutputBuffer.BalanceDate = (drPSZX["BalanceDate"] is DBNull) ? DateTime.MinValue : Convert.ToDateTime(drPSZX["BalanceDate"]);
MyOutputBuffer.Supplier = drPSZX["Supplier"].ToString();
MyOutputBuffer.OperativeMode = drPSZX["OperativeMode"].ToString();
MyOutputBuffer.SupplierExpensesType = drPSZX["SupplierExpensesType"].ToString();
MyOutputBuffer.ExpensesMoney = drPSZX["ExpensesMoney"].ToString();
MyOutputBuffer.InputOperator = drPSZX["InputOperator"].ToString();
MyOutputBuffer.InputTime = Convert.ToDateTime(drPSZX["InputTime"]);
MyOutputBuffer.Remark = (drPSZX["Remark"] is DBNull) ? string.Empty : drPSZX["Remark"].ToString();
}
drPSZX.Close();
}
this.connectionPSZX.Close();
}
}
}

这里简单说一下,AcquireConnections这个方法用于预处理数据库连接,CreateNewOutputRows这个方法用于输出数据行。运行时,脚本组件输出数据流,后面进行装换、更新处理即可。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐