您的位置:首页 > 数据库

SQL Server Change Data Capture

2016-04-22 23:15 435 查看
先看图,了解下框架,知道是怎么回事儿。相对来说, CDC 比较简单,没那么复杂的环境要配置。但是理清楚整个套路也是需要花点时间做做测试的。



1. Change Data Capture internals

1.1 components

1.1.1 underlying tables

All underlying tables associated with the change data capture are created in the change data capture (cdc) schema of the cdc enabled database.

Naming format for the cdc system tables and functions:

1.1.1.1 change instance: of the source table

1.1.1.2 underlying tables used to capture the DML operations:

_CT (by adding _CT as its postfix.)

Since the capture instance related tables are created under change data capture schema, we need prepend [cdc.] as its schema name to the underlying cdc table.

Structure about change data capture underlying table cdc.change_tables:

_$start_lsn:
Lsn concept: Log Sequence Number
_$end_lsn:
_$seqval:
_$operation:
1 – delete ; 2 – insert ;
3 – update(before) ; 4 – update(after)
_$update_mask:


Remaining columns are identical to the captured table

1.1.1.3 function to read the captured data:

1.1.1.3.1 to read all data: add fn_cdc_get_all_changes_ to the instance

e.g. Fn_cdc_get_all_changes_

1.1.1.3.2 to read net changes: fn_cdc_get_net_changes_ to the instance

e.g. Fn_cdc_get_net_changes_

here comes the question, how to support NET Changes? Primary Key or Unique index

1.1.2 capture instance: this one should be the main container, it is composed of underlying tables and system functions

1.1.3 read CDC data by system functions

1.1.4 two sql server agent jobs, one for populating data from log into change table; one for cleanup process. Their creation and starting can both be configured .

Sys.sp_cdc_add_job & sys.sp_cdc_drop_job
sys.sp_cdc_change_job & sys.sp_cdc_help_jobs
sys.sp_cdc_start_job & sys.sp_cdc_stop_job


1.2 configuration

1.2.1 enable CDC for database: sys.sp_cdc_enable_db

Requires sysadmin server role to enable cdc;

Choose the database that needs cdc enabled, apply the stored procedure:

Use lenistest3
Go
Exec Sys.sp_cdc_enable_db


Constraint:

*Msg 41385, Level 16, State 1, Procedure sp_cdc_enable_db, Line 33

A database cannot be enabled for both Change Data Capture (CDC) and MEMORY_OPTIMIZED_DATA storage.*

1.2.2 pick up the tables for CDC :sys.sp_cdc_enable_table

1.2.2.1 a capture instance will be created for the table that is enabled CDC

1.2.2.2 to get the metadata information about CDC, use the system function:

*Cdc.change_tables,
cdc.index_columns,
cdc.captured_columns*


Also the system stored procedure can be used to get the information:

Sys.sp_cdc_help_change_data_capture


1.2.2.3 to enable table cdc with net change support: for other detailed parameters , refer to syntax of sys.sp_cdc_enable_table. For net change, there must be a primary key or unique index on the table.

Use lenistest3
Go
Exec sys.sp_cdc_enable_table
@source_schema = N’dbo’,
@source_name = N’region’,
@support_net_changes = 1


1.2.3 sync data

1.2.3.0 how to prepare the data changed captured

1.2.3.0.1

cdc.fn_cdc_get_all_changes_capture_instance(from_lsn,to_lsn, ‘<row_fileter_option>’)
Here <row_filter_option> has two values :
All: one row for each change including update operation
All update old: two rows for update operation, one before change, one after change.


1.2.3.1 method used to sync data

1.2.3.2 monitor sync process

1.3 a single example for reference

1.3.1 enable and crate cdc table

create table dbo.region (regionId int, regionName varchar(10))
go
insert into dbo.region(regionId,regionName) values(1,'China')
go
exec sys.sp_cdc_enable_db
go
select name,is_cdc_enabled from sys.databases where name = 'lenistest4'
alter table dbo.region alter column regionId int not null
alter table dbo.region
add constraint pk_id primary key (regionId)
exec sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'region', @supports_net_changes = 1, @role_name = null
select object_name(object_id) as change_table, object_name(source_object_id) as source_table,capture_instance from cdc.change_tables
select * from cdc.dbo_region_CT
select * from dbo.region


