Wednesday, February 25, 2026

Azure Data Factory - creating pipelines via Python

I hate creating SSIS. I also hate creating ADF via GUI. So here's a way to do it with python. 

There are a couple prereqs (below), specifically an ultra-configurable Linked Service and Dataset.

from azure.mgmt.datafactory.models import (
    PipelineResource,
    CopyActivity,
    SqlSource,
    SqlSink,
    DatasetReference,
    SqlServerStoredProcedureActivity,
    StoredProcedureParameter,
    ParameterSpecification
)
 
# Create a pipeline that uses the datasets
print("\nšŸ”„ Creating pipeline with SQL operations...")
 
pipeline = client.pipelines.create_or_update(
    RESOURCE_GROUP,
    FACTORY_NAME,
    "pl_SqlOperations",
    PipelineResource(
        parameters={
            "SourceTableName": ParameterSpecification(type="String"),
            "DestTableName": ParameterSpecification(type="String"),
            "StoredProcName": ParameterSpecification(type="String")
        },
        activities=[
            # Activity 1: Copy data from one table to another
            CopyActivity(
                name="CopyTableData",
                inputs=[
                    DatasetReference(
                        reference_name="ds_SqlTable_Parameterized",
                        parameters={
                            "schemaName""dbo",
                            "tableName""@pipeline().parameters.SourceTableName"
                        }
                    )
                ],
                outputs=[
                    DatasetReference(
                        reference_name="ds_SqlTable_Parameterized",
                        parameters={
                            "schemaName""dbo",
                            "tableName""@pipeline().parameters.DestTableName"
                        }
                    )
                ],
                source=SqlSource(
                    sql_reader_query="SELECT * FROM @{dataset().schemaName}.@{dataset().tableName}",
                    query_timeout="02:00:00"
                ),
                sink=SqlSink(
                    write_behavior="insert",
                    pre_copy_script="TRUNCATE TABLE @{dataset().schemaName}.@{dataset().tableName}"
                )
            ),
             
            # Activity 2: Execute stored procedure
            SqlServerStoredProcedureActivity(
                name="ExecuteStoredProcedure",
                linked_service_name=LinkedServiceReference(
                    reference_name="ls_AzureSqlDatabase"
                ),
                stored_procedure_name="@pipeline().parameters.StoredProcName",
                stored_procedure_parameters={
                    "param1": StoredProcedureParameter(
                        value="@pipeline().parameters.SourceTableName",
                        type="String"
                    ),
                    "param2": StoredProcedureParameter(
                        value="@utcnow()",
                        type="DateTime"
                    )
                },
                depends_on=[
                    {
                        "activity""CopyTableData",
                        "dependency_conditions": ["Succeeded"]
                    }
                ]
            )
        ]
    )
)
 
print(f"✅ Pipeline created: {pipeline.name}")
 
# Run the pipeline
print("\nšŸš€ Triggering pipeline run...")
run = client.pipelines.create_run(
    RESOURCE_GROUP,
    FACTORY_NAME,
    "pl_SqlOperations",
    parameters={
        "SourceTableName""SourceTable",
        "DestTableName""DestinationTable",
        "StoredProcName""usp_ProcessData"
    }
)
 
print(f"✅ Pipeline run started: {run.run_id}")



Create the key vault:

 
    # Create Key Vault linked service
    result = client.linked_services.create_or_update(
        RESOURCE_GROUP,
        FACTORY_NAME,
        "ls_KV_yournamehere",
        LinkedServiceResource(
            properties=AzureKeyVaultLinkedService(
                base_url=f"https://{KEY_VAULT_NAME}.vault.azure.net/"
            )
        )
    )
  

Adding a parameterized linked service and dataset. Linked service I don't have automation on.


{

    "name": "ls_SQL_FullyParameterized",

    "type": "Microsoft.DataFactory/factories/linkedservices",

    "properties": {

        "parameters": {

            "serverName": {

                "type": "String"

            },

            "databaseName": {

                "type": "String"

            },

            "userName": {

                "type": "String"

            },

            "passwordSecret": {

                "type": "String",

                "defaultValue": "sqlmaint"

            }

        },

        "annotations": [],

        "type": "AzureSqlDatabase",

        "typeProperties": {

            "connectionString": "Integrated Security=False;Encrypt=True;Connection Timeout=30;Data Source=@{linkedService().serverName};Initial Catalog=@{linkedService().databaseName};User ID=putyoursecretnamehere",

            "password": {

                "type": "AzureKeyVaultSecret",

                "store": {

                    "referenceName": "ls_keyvaulthere",

                    "type": "LinkedServiceReference"

                },

                "secretName": "secretnamehere"

            }

        },

        "connectVia": {

            "referenceName": "integrationruntimename",

            "type": "IntegrationRuntimeReference"

        }

    }

}

print("\nšŸ”§ Creating parameterized SQL dataset...")
parameterized_dataset = client.datasets.create_or_update(
    RESOURCE_GROUP,
    FACTORY_NAME,
    "ds_SqlTable_Parameterized",
    DatasetResource(
        properties=AzureSqlTableDataset(
            linked_service_name=LinkedServiceReference(
                reference_name="ls_AzureSqlDatabase"
            ),
            parameters={
                "schemaName": ParameterSpecification(type="String"),
                "tableName": ParameterSpecification(type="String")
            },
            schema="@dataset().schemaName",
            table="@dataset().tableName"
        )
    )
)
print(f"✅ Parameterized SQL dataset created: {parameterized_dataset.name}")