Change Data Capture et SSIS pour un load intelligent

Présentation

Ce projet de chargement intelligent des données repose sur l’utilisation du Change Data Capture de SQL Server 2008 / 2008 R2 et la mise en place d’un historique de données grâce aux dimensions à variation lente de type II (Slowly Changing Dimension (SCD)  Type II).

L’idée de départ…

Je possède une base de données source avec une table contenant les informations de mes clients.  Et je souhaite utiliser certaines de ces informations pour alimenter la dimension associée dans mon data warehouse.
Dans un premier temps il faudrait pouvoir charger toutes les données directement  dans la dimension.
Par la suite il faudrait charger uniquement les modifications.
Le package qui assurera ce rôle devra être « intelligent », il pourra automatiquement basculer entre un scénario de premier chargement (First Full Loading)  ou un chargement incrémenté prenant uniquement  en compte les mises à jours (Incremental Loading).
De plus ce package serait modulaire, on pourrait le réutiliser sur d’autres tables associées à d’autres dimensions: Il suffirait juste d’initialiser certains paramètres.
Et bien évidement cette méthode devrait être performante et simple de maintenance.

Techniquement parlant…

Il existe différents moyens de gérer l’alimentation des dimensions d’un data warehouse.
Les « upserts » consistent à mettre à jour les données et/ou insérer de nouvelles données.
Avec SSIS différentes méthodes existent pour traiter ce type de cas. On peut utiliser des « Lookups », des « Merges » et pour gérer l’historique les « Slowly Changing dimensions » (SCD) de type II.
Une tout autre approche est possible, en utilisant les fonctionalités liées directement à la base de données SQL Server: le Change Data Capture (CDC).
Le Change Data Capture est une fonctionnalité qui lorsqu’elle est activée permet de récupérer uniquement les données modifiées d’une table. Les modifications enregistrées  sont les insertions,  les mises à jour et les suppressions de données (INSERT/UPDATE/DELETE).
Le CDC s’active tout d’abord sur une base de données puis sur une table . L’activation du CDC va engendrer la création d’une table système qui stockera toutes les modificaitons. En consultant cette table il sera possible d’alimenter une dimension d’un data warehouse en prenant uniquement en compte les modifications des données.
Le diagramme ci-dessous explique brièvement les différentes étapes que devra assurer le package.

Explication rapide

Au lieu d’activer nous même le CDC sur les sources de données, le package s’en occupera automatiquement.
Lors de la première exécution du package, si le CDC n’est pas activé alors le package chargera l’ensemble des données dans la dimension. A la fin de ce chargement le package activera le CDC sur la la base de données mais aussi sur la table source.
A partir de ce moment bien précis toute les futures modifications de cette table seront tracées. Lors de  la prochaine exécution du package ce dernier s’occupera uniquement des modifications référencées par cette table CDC.
Toute « l’intelligence » de cette méthode repose sur la détection du CDC, de son activation automatique et du chargement de données incrémenté des données modifiées depuis la dernière exécution du package.
Mais aussi la configuration des paramètres qui va permettre la modularité de la solution.

Pré requis

L’agent SQL Server doit être démarré.

Puis exécuter ce script  dans votre data whareouse .

USE YOUR_DWH
GO

CREATE TABLE dbo.LastExecution
(
 LastExecution DATETIME NOT NULL
)

GO
INSERT INTO LastExecution VALUES ('2000-01-01')

Ce script va créer une table qui stockera uniquement la derniere date d’éxécution du package.

Ce projet est un exemple pour vous montrer ce dont on est capable de faire. Il se peut que le cas présenté ici ne colle pas directement  aux réalités que vous pouvez rencontrer.
Dans notre scénario voici la table source :

Très important le CDC ne peut être activé que sur une table possèdant une PK ou une colonne IDENTITY.
La dimension de destination dans le data warehouse :

Le but est de créer un historique dans le data warehouse, on utilise donc le principe des SCD de type II c’est pour cela que les champs StartDate, EndDate et RecordStatus sont importants.

