Oracle Advanced Queue (DBMS_AQ/DMBS_AQADM)
2015-11-11 16:18
531 查看
扯在前面Oracle通过AQ (Oracle Streams Advanced Queuing)来提供“进程间” (或者跨会话 -- inter-session) 通信的功能。关于inter-session communication, 貌似DBMS_PIPE也可以做到,这个打算写另外一片水文来介绍,在此不表。 AQ 其实类似于一个message queue, 至于为什么叫Advanced queue,就不清楚了:) message queue典型的应用场景是“生产者-消费者”模式 (or 发布/订阅 (publisher/subscriber))。AQ基础在于有个实实在在的"queue table", 因为数据表中的数据是持久化的,共享的,可以被多个session同时访问,因此很容易就实现了多个session信息传递的目的。有了queue table是不够的,自然还是需要有个queue, 然后可以通过这个介质publisher可以把 信息发布到queue table中,subscriber然后可以通过queue来访问queue table来取得数据。在这个过程中,最重要的还是subscriber 怎么知道何时来queue table里面取message,当然subscriber可以一直不停地查询queue table 或者每隔多长时间来查询一次,这显然都不是很好,最好还是有种方式来提醒subscriber。 AQ支持所谓的callback方式来让subscriber及时获得queue table中的数据。
AQ相关接口
Oracle Advanced Queue 主要提供了DBMS_AQ 和 DBMS_AQADM两个package程序接口。有必要先了解这两个package都提供了什么东西...相关类型定义主要包括如下一些:- ENQUEUE_OPTIONS_T- DEQUEUE_OPTIONS_T- MESSAGE_PROPERTIES_T- AQ$_REG_INFO- AQ$_DESCRIPTORDBMS_AQ这个包主要定义了enqueue/dequeue等过程。 如下所示 (Note that this is far from a complete list of interfaces)- ENQUEUE procedure- ENQUEUE_ARRAY function- DEQUEUE procedure- DEQUEUE_ARRAY function- REGISTER Procedure (Register for message notifications)- UNREGISTER procedure (Unregister a subscription which turns off notification)- LISTEN procedures ( Listen to one or more queues on behalf of a list of agents)- POST procedures (Posts to an anonymous subscription which allows all clients who are registered for the subscription to get notifications)AQ 比较有用的应该是它提供的 callback procedure来支持异步调用的功能。不过有个限制就是自定义的callback procedure必须满足一定的接口规范,如下所示:如果message的类型是RAW, 则接口如下procedure plsqlcallback( context IN RAW, reginfo IN SYS.AQ$_REG_INFO, descr IN SYS.AQ$_DESCRIPTOR, payload IN RAW, payloadl IN NUMBER);如果message的类型的自定义的object类型,则接口如下:
procedure plsqlcallback( context IN RAW, reginfo IN SYS.AQ$_REG_INFO, descr IN SYS.AQ$_DESCRIPTOR, payload IN VARCHAR2, payloadl IN NUMBER);DBMS_AQADM顾名思义,这个包提供了用来管理AQ的接口。 主要包括以下一些接口,- CREATE_QUEUE_TABLE procedure- CREATE_QUEUE procedure- DROP_QUEUE procedure- DROP_QUEUE_TABLE procedure- PURGE_QUEUE_TABLE procedure- START_QUEUE procedure- STOP_QUEUE procedure- ADD_SUBSCRIBER procedure- REMOVE_SUBSCRIBER procedure
AQ in Action
1. Create a message type ( a.k.a. payload type)
SQL> create type test_msg_type as2 object (message varchar2(4000));3 /Type created.
2. Create a queue table based on the payload type just created.
SQL> begin2 dbms_aqadm.create_queue_table3 ( queue_table => 'test_queue_table',4 queue_payload_type => 'test_msg_type');5 end;6 /PL/SQL procedure successfully completed.
3. Create a queue and start the queue
SQL> begin2 dbms_aqadm.create_queue3 ( queue_name => 'test_queue',4 queue_table => 'test_queue_table');56 dbms_aqadm.start_queue7 ( queue_name => 'test_queue');89 end;10 /PL/SQL procedure successfully completed.Now let's see what Oracle has created behind the scene so far.
SQL> select object_name, object_type from user_objects;OBJECT_NAME OBJECT_TYPE---------------------------------------- --------------------TEST_QUEUE_TABLE TABLETEST_MSG_TYPE TYPESYS_C0054669 INDEXSYS_LOB0000262448C00030$$ LOBAQ$_TEST_QUEUE_TABLE_T INDEXAQ$_TEST_QUEUE_TABLE_I INDEXAQ$_TEST_QUEUE_TABLE_E QUEUEAQ$_TEST_QUEUE_TABLE_F VIEWAQ$TEST_QUEUE_TABLE VIEWTEST_QUEUE QUEUE10 rows selected.Note there is another queue -- AQ$_TEST_QUEUE_TABLE_E created for us. Just as the suffix "E" implies, this queue will be used to store the message if the AQ cannot retrieve a messagefrom our user-queue.Now let's see what is inside our queue table. Obviously, nothing!
SQL> select * from test_queue_table;no rows selectedSQL>4. Enqueue messages
1 declare2 v_enqueue_options dbms_aq.enqueue_options_t;3 v_message_properties dbms_aq.message_properties_t;4 v_message_handle raw(16);5 v_payload test_msg_type;6 begin7 v_payload := test_msg_type('Hello There');8 dbms_aq.enqueue9 ( queue_name => 'test_queue',10 enqueue_options => v_enqueue_options,11 message_properties => v_message_properties,12 payload => v_payload,13 msgid => v_message_handle);14 commit;15* end;16 /PL/SQL procedure successfully completed.SQL>Note the enqueue action is essentially a transaction (insert into the queue table), hence we needed to commit it to let other sessions can see the data in the queue table.Now let's see what's inside the queue table.
SQL> select count(*) from test_queue_table;COUNT(*)----------1SQL> select count(*) from aq$test_queue_table;COUNT(*)----------1
SQL> select user_data from aq$test_queue_table;USER_DATA(MESSAGE)--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------TEST_MSG_TYPE('Hello There')SQL>
5. Browsing messages
DBMS_AQ.DEQUEUE can be used to either dequeue message from the queue (remove the message from the queue table) which is the default behavior, or browse the message from the queue ( will not remove the message from the queue).To browse the messages, we can set the dequeue message mode to be DBMS_AQ.BROWSE. See below an example,SQL> declare2 v_dequeue_options dbms_aq.dequeue_options_t;3 v_message_properties dbms_aq.message_properties_t;4 v_message_handle raw(16);5 v_payload test_msg_type;6 begin78 v_dequeue_options.dequeue_mode := dbms_aq.browse;910 dbms_aq.dequeue(11 queue_name => 'test_queue',12 dequeue_options => v_dequeue_options,13 message_properties => v_message_properties,14 payload => v_payload,15 msgid => v_message_handle);1617 dbms_output.put_line('Browsed message: ' || v_payload.message);1819 end;20 /PL/SQL procedure successfully completed.SQL> set serveroutput onSQL> /Browsed message: Hello TherePL/SQL procedure successfully completed.SQL>We can verify the message is still out there in the queue (table) by querying the view aq$test_queue_table.
SQL> select user_data from aq$test_queue_table; USER_DATA(MESSAGE)--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------TEST_MSG_TYPE('Hello There')SQL>
6. Dequeue messages
Now let's do the real dequeuing operation. Note this doesn't have to be from the same session since enqueues are committed transactions and AQ is table-based. Similarly, dequeue is also a transaction. If we are happy with the message, we must commit the dequeueas well.1 declare2 v_dequeue_options dbms_aq.dequeue_options_t;3 v_message_properties dbms_aq.message_properties_t;4 v_message_handle raw(16);5 v_payload test_msg_type;6 begin7 dbms_aq.dequeue(8 queue_name => 'test_queue',9 dequeue_options => v_dequeue_options,10 message_properties => v_message_properties,11 payload => v_payload,12 msgid => v_message_handle);13 dbms_output.put_line('Dequeue message: ' || v_payload.message); 14 commit;15* end;16 /Dequeue message: Hello TherePL/SQL procedure successfully completed.SQL>We can confirm the message is gone by querying the queue table...
SQL> select count(*) from test_queue_table;COUNT(*)----------0
7. Clean Up
Before going on to the topic of notification, let's do the clean up first.SQL> begin2 dbms_aqadm.stop_queue('test_queue');3 dbms_aqadm.drop_queue('test_queue');4 dbms_aqadm.drop_queue_table('test_queue_table');5 end;6 /PL/SQL procedure successfully completed.SQL>
SQL> select object_name, object_type from user_objects;OBJECT_NAME OBJECT_TYPE---------------------------------------- --------------------TEST_MSG_TYPE TYPESQL>
8. Notification
The examples above shows how to dequeue the messages manually. This is not pleasant in the real world. Most of the time, we'd like there would be some mechanism to notify dequeuing instead of dequeuing initiatively.First create queue table for multiple consumers..1 begin2 dbms_aqadm.create_queue_table3 ( queue_table => 'test_queue_table',4 queue_payload_type => 'test_msg_type',5 multiple_consumers => true);6* end;SQL> /PL/SQL procedure successfully completed.SQL>Then create the queue and start it as usual,
SQL> begin2 dbms_aqadm.create_queue3 ( queue_name => 'test_queue',4 queue_table => 'test_queue_table');56 dbms_aqadm.start_queue7 ( queue_name => 'test_queue');8 end;9 /PL/SQL procedure successfully completed.SQL>To demonstrate the asynchronous nature of notification via callback, we are going to store the queued message in a normal application table.
SQL> create table test_message_table2 ( message varchar2(4000));Table created.Now the key point comes, we create a callback plsql procedure. This procedure will dequeue the message and save it in the table test_message_table when there is notification. Remember the callback procedure interface signature must be as follows,
1 create or replace procedure test_queue_callback_procedure2 ( context raw,3 reginfo sys.aq$_reg_info,4 descr sys.aq$_descriptor,5 payload raw,6 payloadl number)7 AS8 v_dequeue_options dbms_aq.dequeue_options_t;9 v_message_properties dbms_aq.message_properties_t;10 v_message_handle raw(16);11 v_payload test_msg_type;12 begin13 v_dequeue_options.msgid := descr.msg_id;14 v_dequeue_options.consumer_name := descr.consumer_name;15 dbms_aq.dequeue16 ( queue_name => descr.queue_name,17 dequeue_options => v_dequeue_options,18 message_properties => v_message_properties,19 payload => v_payload,20 msgid => v_message_handle);21 insert into test_message_table(message)22 values('Message [' || v_payload.message || ']' ||23 ' dequeued at [' || to_char(systimestamp, 'yyyy-mm-dd hh24:mi:ss.FF3') || ']');24 commit;25* end;SQL> /Procedure created.We need to add a named subscriber to the queue and register the action that the subscriber will take on notification.
1 begin2 dbms_aqadm.add_subscriber3 ( queue_name => 'test_queue',4 subscriber => sys.aq$_agent5 ('test_queue_subscriber',6 null,7 null)8 );9 dbms_aq.register10 (11 sys.aq$_reg_info_list12 ( sys.aq$_reg_info13 ( 'test_queue:test_queue_suscriber',14 dbms_aq.namespace_aq,15 'plsql://test_queue_callback_procedure',16 HEXTORAW('FF')17 )18 ),19 120 );21* end;22 /PL/SQL procedure successfully completed.SQL>Refer to AQ$_REG_INFO Type forthe detailed definition.Now let's see what will happen when we enqueue a message...
1 declare2 v_enqueue_options dbms_aq.enqueue_options_t;3 v_message_properties dbms_aq.message_properties_t;4 v_message_handle raw(16);5 v_payload test_msg_type;6 begin7 v_payload := test_msg_type(8 to_char(systimestamp,9 'yyyy-mm-dd hh24:mi:ss.ff3'));10 dbms_aq.enqueue11 (12 queue_name => 'test_queue',13 enqueue_options => v_enqueue_options,14 message_properties => v_message_properties,15 payload => v_payload,16 msgid => v_message_handle);17 commit;18* end;SQL> /PL/SQL procedure successfully completed.SQL>To see if the message was automatically dequeued, let's check out the table test_message_table,
SQL> select * from test_message_table;MESSAGE----------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------- ----------------------Message [2010-11-06 16:07:47.537] dequeued at [2010-11-06 16:07:51.599]SQL> select count(*) from test_queue_table;COUNT(*)----------0SQL>Clean up: remove the subscriber as follows,
1 begin2 DBMS_AQADM.REMOVE_SUBSCRIBER3 ( queue_name => 'test_queue',4 subscriber => sys.aq$_agent5 ('test_queue_subscriber', null, null)6 );7* end;SQL> /PL/SQL procedure successfully completed.
Acknowledgements
本文的例子来自Adrian Billington的 introduction to advanced queuing相关文章推荐
- hibernate中oracle库时间格式执行 2015/11/11 15:10:01错误原因
- oracle 授权
- oracle归档模式和非归档模式的区别
- ORACLE创建表时添加列说明
- ORACLE中sequence用法
- linux下Oracle Instant Client安装(rpm、zip)
- navicat连接Oracle使用说明
- oracle 控制文件
- oracle服务详细介绍
- cmd 下连接oracle 并且导出表
- ExDate Oracle 数据库优化的原则和方法
- JDBC连接数据库(oracle)操作
- (3)Oracle基础--表
- 分析Oracle索引扫描四大类
- Windows平台中Oracle11gR2使用的服务
- Rhel6.6_Oracle11gR2(ASM,UDEV)的RAC搭建安装
- java向ORACLE插入时间
- 如何查看oracle的表空间使用情况
- oracle数据的几个关闭选项
- linux下安装oracle_11g