In this blog post, we will show you how to do Oracle Goldengate 12.3 Procedural Replication for AQ Replication.
Procedural replication is a new feature introduced in Oracle Goldengate 12.3 version.

***********************
Prerequisites
***********************

1) Enable Minimal level supplemental logging

select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, FORCE_LOGGING from V$DATABASE;
SUPPLEME SUP SUP FORCE_LOGGING
——– — — —————————————
YES      NO  NO  YES

2) Integrated Extract and Integrated Replicat of Oracle Goldengate version 12.3 and above + Oracle Database 12.2 and above.

3) Need to add procedure trandata.

GGSCI (ora01.cloudintegrator.com as GGSUSER@SOURCEDB) 4> add proceduretrandata

2018-04-22 19:21:33  INFO    OGG-13005  PROCEDURETRANDATA supplemental logging has been enabled.

GGSCI (ora01.cloudintegrator.com as GGSUSER@SOURCEDB) 2> info proceduretrandata

2018-04-22 19:22:32  INFO    OGG-13007  Procedure level supplemental logging is enabled.

Check if your  Procedural replication is ON on the source using the below code.
Note: This will be off at the target ( DB 3 )  so no Procedures will be replicated back.

SET SERVEROUTPUT ON
DECLARE
on_or_off   NUMBER;
BEGIN
on_or_off := DBMS_GOLDENGATE_ADM.GG_PROCEDURE_REPLICATION_ON;
IF on_or_off=1 THEN
DBMS_OUTPUT.PUT_LINE(‘Oracle GoldenGate procedural replication is ON.’);
ELSE
DBMS_OUTPUT.PUT_LINE(‘Oracle GoldenGate procedural replication is OFF.’);
END IF;
END;

4) Configure Extract parameterfile

extract e6
setenv (ORACLE_HOME = /u01/app/ggate/app/oracle/product/12.2.0.1 )
setenv (ORACLE_SID = SOURCEDB )
setenv (TNS_ADMIN = /u01/app/ggate/app/oracle/product/12.2.0.1/network/admin )
USERID GGSUSER@SOURCEDB PASSWORD ggsuser

exttrail ./dirdat/e6
TRANLOGOPTIONS excludetag +
DDL INCLUDE MAPPED
TRANLOGOPTIONS INTEGRATEDPARAMS (ENABLE_PROCEDURAL_REPLICATION Y)

PROCEDURE INCLUDE FEATURE AQ, RULE
–=====================================
— One-way Replication
–=====================================
TABLE LOCALUSER.*;

5)  Configure Replicat parameter file

replicat r6
setenv (ORACLE_HOME = /u01/app/ggate/oracle/product/12.2.0.1)
setenv (ORACLE_SID = TARGETDB)
setenv (TNS_ADMIN =  /u01/app/ggate/oracle/product/12.2.0.1/network/admin)

USERID GGSUSER@TARGETDB PASSWORD ggsuser
DDL INCLUDE ALL
–REPERROR(PROCEDURE , DISCARD)
–PROCEDURE EXCLUDE FEATURE AQ, RULE
MAP LOCALUSER.*,  TARGET LOCALUSER.*;

6) Now add the Integrated Extract on the source and Integrated Replicat on target.

7) Create the queue table and queue and test the AQ replication.

create or replace type localaquser.message_type as object(
subject varchar2(30),
text varchar2(80))
/

******************
Create Queue table
******************
EXEC dbms_aqadm.create_queue_table(queue_table => ‘LOCALAQUSER.TEST_TABLE’, –
queue_payload_type => ‘LOCALAQUSER.MESSAGE_TYPE’, –
storage_clause => ‘PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 TABLESPACE USERS’, –
Sort_list => ‘ENQ_TIME’, –
Multiple_consumers => TRUE, –
Compatible => ‘10.0.0’, –
replication_mode => DBMS_AQADM.REPLICATION_MODE);