1.3.2 get the changes of data

select * from cdc.fn_cdc_get_all_changes_dbo_region ( sys.fn_cdc_get_min_lsn('dbo_region'),sys.fn_cdc_get_max_lsn(), N'all')
select * from cdc.fn_cdc_get_net_changes_dbo_region ( sys.fn_cdc_get_min_lsn('dbo_region'),sys.fn_cdc_get_max_lsn(), N'all')


1.3.3 deal with apply of change data captured

-- 1. driven table : log every transaction that transfers the changed data
-- 2. transfer data application
-- 3. Audit testing
-- 4. clean up the transferred changed data

-- 1. driven table : log every transaction that transfers the changed data
-- 1.1 configuration table : to log the tables that are enabled for cdc
-- 1.2 driven table : log transaction that transfers the data chanage captured
CREATE TABLE dbo.cdc_tables
(
cdcid        INT NOT NULL,
cdctable     VARCHAR(50),
cdcprocesssp VARCHAR(200),
cdcenabled   BIT
)

go

CREATE TABLE dbo.cdc_driven
(
transactionid BIGINT NOT NULL,
cdcid         INT NOT NULL,
cdcstartdt    DATETIME,
cdcenddt      DATETIME,
cdccompleted  BIT,
cdcminlsn     BINARY(10),
cdcmaxlsn     BINARY(10)
)

go

INSERT INTO dbo.cdc_tables
(cdcid,
cdctable,
cdcprocesssp,
cdcenabled)
VALUES     (1,
'dbo.region',
'dbo.cdcprocessRegion',
1)

-- 2. transfer application
CREATE PROCEDURE dbo.Cdcprocessregion (@cdcMinLsn BINARY(10),
@cdcMaxLsn BINARY(10))
AS
BEGIN
BEGIN try
DECLARE @opr        INT = 0,
@regionId   INT =0,
@regionName VARCHAR(200) = ''
DECLARE cdc_cur CURSOR FOR
SELECT __$operation AS opr,
regionid,
regionname
FROM   cdc.Fn_cdc_get_net_changes_dbo_region (@cdcMinLsn, @cdcMaxLsn
,
'all')

OPEN cdc_cur

FETCH next FROM cdc_cur INTO @opr, @regionId, @regionName

WHILE @@fetch_status = 0
BEGIN
IF @opr = 1
DELETE dbo.region_cdc
WHERE  regionid = @regionId

IF @opr = 2
INSERT INTO dbo.region_cdc
(regionid,
regionname)
VALUES     (@regionId,
@regionName)

IF @opr = 4
UPDATE dbo.region_cdc
SET    regionname = @regionName
WHERE  regionid = @regionId

FETCH next FROM cdc_cur INTO @opr, @regionId, @regionName
END

CLOSE cdc_cur

DEALLOCATE cdc_cur
END try

BEGIN catch
RAISERROR ('cdc process region error',16,1)
END catch
END

go

CREATE PROCEDURE dbo.Cdc_transfer
AS
BEGIN
BEGIN try
DECLARE my_cur CURSOR FOR
SELECT cdcid,
cdctable,
cdcprocesssp
FROM   dbo.cdc_tables
WHERE  cdcenabled = 1
DECLARE @cdcId        INT,
@cdcTable     VARCHAR(50),
@cdcProcessSP NVARCHAR(200)
DECLARE @transactionId BIGINT,
@cdcStartDT    DATETIME,
@cdcEndDT      DATETIME,
@cdcMinLsn     BINARY(10),
@cdcMaxLsn     BINARY(10)
DECLARE @cdcMinLsn_p NVARCHAR(200),
@cdcMaxLsn_p NVARCHAR(200)
DECLARE @cdcCompleted BIT = 0
DECLARE @sqlstat NVARCHAR(max);
DECLARE @capture_instance VARCHAR(100);

SELECT @cdcMaxLsn = sys.Fn_cdc_get_max_lsn()

SELECT @cdcMaxLsn_p = CONVERT(NVARCHAR(80), @cdcMaxLsn, 1)

OPEN my_cur

FETCH next FROM my_cur INTO @cdcId, @cdcTable, @cdcProcessSP

WHILE @@FETCH_STATUS = 0
BEGIN
SELECT @transactionId = Isnull(Max(transactionid) + 1, 1)
FROM   dbo.cdc_driven

