BI建设实战–使用SSIS根据数据仓库数据有条件提取业务系统数据
2009-03-05 15:10
1041 查看
注:为统一风格,文中一些术语均使用了缩写:ETL,数据提取,或ODS系统;DW,数据仓库;ERP,业务系统。
最近做ETL,遇到一个需求,以前没实现过,先描述一下:
ERP中有一个表,存储了供应商费用结算单,数据是系统每天根据当天结算内容自动生成的。但这时还不是最终数据,在供应商前来结款走帐后,还要再更新一次数据,这时才完成了最终结算。至于这个最终时间那就没准了,最长的一笔未结算单据还是三年前的。这样,在将数据导入DW时,就有了这样的问题:没有结算的单据何时导入?
研究了一下实现思路, 初步有三种方法:
方法1、每天导入当日完成结算的单据。
这个方法看起来是最好的,但仔细一研究,发现技术上不可行。因为业务系统中只存储了单据生成日期,而没有最终结算日期,供应商前来结算,更新一下结算状态和金额就完事了,无法判断每天有哪些单据结算完成,也就没法导入了。
方法2、每天导入新的结算单据,然后查看DW中没有完成结算的单据,从ERP中找到对应的单据并更新DW。
这种方法看来是比较好的,不过技术上以前没做有做过。
方法3、每天删除数据仓库中的全部结算单据,然后从业务系统中整体重新导入。
这个最容易实现了,不过从系统性能方面来说,是最糟糕的一种法案。
经过考虑,决定采用第二种方法,于是开始研究使用何种方法实现。每日导入新增结算单据,这个实现起来很简单,用"数据流任务"组件即可。更新数据,则先想到了用数据流中的"OLE DB命令",通过这个组件,可以很容易地更新数据仓库数据。这时需要解决的就是如何从业务系统中提取这些需要更新的数据了。前后也考虑了多种方法,先列一下。
技术1、从DW中提取未结算单据,从ERP中提取所有单据,然后用"合并联接"组件进行联接,保留ERP中与DW中相对应的那部分单据,这些就是未结算单据,更新DW。
这里的问题和前面的方法1所涉及到的问题一样了,就是系统性能问题,每天全部读取一遍结算单据,对ERP来说也是很大的负担。
技术2、使用"脚本任务"编码进行未接单据的提取、查询和更新
用脚本组件进行硬编码,使用程序代码,先从DW中提取未结单据,然后执行遍历操作,得到ERP中对应的单据,并更新DW。
此方法性上比较好,但是数据转换的工作也要使用硬编码来做,实现起来较为复杂,而且也容易出错。因此最后未采用此方法。
技术3、从DW中提取需要更新的单据,然后据此从ERP中查询得到相应单据,后面的实现技术就和导入的操作差不多了。
这种方法无疑是最合适的,但实现的关键点就是如何根据DW中的单据提取ERP中的单据,下面说一下具体的实现过程。
一、使用"脚本任务"编码得到ERP查询语句,后面使用"数据流任务"处理即可
先想到这样一个思路:使用脚本从DW中提取需要更新的单据,然后遍历单据号,根据单据号生成一条查询语句,然后使用"数据流任务"用此语句直接从ERP中提取需要更新的单据,并执行更新操作。
1、定义一个变量,命名为"SequenceNumberListQuery",用于存储生成的查询语句
2、在控制流项中拖一个"脚本任务",然后使用程序实现数据提取与查询语句生成的功能。
设置"ReadWriteVariables"内容,选中变量"SequenceNumberListQuery"。然后编写脚本内容,代码如下:
脚本代码
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.OleDb;
using System.Data.SqlClient;
using System.Data.Odbc;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
IDTSConnectionManager100 cmDW;
SqlConnection connectionDW;
SqlDataReader drDW;
IDTSConnectionManager100 cmPSZX;
OleDbConnection connectionPSZX;
OleDbDataReader drPSZX;
public override void (object Transaction)
{
cmDW = this.Connections.DW;
cmPSZX = this.Connections.PSZX;
connectionDW = (SqlConnection)cmDW.AcquireConnection(null);
connectionPSZX = new OleDbConnection("Provider=msdaora;Data Source=xxxx;User Id=xxxxxxxx;Password=xxxxxxxx;");
}
public override void CreateNewOutputRows()
{
/*
Add rows by calling the AddRow method on the member variable named "<Output Name>Buffer".
For example, call MyOutputBuffer.AddRow() if your output was named "MyOutput".
*/
SqlCommand cmdDW = new SqlCommand("SELECT ExpenseTicketNumber FROM Guideline_SupplierExpenses WHERE (BalanceTicketNumber IS NULL) OR (BalanceDate IS NULL) OR IsBalance = 0", connectionDW);
drDW = cmdDW.ExecuteReader();
IList<string> expenseTicketNumberList = new List<string>();
if (drDW.HasRows)
{
while (drDW.Read())
{
expenseTicketNumberList.Add(drDW["ExpenseTicketNumber"].ToString());
}
drDW.Close();
}
cmDW.ReleaseConnection(connectionDW);
foreach (string expenseTicketNumber in expenseTicketNumberList)
{
OleDbCommand cmdPSZX = new OleDbCommand("SELECT * FROM fy WHERE fydjbh = '" + expenseTicketNumber + "'", connectionPSZX);
this.connectionPSZX.Open();
drPSZX = cmdPSZX.ExecuteReader();
if (drPSZX.HasRows)
{
while (drPSZX.Read())
{
MyOutputBuffer.AddRow();
MyOutputBuffer.SequenceNumber = (drPSZX["SequenceNumber"] is DBNull) ? string.Empty : drPSZX["SequenceNumber"].ToString();
MyOutputBuffer.ExpenseTicketNumber = (drPSZX["ExpenseTicketNumber"] is DBNull) ? string.Empty : drPSZX["ExpenseTicketNumber"].ToString();
MyOutputBuffer.BalanceTicketNumber = (drPSZX["BalanceTicketNumber"] is DBNull) ? string.Empty : drPSZX["BalanceTicketNumber"].ToString();
MyOutputBuffer.IsBalance = drPSZX["IsBalance"].ToString();
MyOutputBuffer.HappenDate = Convert.ToDateTime(drPSZX["HappenDate"]);
MyOutputBuffer.BalanceDate = (drPSZX["BalanceDate"] is DBNull) ? DateTime.MinValue : Convert.ToDateTime(drPSZX["BalanceDate"]);
MyOutputBuffer.Supplier = drPSZX["Supplier"].ToString();
MyOutputBuffer.OperativeMode = drPSZX["OperativeMode"].ToString();
MyOutputBuffer.SupplierExpensesType = drPSZX["SupplierExpensesType"].ToString();
MyOutputBuffer.ExpensesMoney = drPSZX["ExpensesMoney"].ToString();
MyOutputBuffer.InputOperator = drPSZX["InputOperator"].ToString();
MyOutputBuffer.InputTime = Convert.ToDateTime(drPSZX["InputTime"]);
MyOutputBuffer.Remark = (drPSZX["Remark"] is DBNull) ? string.Empty : drPSZX["Remark"].ToString();
}
drPSZX.Close();
}
this.connectionPSZX.Close();
}
}
}
这里简单说一下,AcquireConnections这个方法用于预处理数据库连接,CreateNewOutputRows这个方法用于输出数据行。运行时,脚本组件输出数据流,后面进行装换、更新处理即可。
最近做ETL,遇到一个需求,以前没实现过,先描述一下:
ERP中有一个表,存储了供应商费用结算单,数据是系统每天根据当天结算内容自动生成的。但这时还不是最终数据,在供应商前来结款走帐后,还要再更新一次数据,这时才完成了最终结算。至于这个最终时间那就没准了,最长的一笔未结算单据还是三年前的。这样,在将数据导入DW时,就有了这样的问题:没有结算的单据何时导入?
研究了一下实现思路, 初步有三种方法:
方法1、每天导入当日完成结算的单据。
这个方法看起来是最好的,但仔细一研究,发现技术上不可行。因为业务系统中只存储了单据生成日期,而没有最终结算日期,供应商前来结算,更新一下结算状态和金额就完事了,无法判断每天有哪些单据结算完成,也就没法导入了。
方法2、每天导入新的结算单据,然后查看DW中没有完成结算的单据,从ERP中找到对应的单据并更新DW。
这种方法看来是比较好的,不过技术上以前没做有做过。
方法3、每天删除数据仓库中的全部结算单据,然后从业务系统中整体重新导入。
这个最容易实现了,不过从系统性能方面来说,是最糟糕的一种法案。
经过考虑,决定采用第二种方法,于是开始研究使用何种方法实现。每日导入新增结算单据,这个实现起来很简单,用"数据流任务"组件即可。更新数据,则先想到了用数据流中的"OLE DB命令",通过这个组件,可以很容易地更新数据仓库数据。这时需要解决的就是如何从业务系统中提取这些需要更新的数据了。前后也考虑了多种方法,先列一下。
技术1、从DW中提取未结算单据,从ERP中提取所有单据,然后用"合并联接"组件进行联接,保留ERP中与DW中相对应的那部分单据,这些就是未结算单据,更新DW。
这里的问题和前面的方法1所涉及到的问题一样了,就是系统性能问题,每天全部读取一遍结算单据,对ERP来说也是很大的负担。
技术2、使用"脚本任务"编码进行未接单据的提取、查询和更新
用脚本组件进行硬编码,使用程序代码,先从DW中提取未结单据,然后执行遍历操作,得到ERP中对应的单据,并更新DW。
此方法性上比较好,但是数据转换的工作也要使用硬编码来做,实现起来较为复杂,而且也容易出错。因此最后未采用此方法。
技术3、从DW中提取需要更新的单据,然后据此从ERP中查询得到相应单据,后面的实现技术就和导入的操作差不多了。
这种方法无疑是最合适的,但实现的关键点就是如何根据DW中的单据提取ERP中的单据,下面说一下具体的实现过程。
一、使用"脚本任务"编码得到ERP查询语句,后面使用"数据流任务"处理即可
先想到这样一个思路:使用脚本从DW中提取需要更新的单据,然后遍历单据号,根据单据号生成一条查询语句,然后使用"数据流任务"用此语句直接从ERP中提取需要更新的单据,并执行更新操作。
1、定义一个变量,命名为"SequenceNumberListQuery",用于存储生成的查询语句
2、在控制流项中拖一个"脚本任务",然后使用程序实现数据提取与查询语句生成的功能。
设置"ReadWriteVariables"内容,选中变量"SequenceNumberListQuery"。然后编写脚本内容,代码如下:
脚本代码
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.OleDb;
using System.Data.SqlClient;
using System.Data.Odbc;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
IDTSConnectionManager100 cmDW;
SqlConnection connectionDW;
SqlDataReader drDW;
IDTSConnectionManager100 cmPSZX;
OleDbConnection connectionPSZX;
OleDbDataReader drPSZX;
public override void (object Transaction)
{
cmDW = this.Connections.DW;
cmPSZX = this.Connections.PSZX;
connectionDW = (SqlConnection)cmDW.AcquireConnection(null);
connectionPSZX = new OleDbConnection("Provider=msdaora;Data Source=xxxx;User Id=xxxxxxxx;Password=xxxxxxxx;");
}
public override void CreateNewOutputRows()
{
/*
Add rows by calling the AddRow method on the member variable named "<Output Name>Buffer".
For example, call MyOutputBuffer.AddRow() if your output was named "MyOutput".
*/
SqlCommand cmdDW = new SqlCommand("SELECT ExpenseTicketNumber FROM Guideline_SupplierExpenses WHERE (BalanceTicketNumber IS NULL) OR (BalanceDate IS NULL) OR IsBalance = 0", connectionDW);
drDW = cmdDW.ExecuteReader();
IList<string> expenseTicketNumberList = new List<string>();
if (drDW.HasRows)
{
while (drDW.Read())
{
expenseTicketNumberList.Add(drDW["ExpenseTicketNumber"].ToString());
}
drDW.Close();
}
cmDW.ReleaseConnection(connectionDW);
foreach (string expenseTicketNumber in expenseTicketNumberList)
{
OleDbCommand cmdPSZX = new OleDbCommand("SELECT * FROM fy WHERE fydjbh = '" + expenseTicketNumber + "'", connectionPSZX);
this.connectionPSZX.Open();
drPSZX = cmdPSZX.ExecuteReader();
if (drPSZX.HasRows)
{
while (drPSZX.Read())
{
MyOutputBuffer.AddRow();
MyOutputBuffer.SequenceNumber = (drPSZX["SequenceNumber"] is DBNull) ? string.Empty : drPSZX["SequenceNumber"].ToString();
MyOutputBuffer.ExpenseTicketNumber = (drPSZX["ExpenseTicketNumber"] is DBNull) ? string.Empty : drPSZX["ExpenseTicketNumber"].ToString();
MyOutputBuffer.BalanceTicketNumber = (drPSZX["BalanceTicketNumber"] is DBNull) ? string.Empty : drPSZX["BalanceTicketNumber"].ToString();
MyOutputBuffer.IsBalance = drPSZX["IsBalance"].ToString();
MyOutputBuffer.HappenDate = Convert.ToDateTime(drPSZX["HappenDate"]);
MyOutputBuffer.BalanceDate = (drPSZX["BalanceDate"] is DBNull) ? DateTime.MinValue : Convert.ToDateTime(drPSZX["BalanceDate"]);
MyOutputBuffer.Supplier = drPSZX["Supplier"].ToString();
MyOutputBuffer.OperativeMode = drPSZX["OperativeMode"].ToString();
MyOutputBuffer.SupplierExpensesType = drPSZX["SupplierExpensesType"].ToString();
MyOutputBuffer.ExpensesMoney = drPSZX["ExpensesMoney"].ToString();
MyOutputBuffer.InputOperator = drPSZX["InputOperator"].ToString();
MyOutputBuffer.InputTime = Convert.ToDateTime(drPSZX["InputTime"]);
MyOutputBuffer.Remark = (drPSZX["Remark"] is DBNull) ? string.Empty : drPSZX["Remark"].ToString();
}
drPSZX.Close();
}
this.connectionPSZX.Close();
}
}
}
这里简单说一下,AcquireConnections这个方法用于预处理数据库连接,CreateNewOutputRows这个方法用于输出数据行。运行时,脚本组件输出数据流,后面进行装换、更新处理即可。
相关文章推荐
- 中小型企业商业智能平台的开发和实现(数据仓库、BI系统、真实项目实战)
- 数据仓库,数据挖掘,OLAP,BI等系统技术深度建设
- 中小型企业商业智能平台的开发和实现(数据仓库、BI系统、真实项目实战)
- 中小型企业商业智能平台的开发和实现(数据仓库、BI系统、真实项目实战)
- 基于数据仓库的证券CRM系统建设
- 构建BI(商业智能)系统的核心——数据仓库引擎介绍
- 数据仓库--数据仓库系统的实现与使用(含OLAP重点讲解)
- 数据仓库建设-业务数据调研
- BI解决方案分享:地产BI数据分析系统的建设
- SQL Server BI Step by Step --- 使用SSIS进行简单的数据导入导出
- 微软MSBI零基础从数据仓库到商业智能实战(SSIS SSAS SSRS)
- 【案例实战】餐饮企业分店财务数据分析系统解决方案:业务需求
- SQL Server BI Step by Step 2--- 使用SSIS进行简单的数据导入导出
- 数据仓库的模型设计 A. 数据建模方法论 数据仓库模型设计遵循“自顶向下、逐步求精”的设计原则。 模型设计分为三个阶段: 1,概念模型 对业务的范围和使用,从高度上进行抽象概括,也就是划分主题域。 一
- 第三篇:数据仓库系统的实现与使用(含OLAP重点讲解)
- 数据仓库系统的实现与使用(含OLAP重点讲解)
- 通达OA 使用Ajax和工作流插件实现根据人力资源系统数据增加OA账号(图文详解)
- 实践:使用 Apache Hadoop 处理日志使用典型 Linux 系统上的 Hadoop 从日志中提取有用数据
- ASP.NET基础教程-Web 自定义控件的使用-根据属性值从数据库中提取数据并在页面上自动生成一个表格
- 业务系统和数据仓库