Présentation
Ce projet de chargement intelligent des données repose sur l’utilisation du Change Data Capture de SQL Server 2008 / 2008 R2 et la mise en place d’un historique de données grâce aux dimensions à variation lente de type II (Slowly Changing Dimension (SCD) Type II).
L’idée de départ…
Je possède une base de données source avec une table contenant les informations de mes clients. Et je souhaite utiliser certaines de ces informations pour alimenter la dimension associée dans mon data warehouse.
Dans un premier temps il faudrait pouvoir charger toutes les données directement dans la dimension.
Par la suite il faudrait charger uniquement les modifications.
Le package qui assurera ce rôle devra être « intelligent », il pourra automatiquement basculer entre un scénario de premier chargement (First Full Loading) ou un chargement incrémenté prenant uniquement en compte les mises à jours (Incremental Loading).
De plus ce package serait modulaire, on pourrait le réutiliser sur d’autres tables associées à d’autres dimensions: Il suffirait juste d’initialiser certains paramètres.
Et bien évidement cette méthode devrait être performante et simple de maintenance.
Techniquement parlant…
Les « upserts » consistent à mettre à jour les données et/ou insérer de nouvelles données.
Avec SSIS différentes méthodes existent pour traiter ce type de cas. On peut utiliser des « Lookups », des « Merges » et pour gérer l’historique les « Slowly Changing dimensions » (SCD) de type II.
Une tout autre approche est possible, en utilisant les fonctionalités liées directement à la base de données SQL Server: le Change Data Capture (CDC).
Le Change Data Capture est une fonctionnalité qui lorsqu’elle est activée permet de récupérer uniquement les données modifiées d’une table. Les modifications enregistrées sont les insertions, les mises à jour et les suppressions de données (INSERT/UPDATE/DELETE).
Le CDC s’active tout d’abord sur une base de données puis sur une table . L’activation du CDC va engendrer la création d’une table système qui stockera toutes les modificaitons. En consultant cette table il sera possible d’alimenter une dimension d’un data warehouse en prenant uniquement en compte les modifications des données.
Explication rapide
Lors de la première exécution du package, si le CDC n’est pas activé alors le package chargera l’ensemble des données dans la dimension. A la fin de ce chargement le package activera le CDC sur la la base de données mais aussi sur la table source.
Toute « l’intelligence » de cette méthode repose sur la détection du CDC, de son activation automatique et du chargement de données incrémenté des données modifiées depuis la dernière exécution du package.
Pré requis
Puis exécuter ce script dans votre data whareouse .
USE YOUR_DWH GO CREATE TABLE dbo.LastExecution ( LastExecution DATETIME NOT NULL ) GO INSERT INTO LastExecution VALUES ('2000-01-01')
Ce script va créer une table qui stockera uniquement la derniere date d’éxécution du package.
Ce projet est un exemple pour vous montrer ce dont on est capable de faire. Il se peut que le cas présenté ici ne colle pas directement aux réalités que vous pouvez rencontrer.
Dans notre scénario voici la table source :
Très important le CDC ne peut être activé que sur une table possèdant une PK ou une colonne IDENTITY.
La dimension de destination dans le data warehouse :
Le but est de créer un historique dans le data warehouse, on utilise donc le principe des SCD de type II c’est pour cela que les champs StartDate, EndDate et RecordStatus sont importants.
Comprendre comment ça marche
• Initialization
• First Full Loading
• Incremental Loading
Package Initialization
• SOURCE_DB : Nom de la base de données source opérationnelle
• SOURCE_TAB : Nom de la table source opérationnelle
• SOURCE_SCHEMA : Nom de votre schéma de votre base de données opérationnelle
• FILEGROUP : Nom du fichier de groupe de la base de données opérationnelle
• CDC_ROLE : Le Nom du role qui exécutera le Change data Capture
• DESTINATION_DIM : Dimension de votre Data Warehouse
Ce composant vérifie si le CDC est activé sur la base de donnée source opérationnelle (SOURCE_DB) via la requête :
SELECT is_cdc_enabled AS 'CDC_INFO' FROM sys.databases WHERE name = ?
Package First Full Loading
Durant son exécution ce package mettra à jour la valeur LastExecution de la table LastExecution.
Ce package contient un bon nombre de variables la plupart prennent la valeur des variables parents du package Initialization : toutes les variables qui sont préfixées par « INIT_ »
Flux de données : FFL-Dim_Customers
OLE DB Source : SOURCE_DB_TAB
ValidateExternalMetada empéchera au composant de retourner une erreur suite aux vérifications des métadonnées. Si on laisse cette propriété a True le composant sera marqué comme erroné.
GetData est définit comme ceci :
Etant capable de définir la table source (variable SOURCE_TAB) depuis le package initialization on peut alors requêter dynamiquement n’importe quelles tables. La requête ajustera donc le FROM pour pointer sur la table désirée.
Récapitulatif.
Etape/Package/variable/Description
1 /Initialization/SOURCE_TAB/Définition de la table source
2/First_Full_Loading/INIT_SOURCE_TAB/Récupération du nom de la table source
3/First_Full_Loading/GetData/Utilisation de la variable INIT_SOURCE_TAB au niveau de la clause FROM
Le composant ne reconnaitra pas les champs d’entrées et de sorties il faut les créer à la main et spécifier correctement leur type.
Colonne dérivée : Create new fields
RecordStatus représente le status de l’enregistrement, lors du premier chargement on force la valeur ‘OK’ qui signifie pour moi que les enregistrement sont valables.
OLEDB Destination : Destination_DWH_DIM
1-Créer la connexion vers la dimension du data warehouse
2-Utiliser cette connexion dans le Connection Manager via l’Advanced Editor
Retour au Control flow : Les SQL Task
UPDATE LastExecution SET LastExecution=GETDATE()
EXEC sys.sp_cdc_enable_db
"EXEC sys.sp_cdc_enable_table @source_schema = N'"+ @[User::INIT_SOURCE_SCHEMA] +"', @source_name = N'"+ @[User::INIT_SOURCE_TAB] +"', @role_name = N'"+ @[User::INIT_CDC_ROLE] +"', @filegroup_name = N'"+ @[User::INIT_FILEGROUP] +"', @supports_net_changes ="+ @[User::INIT_NET_CHANGES]
Package Incremental Loading
Puis il vérifie que le CDC est bien activé sur la table source. Si ce n’est pas le cas il lance le package First_Full_Loading.
Le flux de données présent va lire la table CDC associé à la table source et va récupérer les changements efffectués depuis la dernieère exécution du package.
Et pour finir on met à jour la valeur de last execution.
Execute SQL Task : SQL to LastExecution
SELECT MAX(LastExecution) FROM LastExecution
Execute SQL Task : SQL – Retrieve SOURCE_TAB CDC Infos
Composant Script
C’est pour cela que l’on ré affecte la valeur dans une variable de type String afin d’utiliser le résultat dans les expressions.
De plus la variable GetLastExecution contient uniquement un seul enregistrement : la date du dernier chargement de données connue.
Pour transférer la valeur entre deux variable, il faut founir auc composant Script les variables que l’on souhaite manipuler.
public void Main() { // TODO: Add your code here Dts.Variables["UseLastExecution"].Value=Dts.Variables["GetLastExecution"].Value.ToString(); Dts.TaskResult = (int)ScriptResults.Success; }
Flux de données: Manage Data Changes
Chaque enregistrement possède un numéro allant de 1 à 4 qui spécifie le type d’opérations effectuées (Insert/Update/Delete).
On traite alors les données selon leur type d’opération et on les charge dans la dimention du data warehoue de destination.
OLE DB Source : Retrieve CDC Changes
Donc si l’on a une table « MyTable » appartenant au schéma dbo, lorsqu’on active le CDC on aura une table système « cdc.dbo_Mytable_CT » (CT pour capture Tracking)
Cette variable est définis comme ceci :
"SELECT __$start_lsn ,__$end_lsn ,__$seqval ,__$operation ,__$update_mask ,CustomerID ,CustomerName ,CustomerStatus,tran_end_time FROM cdc.dbo_"+@[User::INIT_SOURCE_TAB]+"_CT INNER JOIN cdc.lsn_time_mapping ON cdc.dbo_"+@[User::INIT_SOURCE_TAB]+"_CT.__$start_lsn = cdc.lsn_time_mapping.start_lsn WHERE lsn_time_mapping.tran_end_time > '" + @[User::UseLastExecution] +"'"
On utilise uen expression pricipalement afin d’avoir une clause FROM dynamique en fonction de table.
On batit le nom de table système CDC a partir du non de la source de donnée d’où l’ajout de cdc.dbo_ en préfixe et _CT en suffixe.
La clause WHERE appelle la variable [User::UseLAstExecution] » pour récupérer uniquement les changments effectué sur la table source depuis le dernier chargement de données.
Conditionnal Split : Split Insert Delete Update
1 : signifie que que l’enregistrement dans la table CDC concerne une requête de supression (DELETE).
2 :signifie que l’enregistrement dans la table CDC concerne une insertion (INSERT)
3 et 4 : signifient que l’enregistrement dans la table CDC concerce une mise à jour (UPDATE). Les opération de type 3 contiennent la valeur avant le changement et ceux de type 4 contienent les nouvelles valeurs.