⚡ Module 23: Real-time Data & Messaging

Oracle Streams & Advanced Queuing

📨 Oracle Advanced Queuing (AQ)

AQ Architecture

Component Description Purpose
Queue Message container Async processing
Queue Table Storage table Persistent messages
Producer Enqueue application Send messages
Consumer Dequeue application Process messages
Subscriber Registered recipient Selective consumption

Message Delivery Modes

Mode Storage Use Case
Persistent Queue table Mission-critical
Buffered Memory (SGA) High throughput
Persistent Buffered Both Balance speed & reliability

🔨 Creating Queues

Grant AQ Privileges

-- Grant privileges
GRANT AQ_ADMINISTRATOR_ROLE TO app_user;
GRANT EXECUTE ON DBMS_AQ TO app_user;
GRANT EXECUTE ON DBMS_AQADM TO app_user;

Create Queue Table & Queue

-- Create payload type
CREATE TYPE order_message_t AS OBJECT (
  order_id       NUMBER,
  customer_id    NUMBER,
  order_amount   NUMBER,
  order_date     DATE,
  status         VARCHAR2(50)
);
/

-- Create queue table
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table        => 'order_queue_table',
    queue_payload_type => 'order_message_t',
    multiple_consumers => TRUE,
    comment            => 'Order processing'
  );
END;
/

-- Create queue
BEGIN
  DBMS_AQADM.CREATE_QUEUE(
    queue_name  => 'order_queue',
    queue_table => 'order_queue_table'
  );
END;
/

-- Start queue
BEGIN
  DBMS_AQADM.START_QUEUE(queue_name => 'order_queue');
END;
/

📤 Enqueue Messages

Producer Example

-- Enqueue a message
DECLARE
  enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     RAW(16);
  message            order_message_t;
BEGIN
  message := order_message_t(
    order_id     => 1001,
    customer_id  => 5001,
    order_amount => 299.99,
    order_date   => SYSDATE,
    status       => 'PENDING'
  );

  -- Set priority (lower = higher priority)
  message_properties.priority := 1;
  message_properties.expiration := 3600; -- 1 hour

  DBMS_AQ.ENQUEUE(
    queue_name         => 'order_queue',
    enqueue_options    => enqueue_options,
    message_properties => message_properties,
    payload            => message,
    msgid              => message_handle
  );

  COMMIT;
END;
/

📥 Dequeue Messages

Consumer Example

-- Dequeue a message
DECLARE
  dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     RAW(16);
  message            order_message_t;
BEGIN
  dequeue_options.wait := 5; -- Wait 5 seconds
  dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;

  DBMS_AQ.DEQUEUE(
    queue_name         => 'order_queue',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => message,
    msgid              => message_handle
  );

  -- Process message
  DBMS_OUTPUT.PUT_LINE('Order: ' || message.order_id);
  DBMS_OUTPUT.PUT_LINE('Amount: ' || message.order_amount);

  COMMIT; -- Remove from queue
EXCEPTION
  WHEN OTHERS THEN
    IF SQLCODE = -25228 THEN
      DBMS_OUTPUT.PUT_LINE('No messages');
    ELSE
      RAISE;
    END IF;
END;
/
💡 Tip: Use dequeue_options.wait := DBMS_AQ.NO_WAIT for non-blocking or DBMS_AQ.FOREVER to wait indefinitely.

🔔 Subscribers & Propagation

Add Queue Subscribers

-- Add subscriber
DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber := SYS.AQ$_AGENT('warehouse_app', NULL, NULL);

  DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name => 'order_queue',
    subscriber => subscriber
  );
END;
/

-- Subscriber with rule-based filter
DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber := SYS.AQ$_AGENT('priority_app', NULL, NULL);

  DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name => 'order_queue',
    subscriber => subscriber,
    rule       => 'tab.user_data.order_amount > 1000'
  );
END;
/

Dequeue for Specific Subscriber

-- Dequeue as subscriber
DECLARE
  dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     RAW(16);
  message            order_message_t;
BEGIN
  dequeue_options.consumer_name := 'warehouse_app';
  dequeue_options.wait := 5;

  DBMS_AQ.DEQUEUE(
    queue_name         => 'order_queue',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => message,
    msgid              => message_handle
  );

  COMMIT;
END;
/

Queue Propagation

-- Create database link
CREATE DATABASE LINK remote_db
  CONNECT TO aq_user IDENTIFIED BY password
  USING 'remote_tns';

-- Schedule propagation
BEGIN
  DBMS_AQADM.SCHEDULE_PROPAGATION(
    queue_name      => 'order_queue',
    destination     => 'remote_db',
    start_time      => SYSDATE,
    latency         => 60  -- Every 60 seconds
  );
END;
/

-- View propagation schedules
SELECT SCHEMA, QUEUE_NAME, DESTINATION, LATENCY
FROM DBA_QUEUE_SCHEDULES;

📊 Monitoring Queues

Essential Queries

-- View all queues
SELECT NAME, QUEUE_TABLE, ENQUEUE_ENABLED, DEQUEUE_ENABLED
FROM DBA_QUEUES;

-- View queue tables
SELECT QUEUE_TABLE, TYPE, RECIPIENTS
FROM DBA_QUEUE_TABLES;

-- Messages in queue
SELECT MSG_ID, MSG_STATE, ENQUEUE_TIME
FROM order_queue_table;

-- Queue statistics
SELECT QUEUE_SCHEMA, QUEUE_NAME, NUM_MSGS, SPILL_MSGS
FROM V$AQ;

-- Subscribers
SELECT QUEUE_NAME, CONSUMER_NAME, ADDRESS, RULE
FROM DBA_QUEUE_SUBSCRIBERS;

🎯 Queue Management

Queue Operations

-- Stop queue
BEGIN
  DBMS_AQADM.STOP_QUEUE(queue_name => 'order_queue');
END;
/

-- Start queue
BEGIN
  DBMS_AQADM.START_QUEUE(queue_name => 'order_queue');
END;
/

-- Purge old messages
DECLARE
  purge_options DBMS_AQADM.AQ$_PURGE_OPTIONS_T;
BEGIN
  purge_options.block := TRUE;

  DBMS_AQADM.PURGE_QUEUE_TABLE(
    queue_table      => 'order_queue_table',
    purge_condition  => 'enqueue_time < SYSDATE - 7',
    purge_options    => purge_options
  );
END;
/

-- Drop queue
BEGIN
  DBMS_AQADM.STOP_QUEUE(queue_name => 'order_queue');
  DBMS_AQADM.DROP_QUEUE(queue_name => 'order_queue');
  DBMS_AQADM.DROP_QUEUE_TABLE(
    queue_table => 'order_queue_table', 
    force => TRUE
  );
END;
/

🌊 Oracle Streams (Legacy)

Streams Components

Component Description Purpose
Capture Reads redo logs Extract changes
Propagation Sends LCRs Transfer changes
Apply Applies changes Replicate data
LCR Logical Change Record Change event
⚠️ Warning: Oracle Streams is deprecated since 19c. Use GoldenGate for replication or AQ for messaging.

Streams Alternatives

Use Case Replacement Benefits
Data Replication Oracle GoldenGate Real-time CDC
Messaging Advanced Queuing Transactional
Event Processing Kafka + Oracle CDC High throughput
DB Migration Data Pump + GoldenGate Zero downtime

🚀 Best Practices

AQ Best Practices

Common Use Cases

✓ Learning Checklist - Module 23