Dev releases code, version 2.1, which has a CSV with 4 columns.
[2 days pass]
Dev releases code, version 2.1.1, which has a CSV with 5 columns.
[2 days pass]
Dev releases code, version 2.1.2, which has a CSV with 6 columns.
Since we relational DBAs have this fun thing called "schema", importing this gets hard. SSIS will choke because it's different each time. BULK INSERT will choke as the file type has changed. Inserting via OPENROWSET and the text connector sucks because 64-bit servers require you to install the Excel pack. Modifying the import each time blows for obvious reasons.
So, time to use the DBAs secret weapon, Dynamic SQL. (Which, yes, lots of potential problems, holes, etc. See Erland's seminal paper "The Curse and Blessings of Dynamic SQL").
So what did I build? Practically, it imports any delimited file with a header row, compares that to the end table, and inserts just those fields. Need that field in your end table? Add it to the target table, and the next run will pick it up.
Warning: it's SLOW. Not sure which part, but there's plenty of blame to go around on my part. Dynamic Shreds, Dynamic Pivots, Pivots, etc. You could replace the Pivot with a CLR Dynamic Pivot floating around out there (which is probably the majority of it), replace the comma parsing with Adam Machanic's, and that may get you to a happy spot. For me, though, this is ideal.
My next version of this process will probably be in Powershell, in case someone hasn't already done it. That could be slick and really fast- import-csv, get the column list as a variable, match up to another variable with the columns from the final table, and use the resultant as a hash table for a select. Next time, next time.
Dependency: right now, Jeff Moden's 8k splitter.
Order of operations for this code:
- get server list (naturally, this needs the file from multiple servers)
- For each server...
- import first row (header has the field names) into a table
- split CSV into a list of columns.
- import file into wide table
- split CSV to a keypair-style table, using Jeff Moden's awesome DelimitedSplit8K splitter (may need to change this to use Adam Machanic's CLR)
- update the table so that blank fields are now NULL (since blank fields become '', which converts to 0 if bigint, but fails converting to decimal.)
- build a pivot to go from the keypair into your target table
- compare the columns (sys.columns or information_schema.columns) between the header and the target table
- match the data and header to make a good insert statement
- use the pivot to insert into your target table
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SET QUOTED_IDENTIFIER ON | |
SET ANSI_NULLS ON | |
GO | |
/* | |
2015/04/22 - mdb - 2.00 - changed to pull raw CSV, shred via function, pivot into a virtual table, then insert into | |
actual table based on existing fields. Practically: forwards/backwards compatability. | |
Add new fields to your target table, and as they show up in the new file they'll be added. | |
Practically, they'll have to rename the file when they do this, so that we know the | |
names of the new fields. But that's just a parameter change in the job. | |
2016/03/25 - mdb - 2.01 - found a bug where inserts may fail when the source data uses scientific notation. Added statement | |
that UPDATEs the staging table where the value is "like '%E-0%'". Converts to float(53), then dec(28,15). | |
Commented out by default because you might have E-0 in a text field, and THAT would break it. | |
requires Jeff Moden's 8k CSV parser, or something like it. | |
*/ | |
--BULK INSERT requires dynamic SQL | |
DECLARE @filename VARCHAR(500), @sql NVARCHAR(4000), @base_filename VARCHAR(500), @servername VARCHAR(255) | |
DECLARE @beginning_of_filename VARCHAR(30) = 'filenamestartswith' | |
DECLARE @Columns VARCHAR (MAX), @Columns_Short VARCHAR (MAX), @Columns_Insert VARCHAR(MAX) | |
DECLARE @ErrorMessage NVARCHAR(4000); | |
DECLARE @ErrorSeverity INT; | |
DECLARE @ErrorState INT; | |
DECLARE @serverlist_to_do TABLE (id INT IDENTITY, servername sysname) | |
INSERT INTO @serverlist_to_do | |
--uses OPENROWSET to pull a serverlist from my repository; can use a linked server or just hit locally. | |
SELECT server FROM OPENROWSET('SQLNCLI', 'Server=myrepo;Trusted_Connection=yes', | |
'select server from serverlist') | |
------------------- | |
--FOR EACH server-- | |
------------------- | |
declare @min INT, @max INT | |
SELECT @min = MIN(id), @max = MAX(id) FROM @serverlist_to_do | |
while @min <= @max | |
BEGIN | |
--wipe out variables that are added each time! | |
SELECT @sql = '', @ErrorMessage = NULL, @ErrorSeverity = NULL, @ErrorState = NULL | |
,@Columns = NULL, @Columns_Short = NULL, @Columns_Insert = NULL | |
SELECT @servername = servername FROM @serverlist_to_do WHERE id = @min | |
SELECT @filename = '\\' + @servername + '\c$\temp\' + @beginning_of_filename + '_' | |
--note that the date format is screwy; using a few minutes ago so that it auto-rolls. | |
+ FORMAT( GETDATE()-.003, 'dd.MM.yyyy', 'en-US' ) + '.csv' | |
SELECT @base_filename = RIGHT(@filename, CHARINDEX('\',REVERSE(@filename))-1) | |
----------------- | |
--Import Header-- | |
----------------- | |
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' BULK INSERT of header begins for ' + @filename | |
TRUNCATE TABLE myimport_header_stage | |
SET @sql = | |
'BULK INSERT myimport_header_stage | |
FROM ''' + @filename + ''' | |
WITH | |
( | |
LASTROW = 1, | |
FIELDTERMINATOR = ''\0'' --the \0 is "null terminator"; needed to make sure it doesnt try and parse | |
)' | |
BEGIN TRY | |
EXEC sp_executesql @sql | |
END TRY | |
BEGIN CATCH | |
PRINT 'File either locked, Does Not Exist, or format has changed; see error message for more details' | |
SELECT @ErrorMessage = ERROR_MESSAGE(), | |
@ErrorSeverity = ERROR_SEVERITY(), | |
@ErrorState = ERROR_STATE(); | |
IF @@TRANCOUNT > 0 | |
ROLLBACK TRANSACTION; | |
PRINT 'Error Severity: ' + CONVERT(VARCHAR(30), @ErrorSeverity) | |
PRINT 'Error State: ' + CONVERT(VARCHAR(30), @ErrorState) | |
PRINT 'Error Severity: ' + @ErrorMessage | |
--commented this out so we can run it without files. | |
--RAISERROR (@ErrorMessage, -- Message text. | |
-- @ErrorSeverity, -- Severity. | |
-- @ErrorState -- State. | |
-- ); | |
END CATCH | |
--------------- | |
--Import Data-- | |
--------------- | |
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' BULK INSERT of data begins for ' + @filename | |
--inserts into view in order to add the ID column so the PIVOT works. insertview is just a select *, minus the ID | |
TRUNCATE TABLE myimport_wide_stage | |
SET @sql = | |
'BULK INSERT myimport_wide_stage_insertview | |
FROM ''' + @filename + ''' | |
WITH | |
( | |
FIRSTROW = 2, | |
FIELDTERMINATOR = ''\0'' --the \0 is "null terminator"; needed to make sure it doesnt try and parse | |
)' | |
BEGIN TRY | |
EXEC sp_executesql @sql | |
END TRY | |
BEGIN CATCH | |
PRINT 'File either locked, Does Not Exist, or format has changed; see error message for more details' | |
SELECT @ErrorMessage = ERROR_MESSAGE(), | |
@ErrorSeverity = ERROR_SEVERITY(), | |
@ErrorState = ERROR_STATE(); | |
IF @@TRANCOUNT > 0 | |
ROLLBACK TRANSACTION; | |
PRINT 'Error Severity: ' + CONVERT(VARCHAR(30), @ErrorSeverity) | |
PRINT 'Error State: ' + CONVERT(VARCHAR(30), @ErrorState) | |
PRINT 'Error Severity: ' + @ErrorMessage | |
--commented this out so we can run it without files. | |
--RAISERROR (@ErrorMessage, -- Message text. | |
-- @ErrorSeverity, -- Severity. | |
-- @ErrorState -- State. | |
-- ); | |
END CATCH | |
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' BULK INSERT ends for ' + @filename | |
--===== Split the CSV column for the whole table using CROSS APPLY | |
--save into a staging table to make the pivot back easier. | |
TRUNCATE TABLE myimport_stage_split | |
INSERT INTO myimport_stage_split | |
(id, ItemNumber, Item) | |
SELECT stage.id, split.ItemNumber, split.item | |
FROM myimport_wide_stage stage | |
CROSS APPLY DelimitedSplit8K(resultant,',') split | |
--this is needed because blank values ('') don't convert to null; they come back with error converting varchar to decimal | |
UPDATE myimport_stage_split SET item = NULL WHERE item = '' | |
--2.01 optional fix for scientific notation. | |
--UPDATE myimport_stage_split | |
--SET item = CONVERT(DECIMAL(28,15),CONVERT(FLOAT(53),item)) | |
--WHERE item LIKE '%E-0%' | |
---------------------- | |
--Building the PIVOT-- | |
---------------------- | |
SELECT @Columns=COALESCE(@Columns + ',','') + QUOTENAME(ItemNumber) + ' as ' + QUOTENAME(item) | |
, @Columns_Short = COALESCE(@Columns_Short + ',','') + QUOTENAME(ItemNumber) | |
, @Columns_Insert = COALESCE(@Columns_Insert + ',','') + QUOTENAME(item) | |
FROM | |
( | |
SELECT DISTINCT split.ItemNumber, headers.item | |
From myimport_stage_split split | |
INNER JOIN | |
( | |
SELECT split.ItemNumber, split.item | |
FROM myimport_header_stage headers | |
CROSS APPLY DelimitedSplit8K(resultant,',') split | |
) | |
headers | |
ON headers.ItemNumber = split.ItemNumber | |
INNER JOIN INFORMATION_SCHEMA.COLUMNS | |
ON TABLE_NAME = 'mytargettable' | |
AND columns.COLUMN_NAME = headers.item | |
) AS B | |
ORDER BY B.ItemNumber | |
--We need the CTE so that we can calculate the import_datetime more easily. | |
SET @SQL=' | |
;with CTE_Import as | |
( | |
SELECT ''' + @servername + ''' AS server_name, ' + @Columns | |
+ ', ''' + @base_filename + ''' as import_filename' | |
+ ' FROM | |
( | |
SELECT id, ItemNumber, item FROM | |
myimport_stage_split ) AS source | |
PIVOT | |
(max(item) FOR source.ItemNumber IN (' + @Columns_Short + ') | |
)AS pvt | |
) | |
insert into mytargettable (server_name, ' + @Columns_Insert + ', import_filename, import_datetime) | |
select server_name, ' + @Columns_Insert + ', import_filename, | |
CONVERT(DATETIME,SUBSTRING(RIGHT(''' + @base_filename + ''',14), 7,4) + SUBSTRING(RIGHT(''' + @base_filename + ''',14), 4,2) + SUBSTRING(RIGHT(''' + @base_filename + ''',14), 1,2) | |
+ CONVERT(datetime,minuteOfStats),120) as import_datetime | |
from CTE_Import | |
WHERE NOT EXISTS | |
(SELECT 1 FROM mytargettable main | |
WHERE main.server_name = ''' + @servername + ''' | |
AND main.the_filename = ''' + @base_filename + ''' | |
and CTE_Import.matchingfielda = main.matchingfielda | |
AND CTE_Import.matchingfieldb= main.matchingfieldb) | |
' | |
--make sure to handle nulls properly on the above matches! | |
PRINT @sql | |
--make sure the filename has a date in the valid format, otherwise the insert will blow up | |
IF (ISDATE(CONVERT(DATETIME,SUBSTRING(RIGHT(@base_filename,14), 7,4) + SUBSTRING(RIGHT(@base_filename,14), 4,2) + SUBSTRING(RIGHT(@base_filename,14), 1,2) | |
,120) ) )= 1 | |
BEGIN | |
EXEC (@sql) | |
END | |
PRINT CONVERT(VARCHAR(30),GETDATE(),120) + ' insert into _main ends for ' + @filename | |
SET @min = @min+1 | |
END | |
GO |