Oracle Streams Configuration: Change Data Capture
Oracle Streams Configuration: Change Data Capture
Lewis Cunningham | Dec 17, 2006
I have been playing with Oracle Streams again lately. My goal is to capture changes in 10g and send them to a 9i database.
Below is the short list for setting up Change Data Capture using Oracle Streams. These steps are mostly from the docs with a few tweaks I have added. This entry only covers setting up the local capture and apply. I'll add the propagation to 9i later this week or next weekend.
First the set up: we will use the HR account's Employee table. We'll capture all changes to the Employee table and insert them into an audit table. I'm not necessarily saying this is the way you should audit your database but it makes a nice example.
I'll also add a monitoring piece to capture process. I want to be able to see exactly what is being captured when it is being captured.
You will need to have sysdba access to follow along with me. Your database must also be in archivelog mode. The changes are picked up from the redo log.
So, away we go! The first step is to create out streams administrator. I will follow the guidelines from the oracle docs exactly for this:
Connect as sysdba:
sqlplus / as sysdba
Create the streams tablespace (change the name and/or location to suit):
create tablespace streams_tbs datafile 'c:\temp\stream_tbs.dbf' size 25M reuse autoextend on maxsize unlimited;
Create our streams administrator:
create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
I haven't quite figured out why, but we need to grant our administrator DBA privs. I think this is a bad thing. There is probably a work around where I could do some direct grants instead but I haven't had time to track those down.
grant dba to strmadmin;
We also want to grant streams admin privs to the user.
BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'strmadmin', grant_privileges => true);
END;
/
The next steps we'll run as the HR user.
conn hr/hr
Grant all access to the employee table to the streams admin:
grant all on hr.employees to strmadmin;
We also need to create the employee_audit table. Note that I am adding three columns in this table that do not exist in the employee table.
CREATE TABLE employee_audit
( employee_id NUMBER(6),
first_name VARCHAR2(20),
last_name VARCHAR2(25),
email VARCHAR2(25),
phone_number VARCHAR2(20),
hire_date DATE,
job_id VARCHAR2(10),
salary NUMBER(8,2),
commission_pct NUMBER(2,2),
manager_id NUMBER(6),
department_id NUMBER(4),
upd_date DATE,
user_name VARCHAR2(30),
action VARCHAR2(30)
);
Grant all access to the audit table to the streams admin user:
grant all on hr.employee_audit to strmadmin;
We connect as the streams admin user:
conn strmadmin/strmadmin
We can create a logging table. You would NOT want to do this in a high-volume production system. I am doing this to illustrate user defined monitoring and show how you can get inside the capture process.
CREATE TABLE streams_monitor
(date_and_time TIMESTAMP(6) DEFAULT systimestamp, txt_msg CLOB );
Here we create the queue. Unlike AQ, where you have to create a separate table, this step creates the queue and the underlying ANYDATA table.
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strmadmin.streams_queue_table', queue_name => 'strmadmin.streams_queue');
END;
/
This just defines that we want to capture DML and not DDL.
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'capture', streams_name => 'capture_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true);
END;
/
Tell the capture process that we want to know who made the change:
BEGIN
DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE( capture_name => 'capture_emp', attribute_name => 'username',
include => true);
END;
/
We also need to tell Oracle where to start our capture. Change the source_database_name to match your database.
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( source_object_name => 'hr.employees', source_database_name => 'ORCL', instantiation_scn => iscn);
END;
/
And the fun part! This is where we define our capture procedure. I'm taking this right from the docs but I'm adding a couple steps.
CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
command VARCHAR2(30);
old_values SYS.LCR$_ROW_LIST;
BEGIN
-- Access the LCR
rc := in_any.GETOBJECT(lcr);
-- Get the object command type
command := lcr.GET_COMMAND_TYPE();
-- I am inserting the XML equivalent of the LCR into the monitoring table.
insert into streams_monitor (txt_msg) values (command || DBMS_STREAMS.CONVERT_LCR_TO_XML(in_any) );
-- Set the command_type in the row LCR to INSERT
lcr.SET_COMMAND_TYPE('INSERT');
-- Set the object_name in the row LCR to EMP_DEL
lcr.SET_OBJECT_NAME('EMPLOYEE_AUDIT');
-- Set the new values to the old values for update and delete
IF command IN ('DELETE', 'UPDATE') THEN
-- Get the old values in the row LCR
old_values := lcr.GET_VALUES('old');
-- Set the old values in the row LCR to the new values in the row LCR
lcr.SET_VALUES('new', old_values);
-- Set the old values in the row LCR to NULL
lcr.SET_VALUES('old', NULL);
END IF;
-- Add a SYSDATE for upd_date
lcr.ADD_COLUMN('new', 'UPD_DATE', ANYDATA.ConvertDate(SYSDATE));
-- Add a user column
lcr.ADD_COLUMN('new', 'user_name', lcr.GET_EXTRA_ATTRIBUTE('USERNAME') );
-- Add an action column
lcr.ADD_COLUMN('new', 'ACTION', ANYDATA.ConvertVarChar2(command));
-- Make the changes
lcr.EXECUTE(true);
commit;
END;
/
Create the DML handlers:
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'INSERT', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL);
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'UPDATE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL);
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'DELETE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL);
END;
/
Create the apply rule. This tells streams, yet again, that we in fact do want to capture changes. The second calls tells streams where to put the info. Change the source_database_name to match your database.
DECLARE
emp_rule_name_dml VARCHAR2(30);
emp_rule_name_ddl VARCHAR2(30);
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'apply', streams_name => 'apply_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, source_database => 'ORCL', dml_rule_name => emp_rule_name_dml, ddl_rule_name => emp_rule_name_ddl);
DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION( rule_name => emp_rule_name_dml, destination_queue_name => 'strmadmin.streams_queue');
END;
/
NOTE: An error was noticed (by several readers) and a fix was posted on OTN so I wanted to get the fix in here:
Add this code, as STRMADMIN:
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'dequeue', streams_name => 'emp_deq', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true);
END;
/
I haven't tested this myself but it does make sense based on the error people were receiving (ORA-26694). I did have dequeues in my original application so I must have missed that when making this example. Thanks OTN!
We don't want to stop applying changes when there is an error, so:
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_emp', parameter => 'disable_on_error', value => 'n');
END;
/
Turn on the apply process:
BEGIN
DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_emp');
END;
/
Turn on the capture process:
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'capture_emp');
END;
/
Connect as HR and make some changes to Employees.
sqlplus hr/hr
INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110); COMMIT; UPDATE hr.employees SET salary=5999 WHERE employee_id=206; COMMIT; DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;
It takes a few seconds for the data to make it to the logs and then back into the system to be appled. Run this query until you see data (remembering that it is not instantaneous):
SELECT employee_id, first_name, last_name, upd_Date, action FROM hr.employee_audit ORDER BY employee_id;
Then you can log back into the streams admin account:
sqlplus strmadmin/strmadmin
View the XML LCR that we inserted during the capture process:
set long 9999 set pagesize 0
select * from streams_monitor;
That's it! It's really not that much work to capture and apply changes. Of course, it's a little bit more work to cross database instances, but it's not that much. Keep an eye out for a future entry where I do just that. One of the things that amazes me is how little code is required to accomplish this. The less code I have to write, the less code I have to maintain.
Thank care,
LewisC
Oracle Streams Configuration: Change Data Capture
Lewis Cunningham | Dec 17, 2006
I have been playing with Oracle Streams again lately. My goal is to capture changes in 10g and send them to a 9i database.
Below is the short list for setting up Change Data Capture using Oracle Streams. These steps are mostly from the docs with a few tweaks I have added. This entry only covers setting up the local capture and apply. I'll add the propagation to 9i later this week or next weekend.
First the set up: we will use the HR account's Employee table. We'll capture all changes to the Employee table and insert them into an audit table. I'm not necessarily saying this is the way you should audit your database but it makes a nice example.
I'll also add a monitoring piece to capture process. I want to be able to see exactly what is being captured when it is being captured.
You will need to have sysdba access to follow along with me. Your database must also be in archivelog mode. The changes are picked up from the redo log.
So, away we go! The first step is to create out streams administrator. I will follow the guidelines from the oracle docs exactly for this:
Connect as sysdba:
sqlplus / as sysdba
Create the streams tablespace (change the name and/or location to suit):
create tablespace streams_tbs datafile 'c:\temp\stream_tbs.dbf' size 25M reuse autoextend on maxsize unlimited;
Create our streams administrator:
create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
I haven't quite figured out why, but we need to grant our administrator DBA privs. I think this is a bad thing. There is probably a work around where I could do some direct grants instead but I haven't had time to track those down.
grant dba to strmadmin;
We also want to grant streams admin privs to the user.
BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'strmadmin', grant_privileges => true);
END;
/
The next steps we'll run as the HR user.
conn hr/hr
Grant all access to the employee table to the streams admin:
grant all on hr.employees to strmadmin;
We also need to create the employee_audit table. Note that I am adding three columns in this table that do not exist in the employee table.
CREATE TABLE employee_audit
( employee_id NUMBER(6),
first_name VARCHAR2(20),
last_name VARCHAR2(25),
email VARCHAR2(25),
phone_number VARCHAR2(20),
hire_date DATE,
job_id VARCHAR2(10),
salary NUMBER(8,2),
commission_pct NUMBER(2,2),
manager_id NUMBER(6),
department_id NUMBER(4),
upd_date DATE,
user_name VARCHAR2(30),
action VARCHAR2(30)
);
Grant all access to the audit table to the streams admin user:
grant all on hr.employee_audit to strmadmin;
We connect as the streams admin user:
conn strmadmin/strmadmin
We can create a logging table. You would NOT want to do this in a high-volume production system. I am doing this to illustrate user defined monitoring and show how you can get inside the capture process.
CREATE TABLE streams_monitor
(date_and_time TIMESTAMP(6) DEFAULT systimestamp, txt_msg CLOB );
Here we create the queue. Unlike AQ, where you have to create a separate table, this step creates the queue and the underlying ANYDATA table.
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strmadmin.streams_queue_table', queue_name => 'strmadmin.streams_queue');
END;
/
This just defines that we want to capture DML and not DDL.
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'capture', streams_name => 'capture_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true);
END;
/
Tell the capture process that we want to know who made the change:
BEGIN
DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE( capture_name => 'capture_emp', attribute_name => 'username',
include => true);
END;
/
We also need to tell Oracle where to start our capture. Change the source_database_name to match your database.
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( source_object_name => 'hr.employees', source_database_name => 'ORCL', instantiation_scn => iscn);
END;
/
And the fun part! This is where we define our capture procedure. I'm taking this right from the docs but I'm adding a couple steps.
CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
command VARCHAR2(30);
old_values SYS.LCR$_ROW_LIST;
BEGIN
-- Access the LCR
rc := in_any.GETOBJECT(lcr);
-- Get the object command type
command := lcr.GET_COMMAND_TYPE();
-- I am inserting the XML equivalent of the LCR into the monitoring table.
insert into streams_monitor (txt_msg) values (command || DBMS_STREAMS.CONVERT_LCR_TO_XML(in_any) );
-- Set the command_type in the row LCR to INSERT
lcr.SET_COMMAND_TYPE('INSERT');
-- Set the object_name in the row LCR to EMP_DEL
lcr.SET_OBJECT_NAME('EMPLOYEE_AUDIT');
-- Set the new values to the old values for update and delete
IF command IN ('DELETE', 'UPDATE') THEN
-- Get the old values in the row LCR
old_values := lcr.GET_VALUES('old');
-- Set the old values in the row LCR to the new values in the row LCR
lcr.SET_VALUES('new', old_values);
-- Set the old values in the row LCR to NULL
lcr.SET_VALUES('old', NULL);
END IF;
-- Add a SYSDATE for upd_date
lcr.ADD_COLUMN('new', 'UPD_DATE', ANYDATA.ConvertDate(SYSDATE));
-- Add a user column
lcr.ADD_COLUMN('new', 'user_name', lcr.GET_EXTRA_ATTRIBUTE('USERNAME') );
-- Add an action column
lcr.ADD_COLUMN('new', 'ACTION', ANYDATA.ConvertVarChar2(command));
-- Make the changes
lcr.EXECUTE(true);
commit;
END;
/
Create the DML handlers:
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'INSERT', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL);
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'UPDATE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL);
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'DELETE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL);
END;
/
Create the apply rule. This tells streams, yet again, that we in fact do want to capture changes. The second calls tells streams where to put the info. Change the source_database_name to match your database.
DECLARE
emp_rule_name_dml VARCHAR2(30);
emp_rule_name_ddl VARCHAR2(30);
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'apply', streams_name => 'apply_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, source_database => 'ORCL', dml_rule_name => emp_rule_name_dml, ddl_rule_name => emp_rule_name_ddl);
DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION( rule_name => emp_rule_name_dml, destination_queue_name => 'strmadmin.streams_queue');
END;
/
NOTE: An error was noticed (by several readers) and a fix was posted on OTN so I wanted to get the fix in here:
Add this code, as STRMADMIN:
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'dequeue', streams_name => 'emp_deq', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true);
END;
/
I haven't tested this myself but it does make sense based on the error people were receiving (ORA-26694). I did have dequeues in my original application so I must have missed that when making this example. Thanks OTN!
We don't want to stop applying changes when there is an error, so:
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_emp', parameter => 'disable_on_error', value => 'n');
END;
/
Turn on the apply process:
BEGIN
DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_emp');
END;
/
Turn on the capture process:
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'capture_emp');
END;
/
Connect as HR and make some changes to Employees.
sqlplus hr/hr
INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110); COMMIT; UPDATE hr.employees SET salary=5999 WHERE employee_id=206; COMMIT; DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;
It takes a few seconds for the data to make it to the logs and then back into the system to be appled. Run this query until you see data (remembering that it is not instantaneous):
SELECT employee_id, first_name, last_name, upd_Date, action FROM hr.employee_audit ORDER BY employee_id;
Then you can log back into the streams admin account:
sqlplus strmadmin/strmadmin
View the XML LCR that we inserted during the capture process:
set long 9999 set pagesize 0
select * from streams_monitor;
That's it! It's really not that much work to capture and apply changes. Of course, it's a little bit more work to cross database instances, but it's not that much. Keep an eye out for a future entry where I do just that. One of the things that amazes me is how little code is required to accomplish this. The less code I have to write, the less code I have to maintain.
Thank care,
LewisC
Oracle Streams Configuration: Change Data Capture
Comments