Friday, March 9, 2012

Programmatically Created SSIS Package, CSV file to OLDDB (SQLSever 2005)

Hi everyone,

I wanted to thank everyone for posting a ton of valuable information in these forums. I also want to thank all the moderators that have been replying with really insightful help!

I am trying to programmatically create an SSIS package to take .CSV data and put it into a SQL Server 2005. I am assuming that this is pretty common scenario.

I have used many of the examples in this forum as well as heavily borrowing from this example http://www.codeproject.com/csharp/Digging_SSIS_object_model.asp written by Moim Hossain.

I can get my package to create and execute properly but no data is being written to the SQL Server table. This has puzzled me for the last 2 days!

I know the issue isnt with the server itself because I tested it by graphically creating a test SSIS package and it transfers the .CSV data to the table perfectly.

Would anyone know why this would happen? The Execution results are returning success but no data is written to the table!

Could anyone please provide insight as to what my issue may be?

Thanks in advance!

Code Snippet

using System;
using System.IO;
using System.Data.SqlClient;
using System.Collections.Generic;
using System.Text;
using Microsoft.SqlServer.Dts.Runtime;
using PipeLineWrapper = Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace SumCodeApp
{
class SumCodeApp
{
// Variables.
private Package package;
private ConnectionManager flatFileConnectionManager;
private ConnectionManager destinationDatabaseConnectionManager;
private Executable dataFlowTask;
private List<String> srcColumns;

int file_count;
SqlConnection connection;

String folder_path;
String username;
String password;
String DB_server;
String catalog;

// Default Constructor.
public SumCodeApp()
{
}

// Constructor taking in user info.
public SumCodeApp(String folder_path, String username, String password,
String DB_server, String catalog)
{
this.folder_path = folder_path;
this.username = username;
this.password = password;
this.DB_server = DB_server;
this.catalog = catalog;
}

private void CreatePackage()
{
package = new Package();
package.CreationDate = DateTime.Now;
package.ProtectionLevel = DTSProtectionLevel.DontSaveSensitive;
package.Name = "SumCode Package";
package.Description = "Upload the SumCode files to the database";
package.DelayValidation = true;
package.PackageType = DTSPackageType.DTSDesigner90;
}

private void CreateFlatFileConnection()
{
String flatFileName = ".\01105.csv";
String flatFileMoniker = "FLATFILE";
flatFileConnectionManager = package.Connections.Add(flatFileMoniker);
flatFileConnectionManager.Name = "SSIS Connection Manager for Files";
flatFileConnectionManager.Description = String.Concat("SSIS Connection Manager");
flatFileConnectionManager.ConnectionString = flatFileName;

// Set some common properties of the connection manager object.
//flatFileConnectionManager.Properties["ColumnNamesInFirstRow"].SetValue(flatFileConnectionManager, false);
flatFileConnectionManager.Properties["Format"].SetValue(flatFileConnectionManager, "Delimited");
flatFileConnectionManager.Properties["TextQualifier"].SetValue(flatFileConnectionManager, "\"");
flatFileConnectionManager.Properties["RowDelimiter"].SetValue(flatFileConnectionManager, "\r\n");
flatFileConnectionManager.Properties["DataRowsToSkip"].SetValue(flatFileConnectionManager, 0);

// Create the source columns into the connection manager.
CreateSourceColumns();
}

private void CreateSourceColumns()
{
// Get the actual connection manager instance
RuntimeWrapper.IDTSConnectionManagerFlatFile90 flatFileConnection = flatFileConnectionManager.InnerObject as RuntimeWrapper.IDTSConnectionManagerFlatFile90;

RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 column;
RuntimeWrapper.IDTSName90 name;

// Fill the source column collection.
srcColumns = new List<String>();
srcColumns.Add("CreateDate");
srcColumns.Add("CorpID");
srcColumns.Add("SumCodeID");
srcColumns.Add("Priority");
srcColumns.Add("SumCodeAbv");
srcColumns.Add("SumCodeDesc");
srcColumns.Add("SumCodeGroupID");

foreach (String colName in srcColumns)
{
column = flatFileConnection.Columns.Add();
if (srcColumns.IndexOf(colName) == (srcColumns.Count - 1))
//column.ColumnDelimiter = "\r\n";
column.ColumnDelimiter = "{CR}{LF}";
else
//column.ColumnDelimiter = ",";
column.ColumnDelimiter = "Comma {,}";

name = (RuntimeWrapper.IDTSName90)column;
name.Name = colName;

column.TextQualified = true;
column.ColumnType = "Delimited";
column.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
column.ColumnWidth = 0;
column.MaximumWidth = 255;
column.DataPrecision = 0;
column.DataScale = 0;

}

}

private void CreateDestinationDatabaseConnection()
{
destinationDatabaseConnectionManager = package.Connections.Add("OLEDB");
destinationDatabaseConnectionManager.Name = "Destination Connection - SumCodeCorpGroup";
destinationDatabaseConnectionManager.Description = "Connection to the temporary table SumCodCorpGroup";
destinationDatabaseConnectionManager.ConnectionString = "Data Source=DIVWL-356KCB1;Initial Catalog=SumCode;Provider=SQLOLEDB;Persist Security Info=True;User ID=sum;Password=code";
}

public class Column
{
private String name;
private Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType dataType;
private int length;
private int precision;
private int scale;
private int codePage = 0;

public String Name
{
get { return name; }
set { name = value; }
}

public Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType DataType
{
get { return dataType; }
set { dataType = value; }
}

public int Length
{
get { return length; }
set { length = value; }
}

public int Precision
{
get { return precision; }
set { precision = value; }
}

public int Scale
{
get { return scale; }
set { scale = value; }
}

public int CodePage
{
get { return codePage; }
set { codePage = value; }
}
}

private Column GetTargetColumnInfo(string sourceColumnName)
{
Column cl = new Column();
if (sourceColumnName.Contains("CreateDate"))
{
cl.Name = "CreateDate";
cl.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
cl.Precision = 0;
cl.Scale = 0;
cl.Length = 255;
cl.CodePage = 1252;
}
else if (
sourceColumnName.Contains("CorpID"))
{
cl.Name = "CorpID";
cl.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
cl.Precision = 0;
cl.Scale = 0;
cl.Length = 255;
cl.CodePage = 1252;
}
else if (sourceColumnName.Contains("SumCodeID"))
{
cl.Name = "SumCodeID";
cl.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
cl.Precision = 0;
cl.Scale = 0;
cl.Length = 255;
cl.CodePage = 1252;
}
else if (sourceColumnName.Contains("Priority"))
{
cl.Name = "Priority";
cl.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
cl.Precision = 0;
cl.Scale = 0;
cl.Length = 255;
cl.CodePage = 1252;
}
else if (sourceColumnName.Contains("SumCodeAbv"))
{
cl.Name = "SumCodeAbv";
cl.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
cl.Precision = 0;
cl.Scale = 0;
cl.Length = 255;
cl.CodePage = 1252;
}
else if (sourceColumnName.Contains("SumCodeDesc"))
{
cl.Name = "SumCodeDesc";
cl.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
cl.Precision = 0;
cl.Scale = 0;
cl.Length = 255;
cl.CodePage = 1252;
}
else if (sourceColumnName.Contains("SumCodeGroupID"))
{
cl.Name = "SumCodeGroupID";
cl.DataType = Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR;
cl.Precision = 0;
cl.Scale = 0;
cl.Length = 255;
cl.CodePage = 1252;
}
return cl;
}

private void CreateDataFlowTask()
{
String dataFlowTaskMoniker = "DTS.Pipeline";
dataFlowTask = package.Executables.Add(dataFlowTaskMoniker);

}

public void ImportFile(String directory_path)
{
// Create the package.
CreatePackage();

// Create Flat File Source Connection.
CreateFlatFileConnection();

// Create Database Destination Connection.
CreateDestinationDatabaseConnection();

// Create DataFlowTask.
CreateDataFlowTask();

// Create the DataFlowTask
PipeLineWrapper.IDTSComponentMetaData90 sourceComponent = ((dataFlowTask as TaskHost).InnerObject as PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
sourceComponent.Name = "Source File Component";
sourceComponent.ComponentClassID = "DTSAdapter.FlatFileSource";

PipeLineWrapper.CManagedComponentWrapper managedFlatFileInstance = sourceComponent.Instantiate();
managedFlatFileInstance.ProvideComponentProperties();
sourceComponent.RuntimeConnectionCollection[0].ConnectionManagerID = flatFileConnectionManager.ID;
sourceComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(flatFileConnectionManager);

managedFlatFileInstance.AcquireConnections(null);
managedFlatFileInstance.ReinitializeMetaData();

Dictionary<String, int> outputColumnLineageIDs = new Dictionary<String, int>();
PipeLineWrapper.IDTSExternalMetadataColumn90 exOutColumn = null;

foreach (PipeLineWrapper.IDTSOutputColumn90 outColumn in sourceComponent.OutputCollection[0].OutputColumnCollection)
{
exOutColumn = sourceComponent.OutputCollection[0].ExternalMetadataColumnCollection[outColumn.Name];
managedFlatFileInstance.MapOutputColumn(sourceComponent.OutputCollection[0].ID, outColumn.ID, exOutColumn.ID, true);
outputColumnLineageIDs.Add(outColumn.Name, outColumn.ID);
}
managedFlatFileInstance.ReleaseConnections();

String a = sourceComponent.RuntimeConnectionCollection[0].Name.ToString();
String b = sourceComponent.OutputCollection[0].Name;
String c = sourceComponent.OutputCollection[0].Description;
String d = sourceComponent.OutputCollection[0].OutputColumnCollection.Count.ToString();

// Create DataFlowTask Destination Component.
PipeLineWrapper.IDTSComponentMetaData90 destinationComponent = ((dataFlowTask as TaskHost).InnerObject as PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
destinationComponent.Name = "OLEDB SQL Connection";
destinationComponent.ComponentClassID = "DTSAdapter.OLEDBDestination";

PipeLineWrapper.CManagedComponentWrapper managedOleInstance = destinationComponent.Instantiate();
managedOleInstance.ProvideComponentProperties();

// Create a path and attach the output of the source to the input of the destination.
PipeLineWrapper.IDTSPath90 path = ((dataFlowTask as TaskHost).InnerObject as PipeLineWrapper.MainPipe).PathCollection.New();
path.AttachPathAndPropagateNotifications(sourceComponent.OutputCollection[0], destinationComponent.InputCollection[0]);

destinationComponent.RuntimeConnectionCollection[0].ConnectionManagerID = destinationDatabaseConnectionManager.ID;
destinationComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(destinationDatabaseConnectionManager);

managedOleInstance.SetComponentProperty("AccessMode", 0);
managedOleInstance.SetComponentProperty("OpenRowset", "[SumCode].[dbo].[SumCodeCorpGroup]");
managedOleInstance.SetComponentProperty("AlwaysUseDefaultCodePage", false);
managedOleInstance.SetComponentProperty("DefaultCodePage", 1252);
managedOleInstance.SetComponentProperty("FastLoadKeepIdentity", false); // Fast load
managedOleInstance.SetComponentProperty("FastLoadKeepNulls", false);
managedOleInstance.SetComponentProperty("FastLoadMaxInsertCommitSize", 0);
managedOleInstance.SetComponentProperty("FastLoadOptions","TABLOCK,CHECK_CONSTRAINTS");

managedOleInstance.AcquireConnections(null);
managedOleInstance.ReinitializeMetaData();

PipeLineWrapper.IDTSInput90 input = destinationComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput = input.GetVirtualInput();

foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn in vInput.VirtualInputColumnCollection)
{
//if (outputColumnLineageIDs.ContainsKey(vColumn.LineageID.ToString()))
//{
managedOleInstance.SetUsageType(input.ID, vInput, vColumn.LineageID, Microsoft.SqlServer.Dts.Pipeline.Wrapper.DTSUsageType.UT_READONLY);
//}
}

List<String> tmp = new List<String>();
foreach(PipeLineWrapper.IDTSInputColumn90 inc in destinationComponent.InputCollection[0].InputColumnCollection)
{
tmp.Add(inc.Name);
}

PipeLineWrapper.IDTSExternalMetadataColumn90 exColumn;
foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in destinationComponent.InputCollection[0].InputColumnCollection)
{
exColumn = destinationComponent.InputCollection[0].ExternalMetadataColumnCollection[inColumn.Name];
Column mappedColumn = GetTargetColumnInfo(exColumn.Name);
String destName = mappedColumn.Name;

exColumn.Name = destName;

managedOleInstance.MapInputColumn(destinationComponent.InputCollection[0].ID, inColumn.ID, exColumn.ID);
}

managedOleInstance.ReleaseConnections();

DTSExecResult result = package.Execute();
a = "0";
}

}
}


A good first step in troubleshooting these types of problems: Save the file out to a DTSX (using the Application.SaveToXml method, then open and run it in BIDS. That might give you a better idea what's going on.

|||

I include a save to file by default, but have it for Debug confiurations only -

Code Snippet

#if DEBUG

// Save package to disk, DEBUG only

new Application().SaveToXml(@."C:\Temp\" + package.Name, package, null);

#endif

You would also be better off using the overloaded Execute method that enables you to pass in IDTSEvents amongst other things. This way you can capture error events in your application, otherwise you have no idea of what has one wrong, which when deployed will be an issue I guess. The save to file trick is easier when developing the code as you get the power of the designer and debugger, rather than just events you capture.

No comments:

Post a Comment