Comprendre comment ça marche

Le projet est composé de 3 packages :
• Initialization
• First Full Loading
• Incremental Loading

Package Initialization

Le Package Initialization est le package de départ, il joue le rôle de package parent. Il transmet aux package enfants ( First Full Loading et Incremental Loading) les paramètres et les variables nécessaires pour le bon  exécution des chargements de données.

Ci-dessous la liste des variables :

• SOURCE_DB : Nom de la base de données source opérationnelle

• SOURCE_TAB : Nom de la table source opérationnelle

• SOURCE_SCHEMA : Nom de votre schéma de votre base de données opérationnelle

• FILEGROUP : Nom du fichier de groupe de la base de données opérationnelle

• CDC_ROLE : Le Nom du role qui exécutera le Change data Capture

• DESTINATION_DIM : Dimension de votre Data Warehouse

Ce composant vérifie si le CDC est activé sur la base de donnée source opérationnelle (SOURCE_DB) via la requête :

SELECT  is_cdc_enabled AS 'CDC_INFO'
FROM sys.databases
WHERE name = ?
Cette requête interroge une table système et vérifie si le Change Data Capture est activé sur la base de données source que l’on a défini dans la variable SOURCE_DB.

Le résultat est contenu dans la variable CDC_ENABLED que l’on mappe à CDC_INFO.

Lorsqu’on exécute ce package pour la première fois ce dernier  va remarquer que le CDC n’est pas activé sur la base de données source… le package First Full Loading va s’exécuter par la suite.

Si le CDC est activé sur la base de donnée source alors c’est le package Incremental Loading qui s’éxécutera.

Package First Full Loading

Le package First Full Loading va charger tous les données existantes de table source opérationnelle vers la dimension du data warehouse. A la fin de ce flux de contrôle il activera le CDC sur la base de données et sur la table source.
Durant son exécution ce package mettra à jour la valeur LastExecution de la table LastExecution.

Ce package contient un bon nombre de variables la plupart prennent la valeur des variables parents du package Initialization :  toutes les variables qui sont préfixées par « INIT_ »

Pour transférer les valeurs entre deux variables de type parent/enfant , il suffit de paramétrer la configuration du package en utilisant le type « Parent package variable ».

Flux de données  : FFL-Dim_Customers

OLE DB Source : SOURCE_DB_TAB

Tout  d’abord, il faut créer votre connexion vers votre base de données source. Par la suite il faut utiliser cette connexion sur dans le composant source OLE DB via le « Advancedd Editor »   dans l’onget « Connection Manager ».

Dans l’onglet « Component Properties », les propriétés  ValidateExternalMetada, AccessMode et SqlCommandVariable sont définies comme ci-dessous.
Ces propriétés sont nécessaires pour rendre la solution SSIS modulaire.
ValidateExternalMetada empéchera au composant de retourner une erreur suite aux vérifications des métadonnées. Si on laisse cette propriété a True le composant sera marqué comme erroné.
Mais si on exécute le package tout se déroule correctement.
La variable User ::GetData contient la requête SELECT qui va récupérer les champs que l’on désire insérer dans le data warehouse.
GetData est définit comme ceci :
La propriété EvaluateAsExpression doit être à True car la requête SELECT est basée sur une expression.
Etant capable de définir la table source  (variable SOURCE_TAB) depuis le package initialization on peut alors requêter dynamiquement n’importe quelles tables. La requête ajustera donc le FROM pour pointer sur la table désirée.

Récapitulatif.
Etape/Package/variable/Description
1 /Initialization/SOURCE_TAB/Définition de la table source
2/First_Full_Loading/INIT_SOURCE_TAB/Récupération du nom de la table source
3/First_Full_Loading/GetData/Utilisation de la variable INIT_SOURCE_TAB au niveau de la clause FROM

Le composant ne reconnaitra pas les champs d’entrées et de sorties il faut les créer à la main et spécifier correctement leur type.

Les champs d’entrées et de sorties sont disponibles dans l’onglet « Column Mapping » il vous suffit alors de les relier entre eux.

Colonne dérivée : Create new fields

La dimension du data warehouse posséde plus de champs que la la table source, cela est due aux colonnes liées à la gestion de l’horique avec les SCD de Type II.

StartDate fixe la date du status des enregistements, par défaut lors du premier chargement la date que j’ai choisi est celle de l’exécution du package.
RecordStatus représente le status de l’enregistrement, lors du premier chargement on force la valeur ‘OK’  qui signifie pour moi que les enregistrement sont valables.

OLEDB Destination : Destination_DWH_DIM

Comme pour le composant OLE DB Source, le composant OLDE DB Destination nécessite une configuration particulière.
1-Créer la connexion vers la dimension du data warehouse
2-Utiliser cette connexion dans le Connection Manager via l’Advanced Editor
Les propriétés du composant

La propriété AccessMode  est fixé à « OpenRowset Using Fastload From Variable »  la variable utilisé est « INIT_DESTINATION_DIM » qui contient le nom de la dimension de destination (Dim_Customers).

Retour au Control flow : Les SQL Task

Ce composant «Execute SQL Task » va mettre à jour la valeur de LastExecution de la table LastExecution. Cette table contiendra la date la plus récente du chargement de données. Cette date est importante pour la suite 🙂
 UPDATE LastExecution SET LastExecution=GETDATE()

Ces deux composants vont faire appels à deux procédures stockées pour activer le CDC sur la base de données en premier temps puis sur la table source
NOTE :Ne pas oublier d’uliser les bonnes connexions.
Active CDC on DB
 EXEC sys.sp_cdc_enable_db
Active CT on Table
L’activation du CDC sur une table nécessite des paramètres , on créé alors la requête via l’expression qui contituera la requete SQL.  Les variables utilisées dans cette requete sont présentes dans le package First_Full_Loading elles héritent les valeurs des variables parents du package Initialization.
"EXEC sys.sp_cdc_enable_table

 @source_schema = N'"+ @[User::INIT_SOURCE_SCHEMA] +"',

 @source_name   = N'"+ @[User::INIT_SOURCE_TAB] +"',
 @role_name     = N'"+ @[User::INIT_CDC_ROLE] +"',
 @filegroup_name = N'"+ @[User::INIT_FILEGROUP] +"',
 @supports_net_changes ="+ @[User::INIT_NET_CHANGES]

Package Incremental Loading

Ce package récupère la date du dernier chargement connue LAST_EXECUTION.
Puis il vérifie que le CDC est bien activé sur la table source. Si ce n’est pas le cas il lance le package First_Full_Loading.
Le flux de données présent va lire la table CDC associé à la table source et va récupérer les changements efffectués depuis la dernieère exécution du package.
Et pour finir on met à jour la valeur de last execution.

Execute SQL Task : SQL to LastExecution

Ce composant utilise une expression basée sur la propriété SqlStamentSource.
On fixe la variable @[User ::INIT_LAST_EXECUTION] qui fait appel à la variable parent LAST_EXECUTION du package Initialization. Cette variable (LAST_EXECUTION) est un string contenant la requête suivante :
 SELECT MAX(LastExecution) FROM LastExecution

Le résultat de la requête est stocké dans la variable de type Object GetLastExecution, ce type de variable ne peut être utilisé dans les expressions, il faudra donc penser à transférer la vlaeur dans une autre variable de type différent (UseLastExecution).

Execute SQL Task : SQL – Retrieve SOURCE_TAB CDC Infos

L’exécution de ce composant permet de vérifier si le CDC est activé au niveau de la table. Une reqûete SQL va interroger une table système pour récupérer un booléen.  La valeur est stocké dans la variable INIT_CDC_ENABLED que l’on renomme CDC_INFOS.

Composant Script

Les variables de type Object ne peuvent pas être utilisé directement dans les expressions.
C’est pour cela que l’on ré affecte la valeur dans une variable de type String afin d’utiliser le résultat dans les expressions.
De plus la variable GetLastExecution contient uniquement un seul enregistrement : la date du dernier chargement de données connue.
Pour transférer la valeur entre deux variable, il faut founir auc composant Script les variables que l’on souhaite manipuler.
Dans l’éditeur de code C# voici comment on affecte une valeur à une variable.
public void Main()
 {
 // TODO: Add your code here
Dts.Variables["UseLastExecution"].Value=Dts.Variables["GetLastExecution"].Value.ToString();
 Dts.TaskResult = (int)ScriptResults.Success;
 }

Flux de données: Manage Data Changes

Ce flux de données récupère les enregistrements liés aux changement de la table source. Chaque changements effectués sur la table source va créer une enregistrement dans une table système CDC associé à la table source.
Chaque enregistrement possède un numéro allant de 1 à 4 qui spécifie le type d’opérations effectuées (Insert/Update/Delete).
On traite alors les données selon leur type d’opération et on les charge dans la dimention du data warehoue de destination.

OLE DB Source  : Retrieve CDC Changes

La source de données OLE DB est une source dynamique… en effet lorsque l’on active le CDC sur une table source. Le CDC va créer une table système associé qui va contenir toutes les modifications.
Donc si l’on a une table « MyTable » appartenant au schéma dbo, lorsqu’on active le CDC on aura une table système « cdc.dbo_Mytable_CT » (CT pour capture Tracking)
La source OLE DB est dynamque grâce à sa configuration avancé :
Utiliser la bonne connexion à la table source.

La requête de ce composant est en fait un appel à la variable : User ::DynamicSQLStatement.
Cette variable est définis comme ceci :

L’expression utilisée est la suivante :
"SELECT __$start_lsn
 ,__$end_lsn
 ,__$seqval
 ,__$operation
 ,__$update_mask
 ,CustomerID
 ,CustomerName
 ,CustomerStatus,tran_end_time
FROM cdc.dbo_"+@[User::INIT_SOURCE_TAB]+"_CT INNER JOIN
cdc.lsn_time_mapping ON cdc.dbo_"+@[User::INIT_SOURCE_TAB]+"_CT.__$start_lsn = cdc.lsn_time_mapping.start_lsn
WHERE  lsn_time_mapping.tran_end_time > '" + @[User::UseLastExecution] +"'"

On utilise uen expression pricipalement afin d’avoir une clause FROM dynamique en fonction de table.
On batit le nom de table système CDC a partir du non de la source de donnée d’où l’ajout de  cdc.dbo_ en préfixe et _CT en suffixe.
La clause WHERE appelle la variable [User::UseLAstExecution] » pour récupérer uniquement les changments effectué sur la table source depuis le dernier chargement de données.

Par la suite il faut créer les champs d’entrées et de sorties :

Puis il faut les mapper :

Conditionnal Split : Split Insert Delete Update

La colonne __$operation de la source qui mappé à operation par la suite peut contenir 4 valeurs.
1 : signifie que que l’enregistrement dans la table CDC concerne une requête de supression (DELETE).
2 :signifie que l’enregistrement dans la table CDC  concerne une insertion (INSERT)
3 et 4 : signifient que l’enregistrement dans la table CDC concerce une mise à jour (UPDATE). Les opération de type 3 contiennent la valeur avant le changement et ceux de type 4 contienent les nouvelles valeurs.

Gestion des insertions

Gestion des supressions

Dans ce cas, lorsqu’un ordre DELETE est effectué sur la table source on ne va pas supprimer ce même enregistrement dans le data warehouse, on va juste changer le status de l’enregistrement pour le marquer comme indisponible ou non valable. RecordStatus= « KO »

Gestion des mises à jour

On utilise un composant SCD.

Limites de la solution

Uniquement pour les sources et les destinations de type SQL Server.
Uniquement avec les versions 2008.