Abstract
This article explains how to create an SSIS package with a Data Flow Task inside a ForEachLoop Container using C# programming language.
Requirements
- Microsoft Visual Studio 2008
- SQL Server 2008
- AdventureWorks2008R2 database (downloadable database file available here)
Article
If the above requirements are all met, we will begin by launching Microsoft Visual Studio 2008.
Create a new project Integration Services Project which is located under Business Intelligence Projects.
After you have named the new project, proceed to click and drag the script task in the Control Flow pane of the new package.
Right-click the script task and click on “Edit”
Under the Script Task Editor change the “ScriptLanguage” to “Microsoft Visual C# 2008”.
In Project Explorer import relevant references and ensure that you have declared namespaces as below:
using System;
using System.Data;
using System.Data.Sql;
using System.Data.SqlTypes;
using System.Data.SqlClient;
using Microsoft.SqlServer.Server;
using Microsoft.SqlServer.Dts.Tasks;
using Microsoft.SqlServer.Dts.Tasks.ExecuteSQLTask;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.IO;
After declarations, create an instance of the application and package:
Application SIFISO_app = new Application();
Package dyna_pkg = new Package();
Create a connection to the AdventureWorks2008R2 database.
ConnectionManager ConMgr = dyna_pkg.Connections.Add(“OLEDB”);
ConMgr.Name = “ConMgr_OLEDB”;
ConMgr.ConnectionString = string.Format(
“Provider=SQLOLEDB.1;Data Source={0};Initial Catalog={1};Integrated Security=SSPI;”, “\\SQLEXPRESS”, “AdventureWorks”);
Declare local variable:
Variable Counter = dyna_pkg.Variables.Add(“Counter”,false,”User”,0);
Insert a ForEach container:
ForEachLoop exec_foreachloop = (ForEachLoop)dyna_pkg.Executables.Add(“STOCK:FOREACHLOOP”);
exec_foreachloop.FailPackageOnFailure = true;
exec_foreachloop.FailParentOnFailure = true;
exec_foreachloop.Name = @”select SIFISO FOREACHLOOP Container”;
exec_foreachloop.Description = @”select SIFISO FOREACHLOOP Container”;
Set the properties of the ForEachLoop container:
ForEachEnumeratorInfo f_enum = SIFISO_app.ForEachEnumeratorInfos[“Foreach File Enumerator”];
ForEachEnumeratorHost f_enum_host = f_enum.CreateNew();
f_enum_host.CollectionEnumerator = false;
f_enum_host.Properties[“Directory”].SetValue(f_enum_host, @”C:\TEMP\”);
f_enum_host.Properties[“FileSpec”].SetValue(f_enum_host, @”selectSIFISO_Import_Test.txt”);
f_enum_host.Properties[“FileNameRetrieval”].SetValue(f_enum_host, 1);
f_enum_host.Properties[“Recurse”].SetValue(f_enum_host, “False”);
exec_foreachloop.ForEachEnumerator = f_enum_host;
Add the following Execute SQL Tasks:
Executable exec = exec_foreachloop.Executables.Add(“STOCK:SQLTask”);
TaskHost th = exec as TaskHost;
th.Properties[“Name”].SetValue(th, “Create View”);
th.Properties[“Description”].SetValue(th, “Drops and Create SQL View which based on Adventureworks database”);
th.Properties[“Connection”].SetValue(th, “ConMgr_OLEDB”);
th.Properties[“SqlStatementSource”].SetValue(th, “CREATE VIEW v_Person_Contact as select * from [AdventureWorks].[Person].[Contact]”);
Executable exec2 = exec_foreachloop.Executables.Add(“STOCK:SQLTask”);
TaskHost th2 = exec2 as TaskHost;
th2.Properties[“Name”].SetValue(th2, “select from view”);
th2.Properties[“Description”].SetValue(th2, “select from view”);
th2.Properties[“Connection”].SetValue(th2, “ConMgr_OLEDB”);
th2.Properties[“SqlStatementSource”].SetValue(th2, “SELECT * FROM v_Person_Contact”);
Executable exec3 = dyna_pkg.Executables.Add(“STOCK:SQLTask”);
TaskHost th3 = exec3 as TaskHost;
th3.Properties[“Name”].SetValue(th3, “delete View”);
th3.Properties[“Description”].SetValue(th3, “delete View”);
th3.Properties[“Connection”].SetValue(th3, “ConMgr_OLEDB”);
th3.Properties[“SqlStatementSource”].SetValue(th3, “DROP VIEW v_Person_Contact”);
Executable e = exec_foreachloop.Executables.Add(“STOCK:PipelineTask”);
TaskHost thMainPipe = e as TaskHost;
MainPipe dataFlowTask = thMainPipe.InnerObject as MainPipe;
thMainPipe.Name = “selectSIFISO data Flow”;
Insert a data flow task:
IDTSComponentMetaData100 component =
dataFlowTask.ComponentMetaDataCollection.New();
component.Name = “OLEDBSource”;
component.ComponentClassID = “DTSAdapter.OleDbSource.2”;
Get the design time instance of the component:
CManagedComponentWrapper instance = component.Instantiate();
Initialize the component:
instance.ProvideComponentProperties();
Specify the connection manager:
if (component.RuntimeConnectionCollection.Count > 0)
{
component.RuntimeConnectionCollection[0].ConnectionManager =
DtsConvert.GetExtendedInterface(dyna_pkg.Connections[0]);
component.RuntimeConnectionCollection[0].ConnectionManagerID =
dyna_pkg.Connections[0].ID;
}
Set the custom properties:
instance.SetComponentProperty(“AccessMode”, 2);
instance.SetComponentProperty(“SqlCommand”,
“SELECT * FROM [AdventureWorks].[Person].[Contact]”);
Reinitialize the metadata:
instance.AcquireConnections(null);
instance.ReinitializeMetaData();
instance.ReleaseConnections();
Create the destination component:
IDTSComponentMetaData100 destination =
dataFlowTask.ComponentMetaDataCollection.New();
destination.ComponentClassID = “DTSAdapter.OleDbDestination”;
destination.Name = “OLEDBDestination”;
Get destination design-time instance, and initialise component:
CManagedComponentWrapper destDesignTime = destination.Instantiate();
destDesignTime.ProvideComponentProperties();
Set destination connection:
destination.RuntimeConnectionCollection[0].ConnectionManagerID = ConMgr.ID;
destination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(ConMgr);
Set destination table name:
destDesignTime.SetComponentProperty(“OpenRowset”, “[dbo].[Contact]”);
destDesignTime.SetComponentProperty(“AccessMode”, 3);
destDesignTime.SetComponentProperty(“FastLoadOptions”, “TABLOCK,CHECK_CONSTRAINTS”);
Connect the source and the transform:
dataFlowTask.PathCollection.New().AttachPathAndPropagateNotifications(component.OutputCollection[0],
destination.InputCollection[0]);
Get input and virtual input for the destination to select and map columns:
IDTSInput100 destinationInputerr = destination.InputCollection[0];
IDTSVirtualInput100 destinationVirtualInputerr = destinationInputerr.GetVirtualInput();
IDTSVirtualInputColumnCollection100 destinationVirtualInputColumnserr =
destinationVirtualInputerr.VirtualInputColumnCollection;
destDesignTime.AcquireConnections(null);
destDesignTime.ReinitializeMetaData();
destDesignTime.ReleaseConnections();
Get the destination’s default input and virtual input:
IDTSInput100 input = destination.InputCollection[0];
IDTSVirtualInput100 vInput = input.GetVirtualInput();
Join the Execute SQL Tasks:
PrecedenceConstraint pcFileTasks3 =
exec_foreachloop.PrecedenceConstraints.Add((Executable)exec, (Executable)e);
pcFileTasks3.Value = DTSExecResult.Success;
PrecedenceConstraint pcFileTasks =
exec_foreachloop.PrecedenceConstraints.Add((Executable)e, (Executable)exec2);
pcFileTasks.Value = DTSExecResult.Success;
PrecedenceConstraint pcFileTasks2 =
dyna_pkg.PrecedenceConstraints.Add((Executable)exec_foreachloop, (Executable)exec3);
pcFileTasks2.Value = DTSExecResult.Success;
We then save the package into a file system.
Dts.TaskResult = (int)ScriptResults.Success;
SIFISO_app.SaveToXml(“C:\\TEMP\\pkg_create_DTS.dtsx”, dyna_pkg, null);
Conclusion
It’s that simple!
You can now execute your script task and the package will be created in the location you specified.
0 Comments