SET @cdcStartDT = Getutcdate();

SELECT @capture_instance = capture_instance
FROM   cdc.change_tables
WHERE  source_object_id = Object_id(@cdcTable)

SELECT @cdcMinLsn = sys.Fn_cdc_get_min_lsn(@capture_instance)

SELECT @cdcMinLsn_p = CONVERT(NVARCHAR(80), @cdcMinLsn, 1)

SET @sqlstat = N' exec ' + @cdcProcessSP
+
N' @cdcMinLsn = @cdcMinLsn, @cdcMaxLsn = @cdcMaxLsn ; '

--print @sqlstat
EXEC Sp_executesql
@stmt = @sqlstat,
@params = N' @cdcMinLsn binary(10), @cdcMaxLsn binary(10) ',
@cdcMinLsn = @cdcMinLsn,
@cdcMaxLsn = @cdcMaxLsn

SET @cdcCompleted = 1
SET @cdcEndDT = Getutcdate();

INSERT INTO dbo.cdc_driven
(transactionid,
cdcid,
cdcstartdt,
cdcenddt,
cdccompleted,
cdcminlsn,
cdcmaxlsn)
VALUES     (@transactionaId,
@cdcId,
@cdcStartDT,
@cdcEndDT,
@cdcCompleted,
@cdcMinLsn,
@cdcMaxLsn)

FETCH next FROM my_cur INTO @cdcId, @cdcTable, @cdcProcessSP
END

CLOSE my_cur

DEALLOCATE my_cur
END try

BEGIN catch
DECLARE @errormsg NVARCHAR(4000) = 'error from transfer : ' +
Error_message(
);

IF Cursor_status('global', 'my_cur') > -1
BEGIN
CLOSE my_cur

DEALLOCATE my_cur
END

RAISERROR (@errormsg,16,1 )
END catch
END

go

EXEC dbo.Cdc_transfer

-- 3. Audit testing
SELECT Count(*) AS missedRows
FROM   ((SELECT regionid,
regionname
FROM   dbo.region
EXCEPT
SELECT regionid,
regionname
FROM   dbo.region_cdc)
UNION ALL
(SELECT regionid,
regionname
FROM   dbo.region_cdc
EXCEPT
SELECT regionid,
regionname
FROM   dbo.region)) tmp

-- 4. clean up the transferred changed data
CREATE PROCEDURE dbo.Clearup_cdc
AS
BEGIN
BEGIN try
DECLARE my_cur CURSOR FOR
SELECT cdcid,
cdctable
FROM   dbo.cdc_tables
WHERE  cdcenabled = 1
DECLARE @cdcId    INT,
@cdcTable VARCHAR(200)
DECLARE @cdcInstance VARCHAR(200)
DECLARE @cdcChangeTable VARCHAR(200)
DECLARE @maxLsn BINARY(10)

OPEN my_cur

FETCH next FROM my_cur INTO @cdcId, @cdcTable

WHILE @@fetch_status = 0
BEGIN
SELECT @maxLsn = Max(cdcmaxlsn)
FROM   dbo.cdc_driven
WHERE  cdcid = @cdcId
AND cdccompleted = 1

IF Isnull(@maxLsn, 0) <> 0
BEGIN
SELECT @cdcChangeTable = 'cdc.' + Object_name(object_id)
FROM   cdc.change_tables
WHERE  source_object_id = Object_id (@cdcTable)

DECLARE @sqlstate NVARCHAR(max)

SET @sqlstate =N' delete ' + @cdcChangeTable
+ ' where __$start_lsn <= @maxLsn '

EXEC Sp_executesql
@state = @sqlstate,
@params = N'@maxLsn binary(10)',
@maxLsn = @maxLsn
END

FETCH next FROM my_cur INTO @cdcId, @cdcTable
END

IF Cursor_status('global', 'my_cur') > -1
BEGIN
CLOSE my_cur

DEALLOCATE my_cur
END
END try

BEGIN catch
DECLARE @errormsg NVARCHAR(4000) = 'error from transfer : ' +
Error_message(
);

IF Cursor_status('global', 'my_cur') > -1
BEGIN
CLOSE my_cur

DEALLOCATE my_cur
END

RAISERROR (@errormsg,16,1 )
END catch
END

go
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: