Tuesday, April 28, 2015

[ETL] A dynamic CSV ripper that's forwards/backwards compatible, in T-SQL

Had this problem coming, so wanted to get ahead of it:
Dev releases code, version 2.1, which has a CSV with 4 columns.
[2 days pass]
Dev releases code, version 2.1.1, which has a CSV with 5 columns.
[2 days pass]
Dev releases code, version 2.1.2, which has a CSV with 6 columns.

Since we relational DBAs have this fun thing called "schema", importing this gets hard.  SSIS will choke because it's different each time.  BULK INSERT will choke as the file type has changed.  Inserting via OPENROWSET and the text connector sucks because 64-bit servers require you to install the Excel pack. Modifying the import each time blows for obvious reasons.

So, time to use the DBAs secret weapon, Dynamic SQL.  (Which, yes, lots of potential problems, holes, etc.  See Erland's seminal paper "The Curse and Blessings of Dynamic SQL").

So what did I build?  Practically, it imports any delimited file with a header row, compares that to the end table, and inserts just those fields.  Need that field in your end table? Add it to the target table, and the next run will pick it up. 

Warning: it's SLOW. Not sure which part, but there's plenty of blame to go around on my part. Dynamic Shreds, Dynamic Pivots, Pivots, etc. You could replace the Pivot with a CLR Dynamic Pivot floating around out there (which is probably the majority of it), replace the comma parsing with Adam Machanic's, and that may get you to a happy spot.  For me, though, this is ideal. 

My next version of this process will probably be in Powershell, in case someone hasn't already done it.  That could be slick and really fast- import-csv, get the column list as a variable, match up to another variable with the columns from the final table, and use the resultant as a hash table for a select.  Next time, next time.

Dependency: right now, Jeff Moden's 8k splitter.

Order of operations for this code:
  • get server list (naturally, this needs the file from multiple servers) 
  • For each server...
  • import first row (header has the field names) into a table
  • split CSV into a list of columns.  
  • import file into wide table
  • split CSV to a keypair-style table, using Jeff Moden's awesome DelimitedSplit8K splitter (may need to change this to use Adam Machanic's CLR)
  • update the table so that blank fields are now NULL (since blank fields become '', which converts to 0 if bigint, but fails converting to decimal.)
  • build a pivot to go from the keypair into your target table
    • compare the columns (sys.columns or information_schema.columns) between the header and the target table
    • match the data and header to make a good insert statement
  • use the pivot to insert into your target table
