In this blog post, we will show you how to do Oracle Goldengate 12.3 Procedural Replication for AQ Replication.
Procedural replication is a new feature introduced in Oracle Goldengate 12.3 version.
***********************
Prerequisites
***********************
1) Enable Minimal level supplemental logging
select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, FORCE_LOGGING from V$DATABASE;
SUPPLEME SUP SUP FORCE_LOGGING
——– — — —————————————
YES NO NO YES
2) Integrated Extract and Integrated Replicat of Oracle Goldengate version 12.3 and above + Oracle Database 12.2 and above.
3) Need to add procedure trandata.
GGSCI (ora01.cloudintegrator.com as GGSUSER@SOURCEDB) 4> add proceduretrandata
2018-04-22 19:21:33 INFO OGG-13005 PROCEDURETRANDATA supplemental logging has been enabled.
GGSCI (ora01.cloudintegrator.com as GGSUSER@SOURCEDB) 2> info proceduretrandata
2018-04-22 19:22:32 INFO OGG-13007 Procedure level supplemental logging is enabled.
Check if your Procedural replication is ON on the source using the below code.
Note: This will be off at the target ( DB 3 ) so no Procedures will be replicated back.
SET SERVEROUTPUT ON
DECLARE
on_or_off NUMBER;
BEGIN
on_or_off := DBMS_GOLDENGATE_ADM.GG_PROCEDURE_REPLICATION_ON;
IF on_or_off=1 THEN
DBMS_OUTPUT.PUT_LINE(‘Oracle GoldenGate procedural replication is ON.’);
ELSE
DBMS_OUTPUT.PUT_LINE(‘Oracle GoldenGate procedural replication is OFF.’);
END IF;
END;
4) Configure Extract parameterfile
extract e6
setenv (ORACLE_HOME = /u01/app/ggate/app/oracle/product/12.2.0.1 )
setenv (ORACLE_SID = SOURCEDB )
setenv (TNS_ADMIN = /u01/app/ggate/app/oracle/product/12.2.0.1/network/admin )
USERID GGSUSER@SOURCEDB PASSWORD ggsuser
exttrail ./dirdat/e6
TRANLOGOPTIONS excludetag +
DDL INCLUDE MAPPED
TRANLOGOPTIONS INTEGRATEDPARAMS (ENABLE_PROCEDURAL_REPLICATION Y)
PROCEDURE INCLUDE FEATURE AQ, RULE
–=====================================
— One-way Replication
–=====================================
TABLE LOCALUSER.*;
5) Configure Replicat parameter file
replicat r6
setenv (ORACLE_HOME = /u01/app/ggate/oracle/product/12.2.0.1)
setenv (ORACLE_SID = TARGETDB)
setenv (TNS_ADMIN = /u01/app/ggate/oracle/product/12.2.0.1/network/admin)
USERID GGSUSER@TARGETDB PASSWORD ggsuser
DDL INCLUDE ALL
–REPERROR(PROCEDURE , DISCARD)
–PROCEDURE EXCLUDE FEATURE AQ, RULE
MAP LOCALUSER.*, TARGET LOCALUSER.*;
6) Now add the Integrated Extract on the source and Integrated Replicat on target.
7) Create the queue table and queue and test the AQ replication.
create or replace type localaquser.message_type as object(
subject varchar2(30),
text varchar2(80))
/
******************
Create Queue table
******************
EXEC dbms_aqadm.create_queue_table(queue_table => ‘LOCALAQUSER.TEST_TABLE’, –
queue_payload_type => ‘LOCALAQUSER.MESSAGE_TYPE’, –
storage_clause => ‘PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 TABLESPACE USERS’, –
Sort_list => ‘ENQ_TIME’, –
Multiple_consumers => TRUE, –
Compatible => ‘10.0.0’, –
replication_mode => DBMS_AQADM.REPLICATION_MODE);
*******************
Create Queue
*******************
EXEC DBMS_AQADM.CREATE_QUEUE( –
Queue_name => ‘LOCALAQUSER.TEST_QUEUE’, –
Queue_table => ‘LOCALAQUSER.TEST_TABLE’, –
Queue_type => 0, –
Max_retries => 5, –
Retry_delay => 0, –
dependency_tracking => FALSE);
*******************
Start the queue
*******************
DBMS_AQADM.START_QUEUE(‘LOCALAQUSER.TEST_QUEUE’);
*************************
Add subscribers
**************************
DECLARE
V_agent sys.aq$_agent;
BEGIN
V_agent:= sys.aq$_agent(‘Agent1′,NULL,NULL);
DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>’LOCALAQUSER.TEST_QUEUE’
,subscriber=>v_agent);
END;
DECLARE
V_agent sys.aq$_agent;
BEGIN
V_agent:= sys.aq$_agent(‘Agent2′,NULL,NULL);
DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>’LOCALAQUSER.TEST_QUEUE’
,subscriber=>v_agent);
END;
********************
Enqueue to the Queue
********************
DECLARE
v_enqueue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_message localaquser.message_type ;
BEGIN
FOR i in 1..10 LOOP
v_message := localaquser.message_type(i,i);
dbms_aq.enqueue(
queue_name => ‘test_queue’,
enqueue_options => v_enqueue_options,
message_properties => v_message_properties,
payload => v_message,
msgid => v_message_handle);
commit;
END LOOP;
END;
/
8) Check if the queue table and queue are created on the target and messages are enqueued.
GGSCI (ora01.cloudintegrator.com) 2> stats r6
Sending STATS request to REPLICAT R6 …
Start of Statistics at 2018-04-26 00:57:40.
Integrated Replicat Statistics:
Total transactions 19.00
Redirected 0.00
Replicated procedures 15.00
DDL operations 4.00
Stored procedures 0.00
Datatype functionality 0.00
Event actions 0.00
Direct transactions ratio 21.05%
DDL replication statistics:
*** Total statistics since replicat started ***
Operations 4.00
Mapped operations 4.00
Unmapped operations 0.00
Other operations 0.00
Excluded operations 0.00
Errors 0.00
Retried errors 0.00
Discarded errors 0.00
Ignored errors 0.00
Procedural replication statistics:
Statistics for Feature: AQ
*** Total statistics since 2018-04-26 00:48:37 ***
Total executions 15.00
Total discards 0.00
Total operations 15.00
*** Daily statistics since 2018-04-26 00:48:37 ***
Total executions 15.00
Total discards 0.00
Total operations 15.00
*** Hourly statistics since 2018-04-26 00:48:37 ***
Total executions 15.00
Total discards 0.00
Total operations 15.00
*** Latest statistics since 2018-04-26 00:48:37 ***
Total executions 15.00
Total discards 0.00
Total operations 15.00
End of Statistics.
SOURCE
==========
SQL> select name from v$database;
NAME
—————————-
SOURCEDB
SQL> select q_name,MSGID, ENQ_TIME, SENDER_NAME from LOCALAQUSER.TEST_TABLE order by enq_time;
Q_NAME MSGID ENQ_TIME SENDER_NAME
———————- ——————————– —————————— ———————-
TEST_QUEUE 6AB6C5FA9E112147E0535508DC0A3609 26-APR-18 12.51.44.538846 AM
TEST_QUEUE 6AB6C5FA9E122147E0535508DC0A3609 26-APR-18 12.51.44.592076 AM
TEST_QUEUE 6AB6C5FA9E132147E0535508DC0A3609 26-APR-18 12.51.44.594615 AM
TEST_QUEUE 6AB6C5FA9E142147E0535508DC0A3609 26-APR-18 12.51.44.597272 AM
TEST_QUEUE 6AB6C5FA9E152147E0535508DC0A3609 26-APR-18 12.51.44.600141 AM
TEST_QUEUE 6AB6C5FA9E162147E0535508DC0A3609 26-APR-18 12.51.44.602571 AM
TEST_QUEUE 6AB6C5FA9E172147E0535508DC0A3609 26-APR-18 12.51.44.606044 AM
TEST_QUEUE 6AB6C5FA9E182147E0535508DC0A3609 26-APR-18 12.51.44.609449 AM
TEST_QUEUE 6AB6C5FA9E192147E0535508DC0A3609 26-APR-18 12.51.44.612574 AM
TEST_QUEUE 6AB6C5FA9E1A2147E0535508DC0A3609 26-APR-18 12.51.44.615908 AM
TARGET
==========
SQL> select name from v$database;
NAME
—————————-
TARGETDB
SQL> select q_name,MSGID, ENQ_TIME, SENDER_NAME from LOCALAQUSER.TEST_TABLE order by enq_time;
Q_NAME MSGID ENQ_TIME SENDER_NAME
———————- ——————————– —————————— ———————-
TEST_QUEUE 6AB6C5FA9E112147E0535508DC0A3609 26-APR-18 12.51.44.538846 AM
TEST_QUEUE 6AB6C5FA9E122147E0535508DC0A3609 26-APR-18 12.51.44.592076 AM
TEST_QUEUE 6AB6C5FA9E132147E0535508DC0A3609 26-APR-18 12.51.44.594615 AM
TEST_QUEUE 6AB6C5FA9E142147E0535508DC0A3609 26-APR-18 12.51.44.597272 AM
TEST_QUEUE 6AB6C5FA9E152147E0535508DC0A3609 26-APR-18 12.51.44.600141 AM
TEST_QUEUE 6AB6C5FA9E162147E0535508DC0A3609 26-APR-18 12.51.44.602571 AM
TEST_QUEUE 6AB6C5FA9E172147E0535508DC0A3609 26-APR-18 12.51.44.606044 AM
TEST_QUEUE 6AB6C5FA9E182147E0535508DC0A3609 26-APR-18 12.51.44.609449 AM
TEST_QUEUE 6AB6C5FA9E192147E0535508DC0A3609 26-APR-18 12.51.44.612574 AM
TEST_QUEUE 6AB6C5FA9E1A2147E0535508DC0A3609 26-APR-18 12.51.44.615908 AM
Now try dequeue
=================
On source
=============
SQL> set serveroutput on size 99999
SQL> DECLARE
dequeue_options dbms_aq.DEqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_message LOCALUSER.COLLEGE_TYPE ;
no_messages exception;
pragma exception_init (no_messages, -25228);
BEGIN
for agent_number in 1..2 loop
BEGIN
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.consumer_name := ‘Agent’||agent_number;
dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
LOOP
dbms_aq.DEqueue(
queue_name => ‘LOCALUSER.COLLEGE_QUEUE’,
DEqueue_options => dequeue_options,
message_properties => v_message_properties,
payload => v_message,
msgid => v_message_handle);
DBMS_OUTPUT.PUT_LINE (‘Message: ‘ || v_message.subject || ‘ … ‘ || v_message.text );
dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
END LOOP;
commit;
DBMS_OUTPUT.PUT_LINE (‘No more messages for Agent’||agent_number);
EXCEPTION
WHEN no_messages THEN
DBMS_OUTPUT.PUT_LINE (‘No more messages for Agents ‘);
COMMIT;
END;
END LOOP;
END;
/
Message: 1 … 1
Message: 2 … 2
No more messages for Agents
No more messages for Agents
PL/SQL procedure successfully completed.
On target
============
SQL> select * from LOCALUSER.COLLEGE_QUEUE_TABLE;
no rows selected
Recent Comments