*******************
Create Queue
*******************
EXEC  DBMS_AQADM.CREATE_QUEUE(  –
Queue_name          => ‘LOCALAQUSER.TEST_QUEUE’,  –
Queue_table         => ‘LOCALAQUSER.TEST_TABLE’,  –
Queue_type          =>  0,  –
Max_retries         =>  5,  –
Retry_delay         =>  0,  –
dependency_tracking =>  FALSE);

*******************
Start the queue
*******************
DBMS_AQADM.START_QUEUE(‘LOCALAQUSER.TEST_QUEUE’);

*************************
Add subscribers
**************************
DECLARE
V_agent sys.aq$_agent;
BEGIN
V_agent:= sys.aq$_agent(‘Agent1′,NULL,NULL);
DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>’LOCALAQUSER.TEST_QUEUE’
,subscriber=>v_agent);
END;

DECLARE
V_agent sys.aq$_agent;
BEGIN
V_agent:= sys.aq$_agent(‘Agent2′,NULL,NULL);
DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>’LOCALAQUSER.TEST_QUEUE’
,subscriber=>v_agent);
END;

********************
Enqueue to the Queue
********************
DECLARE
v_enqueue_options      dbms_aq.enqueue_options_t;
v_message_properties   dbms_aq.message_properties_t;
v_message_handle       raw(16);
v_message              localaquser.message_type ;
BEGIN
FOR i in 1..10 LOOP
v_message   := localaquser.message_type(i,i);
dbms_aq.enqueue(
queue_name         => ‘test_queue’,
enqueue_options    => v_enqueue_options,
message_properties => v_message_properties,
payload            => v_message,
msgid              => v_message_handle);
commit;
END LOOP;
END;
/

8) Check if the queue table and queue are created on the target and messages are enqueued.

GGSCI (ora01.cloudintegrator.com) 2> stats r6

Sending STATS request to REPLICAT R6 …

Start of Statistics at 2018-04-26 00:57:40.

Integrated Replicat Statistics:

Total transactions                                19.00
Redirected                                         0.00
Replicated procedures                             15.00
DDL operations                                     4.00
Stored procedures                                  0.00
Datatype functionality                             0.00
Event actions                                      0.00
Direct transactions ratio                         21.05%

DDL replication statistics:

*** Total statistics since replicat started     ***
Operations                                         4.00
Mapped operations                                  4.00
Unmapped operations                                0.00
Other operations                                   0.00
Excluded operations                                0.00
Errors                                             0.00
Retried errors                                     0.00
Discarded errors                                   0.00
Ignored errors                                     0.00

Procedural replication statistics:

Statistics for Feature: AQ

*** Total statistics since 2018-04-26 00:48:37 ***
Total executions                                  15.00
Total discards                                     0.00
Total operations                                  15.00

*** Daily statistics since 2018-04-26 00:48:37 ***
Total executions                                  15.00
Total discards                                     0.00
Total operations                                  15.00

*** Hourly statistics since 2018-04-26 00:48:37 ***
Total executions                                  15.00
Total discards                                     0.00
Total operations                                  15.00

*** Latest statistics since 2018-04-26 00:48:37 ***
Total executions                                  15.00
Total discards                                     0.00
Total operations                                  15.00

End of Statistics.

SOURCE
==========
SQL> select name from v$database;

NAME
—————————-
SOURCEDB

SQL> select q_name,MSGID, ENQ_TIME, SENDER_NAME from LOCALAQUSER.TEST_TABLE order by enq_time;

Q_NAME                 MSGID                            ENQ_TIME                       SENDER_NAME
———————- ——————————– —————————— ———————-
TEST_QUEUE             6AB6C5FA9E112147E0535508DC0A3609 26-APR-18 12.51.44.538846 AM
TEST_QUEUE             6AB6C5FA9E122147E0535508DC0A3609 26-APR-18 12.51.44.592076 AM
TEST_QUEUE             6AB6C5FA9E132147E0535508DC0A3609 26-APR-18 12.51.44.594615 AM
TEST_QUEUE             6AB6C5FA9E142147E0535508DC0A3609 26-APR-18 12.51.44.597272 AM
TEST_QUEUE             6AB6C5FA9E152147E0535508DC0A3609 26-APR-18 12.51.44.600141 AM
TEST_QUEUE             6AB6C5FA9E162147E0535508DC0A3609 26-APR-18 12.51.44.602571 AM
TEST_QUEUE             6AB6C5FA9E172147E0535508DC0A3609 26-APR-18 12.51.44.606044 AM
TEST_QUEUE             6AB6C5FA9E182147E0535508DC0A3609 26-APR-18 12.51.44.609449 AM
TEST_QUEUE             6AB6C5FA9E192147E0535508DC0A3609 26-APR-18 12.51.44.612574 AM
TEST_QUEUE             6AB6C5FA9E1A2147E0535508DC0A3609 26-APR-18 12.51.44.615908 AM

TARGET
==========
SQL> select name from v$database;

NAME
—————————-
TARGETDB

SQL> select q_name,MSGID, ENQ_TIME, SENDER_NAME from LOCALAQUSER.TEST_TABLE order by enq_time;

Q_NAME                 MSGID                            ENQ_TIME                       SENDER_NAME
———————- ——————————– —————————— ———————-
TEST_QUEUE             6AB6C5FA9E112147E0535508DC0A3609 26-APR-18 12.51.44.538846 AM
TEST_QUEUE             6AB6C5FA9E122147E0535508DC0A3609 26-APR-18 12.51.44.592076 AM
TEST_QUEUE             6AB6C5FA9E132147E0535508DC0A3609 26-APR-18 12.51.44.594615 AM
TEST_QUEUE             6AB6C5FA9E142147E0535508DC0A3609 26-APR-18 12.51.44.597272 AM
TEST_QUEUE             6AB6C5FA9E152147E0535508DC0A3609 26-APR-18 12.51.44.600141 AM
TEST_QUEUE             6AB6C5FA9E162147E0535508DC0A3609 26-APR-18 12.51.44.602571 AM
TEST_QUEUE             6AB6C5FA9E172147E0535508DC0A3609 26-APR-18 12.51.44.606044 AM
TEST_QUEUE             6AB6C5FA9E182147E0535508DC0A3609 26-APR-18 12.51.44.609449 AM
TEST_QUEUE             6AB6C5FA9E192147E0535508DC0A3609 26-APR-18 12.51.44.612574 AM
TEST_QUEUE             6AB6C5FA9E1A2147E0535508DC0A3609 26-APR-18 12.51.44.615908 AM

Now try dequeue
=================

On source
=============

SQL> set serveroutput on size 99999
SQL> DECLARE
dequeue_options    dbms_aq.DEqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_message LOCALUSER.COLLEGE_TYPE ;
no_messages    exception;
pragma exception_init (no_messages, -25228);

BEGIN
for agent_number in 1..2 loop
BEGIN
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.consumer_name := ‘Agent’||agent_number;
dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
LOOP
dbms_aq.DEqueue(
queue_name => ‘LOCALUSER.COLLEGE_QUEUE’,
DEqueue_options => dequeue_options,
message_properties => v_message_properties,
payload => v_message,
msgid => v_message_handle);
DBMS_OUTPUT.PUT_LINE (‘Message: ‘ || v_message.subject || ‘ … ‘ || v_message.text );
dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
END LOOP;
commit;
DBMS_OUTPUT.PUT_LINE (‘No more messages for Agent’||agent_number);
EXCEPTION
WHEN no_messages THEN
DBMS_OUTPUT.PUT_LINE (‘No more messages for Agents ‘);
COMMIT;
END;
END LOOP;
END;
/
Message: 1 … 1
Message: 2 … 2
No more messages for Agents
No more messages for Agents

PL/SQL procedure successfully completed.

On target
============
SQL> select * from LOCALUSER.COLLEGE_QUEUE_TABLE;

no rows selected