SET QUOTED_IDENTIFIER ON
SET ANSI_NULLS ON
GO
/*
2015/04/22 - mdb - 2.00 - changed to pull raw CSV, shred via function, pivot into a virtual table, then insert into
actual table based on existing fields. Practically: forwards/backwards compatability.
Add new fields to your target table, and as they show up in the new file they'll be added.
Practically, they'll have to rename the file when they do this, so that we know the
names of the new fields. But that's just a parameter change in the job.
2016/03/25 - mdb - 2.01 - found a bug where inserts may fail when the source data uses scientific notation. Added statement
that UPDATEs the staging table where the value is "like '%E-0%'". Converts to float(53), then dec(28,15).
Commented out by default because you might have E-0 in a text field, and THAT would break it.
requires Jeff Moden's 8k CSV parser, or something like it.
*/
--BULK INSERT requires dynamic SQL
DECLARE @filename VARCHAR(500), @sql NVARCHAR(4000), @base_filename VARCHAR(500), @servername VARCHAR(255)
DECLARE @beginning_of_filename VARCHAR(30) = 'filenamestartswith'
DECLARE @Columns VARCHAR (MAX), @Columns_Short VARCHAR (MAX), @Columns_Insert VARCHAR(MAX)
DECLARE @ErrorMessage NVARCHAR(4000);
DECLARE @ErrorSeverity INT;
DECLARE @ErrorState INT;
DECLARE @serverlist_to_do TABLE (id INT IDENTITY, servername sysname)
INSERT INTO @serverlist_to_do
--uses OPENROWSET to pull a serverlist from my repository; can use a linked server or just hit locally.
SELECT server FROM OPENROWSET('SQLNCLI', 'Server=myrepo;Trusted_Connection=yes',
'select server from serverlist')
-------------------
--FOR EACH server--
-------------------
declare @min INT, @max INT
SELECT @min = MIN(id), @max = MAX(id) FROM @serverlist_to_do
while @min <= @max
BEGIN
--wipe out variables that are added each time!
SELECT @sql = '', @ErrorMessage = NULL, @ErrorSeverity = NULL, @ErrorState = NULL
,@Columns = NULL, @Columns_Short = NULL, @Columns_Insert = NULL
SELECT @servername = servername FROM @serverlist_to_do WHERE id = @min
SELECT @filename = '\\' + @servername + '\c$\temp\' + @beginning_of_filename + '_'
--note that the date format is screwy; using a few minutes ago so that it auto-rolls.
+ FORMAT( GETDATE()-.003, 'dd.MM.yyyy', 'en-US' ) + '.csv'
SELECT @base_filename = RIGHT(@filename, CHARINDEX('\',REVERSE(@filename))-1)
-----------------
--Import Header--
-----------------
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' BULK INSERT of header begins for ' + @filename
TRUNCATE TABLE myimport_header_stage
SET @sql =
'BULK INSERT myimport_header_stage
FROM ''' + @filename + '''
WITH
(
LASTROW = 1,
FIELDTERMINATOR = ''\0'' --the \0 is "null terminator"; needed to make sure it doesnt try and parse
)'
BEGIN TRY
EXEC sp_executesql @sql
END TRY
BEGIN CATCH
PRINT 'File either locked, Does Not Exist, or format has changed; see error message for more details'
SELECT @ErrorMessage = ERROR_MESSAGE(),
@ErrorSeverity = ERROR_SEVERITY(),
@ErrorState = ERROR_STATE();
IF @@TRANCOUNT > 0
ROLLBACK TRANSACTION;
PRINT 'Error Severity: ' + CONVERT(VARCHAR(30), @ErrorSeverity)
PRINT 'Error State: ' + CONVERT(VARCHAR(30), @ErrorState)
PRINT 'Error Severity: ' + @ErrorMessage
--commented this out so we can run it without files.
--RAISERROR (@ErrorMessage, -- Message text.
-- @ErrorSeverity, -- Severity.
-- @ErrorState -- State.
-- );
END CATCH
---------------
--Import Data--
---------------
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' BULK INSERT of data begins for ' + @filename
--inserts into view in order to add the ID column so the PIVOT works. insertview is just a select *, minus the ID
TRUNCATE TABLE myimport_wide_stage
SET @sql =
'BULK INSERT myimport_wide_stage_insertview
FROM ''' + @filename + '''
WITH
(
FIRSTROW = 2,
FIELDTERMINATOR = ''\0'' --the \0 is "null terminator"; needed to make sure it doesnt try and parse
)'
BEGIN TRY
EXEC sp_executesql @sql
END TRY
BEGIN CATCH
PRINT 'File either locked, Does Not Exist, or format has changed; see error message for more details'
SELECT @ErrorMessage = ERROR_MESSAGE(),
@ErrorSeverity = ERROR_SEVERITY(),
@ErrorState = ERROR_STATE();
IF @@TRANCOUNT > 0
ROLLBACK TRANSACTION;
PRINT 'Error Severity: ' + CONVERT(VARCHAR(30), @ErrorSeverity)
PRINT 'Error State: ' + CONVERT(VARCHAR(30), @ErrorState)
PRINT 'Error Severity: ' + @ErrorMessage
--commented this out so we can run it without files.
--RAISERROR (@ErrorMessage, -- Message text.
-- @ErrorSeverity, -- Severity.
-- @ErrorState -- State.
-- );
END CATCH
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' BULK INSERT ends for ' + @filename
--===== Split the CSV column for the whole table using CROSS APPLY
--save into a staging table to make the pivot back easier.
TRUNCATE TABLE myimport_stage_split
INSERT INTO myimport_stage_split
(id, ItemNumber, Item)
SELECT stage.id, split.ItemNumber, split.item
FROM myimport_wide_stage stage
CROSS APPLY DelimitedSplit8K(resultant,',') split
--this is needed because blank values ('') don't convert to null; they come back with error converting varchar to decimal
UPDATE myimport_stage_split SET item = NULL WHERE item = ''
--2.01 optional fix for scientific notation.
--UPDATE myimport_stage_split
--SET item = CONVERT(DECIMAL(28,15),CONVERT(FLOAT(53),item))
--WHERE item LIKE '%E-0%'
----------------------
--Building the PIVOT--
----------------------
SELECT @Columns=COALESCE(@Columns + ',','') + QUOTENAME(ItemNumber) + ' as ' + QUOTENAME(item)
, @Columns_Short = COALESCE(@Columns_Short + ',','') + QUOTENAME(ItemNumber)
, @Columns_Insert = COALESCE(@Columns_Insert + ',','') + QUOTENAME(item)
FROM
(
SELECT DISTINCT split.ItemNumber, headers.item
From myimport_stage_split split
INNER JOIN
(
SELECT split.ItemNumber, split.item
FROM myimport_header_stage headers
CROSS APPLY DelimitedSplit8K(resultant,',') split
)
headers
ON headers.ItemNumber = split.ItemNumber
INNER JOIN INFORMATION_SCHEMA.COLUMNS
ON TABLE_NAME = 'mytargettable'
AND columns.COLUMN_NAME = headers.item
) AS B
ORDER BY B.ItemNumber
--We need the CTE so that we can calculate the import_datetime more easily.
SET @SQL='
;with CTE_Import as
(
SELECT ''' + @servername + ''' AS server_name, ' + @Columns
+ ', ''' + @base_filename + ''' as import_filename'
+ ' FROM
(
SELECT id, ItemNumber, item FROM
myimport_stage_split ) AS source
PIVOT
(max(item) FOR source.ItemNumber IN (' + @Columns_Short + ')
)AS pvt
)
insert into mytargettable (server_name, ' + @Columns_Insert + ', import_filename, import_datetime)
select server_name, ' + @Columns_Insert + ', import_filename,
CONVERT(DATETIME,SUBSTRING(RIGHT(''' + @base_filename + ''',14), 7,4) + SUBSTRING(RIGHT(''' + @base_filename + ''',14), 4,2) + SUBSTRING(RIGHT(''' + @base_filename + ''',14), 1,2)
+ CONVERT(datetime,minuteOfStats),120) as import_datetime
from CTE_Import
WHERE NOT EXISTS
(SELECT 1 FROM mytargettable main
WHERE main.server_name = ''' + @servername + '''
AND main.the_filename = ''' + @base_filename + '''
and CTE_Import.matchingfielda = main.matchingfielda
AND CTE_Import.matchingfieldb= main.matchingfieldb)
'
--make sure to handle nulls properly on the above matches!
PRINT @sql
--make sure the filename has a date in the valid format, otherwise the insert will blow up
IF (ISDATE(CONVERT(DATETIME,SUBSTRING(RIGHT(@base_filename,14), 7,4) + SUBSTRING(RIGHT(@base_filename,14), 4,2) + SUBSTRING(RIGHT(@base_filename,14), 1,2)
,120) ) )= 1
BEGIN
EXEC (@sql)
END
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' insert into _main ends for ' + @filename
SET @min = @min+1
END
GO
view raw dynamic_csv_etl hosted with ❤ by GitHub

No comments: