In the previous articles, we described how Change Tracking works, and the steps you need to follow to enable it and configure it properly. Now, it’s time to see how this feature can be used in a common scenario which is typical in BI solutions but easily extensible to any other context that requires an incremental data load.
A use case: an incremental ETL process
Change Tracking can be very useful in Business Intelligence solutions, where data is frequently loaded from transactional databases to analytical databases by ETL processes, in order to make it available to management. Management can then consult the data and decide which strategies to adopt.
There are often a large number of records involved in these processes, they come from different tables, and the processes can involve quite complex intermediate transformations.
For these reasons, it can take quite a time to execute these ETL processes unless you are using a supporting feature like Change Tracking.
With Change Tracking, you do not have to import all the data, but only those rows inserted, modified or deleted since the last run of the ETL process.
Since the loading time is significantly shorter, you can, if you choose, execute the ETL processes multiple times a day, thus constantly providing the users with the latest updated data.
How to implement an incremental ETL process with SQL Server Integration Services
Only a few steps are required to implement an incremental ETL process with SQL Server Integration Services (SSIS), and most of them are repeated for each tracked table from which you want to extract the data.
The package should include, at a minimum, the following tasks:
- Get the current version
- Get the most recent version that was extracted in the previous execution of the ETL process
- Get the minimum valid version for the tracked table
- Compare the minimum valid version extracted at step 3 with the current/most recent version extracted at step
- Identify those changes having a version which is higher than the one obtained at step 2, and load them into a supporting table in the destination database
- Compare the data that has been loaded into the supporting table with that present in the destination table, and update the destination table
- Save the new current version, replacing the one obtained at step 2 with the one obtained at step 1
In order to complete the SSIS package execution successfully, it is important to ensure that:
- the service account for SQL Server Integration Services has read permissions for the tracked database and read/write permissions for the destination database;
- the process is executed after enabling the SNAPSHOT ISOLATION level on the tracked database to make it consistent and thus protect it from changes or cleaning tasks that may occur during the extraction and the reading of data. This will prevent the version changing between one task and the next, for example, after you saved it into variables. If this did happen, it would cause the data extract to be incorrect or incomplete (because it would extract either more or fewer changes than those expected).
The destination database
Before looking in detail at the implementation of each of the seven steps, we need to implement a target database, DestinationDB, which contains the dbo.Employees table to be synchronized with the table of the same name which is in the CTExampleDB database.
The script to create the new database is as follows
-- Creation of DestinationDB database and setting of the Primary (.Mdf) and Log (.Ldf) -- files CREATE DATABASE DestinationDB ON PRIMARY ( NAME = 'DestinationDB_mdf' , FILENAME = 'C:Program FilesMicrosoft SQL ServerMSSQL10_50.MSSQLSERVERMSSQLDATADestinationDB_Sys.mdf' , SIZE = 10MB , MAXSIZE = 50MB , FILEGROWTH = 10MB ) LOG ON ( NAME = 'DestinationDB_log' , FILENAME = 'C:Program FilesMicrosoft SQL ServerMSSQL10_50.MSSQLSERVERMSSQLDATADestinationDB_Log.ldf' , SIZE = 10MB , MAXSIZE = 500MB , FILEGROWTH = 100MB ); -- Adding USERTABLES Filegroup ALTER DATABASE DestinationDB ADD FILEGROUP USERTABLES; -- Adding file for the example tables ALTER DATABASE DestinationDB ADD FILE ( NAME = 'DestinationDB_userTables' , FILENAME = 'C:Program FilesMicrosoft SQL ServerMSSQL10_50.MSSQLSERVERMSSQLDATADestinationDB_userTables.ndf' , SIZE = 10MB , MAXSIZE = 100MB , FILEGROWTH = 10MB ) TO FILEGROUP USERTABLES;
This is, instead, the script to implement the table to be inserted into the DestinationDB database:
-- Creation of the dbo.Employees table CREATE TABLE dbo.Employees ( EmployeeID int NOT NULL , LastName nvarchar(20) NOT NULL , FirstName nvarchar(10) NOT NULL , CONSTRAINT PK_Employees PRIMARY KEY (EmployeeID) ON USERTABLES ) ON USERTABLES
We then add two schema to the database, tmp and ctk. The first will hold the supporting table (tmp) that will contain information on changed rows to be loaded into the dbo.Employees table. The second will hold the objects through which you will save the information about the various versions of Change Tracking to extract (ctk):
-- Creating the tmp schema CREATE SCHEMA [tmp] AUTHORIZATION [dbo] -- Creating the ctk schema CREATE SCHEMA [ctk] AUTHORIZATION [dbo]
1) Get the current version
This operation can be performed by an Execute SQL Task that runs the CHANGE_TRACKING_CURRENT_VERSION() function and saves the return value in a variable (to be reused at step 7).
The aim of this step is twofold:
- to get the current version from the tracked database, in order to know the “point” to which the changes will be imported. This value is then “transferred” from the variable to a table (to be created at step 2) in the destination database; hence, it will be extracted again and used as a parameter of the CHANGETABLE(CHANGES …) function (step 5) for the next load (in order to thus exclude the rows already imported in the previous load);
- to ensure that Change Tracking is active; to accomplish this, you can insert a Script Task after the Execute SQL task that, if the function returns NULL, terminates the process by returning an error message indicating that Change Tracking is not enabled.
The function must be included in a stored procedure that you must create in the CTExampleDB database (since the CT functions must always be created in the tracked database), specifically in the ctk schema, in order to meet the established conventions of our use case.
The scripts to create the schema and the stored procedures are the following:
-- Script 1: creating the ctk schema CREATE SCHEMA [ctk] AUTHORIZATION [dbo] -- Script 2: creating the stored procedure to extract the current version CREATE PROCEDURE [ctk].[stp_get_change_tracking_current_version] AS DECLARE @current_version BIGINT = (SELECT CHANGE_TRACKING_CURRENT_VERSION()); SELECT current_version = ISNULL(@current_version, -1)
The returned current version is saved in the currentVersion variable (of Int64 type) created in the SSIS package (Figure 18).
2) Get the latest version which was extracted in the previous execution of the ETL process
This activity is carried out by an Execute SQL Task; specifically, you must provide a SELECT statement that obtains, from the table referred to in step 1, the latest version imported during the previous load, and stores it in a variable that will be used in the current process as a parameter of the CHANGETABLE(CHANGES…) function.
When you run the incremental process for the first time, the function will receive the value 0 as a parameter, in order to extract the information about the changes from the beginning. Of course, the value cannot be acquired by a previous execution, since one does not exist, so you must enter it manually when you create the table.
As a consequence, the script to create the table in the DestinationDB database must be the following:
-- Creating the change_tracking_last_sync_version table to store -- the latest extracted version after each load CREATE TABLE [ctk].[change_tracking_last_sync_version]( [description] [varchar](50) NOT NULL, [last_sync_version] [bigint] NOT NULL, [extraction_date] [datetime] NULL ) ON [PRIMARY]; -- Entering the value 0, which is necessary to execute successfully the first -- loading; initially, NULL will be inserted into the extraction_date field, -- after each execution, the date and time of storing the last extracted version -- will be inserted into the extraction_date field INSERT INTO [ctk].[change_tracking_last_sync_version] VALUES ( 'Last Extracted Version', 0, NULL);
After each run of the package, the value of the last_sync_version column will be overwritten with the current version value extracted at step 1 (this UPDATE operation will be described at step 7).
The SELECT statement can be inserted into the stored procedure as follows:
-- Creating the stored procedure that returns the last extracted version CREATE PROCEDURE [ctk].[stp_get_change_tracking_last_sync_version] AS DECLARE @last_sync_version BIGINT = (SELECT last_sync_version FROM ctk.change_tracking_last_sync_version); SELECT last_sync_version = @last_sync_version
The returned version is saved in the lastSyncVersion variable (of Int64 type), which has been created in the SSIS package (Figure 18).
3) Get the minimum valid version for the tracked table
To perform this step we use an Execute SQL Task containing a stored procedure that performs the CHANGE_TRACKING_MIN_VALID_VERSION() function. This extracts the minimum available version and saves it in a variable (the minValidVersion in Figure 18).
The script to create the stored procedure in the CTExampleDB database is as follows:
-- Creating the stored procedure that returns the minimum valid version CREATE PROCEDURE [ctk].[stp_get_change_tracking_min_valid_version] AS DECLARE @min_valid_version BIGINT = (SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('dbo.Employees'))); SELECT min_valid_version = ISNULL(@min_valid_version, -1)
If CT is disabled, the stored procedure will return -1.
4) Compare the minimum valid version obtained at step 3 with the current/most recent version extracted at step 2
For this activity, you can use a Script Task that retrieves and compares the two variables in which we stored the values of the versions – lastSyncVersion (step 2) and minValidVersion (step 3): if the minimum valid version is greater than the extracted last version during the previous load, the package execution terminates with an error message that suggests resynchronizing Change Tracking. If the value of the minValidVersion variable is equal to -1, the comparison will not be performed and the Script Task will return an error message indicating that CT is disabled for the table.
One of the most frequent causes of loss of synchronization is setting the retention period incorrectly, specifically to a time less than the elapsed time between each execution of the job.
For example, suppose you have read, in the last run, the changes up to version 10, but before the next load the auto cleanup deletes the information up to version 15: the next load will then extract the minimum valid version with value equal to 15, which is greater than the last change version extracted in the last load (equal to 10). This means that you cannot retrieve the changes between version 11 and 14, and you will need to use synchronization to align the extractions.
5) Identify the changed rows having versions higher than the one extracted at step 2, and load them into a supporting table in the destination database
In this step, the information about the changed data is transferred from the tracked database to the destination database. You therefore need a Data Flow Task.
In the OLE DB Source you could insert a stored procedure (to be created in the CTExampleDB database) containing the CHANGETABLE(CHANGES…) function that receives the value of the lastSyncVersion (step 2) variable as a parameter.
The code required to implement the stored procedure is as follows:
-- Creating the stored procedure that extracts the keys of the changed rows CREATE PROCEDURE [ctk].[stp_get_employees_changes](@last_sync_version BIGINT) AS DECLARE @query NVARCHAR(MAX) = N'SELECT C.EmployeeID ,C.SYS_CHANGE_OPERATION FROM CHANGETABLE (CHANGES dbo.Employees, ' + CAST(@last_sync_version AS NVARCHAR) + N') AS C'; EXEC sp_executesql @parameter = @queryr
There are many ways to execute it ; in this use case I’ve chosen to insert the EXEC command into the stp_get_employees_changes variable (Figure 18), pointed to by the OLE DB Source to extract the data.
The CHANGETABLE(CHANGES…) function has been designed precisely to be involved in incremental processes where, by getting the latest version extracted in the previous load, it can proceed with extracting the modified data, starting from the next version.
This is the reason why, in the examples that we have seen in the section of this article dedicated to it, the function returned the changes from the version following the one specified as the parameter.
To enter the data into the supporting table of the DestinationDB database, I used an OLE DB Destination.
The main purpose of this table is to hold the primary keys of the modified rows, in order for us to know, through a query, which rows will be updated in the destination table.
Alternatively, you may decide to load into it – in addition to the primary key – all the columns to be inserted into the destination table; this choice does, however, influence the implementation of the next step.
In this use case, I have decided to use the first solution, and have therefore created the supporting table with the following structure:
-- Creating the supporting table to save, into it, the primary keys -- of the changed rows and the operation type performed on them CREATE TABLE [tmp].[EmployeesChanges]( [EmployeeID] [int] NOT NULL, [SysChangeOperation] [nchar](1) NOT NULL ) ON [PRIMARY]
In Figure 16 you can see how the Data Flow Task has been implemented in the SSIS package.
The supporting table must be emptied at each load, in order to prevent the keys of the rows changed earlier remaining in it.
The Data Flow Task therefore has to be preceded by an Execute SQL Task that runs the TRUNCATE TABLE command on the tmp.EmployeesChanges table.
6) Compare the data loaded into the supporting table with that present in the destination table and update the latter
You can use also an Execute SQL Task and a Data Flow Task for this task.
The Execute SQL Task must contain a DELETE in order to remove, from the destination table, only the rows to be updated (in other words, the records whose primary keys are present in the supporting table).
The script is the following:
-- Deleting the rows to be updated DELETE FROM dbo.Employees WHERE EmployeeID IN (SELECT EmployeeID FROM tmp.EmployeesChanges)
The Data Flow Task, instead, is as follows:
- in the OLE DB Source there is a view (created in the I database) that extracts, from the tracked dbo. Employees table, the rows whose primary key is present in the supporting table (so, only the changed records). The code to create the view is the following:
-- Creating the view that extracts the changed rows CREATE VIEW [ctk].[vw_load_employees] AS SELECT EmployeeID ,LastName ,FirstName FROM CTExampleDB.dbo.Employees WHERE EmployeeID IN (SELECT EmployeeID FROM tmp.EmployeesChanges)r
If you decided, at step 5, to import into the tmp.EmployeesChanges table all the columns for the changed rows, the view will not query the tracked database but it will directly query the supporting table, because all the data will be available in it.
- in the OLE DB Destination, the specified destination is the dbo.Employees table (Figure 17).
7) Save the new current version, replacing the one extracted at step 2 with the one extracted at step 1
In this last step, I used an Execute SQL Task whose purpose is to run a stored procedure (created in the DestinationDB database) that overwrites the old current version (used in the CHANGETABLE(CHANGES…) function at step 5) with the one extracted at step 1 (which will be used by the same function in the next load).
The code of the stored procedure is as follows:
-- Creating the stored procedure that updates the current version including the -- date and time of the UPDATE operation CREATE PROCEDURE [ctk].[stp_update_last_sync_version](@current_version BIGINT) AS UPDATE ctk.change_tracking_last_sync_version SET last_sync_version = @current_version ,extraction_date = GETDATE()
Example 1: the first load
In this first example, the SSIS package imports into the dbo.Employees table of the DestinationDB database all the changes made since CT was activated on the CTExampleDB database, using the scripts seen in the previous sections. The content of the tracked table dbo.Employees at this time is shown in Figure 19.
Let us see, step by step, what happens:
The Execute SQL Task obtains the value 7, corresponding to the current version, and saves it into the currentVersion variable;
1) The Execute SQL Task obtains, from the ctk.change_tracking_last_sync_version table, the value of the latest loaded version and saves it into the lastSyncVersion variable; since this is the first upload, the extracted value is 0, inserted manually during the creation of the table;
2)The Execute SQL task extracts the value 0 for the minimum valid version, since all the changes information are available; the value is saved into the minValidVersion variable;
3)The Script Task compares the contents of the lastSyncVersion and minValidVersion variables: since the latter is not greater than the former (both are equal to 0) the process can continue with the extraction of the modified rows;
4)The Data Flow Task loads, into the supporting table, the primary keys of the changed rows, starting from the version that follows the one contained in variable lastSyncVersion variable; in other words, from version 1 (Figure 20).
6) The Execute SQL Task deletes from the destination table the rows whose primary keys are present
in the tmp.employeesChanges table; then, in the Data Flow Task, the rows having the same primary
key as the ones you just deleted are extracted from the tracked table and loaded into the
destination table (Figure 21) thus aligning the latter to the source table.
7) The Execute SQL Task updates the ctk.change_tracking_last_sync_version table by making an
UPDATE of the latest extracted version (overwriting the value 0 with the one contained in the
currentVersion variable, i.e. 7).
Example 2: a load subsequent to the first
The second example shows the behavior of the incremental ETL process after performing three new DML operations on the tracked table.
As you can see in the script below, two new rows have been included in the dbo. Employees table and two records already existing at the time of first loading have been, respectively, deleted and updated:
-- Inserting two rows INSERT INTO dbo.Employees VALUES (7, 'Duerr', 'Bernard'), (8, 'Schultz', 'Barbara') -- Deleting the row having EmployeeID = 4 DELETE FROM dbo.Employees WHERE EmployeeID = 4 -- Updating the row with EmployeeID = 1 UPDATE dbo.Employees SET LastName = 'Dias' WHERE EmployeeID = 1
As a result of these operations, the tracked table is now populated by seven rows (Figure 22). When the incremental load is run, this is what happens:
1) The Execute SQL task obtains the value 10, corresponding to the current version, and saves it in the currentVersion variable
2) The Execute SQL Task obtains, from the ctk.change_tracking_last_sync_version table, the value of the latest loaded version (7) and saves it in the lastSyncVersion variable;
3) The Execute SQL task obtains the value 0 for the minimum valid version because, since the retention period has been properly set, the changes available at the first load will still be available for the next, too. The value is saved in the minValidVersion variable;
4) The Script Task compares the contents of the lastSyncVersion and minValidVersion variables: given that the second is not greater than the first (their respective values are 7 and 0), the process can continue with the extraction of the rows;
5) The Data Flow Task loads the primary keys of the changed rows having a version later than the one contained in the lastSyncVersion variable, then the number 8, into the supporting table. In fact, by observing Figure 23, you can see that the stored procedure extracts only the primary keys corresponding to the four rows involved in the operations shown in the above script.
6) The Execute SQL Task removes from the destination table the rows whose primary keys are present in the tmp.employeesChanges table; then, in the Data Flow Task, the rows with the same primary key as the ones just deleted are extracted from the tracked table and loaded into the destination table; the source and destination tables are now aligned. (Figure 24);
7) The Execute SQL Task updates the ctk.change_tracking_last_sync_version table by performing an
UPDATE of the latest extracted version (replacing the value equal to 7 with the one contained in
the currentVersion variable, that is 10).
In this article, I have presented Change Tracking and all its features; I showed how to enable and disable it, and I dedicated ample space to the concept of version, which is fundamental to understanding the functionality of the five functions that allow you to obtain information on the status of the changes for each tracked table.
The final part describes a use case that explains how to perform an incremental ETL process. I give practical examples to help you to better understand how to use the functions and combine them.
With respect to future improvements, at present there are no changes planned; all the characteristics described in this article will also, therefore, be valid for the next version of SQL Server.
Furthermore, the problem with the CHANGE_TRACKING_CURRENT_VERSION() function through snapshot has been resolved in SQL Server 2012.