您的位置:首页 > 数据库 > SQL

如何用SqlBulkCopy完成未知列数和列名的数据表的采集

2013-10-16 23:45 501 查看
SqlBulkCopy 很简单也很好用, 但关键是当数据来源表非常灵活而多变时(事先不知道有多少列,也不知道每1列的列名), 如何来做这件事呢?

1. 写入表设置为101列, 第1列为批次,每写入一批数据作为一批;

2. 写入表中的除batch之外的列均为 nvarchar(max);

3. 写入数据时要将来源表中的表头存放到另一张表中, 便于以后的导出。

A. 测试数据准备

--1. 数据源表
IF OBJECT_ID('source_table') IS NOT NULL
	DROP TABLE source_table
GO
CREATE TABLE source_table
(
	school NVARCHAR(100),
	account VARCHAR(200),
	cnt_ct  INT,
	createTime DATETIME
)
GO
INSERT INTO source_table
SELECT '春兰小学','liuming',25,'2005-10-23' union
SELECT '春兰大学','leaf',25,'2005-10-20'
GO
--2. 目标表
IF OBJECT_ID('target_table') IS NOT NULL
	DROP TABLE target_table
GO
CREATE TABLE target_table
(
	batch BIGINT,
	C1 NVARCHAR(MAX),
	C2 NVARCHAR(MAX),
	C3 NVARCHAR(MAX),
	C4 NVARCHAR(MAX),
	C5 NVARCHAR(MAX),
	C6 NVARCHAR(MAX),
	C7 NVARCHAR(MAX),
	C8 NVARCHAR(MAX),
	C9 NVARCHAR(MAX),
	C10 NVARCHAR(MAX)
)
GO
--3. 表头表
IF OBJECT_ID('header_table') IS NOT NULL
	DROP TABLE header_table
GO
CREATE TABLE header_table
(
	id BIGINT IDENTITY(1,1),
	batch BIGINT,
	columnName NVARCHAR(MAX),
	columnIndex int
)
GO


2. DBHelper: 见: 点击打开链接

3. 测试控制台程序:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Collections;
using System.Data;
using Common;

namespace ConsoleApplication1
{
    class Program_6
    {
        static void Main(string[] args)
        {
            SqlConnection ConnectionNew = new SqlConnection("Data Source=leaf-home\\sqlserver2005;Initial Catalog=Test;Persist Security Info=True;User ID=??;Password=??");
            SqlConnection ConnectionOld = new SqlConnection("Data Source=leaf-home\\sqlserver2005;Initial Catalog=Test;Persist Security Info=True;User ID=??;Password=??");
            try
            {
                ConnectionNew.Open();
                ConnectionOld.Open();

                //1.在旧表中,用SqlDataAdapter读取出信息  
                string SQL = "select school 学校,account 账号,cnt_ct 短信量, createTime 创建日期 from source_table";

                DataTable dt = Common.DBHelper.GetDataTableBySql(SQL);      //数据源表
                DataTable dt_clone = dt.Clone();                            //复制表, 用于加多一列, 并将其它列改为String类型

                long batch = 3;
                DataColumn dcBatch = new DataColumn("batch");
                dcBatch.DataType = typeof(long);// Type.GetType("System.Int64");
                dt_clone.Columns.Add(dcBatch);
                dcBatch.SetOrdinal(0);  //设置为第1列 非常重要

                for (int i = 1; i < dt.Columns.Count; i++)
                {
                    dt_clone.Columns[i].DataType = Type.GetType("System.String");
                }

                foreach (DataRow dr in dt.Rows)
                {
                    DataRow dr_clone = dt_clone.NewRow();
                    foreach (DataColumn dc in dt.Columns)
                    {

                        dr_clone[dc.ColumnName] = dr[dc.ColumnName];
                    }
                    dr_clone[0] = batch;
                    dt_clone.Rows.Add(dr_clone);
                }
                //将表头存入表头表中, 方便以后导出时使用
                int dcIdx = 1;
                foreach (DataColumn dc in dt.Columns)
                {
                    SqlParameter[] spArr = new SqlParameter[]{
                        new SqlParameter("columnName", dc.ColumnName),
                        new SqlParameter("batch",Convert.ToInt64(batch)).SetDbType(DbType.Int64),
                        new SqlParameter("columnIndex",dcIdx++)};

                    string sql = @"
if not exists(select 1 from header_table where columnName=@columnName and batch=@batch)
begin
    insert into header_table (columnName,batch,columnIndex) select @columnName,@batch,@columnIndex
end
";
                    Common.DBHelper.ExecuteNonQuery(sql, spArr);
                }

                //2.初始化SqlBulkCopy对象,用新的连接作为参数。  
                SqlBulkCopy bulkCopy = new SqlBulkCopy(ConnectionNew);

                //3.写对应关系。如旧表的People列的数据,对应新表Human列,那么就写bulkCopy.ColumnMappings.Add("People","Human")  
                //如果两张表的结构一样,那么对应关系就不用写了。  
                //我是用哈希表存储对应关系的,哈希表作为参数到传入方法中,key的值用来存储旧表的字段名,VALUE的值用来存储新表的值  
                Hashtable ht = new Hashtable();
                ht.Add("batch", "batch");
                int j = 1;
                foreach (DataColumn dc in dt.Columns)
                {
                    ht.Add(dc.ColumnName, "C" + (j++).ToString());

                }

                foreach (string str in ht.Keys)
                {
                    bulkCopy.ColumnMappings.Add(str, ht[str].ToString());
                }

                //4.设置目标表名  
                bulkCopy.DestinationTableName = "target_table";

                //额外,可不写:设置一次性处理的行数。这个行数处理完后,会激发SqlRowsCopied()方法。默认为1  
                bulkCopy.NotifyAfter = 1;

                //额外,可不写:设置激发的SqlRowsCopied()方法,这里为bulkCopy_SqlRowsCopied  
                bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(bulkCopy_SqlRowsCopied);

                //OK,开始传数据!  
                bulkCopy.WriteToServer(dt_clone);
                Console.Write("传输完毕!");
                Console.Read();
            }
            catch (Exception ex)
            {
                Console.Write(ex.Message);
                Console.Read();
            }
            finally
            {
                ConnectionNew.Close();
                ConnectionOld.Close();
            }
        }

        //激发的方法写在外头  
        private static void bulkCopy_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
        {
            //执行的内容。  
            //这里有2个元素值得拿来用  
            //e.RowsCopied,  //返回数值类型,表示当前已经复制的行数  
            //e.Abort,       //用于赋值true or false,用于停止赋值的操作  
            Console.WriteLine("当前已复制的行数:" + e.RowsCopied);
        }
    }
}




将上面的代码 batch 改为3(4个字段), 4 (3个字段), 执行两次后结果如下:

很明显, 在任意变化的情况下, 为我们实现了数据的转移。

当然, 实际的数据采集过程, 可能还会复杂一些:比如同一批数据会有多次传入。不过大体的思路可以定下来的了。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: