📨 Oracle Advanced Queuing (AQ)
AQ Architecture
| Component |
Description |
Purpose |
| Queue |
Message container |
Async processing |
| Queue Table |
Storage table |
Persistent messages |
| Producer |
Enqueue application |
Send messages |
| Consumer |
Dequeue application |
Process messages |
| Subscriber |
Registered recipient |
Selective consumption |
Message Delivery Modes
| Mode |
Storage |
Use Case |
| Persistent |
Queue table |
Mission-critical |
| Buffered |
Memory (SGA) |
High throughput |
| Persistent Buffered |
Both |
Balance speed & reliability |
🔨 Creating Queues
Grant AQ Privileges
GRANT AQ_ADMINISTRATOR_ROLE TO app_user;
GRANT EXECUTE ON DBMS_AQ TO app_user;
GRANT EXECUTE ON DBMS_AQADM TO app_user;
Create Queue Table & Queue
CREATE TYPE order_message_t AS OBJECT (
order_id NUMBER,
customer_id NUMBER,
order_amount NUMBER,
order_date DATE,
status VARCHAR2(50)
);
/
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'order_queue_table',
queue_payload_type => 'order_message_t',
multiple_consumers => TRUE,
comment => 'Order processing'
);
END;
/
BEGIN
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'order_queue',
queue_table => 'order_queue_table'
);
END;
/
BEGIN
DBMS_AQADM.START_QUEUE(queue_name => 'order_queue');
END;
/
📤 Enqueue Messages
Producer Example
DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
message order_message_t;
BEGIN
message := order_message_t(
order_id => 1001,
customer_id => 5001,
order_amount => 299.99,
order_date => SYSDATE,
status => 'PENDING'
);
message_properties.priority := 1;
message_properties.expiration := 3600;
DBMS_AQ.ENQUEUE(
queue_name => 'order_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
COMMIT;
END;
/
📥 Dequeue Messages
Consumer Example
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
message order_message_t;
BEGIN
dequeue_options.wait := 5;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
DBMS_AQ.DEQUEUE(
queue_name => 'order_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
DBMS_OUTPUT.PUT_LINE('Order: ' || message.order_id);
DBMS_OUTPUT.PUT_LINE('Amount: ' || message.order_amount);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -25228 THEN
DBMS_OUTPUT.PUT_LINE('No messages');
ELSE
RAISE;
END IF;
END;
/
💡 Tip: Use dequeue_options.wait := DBMS_AQ.NO_WAIT for non-blocking or
DBMS_AQ.FOREVER to wait indefinitely.
🔔 Subscribers & Propagation
Add Queue Subscribers
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('warehouse_app', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'order_queue',
subscriber => subscriber
);
END;
/
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('priority_app', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'order_queue',
subscriber => subscriber,
rule => 'tab.user_data.order_amount > 1000'
);
END;
/
Dequeue for Specific Subscriber
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
message order_message_t;
BEGIN
dequeue_options.consumer_name := 'warehouse_app';
dequeue_options.wait := 5;
DBMS_AQ.DEQUEUE(
queue_name => 'order_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
COMMIT;
END;
/
Queue Propagation
CREATE DATABASE LINK remote_db
CONNECT TO aq_user IDENTIFIED BY password
USING 'remote_tns';
BEGIN
DBMS_AQADM.SCHEDULE_PROPAGATION(
queue_name => 'order_queue',
destination => 'remote_db',
start_time => SYSDATE,
latency => 60
);
END;
/
SELECT SCHEMA, QUEUE_NAME, DESTINATION, LATENCY
FROM DBA_QUEUE_SCHEDULES;
📊 Monitoring Queues
Essential Queries
SELECT NAME, QUEUE_TABLE, ENQUEUE_ENABLED, DEQUEUE_ENABLED
FROM DBA_QUEUES;
SELECT QUEUE_TABLE, TYPE, RECIPIENTS
FROM DBA_QUEUE_TABLES;
SELECT MSG_ID, MSG_STATE, ENQUEUE_TIME
FROM order_queue_table;
SELECT QUEUE_SCHEMA, QUEUE_NAME, NUM_MSGS, SPILL_MSGS
FROM V$AQ;
SELECT QUEUE_NAME, CONSUMER_NAME, ADDRESS, RULE
FROM DBA_QUEUE_SUBSCRIBERS;
🎯 Queue Management
Queue Operations
BEGIN
DBMS_AQADM.STOP_QUEUE(queue_name => 'order_queue');
END;
/
BEGIN
DBMS_AQADM.START_QUEUE(queue_name => 'order_queue');
END;
/
DECLARE
purge_options DBMS_AQADM.AQ$_PURGE_OPTIONS_T;
BEGIN
purge_options.block := TRUE;
DBMS_AQADM.PURGE_QUEUE_TABLE(
queue_table => 'order_queue_table',
purge_condition => 'enqueue_time < SYSDATE - 7',
purge_options => purge_options
);
END;
/
BEGIN
DBMS_AQADM.STOP_QUEUE(queue_name => 'order_queue');
DBMS_AQADM.DROP_QUEUE(queue_name => 'order_queue');
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'order_queue_table',
force => TRUE
);
END;
/
🌊 Oracle Streams (Legacy)
Streams Components
| Component |
Description |
Purpose |
| Capture |
Reads redo logs |
Extract changes |
| Propagation |
Sends LCRs |
Transfer changes |
| Apply |
Applies changes |
Replicate data |
| LCR |
Logical Change Record |
Change event |
⚠️ Warning: Oracle Streams is deprecated since 19c. Use GoldenGate for replication or AQ for
messaging.
Streams Alternatives
| Use Case |
Replacement |
Benefits |
| Data Replication |
Oracle GoldenGate |
Real-time CDC |
| Messaging |
Advanced Queuing |
Transactional |
| Event Processing |
Kafka + Oracle CDC |
High throughput |
| DB Migration |
Data Pump + GoldenGate |
Zero downtime |
🚀 Best Practices
AQ Best Practices
- Keep payloads small (<1MB)< /li>
- Set retention policy to purge old messages
- Monitor queue depth (NUM_MSGS)
- Use exception queues for failed messages
- Use message priority for critical messages
- Commit after dequeue to acknowledge
- Use buffered queues for high throughput
- Use rule-based filtering for subscribers
- Test message expiration behavior
Common Use Cases
- Order Processing: Decouple entry from fulfillment
- Email Notifications: Queue emails for async delivery
- ETL Pipelines: Queue data change events
- Audit Logging: Queue audit records
- Microservices: Async communication
- Event-Driven: Publish/subscribe patterns
✓ Learning Checklist - Module 23
- Understand AQ architecture
- Create queue tables with payload types
- Enqueue messages with priority
- Dequeue messages (blocking/non-blocking)
- Add subscribers with filters
- Configure queue propagation
- Monitor queue depth and states
- Implement exception queues
- Understand Streams deprecation
- Apply AQ best practices