您的位置:首页 > 数据库 > SQL

SQL Server 2005 中实现通用的异步触发器架构

2016-11-24 14:29 881 查看
在SQL Server 2005中,通过新增的Service Broker可以实现异步触发器的处理功能。本文提供一种使用Service Broker实现的通用异步触发器方法。
在本方法中,通过Service Broker构造异步触发器处理架构,对于要使用这种架构的表,只需要创建相应的触发器及处理触发器中数据的存储过程,并且在异步触发器架构中登记触发器和处理的存储过程即可。如果一个触发器中的数据要被多个表使用,只需要在dbo.tb_async_trigger_subscribtion中登记相应处理数据的存储过程即可,即一个表的数据变更可以被多个表订阅(使用)。
架构的步骤如下:

1. 数据库配置

需要配置数据库以允许使用Service Broker。本文以tempdb库为例,故配置均在tempdb上下文中进行。

[align=left]USE tempdb[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- 允许Service Broker[/align]
[align=left]ALTER DATABASE tempdb SET[/align]
[align=left]ENABLE_BROKER[/align]
GO

2. 构建异步触发器相关的对象

下面的T-SQL创建异步触发器处理架构相关的对象。

[align=left]-- =======================================[/align]
[align=left]-- 异步触发器对象[/align]
[align=left]-- 1. service broker 对象[/align]
[align=left]-- =======================================[/align]
[align=left]-- a. message type, 要求使用xml 传递数据[/align]
[align=left]CREATE MESSAGE TYPE MSGT_async_trigger[/align]
[align=left]VALIDATION = WELL_FORMED_XML[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- b. 只需要发送消息[/align]
[align=left]CREATE CONTRACT CNT_async_trigger([/align]
[align=left] MSGT_async_trigger SENT BY INITIATOR)[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- c. 存储消息的队列[/align]
[align=left]CREATE QUEUE dbo.Q_async_trigger[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- d. 用于消息处理的服务[/align]
[align=left]CREATE SERVICE SRV_async_trigger[/align]
[align=left] ON QUEUE dbo.Q_async_trigger([/align]
[align=left] CNT_async_trigger)[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left] [/align]
[align=left]-- =======================================[/align]
[align=left]-- 异步触发器对象[/align]
[align=left]-- 2. 异步触发器处理的对象[/align]
[align=left]-- =======================================[/align]
[align=left]-- a. 登记异步触发器的表[/align]
[align=left]CREATE TABLE dbo.tb_async_trigger([/align]
[align=left] ID int IDENTITY[/align]
[align=left] PRIMARY KEY,[/align]
[align=left] table_name sysname,[/align]
[align=left] trigger_name sysname[/align]
[align=left])[/align]
[align=left] [/align]
[align=left]-- b. 登记订阅异步触发器的存储过程[/align]
[align=left]CREATE TABLE dbo.tb_async_trigger_subscriber([/align]
[align=left] ID int IDENTITY[/align]
[align=left] PRIMARY KEY,[/align]
[align=left] procedure_name sysname[/align]
[align=left])[/align]
[align=left] [/align]
[align=left]-- c. 异步触发器和存储过程之间的订阅关系[/align]
[align=left]CREATE TABLE dbo.tb_async_trigger_subscribtion([/align]
[align=left] trigger_id int[/align]
[align=left] REFERENCES dbo.tb_async_trigger([/align]
[align=left] ID),[/align]
[align=left] procedure_id int[/align]
[align=left] REFERENCES dbo.tb_async_trigger_subscriber([/align]
[align=left] ID),[/align]
[align=left] PRIMARY KEY([/align]
[align=left] trigger_id, procedure_id)[/align]
[align=left])[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- d. 发送消息的存储过程[/align]
[align=left]CREATE PROC dbo.p_async_trigger_send[/align]
[align=left] @message xml[/align]
[align=left]AS[/align]
[align=left]SET NOCOUNT ON[/align]
[align=left]DECLARE[/align]
[align=left] @handle uniqueidentifier[/align]
[align=left]BEGIN DIALOG CONVERSATION @handle[/align]
[align=left] FROM SERVICE [SRV_async_trigger][/align]
[align=left] TO SERVICE N'SRV_async_trigger'[/align]
[align=left] ON CONTRACT CNT_async_trigger[/align]
[align=left] WITH[/align]
[align=left] ENCRYPTION = OFF;[/align]
[align=left]SEND[/align]
[align=left] ON CONVERSATION @handle[/align]
[align=left] MESSAGE TYPE MSGT_async_trigger([/align]
[align=left] @message);[/align]
[align=left]-- 消息发出即可, 不需要回复, 因此发出后即可结束会话[/align]
[align=left]END CONVERSATION @handle[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- e. 处理异步触发器发送的消息[/align]
[align=left]CREATE PROC dbo.p_async_trigger_process[/align]
[align=left]AS[/align]
[align=left]SET NOCOUNT ON[/align]
[align=left]DECLARE[/align]
[align=left] @handle uniqueidentifier,[/align]
[align=left] @message xml,[/align]
[align=left] @rows int[/align]
[align=left]SET @rows = 1[/align]
[align=left]WHILE @rows > 0[/align]
[align=left]BEGIN[/align]
[align=left] -- 处理已经收到的消息[/align]
[align=left] WAITFOR([/align]
[align=left] RECEIVE TOP(1)[/align]
[align=left] @handle = conversation_handle,[/align]
[align=left] @message = CASE[/align]
[align=left] WHEN message_type_name = N'MSGT_async_trigger'[/align]
[align=left] THEN CONVERT(xml, message_body)[/align]
[align=left] ELSE NULL[/align]
[align=left] END[/align]
[align=left] FROM dbo.Q_async_trigger[/align]
[align=left] ), TIMEOUT 10[/align]
[align=left] SET @rows = @@ROWCOUNT[/align]
[align=left] IF @rows > 0[/align]
[align=left] BEGIN[/align]
[align=left] -- 结束会话[/align]
[align=left] END CONVERSATION @handle;[/align]
[align=left] [/align]
[align=left] -- 处理消息[/align]
[align=left] -- a. 取发送者信息[/align]
[align=left] DECLARE[/align]
[align=left] @table_name sysname,[/align]
[align=left] @trigger_name sysname,[/align]
[align=left] @sql nvarchar(max)[/align]
[align=left] SELECT[/align]
[align=left] @table_name = @message.value('(/root/table_name)[1]', 'sysname'),[/align]
[align=left] @trigger_name = @message.value('(/root/trigger_name)[1]', 'sysname')[/align]
[align=left] [/align]
[align=left] -- b. 调用异步触发器订阅的存储过程[/align]
[align=left] ;WITH[/align]
[align=left] SUB AS([/align]
[align=left] SELECT[/align]
[align=left] TR.table_name,[/align]
[align=left] TR.trigger_name,[/align]
[align=left] SUB.procedure_name[/align]
[align=left] FROM dbo.tb_async_trigger TR,[/align]
[align=left] dbo.tb_async_trigger_subscriber SUB,[/align]
[align=left] dbo.tb_async_trigger_subscribtion TRSUB[/align]
[align=left] WHERE TRSUB.trigger_id = TR.ID[/align]
[align=left] AND TRSUB.procedure_id = SUB.ID[/align]
[align=left] )[/align]
[align=left] SELECT[/align]
[align=left] @sql = ([/align]
[align=left] SELECT[/align]
[align=left] N'[/align]
[align=left]EXEC ' + procedure_name + N'[/align]
[align=left] @message[/align]
[align=left]'[/align]
[align=left] FROM SUB[/align]
[align=left] WHERE table_name = @table_name[/align]
[align=left] AND trigger_name = @trigger_name[/align]
[align=left] FOR XML PATH(''), ROOT('r'), TYPE[/align]
[align=left] ).value('(/r)[1]', 'nvarchar(max)')[/align]
[align=left] EXEC sp_executesql @sql, N'@message xml', @message[/align]
[align=left] END[/align]
[align=left]END[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- f. 绑定处理的存储过程到队列[/align]
[align=left]ALTER QUEUE dbo.Q_async_trigger[/align]
[align=left] WITH ACTIVATION([/align]
[align=left] STATUS = ON,[/align]
[align=left] PROCEDURE_NAME = dbo.p_async_trigger_process,[/align]
[align=left] MAX_QUEUE_READERS = 10,[/align]
[align=left] EXECUTE AS OWNER)[/align]
GO

3. 使用示例

下面的T-SQL演示使用异步触发器构架。示例中创建了三个表:
Dbo.t1 这个是源表,此表的数据变化将用于其他表
Dbo.t2 这个表要求保持与dbo.t1同步
Dbo.tb_log 这个表记录dbo.t1中的数据变化情况
触发器 TR_async_trigger 用于将表Dbo.t1中的数据变化发送到异步触发器构架中。dbo.p_Sync_t1_t2和dbo.p_Record_log用于处理dbo.t1于中变化的数据。
在处理时,需要把相关的信息登记到异步触发器架构的表中。

[align=left]-- =======================================[/align]
[align=left]-- 3. 使用示例[/align]
[align=left]-- =======================================[/align]
[align=left]-- ===============================[/align]
[align=left]-- 测试对象[/align]
[align=left]-- a. 源表[/align]
[align=left]CREATE TABLE dbo.t1([/align]
[align=left] id int IDENTITY[/align]
[align=left] PRIMARY KEY,[/align]
[align=left] col int[/align]
[align=left])[/align]
[align=left]-- b. 同步的目的表[/align]
[align=left]CREATE TABLE dbo.t2([/align]
[align=left] id int IDENTITY[/align]
[align=left] PRIMARY KEY,[/align]
[align=left] col int[/align]
[align=left])[/align]
[align=left]-- c. 记录操作的日志表[/align]
[align=left]CREATE TABLE dbo.tb_log([/align]
[align=left] id int IDENTITY[/align]
[align=left] PRIMARY KEY,[/align]
[align=left] user_name sysname,[/align]
[align=left] operate_type varchar(10),[/align]
[align=left] inserted xml,[/align]
[align=left] deleted xml[/align]
[align=left])[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- a. 异步发送处理消息的触发器[/align]
[align=left]CREATE TRIGGER TR_async_trigger[/align]
[align=left]ON dbo.t1[/align]
[align=left]FOR INSERT, UPDATE, DELETE[/align]
[align=left]AS[/align]
[align=left]IF @@ROWCOUNT = 0[/align]
[align=left] RETURN[/align]
[align=left] [/align]
[align=left]SET NOCOUNT ON[/align]
[align=left] [/align]
[align=left]-- 将要发送的数据生成xml 数据[/align]
[align=left]DECLARE[/align]
[align=left] @message xml[/align]
[align=left]SELECT[/align]
[align=left] @message = ([/align]
[align=left] SELECT[/align]
[align=left] table_name = ([/align]
[align=left] SELECT TOP 1[/align]
[align=left] OBJECT_NAME(parent_object_id)[/align]
[align=left] FROM sys.objects[/align]
[align=left] WHERE object_id = @@PROCID),[/align]
[align=left] trigger_name = OBJECT_NAME(@@PROCID),[/align]
[align=left] user_name = SUSER_SNAME(),[/align]
[align=left] inserted = ([/align]
[align=left] SELECT * FROM inserted FOR XML AUTO, TYPE),[/align]
[align=left] deleted = ([/align]
[align=left] SELECT * FROM deleted FOR XML AUTO, TYPE)[/align]
[align=left] FOR XML PATH(''), ROOT('root'), TYPE[/align]
[align=left] )[/align]
[align=left]-- 发送消息[/align]
[align=left]EXEC dbo.p_async_trigger_send[/align]
[align=left] @message = @message[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- b. 处理异步触发器的存储过程[/align]
[align=left]-- b.1 同步到t2 的存储过程[/align]
[align=left]CREATE PROC dbo.p_Sync_t1_t2[/align]
[align=left] @message xml[/align]
[align=left]AS[/align]
[align=left]SET NOCOUNT ON[/align]
[align=left]DECLARE[/align]
[align=left] @inserted bit,[/align]
[align=left] @deleted bit[/align]
[align=left]SELECT[/align]
[align=left] @inserted = @message.exist('/root/inserted'),[/align]
[align=left] @deleted = @message.exist('/root/deleted')[/align]
[align=left]IF @inserted = 1[/align]
[align=left] IF @deleted = 1 -- 更新[/align]
[align=left] BEGIN[/align]
[align=left] ;WITH[/align]
[align=left] I AS([/align]
[align=left] SELECT[/align]
[align=left] id = T.c.value('@id[1]', 'int'),[/align]
[align=left] col = T.c.value('@col[1]', 'int')[/align]
[align=left] FROM @message.nodes('/root/inserted/inserted') T(c)[/align]
[align=left] ),[/align]
[align=left] D AS([/align]
[align=left] SELECT[/align]
[align=left] id = T.c.value('@id[1]', 'int'),[/align]
[align=left] col = T.c.value('@col[1]', 'int')[/align]
[align=left] FROM @message.nodes('/root/deleted/deleted') T(c)[/align]
[align=left] )[/align]
[align=left] UPDATE A SET[/align]
[align=left] col = I.col[/align]
[align=left] FROM dbo.t2 A, I, D[/align]
[align=left] WHERE A.ID = I.ID[/align]
[align=left] AND I.ID = D.ID[/align]
[align=left] END[/align]
[align=left] ELSE -- 插入[/align]
[align=left] BEGIN[/align]
[align=left] SET IDENTITY_INSERT dbo.t2 ON[/align]
[align=left] ;WITH[/align]
[align=left] I AS([/align]
[align=left] SELECT[/align]
[align=left] id = T.c.value('@id[1]', 'int'),[/align]
[align=left] col = T.c.value('@col[1]', 'int')[/align]
[align=left] FROM @message.nodes('/root/inserted/inserted') T(c)[/align]
[align=left] )[/align]
[align=left] INSERT dbo.t2([/align]
[align=left] id, col)[/align]
[align=left] SELECT[/align]
[align=left] id, col[/align]
[align=left] FROM I[/align]
[align=left] SET IDENTITY_INSERT dbo.t2 OFF[/align]
[align=left] END[/align]
[align=left]ELSE -- 删除[/align]
[align=left]BEGIN[/align]
[align=left] ;WITH[/align]
[align=left] D AS([/align]
[align=left] SELECT[/align]
[align=left] id = T.c.value('@id[1]', 'int'),[/align]
[align=left] col = T.c.value('@col[1]', 'int')[/align]
[align=left] FROM @message.nodes('/root/deleted/deleted') T(c)[/align]
[align=left] )[/align]
[align=left] DELETE A[/align]
[align=left] FROM dbo.t2 A, D[/align]
[align=left] WHERE A.ID = D.ID[/align]
[align=left]END[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- b.2 记录操作记录到dbo.tb_log 的存储过程[/align]
[align=left]CREATE PROC dbo.p_Record_log[/align]
[align=left] @message xml[/align]
[align=left]AS[/align]
[align=left]SET NOCOUNT ON[/align]
[align=left]DECLARE[/align]
[align=left] @inserted bit,[/align]
[align=left] @deleted bit[/align]
[align=left]SELECT[/align]
[align=left] @inserted = @message.exist('/root/inserted'),[/align]
[align=left] @deleted = @message.exist('/root/deleted')[/align]
[align=left]INSERT dbo.tb_log([/align]
[align=left] user_name,[/align]
[align=left] operate_type,[/align]
[align=left] inserted,[/align]
[align=left] deleted)[/align]
[align=left]SELECT[/align]
[align=left] @message.value('(/root/user_name)[1]', 'sysname'),[/align]
[align=left] operate_type = CASE[/align]
[align=left] WHEN @inserted = 1 AND @deleted = 1 THEN 'update'[/align]
[align=left] WHEN @inserted = 1 THEN 'insert'[/align]
[align=left] WHEN @deleted = 1 THEN 'delete'[/align]
[align=left] END,[/align]
[align=left] @message.query('/root/inserted'),[/align]
[align=left] @message.query('/root/deleted')[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left] [/align]
[align=left]-- ===============================[/align]
[align=left]-- 在异步触发器处理系统中登记对象[/align]
[align=left]INSERT dbo.tb_async_trigger([/align]
[align=left] table_name, trigger_name)[/align]
[align=left]VALUES([/align]
[align=left] N't1', N'TR_async_trigger')[/align]
[align=left] [/align]
[align=left]INSERT dbo.tb_async_trigger_subscriber([/align]
[align=left] procedure_name)[/align]
[align=left]SELECT N'dbo.p_Sync_t1_t2' UNION ALL[/align]
[align=left]SELECT N'dbo.p_Record_log'[/align]
[align=left] [/align]
[align=left]INSERT dbo.tb_async_trigger_subscribtion([/align]
[align=left] trigger_id, procedure_id)[/align]
[align=left]SELECT 1, 1 UNION ALL[/align]
[align=left]SELECT 1, 2[/align]
GO

4. 使用测试

下面的T-SQL修改表dbo.t1中的数据,并检查dbo.t2、dbo.tb_log中的数据,以确定异步触发器架构的工作是否成功。
执行完成后可以看到dbo.t2、dbo.tb_log中有相关的记录。

[align=left]-- ===============================[/align]
[align=left]-- 测试[/align]
[align=left]INSERT dbo.t1[/align]
[align=left]SELECT 1 UNION ALL[/align]
[align=left]SELECT 2[/align]
[align=left] [/align]
[align=left]UPDATE dbo.t1 SET[/align]
[align=left] col = 2[/align]
[align=left]WHERE id = 1[/align]
[align=left] [/align]
[align=left]DELETE dbo.t1[/align]
[align=left]WHERE id = 2[/align]
[align=left] [/align]
[align=left]-- 显示结果[/align]
[align=left]WAITFOR DELAY '00:00:05' -- 延迟5 分钟, 以便有时间处理消息(因为是异步的)[/align]
[align=left]SELECT * FROM dbo.t2[/align]
[align=left]SELECT * FROM dbo.tb_log[/align]
[align=left]GO[/align]

5. 使用测试

下面的T-SQL删除本文中建立的所有对象。

[align=left]-- =======================================[/align]
[align=left]-- 5. 删除相关的对象[/align]
[align=left]-- =======================================[/align]
[align=left]-- a. 删除service broker 对象[/align]
[align=left]DROP SERVICE SRV_async_trigger[/align]
[align=left]DROP QUEUE dbo.Q_async_trigger[/align]
[align=left]DROP CONTRACT CNT_async_trigger[/align]
[align=left]DROP MESSAGE TYPE MSGT_async_trigger[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- b. 删除异步触发器处理的相关对象[/align]
[align=left]DROP PROC dbo.p_async_trigger_process[/align]
[align=left]DROP PROC dbo.p_async_trigger_send[/align]
[align=left]DROP TABLE dbo.tb_async_trigger_subscribtion[/align]
[align=left]DROP TABLE dbo.tb_async_trigger_subscriber[/align]
[align=left]DROP TABLE dbo.tb_async_trigger[/align]
[align=left]GO[/align]
[align=left] [/align]
[align=left]-- c. 删除测试的对象[/align]
[align=left]DROP TABLE dbo.tb_log, dbo.t1, dbo.t2[/align]
[align=left]DROP PROC dbo.p_Sync_t1_t2, dbo.p_Record_log[/align]

顶0踩
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: