Nachdem ich im ersten Teil auf die Konfiguration des Oracle CDC Service und des Oracle CDC Designer eingegangen bin, möchte ich in diesem Teil auf die Konfiguration der SSIS Komponenten innerhalb des SSIS Designers eingehen.
Die Demoumgebung
Kurz vorweg zur Erklärung der Demo Umgebung. In meinem bisherigen Demos und Webcasts die ich unter anderem zusammen mit Allan Mitchell für Attunity gemacht hatte, kam häufig die Northwind Datenbank zum Einsatz. So hat sich jeder Microsoft Datenbankler – und auch ich – zumindest ein bisschen heimisch gefühlt.
Mit der Oracle XE Version, die ich nun in meiner VM verwende, habe ich mir zum ersten mal auch die mitgelieferte Sample Application näher angeschaut und verwende diese jetzt in den Demos. Zwar sind einem die Daten jetzt nicht mehr so bekannt (Tschüß ALFKI…), das Datenbankschema bzw. die von mir daraus verwendeten Tabellen mit Customer, Order und Order_Items sollten aber soweit selbsterklärend sein. Der Vorteil der sich für mich aus dem Einsatz der Sample Application ergibt, ist die mitgelieferte Web Anwendung aus dem Oracle Application Express Paket. Somit steht mir direkt eine kleine Web-Anwendung zur Verfügung in der ich Kunden und Aufträge erfassen und verändern kann. In den meisten Fällen ist dies wesentlich schneller nachzuvollziehen, als SQL Code der mittels SQLPlus oder Toad irgendwelche INSERT Statements ausführt.
In diesem Beitrag möchte ich nun die Kundendaten und Orders mit den dazugehörigen Positionen per Change Data Capture aus der Oracle Datenbank laden und in einer temporären Datenbank auf meinem SQL Server speichern. Auf die entsprechenden CDC Einstellungen für Oracle, wie also der Oracle CDC Service sowie die Oracle CDC Instanz zu konfigurieren sind, bin ich bereits in meinem vorherigen Artikel Change Data Capture Service for Oracle–Teil 1 näher eingegangen.
Um die Daten per CDC zu Laden, werden im wesentlichen zwei SSIS Pakete benötigt. Ein Paket für den Full-Load, mit dem die Daten initial vollständig geladen werden, und ein Paket für den Incremental-Load, mit dem später nur noch die Änderungen übernommen werden.
Full Load
Als erstes wird das Paket Full-Load erstelt, das die folgende Aufgaben übernehmen soll:
- Mark Initial Load Start mit dem CDC Control Task
- Transferieren aller Daten vom Quell- in das Ziel-System
- Mark Initial Load End mit dem CDC Control Task
Der CDC Control Task, mit jeweils Mark Initial Load und Mark Initial End, teilt dem CDC Service mit, dass ein Full Load stattgefunden hat, und speichert in der unter Table to use for storing state angegebenen Tabelle, die entsprechende Position im Log mit einigen Zusatzinformationen, so dass beim nächsten Incremental-Load auch wirklich nur die neuen bzw. veränderten Datensätze geladen werden. Ein entsprechender CDC State könnte wie folgt aussehen:
ILUPDATE/CS/0x000000074AA000000000/CE/0x000000074F4E01000001/IR/
0x000000074AA000000000/0x000000074AAE00000000/TS/2013-01-08T16:58:03.5408281/
Als SQL Server CDC database wird die in Teil 1 erstellte Datenbank OracleDemo auf meinem SQL Server angegeben. Seitens SSIS wird für das eigentliche CDC keine Verbindung zu der Oracle Datenbank aufgebaut (für den Full Load muss jedoch eine Verbindung zur Quell-Datenbank aufgebaut werden.). Für den CDC Control Task der später das Ende des Full-Load Prozesses definiert, ist es wichtig, dass die entsprechenden Einstellungen, mit Ausnahme der CDC control operation, identisch vorgenommen werden.
Im zweiten Teil des Paketes, Transferieren aller Daten vom Quell- in das Ziel-System, werden die Daten regulär per SSIS aus der Oracle Datenbank in mein Zielsystem geladen. Für diesen Ladeprozess setze ich mit dem Microsoft Connector Version 2.0 for Oracle by Attunityauf eine weitere von Microsoft bei Attunity lizensierte Komponente. Die entsprechende Komponente wird bereits seit der Version 2008 von Microsoft angeboten und verbessert die Performance beim Zugriff auf eine Oracle Datenbank erheblich.
Der dargestellte Datenfluss lädt die Customer Daten vollständig mit Hilfe des Oracle Source Connector aus der Oracle Datenbank und speichert diese 1:1 in der SQL Server Datenbank.
Um sicherzustellen, dass bei einem Full-Load die Daten in meiner Zieltabelle nicht dupliziert werden, wird dem Control Flow noch ein Execute SQL Task hinzugefügt, mit dem die Ziel-Tabelle vor dem Full-Load gelöscht wird. Da in meinem Beispiel 3 Tabellen mit CDC überwacht und geladen werden, werden die entsprechenden Tasks auch noch für die Tabelle Orders und Order_Items hinzugefügt, so dass das Beispiel-Paket dann wie folgt aussieht:
Incremental Load – Laden des Delta
Zum Laden der jeweils geänderten Daten in den Quelltabellen wird ein neues SSIS Paket erstellt. Wie bereits beim Full-Load kommt auch hier der CDC Control Task zum Einsatz, der in diesem Fall jedoch mit den CDC control operation Get processing range und Mark processed range gesetzt wird.
Innerhalb des Datenflusses kommen nun die beiden weiteren CDC Komponenten, die CDC Source und der CDC Splitter zum Einsatz. Über die CDC Source wird zum einen definiert woher die Daten geladen werden, hier findet also wieder der Zugriff auf die zuvor definierte CDC Instanz statt, zum anderen wird definiert, welche Daten geladen werden. Da ich in diesem Fall nur die Änderungen, also das sogenannte Delta laden möchte, wird der CDC processing mode auf Net gesetzt.
Der CDC Splitter teilt die geladenen Daten in die jeweils durchgeführten Operationnen Insert, Update und Delete auf. Wie im Paket zu sehen, werden die Delete Datensätze direkt über einen OLE DB Command in der Ziel-Datenbank gelöscht. Hierfür wird der Befehl
DELETE FROM DEST_Customer WHERE [CUSTOMER_ID] = ?
verwendet und die durch das ? (Fragezeichen) repräsentierte Variable über das Mapping an die entsprechende Spalte im Datenfluss gebunden.
Im weitere Teil des Paketes werden die Insert und Update Datensätze mit einer Union Komponente zusammengeführt und in einer temporären Tabelle gespeichert. Diese Datensätze werden danach im Control Flow über ein Execute SQL Task mit einem Merge Statement in die Ziel-Tabelle übernommen, dies erhöht die Performance speziell für Updates erheblich. Das entsprechende Merge Statement sieht wie folgt aus:
1: MERGE INTO DEST_Customer
2: USING
3: (
4: SELECT [CUSTOMER_ID]
5: ,[CUST_FIRST_NAME]
6: ,[CUST_LAST_NAME]
7: ,[CUST_STREET_ADDRESS1]
8: ,[CUST_STREET_ADDRESS2]
9: ,[CUST_CITY]
10: ,[CUST_STATE]
11: ,[CUST_POSTAL_CODE]
12: ,[PHONE_NUMBER1]
13: ,[PHONE_NUMBER2]
14: ,[CREDIT_LIMIT]
15: ,[CUST_EMAIL]
16: FROM TEMP_Customer_Destination
17: ) MergeData ON DEST_Customer.CUSTOMER_ID = MergeData.CUSTOMER_ID
18: WHEN MATCHED THEN
19: UPDATE SET
20: DEST_Customer.[CUSTOMER_ID] = MergeData.Customer_ID
21: ,DEST_Customer.[CUST_FIRST_NAME] = MergeData.Cust_First_Name
22: ,DEST_Customer.[CUST_LAST_NAME] = MergeData.Cust_Last_Name
23: ,DEST_Customer.[CUST_STREET_ADDRESS1] = MergeData.CUST_STREET_ADDRESS1
24: ,DEST_Customer.[CUST_STREET_ADDRESS2] = MergeData.CUST_STREET_ADDRESS2
25: ,DEST_Customer.[CUST_CITY] = MergeData.CUST_CITY
26: ,DEST_Customer.[CUST_STATE] = MergeData.CUST_STATE
27: ,DEST_Customer.[CUST_POSTAL_CODE] = MergeData.CUST_POSTAL_CODE
28: ,DEST_Customer.[PHONE_NUMBER1] = MergeData.PHONE_NUMBER1
29: ,DEST_Customer.[PHONE_NUMBER2] = MergeData.PHONE_NUMBER2
30: ,DEST_Customer.[CREDIT_LIMIT] = MergeData.CREDIT_LIMIT
31: ,DEST_Customer.[CUST_EMAIL] = MergeData.CUST_EMAIL
32: WHEN NOT MATCHED THEN INSERT ([CUSTOMER_ID]
33: ,[CUST_FIRST_NAME]
34: ,[CUST_LAST_NAME]
35: ,[CUST_STREET_ADDRESS1]
36: ,[CUST_STREET_ADDRESS2]
37: ,[CUST_CITY]
38: ,[CUST_STATE]
39: ,[CUST_POSTAL_CODE]
40: ,[PHONE_NUMBER1]
41: ,[PHONE_NUMBER2]
42: ,[CREDIT_LIMIT]
43: ,[CUST_EMAIL])
44: VALUES (MergeData.[CUSTOMER_ID]
45: ,MergeData.[CUST_FIRST_NAME]
46: ,MergeData.[CUST_LAST_NAME]
47: ,MergeData.[CUST_STREET_ADDRESS1]
48: ,MergeData.[CUST_STREET_ADDRESS2]
49: ,MergeData.[CUST_CITY]
50: ,MergeData.[CUST_STATE]
51: ,MergeData.[CUST_POSTAL_CODE]
52: ,MergeData.[PHONE_NUMBER1]
53: ,MergeData.[PHONE_NUMBER2]
54: ,MergeData.[CREDIT_LIMIT]
55: ,MergeData.[CUST_EMAIL]);
Da auch hier mit dem Paket wieder 3 Tabellen übernommen werden, müssen die entsprechenden Tasks wieder für die beiden weiteren Tabellen Orders und Order_Items angelegt werden. Es ist noch darauf zu achten, dass innerhalb der jeweiligen CDC Control Tasks wie auch in der CDC Source jeweils eigene Variablen zum Speichern des CDC State pro Tabelle verwendet werden. Ebenso sollte der CDC State name auch pro zu ladender Tabelle eindeutig sein. Das vollständige Paket sieht nun wie folgt aus:
Der zusätzliche Execute SQL Task vor den Sequenz Containern löscht vor der Verarbeitung die jeweiligen temporären Tabellen.
Das Paket zum Incremental Load kann im Übrigen 1:1 auch für SQL Server CDC verwendet werden.