ETL Assistant – Management Web UI Gallery

When we started putting ETL Assistant together it became immediately clear that we needed a UI.  Something to help us get the job done, but something that didn’t get in the way.  We threw together a quick ASP.NET MVC 3 web app that mostly gets the job done.

Below you’ll find some UI samples from the web management console. This should help express a bit more of where we’ve gone and how the whole application functions. This is just to facilitate the configuration “stuff” we toss over to the SSIS package at runtime.

Couple of quick points here:

  1. This was my first MVC app. I was still thinking in webforms and it shows. A lot.
  2. Yes, we went with the Metro Win8 style UI
  3. I took these screenshots on a Macbook Pro with retina display and I rushed through posting them. Please ignore the visual artifacts from my over-compressing things, etc.
  4. This has evolved over time. Hindsight is 20/20 and if we were to start over from scratch this would probably look very different.

 

Jump to specific sections

 

Home & Global UI Elements


ui_home copyHome page
“Quick start,” high level stats, and recent items. We provide easy access to setting up new datasources and groups as well as adding tables to groups. This is a simple, wizard-like 3 step process. We also show some basic stats on the number of objects in the system. Yes – we really do use this for over 30,000+ tables across 30+ separate source systems.

ui_recent_itemsMenu Example
Categorized mini-mega-menu with recent items and common functions. Just a simple way for us to provide easier access to key functions while saving on screen real-estate. The items on the right provide quick access to recently used data sources, groups, or tables (specific to each menu) as well as faster access to “create” functionality.

ui_search_bar copySearch Bar
Inline global search with categorized results. The search system isn’t particularly smart, but it gets the job done and saves you some pain – otherwise you’d needed to “page” through lists of items.


Data Source Display & Editing


datasource_create_basicCreating – Basic settings
These are the basic connection settings, source/target schema settings, destination/staging filegroup settings, etc. required for ETL. The connection tester is here to help as well.

datasource_typesSelecting a Data Source Type
Data source type selection when creating / editing a data source. The data source selection defaults global behaviors so you don’t have to specify them for each and every data source (EX: “How do I query the source system catalog?”). We use OLEDB, so as long as you have the right driver and can configure the required settings it should pretty much work. Should…

datasource_create_advancedCreating – Advanced settings
More fun settings. You can override the global data source defaults here as well as set some other options to throttle performance or enable super-high-risk features like table definition synchronization (MSSQL ONLY!). That’s right… For the ultra bleeding edge, highly volatile systems we let you auto-synchronize your target table definition against source table structure using SMO.

datasource_conn_errorTesting a connection string
Determine your connectivity problems early on. We bubble up the underlying connection exception message if there’s an issue.

datasource_conn_successfulTesting a connection string
Green = good to go!

datasource_deail_basicDetail – Basic settings
Review all that stuff you set up earlier.

datasource_related_groupRelated Groups
Quick access to any “related” groups – groups which have tables that are bound to this data source. Just a simple helper to prevent forcing you to “hunt” for data. This, as with most tabs, is ajax-ified, so it only queries the system when this tab receives focus.

datasource_scheduled_tablesScheduled Tables
Quick access to any tables (specific to this data source) which are known to be scheduled. This is arrived at by looking at the related groups and then querying the MSSQL job setup.

Retrieve & Refresh Table Catalog


datasource_catalog_refreshRefresh table catalog
The UI allows for inline querying of the source system to detect any new/updated/removed tables

datasource_catalog_refresh_3Refresh table catalog – Result
Once the system polls the source database you get a simple set of counts of new/updated/removed tables. Clicking on these panels / boxes shows you a quick list of objects.

Table Display & Editing


table_detail_basicBasic Settings
Basic settings in display mode. Just like how the data source can override settings from the data source type defaults, the table can override settings set in the data source. Need to target a different schema or use a different staging database? No problem – just change things. This is particularly useful if your table requires a different full or incremental load query.

table_detail_basic_and_advancedBasic / Advanced Settings
Basic and advanced settings in display mode. We provide even more bells & whistles here. You can set your table to stage even on full load (helpful if you don’t want to wipe out your data during loads), specify behavior to override Nick Smith’s dynamic merge (aka “Honey Badger”), enable table structure synch via SMO (dangerous x1000), or have MSSQL message you if “critical” columns disappear or change in the data source. “Oh, they changed the data type on employee_id? Let me know!”

table_generate_sqlGenerate SQL
Generate table DDL. Another helper to simplify your life. This is embedded in a few places.

Groups Editing & Table Management


group_infoGroup Detail
Poor, neglected UI that displays simple group information (name, etc.) as well as the link to any scheduled instances of this group.

group_tablesScheduled Tables
Display the list of tables scheduled for this group. You can search for tables via partial name match (scoped to a data source) and then add them as “full” or “incremental” loads. This may also be adjusted within this UI. Scheduled tables may be removed or disabled / enabled from within this UI as well.

group_generate_sql_1 copyDDL – 1/3
We provide a “bulk” DDL generator – to generate DDL for all objects within selected group. The DDL process uses a custom class to reflect on source / destination objects and then uses “Smart Column” configuration to override specific types as required.

group_generate_sql_2 copyDDL – 2/3
Part way through generation of DDL for all objects within selected group

group_generate_sql_3 copyDDL – 3/3
Done generating DDL for all objects within selected group

group_generate_sql_4DDL – Final SQL
Result of “generate SQL” (DDL). This is just a subset of the output.

smart_columns_regex copyTable DDL Builder – “Smart Columns”
Override target data types by source system, data type, and column name (using regular expressions). Our philosophy is the data architect knows best. We let you specify replacement data types in several different ways.

SQL Server Jobs, Job History


job_stepsJob Steps
List job steps for a system job. For the lazy (me), this helps give me a quick way to see what’s going on with ETL Assistant-related jobs without jumping to Management Studio.

job_scheduleJob Schedule
Provide information on the job schedule for a specified job. Pulling apart those bitmasks is super fun for the whole family.

job_historyJob History
Display summary job history for instances of this job. How are we looking?

job_history_2Job History Detail
Display job history for a specific job. Expanded items use ajax (jquery + json) to pull in details as required. We only take what we need to survive.

Query Variables (QVars)


qvar_in_incrementalExample
Example of QVars in use for an incremental loading query. These are extremely handy for writing templates queries where you need to substitute values at ETL runtime. “Get me the latest timestamp for this table” and then replace that variable in the sql statement when we query the data source. There’s a lot more to this than we’re going to go into in a screenshot.

qvar_inline_drag_dropDrag / Drop
QVars support drag-and-drop from the QVar panel into specific destination panels. Hooray for jquery awesomeness!

qvar_inline_listExample – Data Source QVar
QVar panel displayed when editing a datasource or table. QVars shown are system-wide QVars, but then intelligently display “scoped” QVars appropriate / available to a specific datasource or table

qvar_list_datasourceData Source QVar List
List of datasource QVars. QVars may be defined or overridden by table as well if required

Using the MERGE Statement in SSIS Via Stored Procedure

(this is a modified version of the article I wrote for SQLServer.com published on 1/23/012, ‘Using the MERGE Statement in SSIS Via A Stored Procedure‘)

Background

In our Enterprise Data Warehouse (EDW) within the Northwestern University Biomedical Informatics Center (NUBIC), exact copies of many source systems exist. From that source system data we create several data marts to organize clinical data in meaningful ways to assist with reporting. Our current process to populate such data marts is to do the following.

  1. Stage the data in a staging table
  2. Delete rows from the destination/production table that exist in the staging table
  3. Insert the remaining data from the staging table into the production table

Many times in our SSIS ETL packages we will overlap slightly with the last load date/time to make sure all new data is collected. This practice causes numerous delete transactions on the production table on a daily basis before the new rows are inserted from staging. For the most part, these deletes are unnecessary since many of the deleted rows are re-inserted with the same data (because of the deliberate overlap mentioned earlier). In the long-term these unnecessary deletes can cause issues with fragmentation.

Problem

To avoid these excessive deletes we investigated the option of a MERGE statement in place of the 3-step process listed previously.  This functionality initially looked very promising since it would allow us to stage whatever data we needed and then that data could simply be merged into the production data.  The MERGE would update rows that already existed in production and would insert the rows that did not exist.  This would eliminate the unnecessary deletes that occur in our current process.

Our evaluation process uncovered some issues with the MERGE statement and SSIS.

  1. There is currently no simple MERGE task available in SSIS 2008. The easiest way to do this is to create an SSIS Execute SQL task and manually type out the MERGE statement.
  2. SSIS Execute SQL Task MERGE statement maintenance. If we were to use the Execute SQL Task to execute a MERGE statement, the statements would be rather large. Many of our data mart tables are very wide and are prone to having new columns added to track new clinical information. Manually entering the MERGE statement in an Execute SQL Task creates maintenance issues if a data mart were ever to be modified. If changes to the data mart occur, the MERGE statement would need to be manually updated as well to reflect such changes.
  3. SSIS Execute SQL Task character limit. Manual entry of the merge statement into the query editor window of the SSIS Execute SQL task is subject to a 32,767 character limit. Again, many of our data mart tables are very wide and contain several columns where this could potentially be an issue.
  4. SSIS Execute SQL Task manual entry errors. Even if the 32,767 character limit is not met, the MERGE statement itself can still be very long. To type all of the statement manually creates several opportunities for typographical errors, including omission of specific columns.

If we were going to use the MERGE statement via SSIS a solution needed to be created that would be easy to use but also would resolve the issues we uncovered in our investigation.

Proposed Solution

Our solution was to create a stored procedure that could dynamically generate the MERGE statement outside of the SSIS Execute SQL Task and execute it.  This article will walk through step-by-step how we built our stored procedure.  An example MERGE statement is provided below.  As we review each step we will see how the stored procedure would generate this example MERGE statement.

To begin, let’s first review the MERGE statement and its construction (Here is a link to a very useful SQLServerCentral technical article which provides a thorough overview of the MERGE statement). A MERGE is comprised of the following parts.

  • A source table  (this would be our staging table)
  • A target table (this would be our production table)
  • A predicate (how to join the source and target tables)
  • An Update command to be executed when the source and target rows match on the predicate
  • An Insert command to be executed when the source row does not match a target row
  • A Delete command to be executed when the target rows do not exist in the source

For our purposes there was no need for the delete command because we want to keep the rows on the target table that do not exist in the source table. Here is the example MERGE statement that we will attempt to generate and execute via this stored procedure.  Within this example we have a staging/source table called edw.adventure_hospital_dm.visits and a production/target table called edw.staging.stg_visits.

MERGE INTO
--1)  Target table
edw.adventure_hospital_dm.visits tgt
USING
--2)  Source table
edw.staging.stg_visits src
ON
--3)  Predicate
src.financial_nbr = tgt.financial_nbr
--4)  Update into Source table when matched
WHEN matched THEN
UPDATE
SET tgt.name                 = src.name,
    tgt.financial_nbr        = src.fininancial_nbr,
    tgt.medical_number       = src.medical_number,
    tgt.registration_date    = src.registration_date,
    tgt.meta_orignl_load_dts = src.meta_orignl_load_dts,
    tgt.meta_update_dts      = src.meta_update_dts
