SQL Server Change Data Capture
2016-04-22 23:15
435 查看
先看图,了解下框架,知道是怎么回事儿。相对来说, CDC 比较简单,没那么复杂的环境要配置。但是理清楚整个套路也是需要花点时间做做测试的。
1. Change Data Capture internals
1.1 components
1.1.1 underlying tables
All underlying tables associated with the change data capture are created in the change data capture (cdc) schema of the cdc enabled database.
Naming format for the cdc system tables and functions:
1.1.1.1 change instance: of the source table
1.1.1.2 underlying tables used to capture the DML operations:
_CT (by adding _CT as its postfix.)
Since the capture instance related tables are created under change data capture schema, we need prepend [cdc.] as its schema name to the underlying cdc table.
Structure about change data capture underlying table cdc.change_tables:
Remaining columns are identical to the captured table
1.1.1.3 function to read the captured data:
1.1.1.3.1 to read all data: add fn_cdc_get_all_changes_ to the instance
e.g. Fn_cdc_get_all_changes_
1.1.1.3.2 to read net changes: fn_cdc_get_net_changes_ to the instance
e.g. Fn_cdc_get_net_changes_
here comes the question, how to support NET Changes? Primary Key or Unique index
1.1.2 capture instance: this one should be the main container, it is composed of underlying tables and system functions
1.1.3 read CDC data by system functions
1.1.4 two sql server agent jobs, one for populating data from log into change table; one for cleanup process. Their creation and starting can both be configured .
1.2 configuration
1.2.1 enable CDC for database: sys.sp_cdc_enable_db
Requires sysadmin server role to enable cdc;
Choose the database that needs cdc enabled, apply the stored procedure:
Constraint:
*Msg 41385, Level 16, State 1, Procedure sp_cdc_enable_db, Line 33
A database cannot be enabled for both Change Data Capture (CDC) and MEMORY_OPTIMIZED_DATA storage.*
1.2.2 pick up the tables for CDC :sys.sp_cdc_enable_table
1.2.2.1 a capture instance will be created for the table that is enabled CDC
1.2.2.2 to get the metadata information about CDC, use the system function:
Also the system stored procedure can be used to get the information:
1.2.2.3 to enable table cdc with net change support: for other detailed parameters , refer to syntax of sys.sp_cdc_enable_table. For net change, there must be a primary key or unique index on the table.
1.2.3 sync data
1.2.3.0 how to prepare the data changed captured
1.2.3.0.1
1.2.3.1 method used to sync data
1.2.3.2 monitor sync process
1.3 a single example for reference
1.3.1 enable and crate cdc table
1.3.2 get the changes of data
1.3.3 deal with apply of change data captured
1. Change Data Capture internals
1.1 components
1.1.1 underlying tables
All underlying tables associated with the change data capture are created in the change data capture (cdc) schema of the cdc enabled database.
Naming format for the cdc system tables and functions:
1.1.1.1 change instance: of the source table
1.1.1.2 underlying tables used to capture the DML operations:
_CT (by adding _CT as its postfix.)
Since the capture instance related tables are created under change data capture schema, we need prepend [cdc.] as its schema name to the underlying cdc table.
Structure about change data capture underlying table cdc.change_tables:
_$start_lsn: Lsn concept: Log Sequence Number _$end_lsn: _$seqval: _$operation: 1 – delete ; 2 – insert ; 3 – update(before) ; 4 – update(after) _$update_mask:
Remaining columns are identical to the captured table
1.1.1.3 function to read the captured data:
1.1.1.3.1 to read all data: add fn_cdc_get_all_changes_ to the instance
e.g. Fn_cdc_get_all_changes_
1.1.1.3.2 to read net changes: fn_cdc_get_net_changes_ to the instance
e.g. Fn_cdc_get_net_changes_
here comes the question, how to support NET Changes? Primary Key or Unique index
1.1.2 capture instance: this one should be the main container, it is composed of underlying tables and system functions
1.1.3 read CDC data by system functions
1.1.4 two sql server agent jobs, one for populating data from log into change table; one for cleanup process. Their creation and starting can both be configured .
Sys.sp_cdc_add_job & sys.sp_cdc_drop_job sys.sp_cdc_change_job & sys.sp_cdc_help_jobs sys.sp_cdc_start_job & sys.sp_cdc_stop_job
1.2 configuration
1.2.1 enable CDC for database: sys.sp_cdc_enable_db
Requires sysadmin server role to enable cdc;
Choose the database that needs cdc enabled, apply the stored procedure:
Use lenistest3 Go Exec Sys.sp_cdc_enable_db
Constraint:
*Msg 41385, Level 16, State 1, Procedure sp_cdc_enable_db, Line 33
A database cannot be enabled for both Change Data Capture (CDC) and MEMORY_OPTIMIZED_DATA storage.*
1.2.2 pick up the tables for CDC :sys.sp_cdc_enable_table
1.2.2.1 a capture instance will be created for the table that is enabled CDC
1.2.2.2 to get the metadata information about CDC, use the system function:
*Cdc.change_tables, cdc.index_columns, cdc.captured_columns*
Also the system stored procedure can be used to get the information:
Sys.sp_cdc_help_change_data_capture
1.2.2.3 to enable table cdc with net change support: for other detailed parameters , refer to syntax of sys.sp_cdc_enable_table. For net change, there must be a primary key or unique index on the table.
Use lenistest3 Go Exec sys.sp_cdc_enable_table @source_schema = N’dbo’, @source_name = N’region’, @support_net_changes = 1
1.2.3 sync data
1.2.3.0 how to prepare the data changed captured
1.2.3.0.1
cdc.fn_cdc_get_all_changes_capture_instance(from_lsn,to_lsn, ‘<row_fileter_option>’) Here <row_filter_option> has two values : All: one row for each change including update operation All update old: two rows for update operation, one before change, one after change.
1.2.3.1 method used to sync data
1.2.3.2 monitor sync process
1.3 a single example for reference
1.3.1 enable and crate cdc table
create table dbo.region (regionId int, regionName varchar(10)) go insert into dbo.region(regionId,regionName) values(1,'China') go exec sys.sp_cdc_enable_db go select name,is_cdc_enabled from sys.databases where name = 'lenistest4' alter table dbo.region alter column regionId int not null alter table dbo.region add constraint pk_id primary key (regionId) exec sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'region', @supports_net_changes = 1, @role_name = null select object_name(object_id) as change_table, object_name(source_object_id) as source_table,capture_instance from cdc.change_tables select * from cdc.dbo_region_CT select * from dbo.region
1.3.2 get the changes of data
select * from cdc.fn_cdc_get_all_changes_dbo_region ( sys.fn_cdc_get_min_lsn('dbo_region'),sys.fn_cdc_get_max_lsn(), N'all') select * from cdc.fn_cdc_get_net_changes_dbo_region ( sys.fn_cdc_get_min_lsn('dbo_region'),sys.fn_cdc_get_max_lsn(), N'all')
1.3.3 deal with apply of change data captured
-- 1. driven table : log every transaction that transfers the changed data -- 2. transfer data application -- 3. Audit testing -- 4. clean up the transferred changed data -- 1. driven table : log every transaction that transfers the changed data -- 1.1 configuration table : to log the tables that are enabled for cdc -- 1.2 driven table : log transaction that transfers the data chanage captured CREATE TABLE dbo.cdc_tables ( cdcid INT NOT NULL, cdctable VARCHAR(50), cdcprocesssp VARCHAR(200), cdcenabled BIT ) go CREATE TABLE dbo.cdc_driven ( transactionid BIGINT NOT NULL, cdcid INT NOT NULL, cdcstartdt DATETIME, cdcenddt DATETIME, cdccompleted BIT, cdcminlsn BINARY(10), cdcmaxlsn BINARY(10) ) go INSERT INTO dbo.cdc_tables (cdcid, cdctable, cdcprocesssp, cdcenabled) VALUES (1, 'dbo.region', 'dbo.cdcprocessRegion', 1) -- 2. transfer application CREATE PROCEDURE dbo.Cdcprocessregion (@cdcMinLsn BINARY(10), @cdcMaxLsn BINARY(10)) AS BEGIN BEGIN try DECLARE @opr INT = 0, @regionId INT =0, @regionName VARCHAR(200) = '' DECLARE cdc_cur CURSOR FOR SELECT __$operation AS opr, regionid, regionname FROM cdc.Fn_cdc_get_net_changes_dbo_region (@cdcMinLsn, @cdcMaxLsn , 'all') OPEN cdc_cur FETCH next FROM cdc_cur INTO @opr, @regionId, @regionName WHILE @@fetch_status = 0 BEGIN IF @opr = 1 DELETE dbo.region_cdc WHERE regionid = @regionId IF @opr = 2 INSERT INTO dbo.region_cdc (regionid, regionname) VALUES (@regionId, @regionName) IF @opr = 4 UPDATE dbo.region_cdc SET regionname = @regionName WHERE regionid = @regionId FETCH next FROM cdc_cur INTO @opr, @regionId, @regionName END CLOSE cdc_cur DEALLOCATE cdc_cur END try BEGIN catch RAISERROR ('cdc process region error',16,1) END catch END go CREATE PROCEDURE dbo.Cdc_transfer AS BEGIN BEGIN try DECLARE my_cur CURSOR FOR SELECT cdcid, cdctable, cdcprocesssp FROM dbo.cdc_tables WHERE cdcenabled = 1 DECLARE @cdcId INT, @cdcTable VARCHAR(50), @cdcProcessSP NVARCHAR(200) DECLARE @transactionId BIGINT, @cdcStartDT DATETIME, @cdcEndDT DATETIME, @cdcMinLsn BINARY(10), @cdcMaxLsn BINARY(10) DECLARE @cdcMinLsn_p NVARCHAR(200), @cdcMaxLsn_p NVARCHAR(200) DECLARE @cdcCompleted BIT = 0 DECLARE @sqlstat NVARCHAR(max); DECLARE @capture_instance VARCHAR(100); SELECT @cdcMaxLsn = sys.Fn_cdc_get_max_lsn() SELECT @cdcMaxLsn_p = CONVERT(NVARCHAR(80), @cdcMaxLsn, 1) OPEN my_cur FETCH next FROM my_cur INTO @cdcId, @cdcTable, @cdcProcessSP WHILE @@FETCH_STATUS = 0 BEGIN SELECT @transactionId = Isnull(Max(transactionid) + 1, 1) FROM dbo.cdc_driven SET @cdcStartDT = Getutcdate(); SELECT @capture_instance = capture_instance FROM cdc.change_tables WHERE source_object_id = Object_id(@cdcTable) SELECT @cdcMinLsn = sys.Fn_cdc_get_min_lsn(@capture_instance) SELECT @cdcMinLsn_p = CONVERT(NVARCHAR(80), @cdcMinLsn, 1) SET @sqlstat = N' exec ' + @cdcProcessSP + N' @cdcMinLsn = @cdcMinLsn, @cdcMaxLsn = @cdcMaxLsn ; ' --print @sqlstat EXEC Sp_executesql @stmt = @sqlstat, @params = N' @cdcMinLsn binary(10), @cdcMaxLsn binary(10) ', @cdcMinLsn = @cdcMinLsn, @cdcMaxLsn = @cdcMaxLsn SET @cdcCompleted = 1 SET @cdcEndDT = Getutcdate(); INSERT INTO dbo.cdc_driven (transactionid, cdcid, cdcstartdt, cdcenddt, cdccompleted, cdcminlsn, cdcmaxlsn) VALUES (@transactionaId, @cdcId, @cdcStartDT, @cdcEndDT, @cdcCompleted, @cdcMinLsn, @cdcMaxLsn) FETCH next FROM my_cur INTO @cdcId, @cdcTable, @cdcProcessSP END CLOSE my_cur DEALLOCATE my_cur END try BEGIN catch DECLARE @errormsg NVARCHAR(4000) = 'error from transfer : ' + Error_message( ); IF Cursor_status('global', 'my_cur') > -1 BEGIN CLOSE my_cur DEALLOCATE my_cur END RAISERROR (@errormsg,16,1 ) END catch END go EXEC dbo.Cdc_transfer -- 3. Audit testing SELECT Count(*) AS missedRows FROM ((SELECT regionid, regionname FROM dbo.region EXCEPT SELECT regionid, regionname FROM dbo.region_cdc) UNION ALL (SELECT regionid, regionname FROM dbo.region_cdc EXCEPT SELECT regionid, regionname FROM dbo.region)) tmp -- 4. clean up the transferred changed data CREATE PROCEDURE dbo.Clearup_cdc AS BEGIN BEGIN try DECLARE my_cur CURSOR FOR SELECT cdcid, cdctable FROM dbo.cdc_tables WHERE cdcenabled = 1 DECLARE @cdcId INT, @cdcTable VARCHAR(200) DECLARE @cdcInstance VARCHAR(200) DECLARE @cdcChangeTable VARCHAR(200) DECLARE @maxLsn BINARY(10) OPEN my_cur FETCH next FROM my_cur INTO @cdcId, @cdcTable WHILE @@fetch_status = 0 BEGIN SELECT @maxLsn = Max(cdcmaxlsn) FROM dbo.cdc_driven WHERE cdcid = @cdcId AND cdccompleted = 1 IF Isnull(@maxLsn, 0) <> 0 BEGIN SELECT @cdcChangeTable = 'cdc.' + Object_name(object_id) FROM cdc.change_tables WHERE source_object_id = Object_id (@cdcTable) DECLARE @sqlstate NVARCHAR(max) SET @sqlstate =N' delete ' + @cdcChangeTable + ' where __$start_lsn <= @maxLsn ' EXEC Sp_executesql @state = @sqlstate, @params = N'@maxLsn binary(10)', @maxLsn = @maxLsn END FETCH next FROM my_cur INTO @cdcId, @cdcTable END IF Cursor_status('global', 'my_cur') > -1 BEGIN CLOSE my_cur DEALLOCATE my_cur END END try BEGIN catch DECLARE @errormsg NVARCHAR(4000) = 'error from transfer : ' + Error_message( ); IF Cursor_status('global', 'my_cur') > -1 BEGIN CLOSE my_cur DEALLOCATE my_cur END RAISERROR (@errormsg,16,1 ) END catch END go
相关文章推荐
- ehcache,redis,db的性能、一致性比较
- SQLite数据库实用的封装
- Azure Redis Cache (3) 在Windows 环境下使用Redis Benchmark
- Mac 安装和卸载 Mysql5.7.11 的方法
- MYSQL常用函数
- Redis的安装-Redis学习笔记一
- MySQL Workbench update语句错误Error Code: 1175.
- SQL语句截取字符串
- Redis、Redis+sentinel安装(Ubuntu 14.04下Redis安装及简单测试)
- Mysql导入sql脚本到数据库
- jdbc 通过rs.getString()获取数据库中的时间字段问题
- 数据库之SQL语法
- PostgreSQL之Array_to_String用法
- 单用户进入SQLServer并修改行版本控制级别
- [参考]redis存储商品信息,自增订单或商品id,缓存评论!
- SQL判断语句用法和多表查询
- 练习003
- SQL-查询JIRA中已创建与已解决问题对比报告
- java 操作redist
- [参考]spring整合redis!