--5)  Insert into Source table when not matched
WHEN NOT matched THEN
INSERT (
    name,
    financial_nbr,
    medical_number,
    registration_date,
    meta_orignl_load_dts,
    meta_update_dts
    )
VALUES (
    src.name,
    src.fininancial_nbr,
    src.medical_number,
    src.registration_date,
    src.meta_orignl_load_dts,
    src.meta_update_dts);

Before the stored procedure was built we determined that a useful MERGE stored procedure would need to satisfy the following requirements.

  1. Accept parameters to enter the Source database/schema/table and the Target database/schema/table.
  2. Automatically determine the predicate between the Source and Target tables
  3. Automatically determine whether a Source column data type matches a Target column data type and store the matching columns in a temp table
  4. Is smart enough to NOT update primary key column(s) on the Target table
  5. Generate a dynamic SQL MERGE statement based on the matched columns stored in the temp table
  6. Execute the dynamic SQL MERGE statement

Step 1: Accept parameters to enter the Source database, schema, and table along with the Target database, schema, and table.

Our goal for this solution was for it to be easy to use.  We wanted to be able to execute a stored procedure and simply tell it what two tables to merge.  To do this we chose to create a stored procedure with 8 parameters.  The first 3 parameters (database, schema, and table name) are used to pass in the Source table.  The second 3 parameters (database, schema, and table name) are used to pass in the Target table.  Even though the stored procedure will automatically generate the matching predicate (which is discussed in Step #2), we created the 7th parameter as an option to manually pass in a comma-separated list of predicate items to match on if the tables being merged either do not have a primary key or the user would like to match on something other than the primary key.

Here is the section of code that was used to create the stored procedure along with its arguments and necessary variables.

CREATE PROCEDURE [adventure_hospital].[generate_merge]
                @SrcDB          SYSNAME,         --Name of the Source database
                @SrcSchema      SYSNAME,         --Name of the Source schema
                @SrcTable       SYSNAME,         --Name of the Source table
                @TgtDB          SYSNAME,         --Name of the Target database
                @TgtSchema      SYSNAME,         --Name of the Target schema
                @TgtTable       SYSNAME,         --Name of the Target table
                @predicate      SYSNAME  = NULL, --(optional)Override to automatic predicate generation.  A comma-separated list of predicate match items
                @debug          SMALLINT = NULL  --(optional)Pass in 1 to kick out just the MERGE statement text without executing it
AS
BEGIN
	DECLARE @merge_sql      NVARCHAR(MAX);  --overall dynamic sql statement for the merge
	DECLARE @columns_sql    NVARCHAR(MAX);  --the dynamic sql to generate the list of columns used in the update, insert, and insert-values portion of the merge dynamic sql
	DECLARE @pred_sql       NVARCHAR(MAX);	--the dynamic sql to generate the predicate/matching-statement of the merge dynamic sql (populates @pred)
	DECLARE @pk_sql         NVARCHAR(MAX);  --the dynamic sql to populate the @pk table variable that holds the primary keys of the target table
	DECLARE @updt           NVARCHAR(MAX);  --contains the comma-seperated columns used in the UPDATE portion of the merge dynamic sql (populated by @columns_sql)
	DECLARE @INSERT         NVARCHAR(MAX);  --contains the comma-seperated columns used in the INSERT portion of the merge dynamic sql (populated by @insert_sql)
	DECLARE @vals           NVARCHAR(MAX);  --contains the comma-seperated columns used in the VALUES portion of the merge dynamic sql (populated by @vals_sql)
	DECLARE @pred           NVARCHAR(MAX);  --contains the predicate/matching-statement of the merge dynamic sql (populated by @pred_sql)
	DECLARE @pred_param     NVARCHAR(MAX) = @predicate;  --populated by @predicate.  used in the dynamic generation of the predicate statment of the merge
	DECLARE @pred_item      NVARCHAR(MAX);  --used as a placeholder of each individual item contained within the explicitley passed in predicate
	DECLARE @done_ind       SMALLINT = 0;   --used in the dynamic generation of the predicate statment of the merge
	DECLARE @dsql_param     NVARCHAR(500);  --contains the necessary parameters for the dynamic sql execution

Step 2: Automatically determine the predicate used between the Source and Target tables
Before the predicate is automatically generated there must be a check to see whether or not a predicate was passed in as a parameter. If the predicate parameter is populated then the comma-separated list that was passed in is broken down into its individual items and the predicate matching statement is generated and assigned to @pred.

If the predicate was not passed in as a parameter a dynamic SQL (@pred_sql) is generated to query the INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE table using the Target database, schema, and table parameters. The INFORMATION_SCHEMA table is automatically available in SQL Server 2000 and above. Since the output of this the dynamic SQL (@pred_sql) needs to be collected and assigned to a variable (@pred) a parameter (@dsql_param) needs to be passed in so that the output can be returned and assigned appropriately. Once the dynamic SQL (@pred_sql) and the dynamic SQL parameter (@dsql_param) have been created, they can be executed. The output can then be assigned to @pred. We now have our predicate statement.

/****************************************************************************************
* This generates the matching statement (aka Predicate) statement of the Merge.        *
* If a predicate is explicitly passed in, use that to generate the matching statement. *
* Else execute the @pred_sql statement to decide what to match on and generate the     *
* matching statement automatically.                                                    *
****************************************************************************************/
 
IF @pred_param IS NOT NULL
  -- If a comma-separated list of predicate match items were passed in via @predicate
  BEGIN
  -- These next two SET statements do basic clean-up on the comma-separated list of predicate items (@pred_param)
  -- if the user passed in a predicate that begins with a comma, strip it out
  SET @pred_param = CASE WHEN SUBSTRING(ltrim(@pred_param),1,1) = ',' THEN SUBSTRING(@pred_param,(charindex(',',@pred_param)+1),LEN(@pred_param)) ELSE @pred_param END
  --if the user passed in a predicate that ends with a comma, strip it out
  SET @pred_param = CASE WHEN SUBSTRING(rtrim(@pred_param),LEN(@pred_param),1) = ',' THEN SUBSTRING(@pred_param,1,LEN(@pred_param)-1) ELSE @pred_param END
  -- End clean-up of(@pred_param) *
  -- loop through the comma-seperated predicate that was passed in via the paramater and construct the predicate statement
  WHILE (@done_ind = 0)
    BEGIN
    SET @pred_item = CASE WHEN charindex(',',@pred_param) > 0 THEN SUBSTRING(@pred_param,1,(charindex(',',@pred_param)-1)) ELSE @pred_param END
    SET @pred_param = SUBSTRING(@pred_param,(charindex(',',@pred_param)+1),LEN(@pred_param))
    SET @pred = CASE WHEN @pred IS NULL THEN (COALESCE(@pred,'') + 'src.[' + @pred_item + '] = ' + 'tgt.[' + @pred_item + ']') ELSE (COALESCE(@pred,'') + ' and ' + 'src.[' + @pred_item + '] = ' + 'tgt.[' + @pred_item + ']') END
    SET @done_ind = CASE WHEN @pred_param = @pred_item THEN 1 ELSE 0 END
    END
  END
ELSE
  -- If an explicite list of predicate match items was NOT passed in then automatically construct the predicate
  -- match statement based on the primary keys of the Source and Target tables
  BEGIN
  SET @pred_sql = ' SELECT @predsqlout = COALESCE(@predsqlout+'' and '','''')+' +
                  '(''''+''src.''+column_name+'' = tgt.''+ccu.column_name)' +
                  ' FROM ' +
                  @TgtDB + '.INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc_tgt' +
                  ' INNER JOIN ' + @TgtDB +'.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu' +
                  ' ON tc_tgt.CONSTRAINT_NAME = ccu.Constraint_name' +
                  ' AND tc_tgt.table_schema = ccu.table_schema' +
                  ' AND tc_tgt.table_name = ccu.table_name' +
                  ' WHERE' +
                  ' tc_tgt.CONSTRAINT_TYPE = ''Primary Key''' +
                  ' and tc_tgt.table_catalog = ''' + @TgtDB + '''' +
                  ' and tc_tgt.table_name = ''' + @TgtTable + '''' +
                  ' and tc_tgt.table_schema = ''' + @TgtSchema + ''''
  SET @dsql_param = '	@predsqlout nvarchar(max) OUTPUT'
 
  EXEC sp_executesql
  @pred_sql,
  @dsql_param,
  @predsqlout = @pred OUTPUT;
END

The benefit of generating the predicate this way is that this automatically handles multiple primary key constraints on the Target table. All primary key constraints are evaluated and added into the predicate statement. The example output from the @pred variable is listed here.

src.financial_nbr = tgt.financial_nbr

Step 3: Automatically determine if a Source column data type matches the corresponding Target column data type and store the matching columns in a temp table
To do this we first need to create the custom table (@columns) that will be used to hold all the columns that exist between the Source and the Target tables. These columns must have the same name and same data type. By ‘same data type’ I mean both that the data type is the same and that the precision of the Target table column is at the same level or greater than the precision of the Source table column. For example, the medical_number column on the Source table may be varchar(10). As long as the corresponding medical_number column on the Target/Production table is at least a varchar(10) or greater the logic will consider that to be a match and store that column on the temp table.

--Create the temporary table to collect all the columns shared
--between both the Source and Target tables.
 
DECLARE @COLUMNS TABLE (
 table_catalog            VARCHAR(100) NULL,
 table_schema             VARCHAR(100) NULL,
 TABLE_NAME               VARCHAR(100) NULL,
 column_name              VARCHAR(100) NULL,
 data_type                VARCHAR(100) NULL,
 character_maximum_length INT NULL,
 numeric_precision        INT NULL,
 src_column_path          VARCHAR(100) NULL,
 tgt_column_path          VARCHAR(100) NULL
)

When the @columns temp table has been created a dynamic SQL statement is generated (@columns_sql) and executed to populate the temp table. The @columns_sql dynamic SQL statement takes the values passed in via parameters and uses them to query the INFORMATION_SCHEMA.COLUMNS table to find matching columns between the Source and Target tables. Once the @columns_sql dynamic SQL has been constructed it is executed and the values returned are automatically inserted into the @columns temp table.

/************************************************************************************************
* Generate the dynamic sql (@columns_sql) statement that will                                  *
* populate the @columns temp table with the columns that will be used in the merge dynamic sql *
* The @columns table will contain columns that exist in both the source and target             *
* tables that have the same data types.                                                        *
************************************************************************************************/    
 
SET @columns_sql =
'SELECT
tgt.table_catalog,
tgt.table_schema,
tgt.table_name,
tgt.column_name,
tgt.data_type,
tgt.character_maximum_length,
tgt.numeric_precision,
(src.table_catalog+''.''+src.table_schema+''.''+src.table_name+''.''+src.column_name) AS src_column_path,
(tgt.table_catalog+''.''+tgt.table_schema+''.''+tgt.table_name+''.''+tgt.column_name) AS tgt_column_path
FROM
     ' + @TgtDB + '.information_schema.columns tgt
     INNER JOIN ' + @SrcDB + '.information_schema.columns src
       ON tgt.column_name = src.column_name
       AND tgt.data_type = src.data_type
       AND (tgt.character_maximum_length IS NULL OR tgt.character_maximum_length >= src.character_maximum_length)
       AND (tgt.numeric_precision IS NULL OR tgt.numeric_precision >= src.numeric_precision)
     WHERE tgt.table_catalog     = ''' + @TgtDB + '''
     AND tgt.table_schema        = ''' + @TgtSchema + '''
     AND tgt.table_name          = ''' + @TgtTable + '''
     AND src.table_catalog       = ''' + @SrcDB + '''
     AND src.table_schema        = ''' + @SrcSchema + '''
     AND src.table_name          = ''' + @SrcTable + '''
     ORDER BY tgt.ordinal_position'
 
     --execute the @columns_sql dynamic sql and populate @columns table with the data
     INSERT INTO @COLUMNS
     EXEC sp_executesql @columns_sql

Now that the @columns temp table has been populated we have a list of all the columns that will be referenced in the overall MERGE statement.

Step 4:  Has logic to NOT update the primary key column(s) on the target table

While investigating the efficiency gains of using the merge statement we found that we were not getting the gains we were expecting.  We already knew that we were updating all columns within the merge statement when the predicate match was satisfied.  The problem is that when you update a row’s primary key (even if you are updating the primary key to its original value) that the update is essentially handled as an insert and therefore you end up losing some efficiency, actually a lot of efficiency. I wanted to thank Paul White for his blog post regarding “The Impact of Non-Updating Updates“.

To prevent the primary key from being updated within the MERGE update we created a temp table to record the primary key column(s) from the target table.  This temp table is later referenced to filter out the primary key column(s) when the update portion of the MERGE statement (@updt) is generated in the next step.

/**************************************************************************************
* Create the temporary table to collect all the primary key columns                  *
* These primary key columns will be filtered out of the update portion of the merge  *
* We do not want to update any portion of clustered index for performance            *
**************************************************************************************/
 
DECLARE @pk TABLE (
  column_name              VARCHAR(100) NULL
);
 
SET @pk_sql = 'SELECT ' +
              'ccu.column_name ' +
              'FROM ' +
              @TgtDB + '.INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc_tgt ' +
              'INNER JOIN ' + @TgtDB +'.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu ' +
              'ON tc_tgt.CONSTRAINT_NAME = ccu.Constraint_name ' +
              'AND tc_tgt.table_schema = ccu.table_schema ' +
              'AND tc_tgt.table_name = ccu.table_name ' +
              'WHERE ' +
              'tc_tgt.CONSTRAINT_TYPE = ''Primary Key'' ' +
              'and tc_tgt.table_catalog = ''' + @TgtDB + ''' ' +
              'and tc_tgt.table_name = ''' + @TgtTable + ''' ' +
              'and tc_tgt.table_schema = ''' + @TgtSchema + ''' ' 
 
INSERT INTO @pk
EXEC sp_executesql @pk_sql

Step 5:  Generate a dynamic SQL MERGE statement based on the matched columns stored in the temp table

For the overall MERGE statement there are 3 sets of comma-separated columns that need to be generated from the data collected in the @columns temp table. These will be the update columns, insert columns, and the insert-values columns. In our example MERGE statement the 3 sets of columns are outlined below in red.

For each one of the 3 sets of columns a query needs to be executed against the @columns temp table. We needed a way to query the columns on the @columns temp table and loop through them creating a comma separated list. Since such a query will return more than row, if you try to assign the output of this query to a variable you will receive the error “Subquery returned more than 1 value.”

To get around this we used SQLServer’s FOR XML functionality. For anyone not familiar with FOR XML, simply put FOR XML allows you to run a query and format all of the output as XML. It then assigns that output to a variable of type XML. We essentially ran the same query but used FOR XML PATH(”) which allowed us to loop through the data results by generating an XML version of the data with absolutely no XML formatting. Since FOR XML automatically returns a data type of XML, we then cast the results to convert them from XML to NVARCHAR.

What you are left with is a string that contains all the comma-separated columns that can be used for the MERGE statement. Obviously, this is not the intended use of FOR XML, but this suited our needs nicely.

Here is the section of the stored procedure used to generate the comma-separated columns used for the update portion of the overall MERGE statement.  You can see below that the @pk temp table we created in the previous step is now referenced to filter out primary key column(s) from the update.

--1) List of columns used for Update Statement
--Populate @updt with the list of columns that will be used to construct the Update Statment portion of the Merge
 
SET @updt = CAST((SELECT ',tgt.[' + column_name + '] = src.[' + column_name + ']'
            FROM @COLUMNS c
            WHERE c.column_name != 'meta_orignl_load_dts'                               --we do not want the original time the row was created to be overwritten
            AND NOT EXISTS (SELECT 'x' FROM @pk p WHERE p.column_name = c.column_name)  --we do not want the primary key columns updated for performance
            FOR XML PATH(''))
            AS NVARCHAR(MAX)
            )

**Special Note**   For the columns used in the update portion of the MERGE statement, we filtered out the meta_orginl_load_dts column. The EDW uses this column as meta data to record when a row was originally written out to the data mart. Upon an update, we do not want this column to be updated with the meta_orignl_load_dts from the staging/source table. We still want to preserve the original load date time of that row. The date and time the row was updated will be represented in the meta column meta_update_dts. Second, the comma-separated list of columns used for the insert portion of the overall MERGE statement need to be generated. Here is section of the stored procedure used to generate the comma-separated columns used for the insert portion of the overall MERGE statement.

--2) List of columns used for Insert Statement
--Populate @insert with the list of columns that will be used to construct the Insert Statment portion of the Merge
 
SET @INSERT = CAST((SELECT ',' + '[' + column_name + ']'
              FROM @COLUMNS
              FOR XML PATH(''))
              AS NVARCHAR(MAX)
              )

Here is the output of the @insert variable for our example.

,name
,financial_nbr
,medical_number
,registration_date
,meta_orignl_load_dts
,meta_update_dts

Here is section of the stored procedure used to create the comma-separated list of columns used for the insert-values portion of the overall MERGE statement.

--3) List of columns used for Insert-Values Statement
--Populate @vals with the list of columns that will be used to construct the Insert-Values Statment portion of the Merge	
 
SET @vals = CAST((SELECT ',src.' + '[' + column_name + ']'
            FROM @COLUMNS
            FOR XML PATH(''))
            AS NVARCHAR(MAX)
            )

Here is the output of the @vals variable for our example.

,src.name
,src.fininancial_nbr
,src.medical_number
,src.registration_date
,src.meta_orignl_load_dts
,src.meta_update_dts

Step 6: Execute the dynamic SQL MERGE statement
Now that we have the Source database/schema/table, the Target database/schema/table, the predicate matching statement, and the update/insert/values comma-separated lists of columns, we have everything we need to generate the entire dynamic SQL MERGE statement. Once the MERGE statement is generated it then gets executed automatically.

Here is the section of the stored procedure used to pull everything together into the overall MERGE statement (@merge_sql) and then execute it.

/*************************************************************************************
*  Generate the final Merge statement using the following...                        *
*    -The parameters (@TgtDB, @TgtSchema, @TgtTable, @SrcDB, @SrcSchema, @SrcTable) *
*    -The predicate matching statement (@pred)                                      *
*    -The update column list (@updt)                                                *
*    -The insert column list (@insert)                                              *
*    -The insert-value column list (@vals)                                          *
*    -Filter out Primary Key from the update (updating primary key essentially      *
*     turns the update into an insert and you lose all efficiency benefits)         *
*************************************************************************************/
 
SET @merge_sql = (' MERGE into ' + @TgtDB + '.' + @TgtSchema + '.' + @TgtTable + ' tgt ' +
                  ' using ' + @SrcDB + '.' + @SrcSchema + '.' + @SrcTable + ' src ' +
                  ' on ' + @pred +
                  ' when matched then update ' +
                  ' set ' + SUBSTRING(@updt, 2, LEN(@updt)) +
                  ' when not matched then insert (' + SUBSTRING(@INSERT, 2, LEN(@INSERT)) + ')' +
                  ' values ( ' + SUBSTRING(@vals, 2, LEN(@vals)) + ');'
                 );
 
--Either execute the final Merge statement to merge the staging table into production
--Or kick out the actual merge statement text if debug is turned on (@debug=1)
IF @debug = 1
  BEGIN
  -- If debug is turned on simply select the text of merge statement and return that
  SELECT @merge_sql;
  END
ELSE
  BEGIN
  -- If debug is not turned on then execute the merge statement
  EXEC sp_executesql @merge_sql;
END

The substring() function is used on @updt, @insert, and @vals to remove the preceding comma from each set of columns. This could have been done earlier in the stored procedure but we decided to take care of it here.

Here is the output of the @merge_sql variable from our example.

MERGE INTO edw.adventure_hospital_dm.visits tgt
USING edw.staging.stg_visits src
ON src.financial_nbr = tgt.financial_nbr
WHEN matched THEN UPDATE
SET tgt.name = src.name,
tgt.medical_number = src.medcial_nubmer,
tgt.registration_date = src.registration_date,
tgt.meta_orignl_load_dts = src.meta_orignl_load_dts,
tgt.meta_update_dts = src.meta_update_dts
WHEN NOT matched THEN INSERT (
name,
financial_nbr,
medical_number,
registration_date,
meta_orignl_load_dts,
meta_update_dts
)
VALUES (
src.name,
src.financial_nbr,
src.medical_number,
src.registration_date,
src.meta_orignl_load_dts,
src.meta_update_dts);

Conclusion

We are currently working towards utilizing this stored procedure within our SSIS packages for populating datamarts. Using our current process, a simplified version of one of our SSIS packages would look like the example below.

By using the stored procedure to merge the data from staging into the production table we can replace the last 2 tasks with one Execute SQL task. That Execute SQL task simply executes the stored procedure. Using this method the example SSIS package would now look like this.

Here is the SQL code that is executed by the Execute SQL task.

Although this stored procedure as a bit tedious to build, it is worth it. We now have a stored procedure to run from SSIS that suits our data warehousing needs for merging staging data into production.

There are many benefits to merging data via a stored procedure.

  • Easily executed through an SSIS SQL Task
  • Easily reusable
  • Automatically builds and executes the merge statement so there is no need for manual creation or maintenance of the merge SQL statement.
  • Reduces fragmentation since there are less deletes performed on the production table
  • Since rows are updated instead of deleted and reinserted, the ETL package runs faster
  • Overcomes SSIS SQL Task 32,676 character limit
  • Preserves original row load date and time (since the row is updated instead of deleted and re-inserted)
Here is a link to the entire generate_merge stored procedure sql code, generage_merge.

In Practice
The largest table the EDW imports from the primary inpatient Electronic Health Record (EHR) source system stores clinical results and contains about 2.4 billion rows. Currently we merge between 600,000 – 900,000 rows of data from that source system table into the corresponding table within our EDW. With our old 2-step process of deleting rows from the production table that exist in the staging table and then inserting new rows, the whole process took around 2 hours to complete each night. Once we implemented the merge stored procedure into the ETL package that time was reduced to about 6 minutes!A bulk of the processing time in the old process was due to the deletes that were performed on a table that contained over 2 billion rows.

Limitations
This stored procedure is meant for data warehousing type functionality.  It assumes you are using a Staging-To-Production type of model where the staging table and production table are almost identically built with the same column names and very similar data types. Also, this stored procedure does not utilize the delete functionality of the MERGE statement.

Resources
Dhaneenja, T. (2008). Understanding the MERGE DML Statement in SQL Server 2008.

  SQLServerCentral.com. Retrieved from

  http://www.sqlservercentral.com/articles/SQL+Server+2008/64365/

ETL Assistant – Getting Error Row Description and Column Information Dynamically

This article is going to attempt to provide one solution to the question of row error management in SSIS.  It’s one option, specially constructed for dynamic column mapping scenarios, but could probably be exploited for static situations as well.

TLDR:

Management of Bad Rows in SSIS

For ETL, SSIS does a fine job at letting you manage the basics of copying one column of data in some source table to another column of data in destination table.  Assuming all goes well, you wind up extracting/transforming/loading that data.

If things don’t go well, however…

Exception handling is a central part of any development task and usually winds up representing a significant chunk of your time and code. You wind up covering any number of “what ifs” like:

  • What if I failed to connect to a system?
  • What if I expected data and didn’t get any?
  • What if my expected data type overflowed?
  • What if something totally unanticipated happened?

ETL Assistant Error Logger - Basic SSIS DFT Without Error HandlingIf you’ve used SSIS for ETL you’re accustomed to the idea of data flow paths inside of a transformation.  You connect a source component to a destination component via either a green line (“good output”) or a red line (“bad / error output”).  This is great stuff.  Say you query some rows from a source database table and want to send the rows to a destination database table – you simply wire up the green line from the source to the destination and map the columns.  Done.  Walk away.

But what about the implied red line for bad rows?  What if you actually have an issue with the transformation?  Two immediate reasons come to mind:

  • The data was truncated in some way (cast my Oracle number(20,0) to a SQL int)
  • Some other unanticipated error occurred (for the sake of explanation, let’s say a primary key violation on insert)

ETL Assistant - SSIS DFT Error Row Redirection

Usually what you’d do with a static transformation is simply use row redirection to handle the exception.  A common solution is to log your error information to a shared error log table for later review.  By attaching the appropriate error output to your destination you “channel” the row information to that destination so you have a hope of figuring out what happened and what you can do about it.

SSIS usually works really well for these situations, with the exception of two nagging challenges you’ll see come up a lot in discussion forums:

  • “My row failed – how do I get the error description?”
  • “My row failed – how do I tell which row failed?”

Error description is fairly straight forward and I’m not going to get into it too much – there’s a great step-by-step example at (http://consultingblogs.emc.com/jamiethomson/archive/2005/08/08/1969.aspx) which is very instructive.

Error row identifier, though, is a bit more complex because of the way SSIS works.

Error Columns and Lineage IDs

I’m going to preface this next section by noting that I don’t have a super clear picture on the internals of how SSIS column flow works, but I get a sense of it.  Please feel free to comment / email me and I’ll update anything that needs correcting.

 

Let’s say you have a row with an integer column “employee_id” which is the primary key on a table.  What you see is a single presentation of that column “employee_id” – it’s labeled that way throughout your data transformation flow, so to you it’s “the same” throughout the flow.  What SSIS sees internally, however, is something completely different.  If you dig a bit you’ll find you have a unique representation of this column at each point throughout the flow of your SSIS package.  That single “column” (“employee_id”) has to be treated uniquely at each input, output, and error output for each step.  Beyond needing to understand how to treat flow direction (ex: input column vs output column), the column itself may change data types, names, or even value as it flows through your package.  SSIS needs to keep track of that “column” at each point throughout the flow and treat it as though it’s unique.  So how does it do that?  LineageID.

There’s a great article on SQL Server Central (http://www.sqlservercentral.com/articles/Integration+Services+(SSIS)/65730/ ) that touches on some of this.  The article describes lineageid as

It’s an integer and it’s unique throughout the data flow. When buffers are reused, the Lineage ID doesn’t change – it’s the same column at the input and output. When buffers are copied, a new column is created – which gets a new (unique) Lineage ID.

That means that as the column “employee_id” flows through the DFT, it gets a unique Lineage ID - for each input and output copy of itself.  And, typically, you have…

  • An input column
  • An output column for “good” data
  • An output column for errors

Taking the “employee_id” example from the “OLE DB Source” step in our DFT we’d have:

  • Input (ID = 33)
  • Source Error Output (Lineage ID 35)
  • Good Output (Lineage ID = 34)

ETL Assistant - SSIS DFT Lineage ID Flow

Great!  No problem.  As long as we know the LineageIDs related to our steps we can back track to determine the mapping to “column name” and voila – we know which row failed.  We can simply look up the column by LineageID using “FindColumnByLineageID” in a script task (http://msdn.microsoft.com/en-us/library/microsoft.sqlserver.dts.pipeline.wrapper.idtsbuffermanager100.findcolumnbylineageid.aspx).  Magic.

Not so fast.  One small, but critical catch.  Metadata about a task step is only available within the scope of that task step. Meaning – once we get past “OLE DB Source” I can see “Lineage ID,” but I can’t easily track back to determine the mapping of Lineage ID to column name.  So – if you want to write out error row information (specifically “column name”) in a second DFT (to your error log, for example) there’s no way to look up that name – because the metadata about LineageID is no longer in scope – it’s only available to the prior step.  Incredibly frustrating.

Getting Error Column Information With Static DFTs

For static packages this can be addressed a few ways. The general strategy is to map the Lineage IDs / IDs to column information at design time and then use that information to look up the information you need.

Couple of quick links you may find handy.

Again – for static packages, these can mostly if not completely solve the issue and leave you in a far better position to determine which rows failed.  I’m not going to go into these since you can read up online.

So What About a Dynamic DFT?

Note that the links I provided above address design time gathering / mapping of column information.   What do you do about a runtime situation?  We started digging into the CozyRoc dynamic DFT about a year ago.  Basic dynamic mappings worked great.  You can easily remap columns at runtime and, assuming all goes well, you’re done.  But if things don’t go well – what then?

We need to catch and log those bad rows.  But – we can’t map columns / Lineage ID information at design time because that negates the entire point of using a dynamic DFT – you won’t know any of the required information. It’s just not there.  Now that issue with the resolution of metadata from prior steps comes into play.  We can’t generate column information at design time and we can’t inspect metadata from ancestor steps within a DFT.  They’re out of scope.

I’ll admit that when I first looked at this I was stumped.  And incredibly frustrated.  There was this great opportunity to really let SSIS rock using CozyRoc’s dynamic DFT, but the inability to handle bad rows in a data warehousing solution is a showstopper (keep in mind the issue here is an SSIS design constraint, not a CozyRoc fault).  Following the examples for handling static mappings online (thank you very much, above-linked article authors), we had the notion that we should be able to pull some of the DFT information out at runtime and approach the problem somewhat similarly.

  • Upon startup, obtain a list of all columns, their IDs, and their Lineage IDs
  • Store that list in a collection
  • Using the IDs / Lineage IDs from the errors to look up the corresponding record in our collection
  • Profit

I rang up CozyRoc and discussed the situation with their engineers.  They immediately understood my intentions and mailed me back a quick sample of some code that exploited a fantastic capability of their dynamic DFT – the ability to add script to the DFT itself. (Thanks, CozyRoc!)  Not code via a script task within the DFT, but on the DFT directly.

CozyRoc DFT+ (http://www.cozyroc.com/ssis/data-flow-task) notes that you can apply script on the DFT by accessing…

  • Advanced tab – specifies advanced task options.
  • Script page – specifies data flow task script, which is used for Setup tab customization.

Aha.  And the magic snippet they supplied me…

public void OnColumnAdded(IDTSComponentMetaData100 component, bool isInput, string colName)
//do stuff

Great!  They provided event hooks for the dynamic column mapping!  So now I can detect when a column is added to the DFT flow, add it to my reference collection of column information, and then access that collection within the DFT to derive column information critical to error logging.

This will let me take “Lineage ID” 12345 at any point throughout the flow and figure out that it was column “employee_name_concat” or whatever and log that.  We’re in business.

Something to note here.  Handling row truncation behavior is trickier when you’re doing this dynamically.  You can now longer manually address the need to “redirect on truncation” on a column by column basis, so you just extend the magic DFT+ column binding event to do it for you.

 

if (!isInput)
{
      IDTSOutputColumn100 column = component.OutputCollection[0].OutputColumnCollection[colName];
      column.TruncationRowDisposition = DTSRowDisposition.RD_RedirectRow;
      column.ErrorRowDisposition = DTSRowDisposition.RD_RedirectRow;
}

Done. Setting row disposition behavior accomplished.

From there we wrote up the nastier parts of the whole exercise – the entire collection lookup mechanism to derive column information.  We did that as a script task within the body of the DFT.

ETL Assistant - SSIS DFT Error Script Task

The script task pulls rows out of the buffer and evaluates row position to resolve the Lineage ID / ID and determine…

  • Source column name (EX: “first_name”)
  • Source primary key name (EX: “employee_id”)
  • Source primary key value (EX: “12345″)
  • Error description (using ComponentMetaData.GetErrorDescription)
  • Error data (so we can quickly eyeball the offending column)

You’ll note I said “primary key name” – we felt it was “good enough” for the moment to avoid dealing with compound keys.  That’s definitely a shortcoming, but for the time being we felt that was acceptable since it matched our existing static ETL error handling process.  It’s definitely something that needs to be addressed, though.  We also cheat by explicitly passing in the primary key as an element of the process (we derive it at an earlier step) – again, in consulting speak, an “opportunity for improvement.”

Putting it All Together

Now that we’ve touched on the ideas, let’s see it work.  Rather than walk you through the entire step-by-step process of building a package I’m going to suggest you Dynamic DFT Error Handler Sample Project.  I’ll quickly touch on the major points on how the sample works.

The download includes some SQL scripts to set up…

  • [etl_proxy_tableetl_proxy_table_stgetl_proxy_table_src] We use some fake placeholder “proxy” tables  so you can set up data bindings in the DFT+.  CozyRoc also suggests you use THUNK_COLUMNs to do this, but I’ve found using these placeholder tables to be very helpful.  The reason we use these is that the magic OnColumnAdded method only fires when a column is actually added to the DFT. If you statically map any of the columns the entire error handling approach will fail because we won’t have those “static” columns added to our column collection.  Huge thank-you to CozyRoc for clueing me in on that.
  • [etl_errors] our error logging table. YMMV, but remember if you change this you also need to adjust the scripts in the DFT.
  • [demo_source_table, demo_dest_table] our source and destination tables.  We’re big Simpsons fans over here, so I’ve provided appropriate sample data.

The overall package has a few steps:

  • ["Set Table Information"] - A cheater Script Task to mimic pulling table configuration information.  In a production scenario you’d likely want to provide configuration elements from either a config file or, better yet, a configuration table.
  • ["SQL Get First table_keycol name"] - An Execute SQL task which we’ll use to pull out primary key information from our destination table.  This just uses INFORMATION_SCHEMA to look up your target table and pull back the first column for the primary key.  If you use unique constraints or something else, just tweak the SQL or overwrite the destination variable.
  • ["Truncate Destination Table"] - A second Execute SQL task to truncate our destination table (for a full load)
  • ["Data Flow Task Plus"] - A CozyRoc DFT+ task for our dynamic loading.  The brains of the operation.

ETL Assistant - Dynamic Error Handling - Overall Package Step Flow

We also have variables.  In our production deployment we have lots and lots of variables.

The major points here are:

  • table_colmap is a System.Object that is our collection of column names, IDs, and Lineage IDs for all columns in our DFT.  I scoped this to our DFT+ task because it’s specific to that task, but you could get away with scoping it to the package.
  • Everything else.  We’re more or less mimicking the variables we used in previous articles.

ETL Assistant - Dynamic Error - Variables

Let’s move on to the DFT.  Open up the DFT+.  You’re going to see two main paths:

  • We had an issue obtaining the source data.  (right side) Yes.  This does happen.  Case in point – you have a date of “-4444 AD” in Oracle.  The OLEDB driver we use for Oracle really doesn’t like that.  Or even a 44 digit numeric.
  • We had an issue writing to the destination table. (left side)
In both paths we simply channel the error rows to our error handler script task to process the buffer and do its magic.  I cheat by seeding the flow with additional error columns we overwrite within the task.  Mainly because I’m too lazy to magically add columns to the buffer myself from within the script task.

ETL Assistant - Dynamic Error Flow DFT+

Let’s give it a whirl and see what happens.

I’ve intentionally created opportunities for problems.

Column Source Destination
column_name DATA_TYPE MAX_LEN DATA_TYPE MAX_LEN
employee_id int NULL int NULL
employee_guid uniqueidentifier NULL uniqueidentifier NULL
email_addr varchar 20 varchar 15
first_nm varchar 20 varchar 10
last_nm varchar 20 varchar 10
awesomeness bigint NULL int NULL
create_dts datetime NULL datetime NULL
modified_dts datetime NULL datetime NULL

The destination columns will have conversion issues with

  • email_addr length
  • first_nm length
  • last_nm length
  • awesomeness (rating) size
employee_id email_addr first_nm last_nm awesomeness
1 jjones@test.org Jimbo Jones 25
2 captain@test.org Horatio McCallister 100000
3 homer@test.org Homer Simpson 25000
4 marge@test.org Marjorie Simpson 250000000000
5 cruiser@test.org Waylon Smithers 100
6 bart@test.org Bartholomew Simpson 25
7 lisasimpson@test.org Lisa Simpson 25

If we run the package and review our error log we’ll see failures related to the highlighted columns.  (Note that I’ve removed some elements of the exception log here solely for formatting)

 

error_id record_id record_id_dsc column_nm error_id error_dsc error_data
8 2 employee_id email_addr -1071607689 The data value cannot be converted for reasons other than sign mismatch or data overflow. captain@test.org
9 4 employee_id awesomeness -1071607686 Conversion failed because the data value overflowed the type used by the provider. 250000000000
10 5 employee_id email_addr -1071607689 The data value cannot be converted for reasons other than sign mismatch or data overflow. cruiser@test.org
11 6 employee_id first_nm -1071607689 The data value cannot be converted for reasons other than sign mismatch or data overflow. Bartholomew
12 7 employee_id email_addr -1071607689 The data value cannot be converted for reasons other than sign mismatch or data overflow. lisasimpson@test.org

“You’re failing, Seymour! What is it about you and failure?”

There you go – row exceptions being logged for various issues with data from the dynamic DFT.

How Denali Should Fix This

We’re eagerly anticipating Denali for several reasons, but one fantastic piece of news is that SSIS in Denali should let us bypass most if not all of the issues with LinageID.  As Jorg Klein notes in one of his blog posts (http://sqlblog.com/blogs/jorg_klein/archive/2011/07/22/ssis-denali-ctp3-what-s-new.aspx):

SSIS always mapped columns from source to transformations or destinations with the help of lineage ids. Every column had a unique metadata ID that was known by all components in the data flow. If something changed in the source this would break the lineage ids and raised error messages like: The external metadata column collection is out of synchronization with the data source columns.
To fix this error you would re-map all broken lineage ids with the “Restore Invalid Column References Editor”.
In Denali lineage-ids are no longer used. Mappings are done on column names, which is great because you can now use auto map on column names and even copy/paste pieces of another data flow and connect them by mapping the corresponding column names.

Fan.  Tastic.  Couldn’t come soon enough.  Granted, you’ll have to upgrade to Denali to make use of this, but there are so many other compelling reasons to migrate (http://www.brentozar.com/sql/sql-server-denali-2011-2012/) that this is just icing on the cake.

 

Appendix – Code

This code is provided in the download, but for quick access / reference I’m also including it here.

DFT+ Column Collection Script

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
 
/*
 
 Add references to ...
     CozyRoc.SSISPlus.2008
     Microsoft.SqlServer.DTSPipelineWrap
     Microsoft.SQLServer.DTSRuntimeWrap 
 
 */
 
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using CozyRoc.SqlServer.SSIS.Attributes;
using CozyRoc.SqlServer.SSIS;
 
using System.Collections;
using System.Collections.Generic;
 
namespace ST_44af5cee356540e294c47d0aa17d41ed.csproj
{
    [System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "", Description = "")]
    [DataFlowColumnAdded("OnColumnAdded")]//CozyRoc annotation
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
 
        #region VSTA generated code
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
 
        public void OnColumnAdded(IDTSComponentMetaData100 component, bool isInput, string colName)
        {
 
            try
            {
                if (!isInput)
                {
                    IDTSOutputColumn100 column = component.OutputCollection[0].OutputColumnCollection[colName];
                    column.TruncationRowDisposition = DTSRowDisposition.RD_RedirectRow;
                    column.ErrorRowDisposition = DTSRowDisposition.RD_RedirectRow;
                }
                if (isInput)
                {
 
                    IDTSInputColumn100 column = component.InputCollection[0].InputColumnCollection[colName];
                    Dictionary colmap = new Dictionary();
                    Variables variables = null;
 
                    try
                    {
                        Dts.VariableDispenser.LockOneForWrite("User::table_colmap", ref variables);
                        if (variables["User::table_colmap"].Value.GetType() == colmap.GetType())
                        {
                            colmap = (Dictionary)variables["User::table_colmap"].Value;
                        }
                        else
                        {
                        }
                        colmap.Add(column.ID, column.Name);
                        variables["User::table_colmap"].Value = colmap;//put the column collection back into the variable
                    }
                    catch (Exception exi)
                    {
                    }
                    finally
                    {
                        variables.Unlock();
                    }
                }
            }
            catch
            {
            }
 
        }
 
        public void Main()
        {
 
            Dts.TaskResult = (int)ScriptResults.Success;
        }
    }
}

Error Row Handler Script

/* Microsoft SQL Server Integration Services Script Component
*  This is CozyRoc Script Component Plus Extended Script
*  Write scripts using Microsoft Visual C# 2008.
*  ScriptMain is the entry point class of the script.*/
 
using System;
using System.Text;
 
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
 
using System.Collections;
using System.Collections.Generic; //for our dictionaries / lists
 
/*
 * HOW IT WORKS:
 * ======================================================================
 * Thanks to CozyRoc's great sample code (thanks, CozyRoc! :), we're able to rip through the
 * set of columns and find the error info critical for our logging / fixing.  We get the basic
 * column info on PreExecute() and store the column names, column lineage IDs, and column relative
 * position ("index") in two separate dictionaries for later.  We use LineageID as the key for those
 * and the later on during Input_ProcessInputRow to look up those names and IDs so we can pull back
 * data from the buffer and then also UPDATE the buffer to overwrite our custom error info columns
 *
 * Dictionary 1: Set of column names ("colnames") key: LineageID, value: column.Name
 * Dictionary 2: Set of column relative positions ("colids") key: LineageID, value colIndex
 *
 * PreExecute - set up objects for later.  IDs for columns, dictionaries, variables, etc.
 * Input_ProcessInputRow - the "real work" of adjusting / setting the values in the columns
 *
 * SETUP - READ THIS OR IT WON'T WORK
 * ======================================================================
 * REQUIRED INPUT COLUMNS
 * -------
 * We anticipate the following input columns being present (sent to the script task as inputs)
 *
 * Standard "Error Output" columns from tasks
 * ------
 * ErrorColumn      MSFT - The Lineage ID for the error column
 * ErrorCode        MSFT - The SSIS error code
 *
 * Additional error columns specific to our purposes.  You can reuse these or update the column names
 * ------
 * error_id         CUSTOM - Same as the SSIS error code, but we need them for our table
 * column_nm        CUSTOM - The name of the column where the error occurred
 * record_id_dsc    CUSTOM - the column name for the "primary key" column (EX: employee_id)
 * record_id        CUSTOM - the value/ID for the "primary key" column so you can look up the row later
 *                              EX:"12345" in column "employee_id"
 *
 * error_id         CUSTOM - the SSIS error (same as ErrorCode, but for my purposes we left it here)
 * error_dsc        CUSTOM - the human-readable description of the SSIS error EX: "The data was truncated."
 *
 * REQUIRED VARIABLES
 * -------
 * NOTE: You MUST set these up as a read-only variables within your script task.
 *
 * Package Variable: @colmap (dictionary) - the collection of column names and IDs for our dynamic columns
 *                                        - this is set in the outer DFT+ OnColumnAdded()
 *                                        - we use this to pull out the full list of columns since we can't get ahold
 *                                        - of the prior step's column IDs/LineageIDs when we're in this script task
 *
 * Package Variable: @table_keycol (string) - the name of the column that represents your primary key
 *                                              EX: "employee_id"
 *
 * This is a cheap hack, but for my situation I'm OK with that.  We don't necessarily know what a "key"
 * column is at this point - primary key, I mean here.  So to get around that we set that value in a variable
 * within the overall package.  We then use that variable to say "oh, that's the key column" later and retrieve
 * the column name and the column value so we can write out our primary key reference info.  You'll see the
 * obvious limitation - we don't support compound primary keys.  But neither does my logging table, so...
 * 
 
*/
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
 
    private int[] m_idx;
 
    private string key_col_name;        //"name" of primary key column.  EX: "employee_id".
    private int key_col_id;             //Relative column index / position of our "primary key" column
    //single primary key column.  Does not handle compound primary keys.  Retrieve this from a package variable since we want to handle this
    //dynamically and can't automatically determine it from within the package at runtime
 
    private Dictionary colnames;           //collection to store our colnames for later use within row processing section
    private Dictionary colpositions;
    private Dictionary colidsbyposition;
    private Dictionary colids;           //collection to store our column ids for later use within row processing section
    private Dictionary colmap;
 
    //Internal column tracking numbers.
    //You could probably avoid using these as separate variables, but...
    // 1. I'm not that clever
    // 2. I really, really wanted to explicitly watch them as they moved around
    private int i_error_code_id;
    private int i_error_column_id;
    private int i_error_id;
    private int i_column_nm;
    private int i_record_id;
    private int i_record_id_dsc;
    private int i_error_dsc;
    private int i_error_data;
 
    StringBuilder _sbColIDs = new StringBuilder();
    StringBuilder _sbErrorCols = new StringBuilder();
 
    private bool isSourceErrorOutput = false;// = true;
    private string _OLEDBSourceType = "";
 
    public override void PreExecute()
    {
        base.PreExecute();
 
        colnames = new Dictionary();
        colpositions = new Dictionary();
        colids = new Dictionary();
        colidsbyposition = new Dictionary();
 
        colmap = new Dictionary();
 
        try
        {
            if (Variables.tablecolmap.GetType() == colmap.GetType())
            {
                colmap = (Dictionary)Variables.tablecolmap;
            }
        }
        catch (Exception exi)
        {
        }
 
        IDTSInput100 input = base.ComponentMetaData.InputCollection[0];
        IDTSVirtualInput100 virtInput = input.GetVirtualInput();
        int colsCount = virtInput.VirtualInputColumnCollection.Count;
        m_idx = new int[colsCount];
        for (int colIndex = 0; colIndex < colsCount; colIndex++)         {             IDTSVirtualInputColumn100 column = virtInput.VirtualInputColumnCollection[colIndex];             //================================================================             //pull out the error codes and column IDs             if (string.Compare(column.Name, "ErrorColumn", true) == 0)             {                 i_error_column_id = colIndex;             }             if (string.Compare(column.Name, "ErrorCode", true) == 0)             {                 i_error_code_id = colIndex;             }             if (string.Compare(column.Name, "error_id", true) == 0)             {                 i_error_id = colIndex;             }             if (string.Compare(column.Name, "column_nm", true) == 0)             {                 i_column_nm = colIndex;             }             if (string.Compare(column.Name, "record_id", true) == 0)             {                 i_record_id = colIndex;             }             if (string.Compare(column.Name, "record_id_dsc", true) == 0)             {                 i_record_id_dsc = colIndex;             }             if (string.Compare(column.Name, "error_dsc", true) == 0)             {                 i_error_dsc = colIndex;             }             if (string.Compare(column.Name, "error_data", true) == 0)             {                 i_error_data = colIndex;             }             //add our column names to our list for later use             colnames.Add(column.LineageID, column.Name); //column.LineageID used to look up index of error column name in row             colids.Add(column.LineageID, colIndex); //column.LineageID used to look up index of error column index position in row             colidsbyposition.Add(colIndex, column.LineageID);             try             {                 colpositions.Add(column.Name, colIndex);             }             catch { }             try             {                 //is this column the "key" column we're using to identify the key values for the row? EX: primary key                 //NOTE: we're only doing this for a single member if a compound primary key                 if (string.Compare(column.Name, Variables.tablekeycol, true) == 0)//true = ignore case during comparison                 {                     key_col_id = colIndex;                     key_col_name = column.Name;                 }             }             catch { }             //================================================================             m_idx[colIndex] = base.HostComponent.BufferManager.FindColumnByLineageID(                 input.Buffer,                 column.LineageID);         }     }     public override void PostExecute()     {         base.PostExecute();     }     public override void Input_ProcessInputRow(InputBuffer Row)     {         int colsCount = m_idx.Length;         int cColLineageKey;         if (colsCount > 0)
        {
            try
            {
                //stuff the errocode into the error_id column
                Row.Buffer[m_idx[i_error_id]] = Row.Buffer[m_idx[i_error_code_id]];
            }
            catch (Exception ex)
            {
            }
 
            try
            {
                //get the value for the "primary key" column
                Row.Buffer[m_idx[i_record_id]] = Row.Buffer[m_idx[key_col_id]];
            }
            catch (Exception ex)
            {
            }
 
            try
            {
                //get the value for the "primary key" column
                Row.Buffer[m_idx[i_record_id_dsc]] = key_col_name;
            }
            catch (Exception ex)
            {
            }
 
            try
            {
                //get the error description
                Row.Buffer[m_idx[i_error_dsc]] = (ComponentMetaData.GetErrorDescription(int.Parse(Row.Buffer[m_idx[i_error_code_id]].ToString())));
            }
            catch (Exception ex)
            {
            }
 
            try
            {
                //get the name and value of the column that failed.
                if (i_error_column_id != null && i_error_column_id > 0 && i_error_column_id  0)
                        {
                            if (colmap.TryGetValue(cColLineageKey, out columnName))
                            {
                                //use the lineage_id to pull the column name
                                //columnName should be set
                                if (cColLineageKey != null && cColLineageKey > 0 && columnName != null && columnName.Length > 0)
                                {
                                    if (colpositions.TryGetValue(columnName, out currentposition))
                                    {
                                        //use the lineage_id to pull the column name
                                        //current position should be set
                                    }
                                }
                                if (cColLineageKey != null && cColLineageKey > 0 && currentposition >= 0)//&& currentposition != null)
                                {
                                    if (colidsbyposition.TryGetValue(currentposition, out cColLineageKey))
                                    {
                                        //use the lineage_id to pull the column name
                                        //current position should be set
                                    }
                                }
                            }
                            else
                            {
                                cColLineageKey = cColLineageKey + 1;
                            }
                        }
                        else
                        {
                            //probably a "source error output"
                            cColLineageKey = cColLineageKey + 1;
                            //MAJOR MAJOR MAJOR HACK
                            //apparently, we do NOT persist the ORIGINAL LINEAGEID from source to output, so we need to... adjust... the number.
                            // this is EXCEPTIONALLY RISKY, but since MS "adjusts" the output rows for errors to have be "different" from the "it works!" destination
                            // we don't have much of a choice.  In reviewing them #'s it appears they consistently increment for errors, so we need to increment the
                            // index here to find the right value.  Horrible stuff.  Likely to break.  Enjoy.
                        }
                    }
 
                    //Retrieve from the column names dictionary and place column name in error info
                    if (i_column_nm != null && i_column_nm > 0 && i_column_nm  0)
                        {
                            if (colnames.TryGetValue(cColLineageKey, out value))
                            {
                                //use the lineage_id to pull the column name
                                Row.Buffer[m_idx[i_column_nm]] = value;
                            }
                        }
                    }
                    //get the missing column value for the key found at the identified "error column"
                    //had issues where the column blew up because of data type conversion issues, so try/catch is here to help handle this
                    try
                    {
                        if (i_error_data != null && i_error_data > 0 && i_error_data  0)
                            {
                                if (colids.TryGetValue(cColLineageKey, out colvalue))
                                {
                                    //use the lineage_id to pull the column name
                                    //NOTE: "bad" data MAY be totally thrown out here, which is why we're using the try/catch
                                    //if the custom CozyRoc row processor dies due to formatting errors then this will throw an exception
                                    //we're just going to ignore that and roll on by
                                    //probably worth revisiting at a later date to see if we can get at the bad data anyway
                                    Row.Buffer[m_idx[i_error_data]] = Row.Buffer[m_idx[colvalue]].ToString();
 
                                }
                            }
                        }
                    }
                    catch (Exception vEx)
                    {
                    }
 
                }
                else
                {
                }
            }
            catch (Exception ex)
            {
            }
        }
 
    }
 
}

ETL Assistant – Using CozyRoc’s Parallel Loop Task

For the TLDR crowd – I’m supplying downloads of the packages so you can just open them and play.

Download “Cozy Roc Parallel Loop Demo Files” contains:

  • setup_sql.txt
  • SequentialLoop.dtsx
  • ParallelLoop.dtsx

To use the demo files you’ll need to have at least the evaluation copy of the CozyRoc components installed (you can get 32bit and x64 versions from the CozyRoc site http://www.cozyroc.com/products)

Loops in SSIS

SSIS provides a very handy loop task – you supply a collection (of type object) and iterate through that object, executing steps or processes for each item in the collection.

Microsoft’s description (http://msdn.microsoft.com/en-us/library/ms139956.aspx) of the task:

“The For Loop container defines a repeating control flow in a package. The loop implementation is similar to the For looping structure in programming languages. In each repeat of the loop, the For Loop container evaluates an expression and repeats its workflow until the expression evaluates to False.”

Great – we can now loop through a set of data.

  • For a given group of something (a collection)
  • Iterate through the collection (a variable / instance)
  • For each instance, execute a process

In the case of ETL Assistant we use this to do the following:

  • We have a concept of a scheduling “group” – a set of source::destination table mappings. Let’s say I want to manage a set HR-related items (department list, employee list, address information, etc.) as a group (easier than managing individual tables).  I can put them into a “group” (collection).  Let’s call this “HR tables.”  I can do the same with a set of patient information (patient / person list, encounter /  visit information, and possibly some other patient demographic information).  Let’s call this “Patient tables.”
  • I can, for each group, pull back a list of tables (instances)
  • For each table I can execute a dynamic ETL on them to pump data from a source to a destination (EX: Oracle::employee -> SQL Server::employee)

The loop task does a great job managing simple collections and executing an operation per item.

The problem?  I’m now executing all of this serially.

SSIS Standard ForEach Loop

This means if I have a fairly beefy server I’m still potentially sitting idle while I do a simple set of ETL operations.  You have several ways to address this, but one I’ve found attractive is to convert from a serial ForEach loop to a parallel ForEach loop using the Parallel Loop Task from CozyRoc.  This will let us do n-parallel executions of a given operation.  If you have a 64 core host, for example, and the diagram above represented tables you wanted to load from a remote source, you could execute A, B, and C loads in parallel.

CozyRoc Parallel Loop Idea

Let’s get back to that example using the HR tables (department list, employee list, address information).  I can create a “group” (“HR”), then places these three tables into the HR group.  When I run a process to pull over the HR group I reference the group, pull back the three table references and place them into a collection.  I iterate through the collection and for n items in the collection, execute a task.

CozyRoc Parallel Loop Task

CozyRoc Parallel Loop Task

 

The Sequential ForEach Loop in SSIS

Let’s do some quick setup steps to prep this test scenario

--create a test schema
CREATE SCHEMA cozyroc AUTHORIZATION dbo
GO 
 
--this is our "group" table
--EX: HR
CREATE TABLE cozyroc.etl_groups (
	group_id INT IDENTITY(1,1) NOT NULL,
	group_nm VARCHAR(100) NOT NULL,
	group_dsc VARCHAR(255)
CONSTRAINT [PK_cozyroc_etl_groups_group_id] PRIMARY KEY CLUSTERED
(
	[group_id] ASC
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF,
ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]
 
--this is our "table" table
--EX: employees, addresses, etc.
CREATE TABLE cozyroc.etl_tables (
	table_id INT IDENTITY(1,1) NOT NULL,
	table_nm VARCHAR(100) NOT NULL,
	table_dsc VARCHAR(255)
CONSTRAINT [PK_cozyroc_etl_tables_table_id] PRIMARY KEY CLUSTERED
(
	[table_id] ASC
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF,
ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]
 
--this is our associative table to store
--  the mapping from group::table
--  I'm using this in the example because in later
--  posts we'll allow the table to be "grouped"
--  multiple times
CREATE TABLE cozyroc.etl_group_tables (
	group_table_id INT IDENTITY(1,1),
	group_id INT NOT NULL,
	table_id INT NOT NULL
CONSTRAINT [PK_cozyroc_etl_group_tables_group_table_id] PRIMARY KEY CLUSTERED
(
	[group_table_id] ASC
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF,
ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]
 
--insert a sample group
INSERT INTO cozyroc.etl_groups (group_nm, group_dsc)
VALUES ('HR', 'HR Group')
 
--insert some sample tables
INSERT INTO cozyroc.etl_tables (table_nm, table_dsc)
VALUES ('Employees', 'Employees Table')
INSERT INTO cozyroc.etl_tables (table_nm, table_dsc)
VALUES ('Departments', 'Departments Table')
INSERT INTO cozyroc.etl_tables (table_nm, table_dsc)
VALUES ('Addresses', 'Addresses Table')
 
--blindly cross join everything
INSERT INTO cozyroc.etl_group_tables (group_id, table_id)
SELECT
	g.group_id, t.table_id
FROM
	cozyroc.etl_groups g,
	cozyroc.etl_tables t
 
--now let's also create a logging table
-- this is a placeholder for more complex
-- operations
CREATE TABLE cozyroc.parallel_test (
	log_id INT IDENTITY(1,1) NOT NULL,
	group_table_id INT NOT NULL,
	group_id INT NOT NULL,
	table_id INT NOT NULL,
	execution_dts datetime2(7) DEFAULT GETDATE()
CONSTRAINT [PK_cozyroc_parallel_test_log_id] PRIMARY KEY CLUSTERED
(
	[log_id] ASC
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF,
ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]

 

This will have created several tables:

  • cozyroc.etl_groups (“HR”)
  • cozyroc.etl_tables (“employees,” “addresses,” etc.)
  • cozyroc.etl_group_tables (mapping “employees” to the “HR” group, for example)
  • cozyroc.parallel_test (our fake table we’re using to test the parallel loop)

You don’t need all of this for the CozyRoc Parallel Loop Task to work, but I’m trying to just introduce some examples we’re going to use in later posts related to dynamic ETL.

Now let’s create a package

  1. Launch BIDS and create a new SSIS project
  2. Create an OLEDB connection to the server and database where you created the tables (in my case, that’s a database called “cozyroc” on localhost)
  3. For convenience, make sure you have the “variables” panel open (SSIS -> Variables)
  4. Drag a few things onto the workspace:
    1. an Execute SQL Task
    2. a ForEach Loop Container task
    3. drag another Execute SQL Task into the ForEach Loop Container (make sure it’s placed inside of the container)
  5. Create a package-level variable called “mylistoftables” – make it of type “Object”
  6. Click on the parallel loop task so it’s highlighted – now create a variable called “iter” and make it of type “int32.”  Clicking on the ForEach Loop and then creating the variable will scope “iter” to the loop – make sure the scope of the variable is correct.
  7. Now let’s create another variable also scoped to the ForEachLoop.  Let’s call it “SQL_insert” and make it of type String.  We’re going to set this up to hold a sample insert statement so we can watch the loop in action.
    1. For the “sql_insert” variable, set EvaluateAsExpression to “True”
    2. Open the Expression and enter the following:
      "insert into cozyroc.parallel_test (group_table_id, group_id, table_id)
      select gt.group_table_id, gt.group_id, gt.table_id
      from cozyroc.etl_group_tables gt
      where gt.group_table_id = " + (DT_WSTR, 100) @[User::iter]
  8. Connect the first SQL Task to the ForEach Loop

Your package should now look something like…

SSIS ForEach Loop Package Setup

Now let’s set up the SQL Task

  1. Double-click the SQL Task to open the configuration
  2. On the “General” panel,
    1. Set the “Result Set” property to “Full Result Set”
    2. Set the Connection property to point to your database (ex: “localhost”)
    3. Set the SQLSourceType to “Direct Input”
    4. Set the SQLStatement to “select * from cozyroc.etl_group_tables”
    5. ssis_loop_sql_collection_statement
  3. Now go to the “Result Set” panel
    1. Set “Variable Name” to point to your collection “User::mylistoftables” – this is where we’re going to store the results of the query
    2. ssis_loop_sql_collection_result

What did we just do here?  We told the SQL Task to execute a query (“get me everything from the coyroc.etl_group_tables table”) and then store the results in our “mylistoftables” object.  Pretty straight forward.

Let’s proceed to setting up the loop

  1. Double-click on your ForEach Loop Container to open the properties panels
  2. On the “Collection” panel
    1. Set the “Enumerator” to ForEach ADO Enumerator
    2. Set the ADO object source variable to your collection “User::mylistoftables”
  3. Now on the “Variable Mappings” panel
    1. Set the variable to “User::iter” and the Index to “0″ (first column of the result set)

In this step we told the ForEach Loop Container to loop through the “mylistoftables” collection – on the first table – and set the “User::iter” variable to the first column as it loops.  Keep in mind – and this is critical for later – you’re using the User::iter variable scoped to the ForEach Loop Container.

Alright – we’re almost done setting up the basic loop.  Now we just have to wire up a task that the loop executes.  In the SQL Task within your loop, set the loop to execute the “sql_insert” statement

  1. Double-click the second SQL Task to open the configuration
  2. On the “General” panel,
    1. Set the Connection property to point to your database (ex: “localhost”)
    2. Set the SQLSourceType to “Variable”
    3. Set the SQLStatement to your “User::sql_insert” variable

Run the package and, if there are no errors, pop over to SQL Server Management Studio for a minute.

Run a query to quickly look at the results of the package:

SELECT * FROM cozyroc.parallel_test

You should see something like…

log_id	group_table_id	group_id	table_id	execution_dts
1	1	1	1	2011-11-04 13:15:00.9500000
2	2	1	2	2011-11-04 13:15:00.9800000
3	3	1	3	2011-11-04 13:15:01.0100000

Note the dates and times. See how there are slight differences in the dates? The dates are clearly following a pattern where later group_table_ids have later dates? This is the result of the loop running sequentially.

Converting to the CozyRoc Parallel Loop Task

Let’s upgrade this to a Parallel Loop Task.  Hang on – things are about to get weird.

First things first.  Let’s quickly throw on some more components and variables as well as tweak some other bits.

  1. Drag a “Parallel Loop Task” onto the canvas
  2. Delete the link from your first SQL Task to the ForEach Loop.  Where we’re going we don’t need that flow.
  3. Now connect that first SQL Task to the Parallel Loop Task
  4. Double-click on the Parallel Loop Task to open up the configuration panel
    1. Click on the Package Connection property and set the connection to “<New Connection>.”  When the dialog box opens, make sure “Connection to Current Package” is checked and hit “OK.”  We just told the Parallel Loop Task to talk to this package when executing.  Right – this basically just became a “meta package” with execution steps.  Think of this like it’s own self-referencing parent-child package.
    2. Now – still inside the configuration panel of the Parallel Loop Task – click on the ForEachLoop configuration item – a new popup should appear.  Click on the name of your ForEach Loop within this package.
  5. Your final Parallel Loop Task configuration should look something like this

Now we’re cooking.  Only a few simple changes left.

  1. Disable the main ForEach Loop.  We no longer manage it – the Parallel Loop Task does.  It enables/disables this as it fires each instance of this package.  If we left the loop enabled things would get very messy – you’d have sequential instances of the loop firing within each parallel instance of this package.  Loops in loops – very loopy.
  2. Create a new package-level variable called “Iter” of type “int32.”   That’s right – we have a package and a loop variable now.

That’s it.  That’s really all you have to do to take a sequential loop and turn it into a parallel loop. Your final package should look something like this:

Give it a quick test run and then go back and re-run that query to look at the dates and times.

SELECT * FROM cozyroc.parallel_test

Your execution dates and times should now be much, much closer to each other if not completely identical:

log_id	group_table_id	group_id	table_id	execution_dts
4	2	1	2	2011-11-04 13:36:06.3230000
5	3	1	3	2011-11-04 13:36:06.3300000
6	1	1	1	2011-11-04 13:36:06.3300000

See that?  Granted, this is a fairly pointless test case, but you get the idea.  By default the Parallel Loop Task iteration setting is set to “-1″ (as many cores as you have).  You may want to play with this (or better yet expose it as a runtime configuration property) depending on your situation.

Next up we’re going to step into how the CozyRoc DFT+ component can make your life easier by side-stepping SSIS’s age-old static design-time column mapping problem. Combined with the Parallel Loop Task, that’s when things really start to get interesting.

Further reading:

Introducing ETL Assistant – A Dynamic, Templated Approach to SSIS

TLDR
This is the first post in a series where I’m going to explain how to do a few things:

  1. Use the CozyRoc Parallel Loop Task to execute ForEach Loops in parallel
  2. Create a package that will handle connections to and push data between arbitrary OLEDB systems
  3. Dynamically map columns in SSIS at runtime
  4. Dynamically gather error data (column name, column value, error description, etc.) at runtime in SSIS (no, really – it works)
  5. Create a templated package system with inline variables that can be replaced at runtime (don’t get too excited – it’s simple string replacement :) )
  6. All sorts of other nutty and, I hope, useful abuses of SSIS
  7. Create an ASP.NET MVC 3 app on top of everything to help you manage the insanity

UPDATE: While I’ve slacked off in posting detailed entries we have posted a “screenshot gallery” of the web management UI. That may help explain where we’ve gone with this. Enjoy!

This post is mainly prefacing why you’d try to do any or all of this.  Later posts will get into more technical stuff.

And now – the long version…

This is my first blog entry.  Not my first this month or first for the EDW, but first ever.  I’m happy that I finally have both something to blog about as well as the opportunity to blog – Northwestern allows and encourages the sharing of information that at most organizations would be considered proprietary.

Good.  Now that we’re past that, let’s get onto the more interesting stuff.  I joined the team here late in 2010 and was immediately impressed by four things:

  1. The team itself – they’re smart, hard-working, and humble.  Can’t beat that power combo.   We also have a large, extended family of indirect team members and organizational supporters who have the same drive, enthusiasm, and dedication to our mission.
  2. The sheer volume of data we deal with.  In terms of both breadth and depth it can be a bit intimidating.  And it’s alive.  It’s an organism.  It’s ever growing, ever changing.  The data just can’t sit still.  You might get something working only to find that the data structures or data within the structures has changed in small, subtle ways that create challenges for everyone.
  3. It’s in our DNA to constantly evolve and improve – and one vital part of that constant improvement is our desire to reduce manual tasks that could be safely and effectively automated.  I’ve been at shops where the entire business model was based around drudgery and inefficiency because they either didn’t get it (sad, but true) or charged clients by the hour.  Either of those are the kiss of death to keeping clients happy, your costs competitive, and your team a happy, hard-working, creative bunch.  Smart, creative people don’t like doing the same task over and over.  They like to solve the problem once and move on.  Smart, creative people are also the types who work hard to improve things because they see a problem (ahem – “opportunity…”) and need to solve it.   In our line work, when you genuinely care about how you’re helping improve medicine, that desire to improve and that unwillingness to let something by “good enough” is critical.
  4. The tools that have been built in-house to help reduce manual labor are pretty awe-inspiring.  You can read about them on our blog, but one in particular really caught my attention early on – PackageWriter.
PackageWriter was built by EDW-alum Eric Just – a great example of the creative, hard-working, “good enough is not enough” breed we have on our team.  Eric cooked up PackageWriter as a way to help reduce manual labor with the construction of SQL Server Integration Services (SSIS) packages that we need to churn out when we need to bring new data sources into the EDW.   Now – let’s set expectations real quick.  When we bring on a “small” data source it could be something on the order of ~15GB in size and comprised of 150+ tables and 170 million rows of data.  If you’ve used SSIS you know that to bring in that “small” system for a full load means you need to set up a data flow task (DFT) process per table and then combine those DFTs into a package you can schedule.
Let’s do some basic math on that.
  • Assuming you have 150 tables, each needs a separate DFT, so that’s 150 DFTs
  • Let’s say you can set up do a single DFT by hand in about 10 minutes
  • Let’s also assume you want to keep things semi-sane, so you have a maximum of 10 DFTs per SSIS package
  • We’re now at… 15 packages with 150 DFTs that took you 10 minutes per DFT.  That’s 25 hours.  In work days if you assume you might get 7 real hours per day of time to work on this that’s over 3.5 days to set up that import.

Ouch.

But – the work’s not over.  You still need to actually build the destination tables in your data warehouse, so you also need to build table DDL scripts.  You may be moving data from Oracle, PostgreSQL, MySQL, or any number of systems that require you to rewrite those DDL scripts in SQL Server-friendly form.  And, if you care about the quality of your data, you’ll really want to build in error row logging (not included in the 10 minutes per DFT above).  Importing that system doesn’t sound like it’s all that much fun or valuable compared to the “real work” you’ve also been tasked with completing (you know – actually using the data).

Enter PackageWriter.

PackageWriter is a beast of burden sent to do your bidding and lighten your workload by automating everything end to end.  You feed it a source database connection string, a few configuration options like destination schema, filegroup, etc. and then enter the list of source tables you want to import.  From there, it’s magic.  It pops out the DDL scripts (using data type conversion defaults you’ll want to review) and emits all of the packages for you, including the error row logging.

Total time for the equivalent effort?

  • You need to type in the database connection string, filegroup default, schema default, and source table names.  I’m lazy, so I listed the table names from the source system table catalog.
  • PackageWriter generates the destination DDL. You then use that DDL to create your tables in SQL Server.
  • Next up, PackageWriter emits the SSIS packages for you.  All of them.  And it crafts SSIS’s arcane error row mapping.
  • Total time for the same 150 tables?  About 30 minutes.

Wow.  That’s just astounding.  And it’s a huge relief.  Now you can get back to your real job.  Bless you, PackageWriter.

I began using PackageWriter to assist with the import of data related to one of the systems we replicate into our EDW.  It’s a larger system – something on the order of about 4,000 tables and TB upon TB or disk.  We currently only replicate in ~150 or so tables of that 4,000 table catalog, but we constantly receive requests to pull in more.  I usually field these requests by firing up PackageWriter and creating a full truncate/reload process for the new tables.  This worked out well until I got a request for a somewhat larger table (~200GB) that you wouldn’t want to reimport nightly.  Happily, poor Eric Just, the author of PackageWriter, sat next to me at the time (I’ve mentioned he’s now an EDW alum – I’m not sure if my proximity to him is related to his now being an “alum”).  I turned to Eric, who was ever too polite and didn’t seem to mind my constant, unending stream of questions.  “How do we incremental loads with PackageWriter?”  To which he replied “You don’t.  Incremental loads need to be written by hand since the queries are table-specific and depend on how changes are managed.  Is there a change date on some column you can use to compare the source and destination table timestamps?” Ah.  I see.  It made perfect sense.  There was no good way to write a generic query for “what changed?” across a series of tables.  And our data volume is just too large for many of the other alternatives you might choose.  My favorite magic bullet would get me part way there, but I needed to go back to writing packages by hand again.

I was new to SSIS when I joined the EDW.  I had worked with Data Transformation Services (DTS – the previous incarnation of SSIS) a few times before, but hadn’t needed to do anything that complex or critical (I’ve traditionally been more of a web-type-guy or a PM).  Luckily, the concepts in SSIS are fairly straight-forward and Microsoft has done a pretty darn good job making SSIS far, far more flexible than DTS.  Using PackageWriter as a starting point I began looking at our tables for this one, huge datasource and we lucked out – all of our tables have a common column called “updt_dt_tm” – the date/time the row was updated.  I could easily come up with a statement to handle that and stage the data for eventual reloading.

Our process is intentionally simple:

  1. Get the max(updt_dt_tm) from the local replica table
  2. Using that max(updt_dt_tm), get the update timestamp is more recent (where updt_dt_tm > [our max updt_dt_tm])
  3. Stage the data in a holding table
  4. Delete the rows from our local replica where there are matches in staging
  5. Insert the new rows from staging into our local replica

I thought to myself – well – if this is the case, then the SQL is nearly 100% identical across all of our tables.

select * from employee where updt_dt_tm > ‘January 1, 2011’
select * from phone where updt_dt_tm > ‘January 1, 2011’
select * from address where updt_dt_tm > ‘January 1, 2011’

(Look pretty similar, don’t they?)

The only real difference is the reference to table names and the final column mappings in SSIS.  When transferring data from table A to table B SSIS must map columnA to columnB – it then resolves data conversion issues, etc.

Hrm.  If I could dynamically map those columns then I could build one, simple incremental package that could be used for all tables – all that’s really changing is the SQL statement, source connection / table information, and destination table / connection information.

ETL Assistant - Dynamic Table Column Mapping

ETL Assistant - Dynamic Table Column Mapping

One small problem – SSIS can’t dynamically map columns at runtime (not without using the API to dynamically construct the package in memory).  Major, but understandable short-coming of the overall toolkit.  Handling dynamic mapping, as I now know, introduces all sorts of new and unusual problems that make you stay up all night wondering why you got into IT.  I started scouring the Internet to see if anyone else had tried to do this (why re-invent the wheel?).  I kept seeing posts from this shop called CozyRoc, noting they could, in fact, do this and more.  At this point I turned to Eric Just again and asked him if he’d ever heard of this “CozyRoc.”  “Oh, yeah – totally.  Great stuff.  We use their zip task.”  Oh, I see.  I began tearing into the CozyRoc control flow and data flow components and found exactly what I was looking for.

  • Data Flow Task Plus” – “allows setup of dynamic data flows. The dynamic setup options can be controlled with parameters from outside. This feature lessens the need to manually open and modify the data flow design, when new source and destination columns have to be accommodated.”
  • Parallel Loop Task” – “is SSIS Control Flow task, which can execute multiple iterations of the standard Foreach Loop Container concurrently. In tests, a CPU intensive sequential process when executed in parallel on 4-core machine was executed 3 times faster compared to the sequential.”

Whoah.  Hold up.  We can now dynamically map columns and run n-iterations of this task in parallel?  Sign me up.  With that I could unleash our server on poor, unsuspecting source systems in no time flat.  If we could do that, then we could replace hundreds of packages with just two packages.

ETL Assistant - Parallel Package Invoker

ETL Assistant - Parallel Package Invoker

And on that same day, in a matter of a few clicks, we delivered the first iteration of “ETL Assistant.”  ETL Assistant has been rebuilt many times since then, but at its core it does a few things:

  • Connects to a source OLEDB system.  Just supply a connection string (I’ve tried it with MSSQL, Oracle, and PostgreSQL thus far)
  • Pulls in the table catalog for that system (based on parameters you specify)
  • Checks for the existence of those tables in your destination system
  • Lets you configure default “global” definitions of query templates for full and incremental jobs – you can then override those templates on a table-by-table basis
  • Lets you create inline variables that can be used to gather small snippets of data in both the source and destination systems at runtime.  They can then be incorporated into your source queries.  Useful for those “find me the latest date in the destination – then get me more recent data from the source system” scenarios.  This really helps unleash more dynamic aspects of the system.
  • Abusing the inline variables we can also set batch windows on the queries so you can get recurring n-row series of rows if you need to (useful when you’re pulling in those 2+TB source tables and want to do so in 10mm row increments)
  • You can then set up “groups” of tables that you can schedule in SQL Server
  • Custom logging – needs it because the existing standard SSIS logging would be useless
  • Error row logging to a table (infinitely useful)

Really, it’s what you would do by hand, so why not let the computer do it for you?  That’s what they’re for.

As a bit of a tease, let me give you a quick example of how this is paying off for us.  I was asked to import a new source Oracle system with 155 tables.

Here’s how we did it

  1. Set up source system connection information (5 minutes – I kept messing up the connection string)
  2. Apply existing query template for “full select from SQL Server” (the web UI has some helper templates built into it since I’m lazy) (1 minute)
  3. Pull in table catalog (2 minutes)
  4. Use TableWriter (part of PackageWriter and now also incorporated into ETL Assistant) to give us helper DDL scripts for the destination tables.  This is an asynchronous ajax call to TableWriter, so it’s pretty quick.  (1 minute)
  5. Create tables in destination SQL Server (2 minutes)
  6. Create a group (for scheduling in SQL Server) (1 minute)
  7. Put tables in group (2 minutes)
  8. Open up SQL Server and create a new job pointing to our parallel ETL Assistant package – supplying the table group ID as a package parameter (5 minutes)
  9. Run the package.  (20 minutes)

Total time end to end? ~40 minutes to bring in 155 tables containing 170mm rows and consuming 18.5 GB of disk.  Not bad.

But – the cool part?  We get that incremental update ability.  And, since we’re using a dynamic column map inside the package, we can go back and adjust column names and data types in the destination table without needing to change and redploy the package.  And – if we need to change table replication behavior we can simply adjust either the overall “global” query template for that data source or override it for a specific table.  Or – let’s say you need to globally go back and re-load data from a specific date range – just override the global template and run the package.  The point is – we’re now open to a wee bit more flexibility than we were previously accustomed.

In the next few posts I’ll be delving into how to get things done.  I’ll try to give practical examples and insights so you can get more done with less – because in IT, wanting to get rid of the monotonous churn isn’t being lazy, it’s being efficient.