Matt How

Matt How's Blog

Pipeline Parameters in Azure Data Factory V2

The second release of Azure Data Factory (ADF) includes several new features that vastly improve the quality of the service. One of which is the ability to pass parameters down the pipeline into datasets. In this blog I will show how we can use parameters to manipulate a generic pipeline structure to copy a SQL table into a blob. Whilst this is a new feature for ADF, this blog assumes some prior knowledge of ADF (V1 or V2)

 

Creating the Data Factory V2

One other major difference with ADF V2 is that we no longer need to chain datasets and pipelines to create a working solution. Whilst the concept of dependency still exists, that is no longer needed to run a pipeline, we can now just run them ad hoc. This is important for this demo because we want to run the job once, check it succeeds and then move onto the next table instead of managing any data slices. You can create a new V2 data factory either from the portal or using this command in PowerShell:

$df = Set-AzureRmDataFactoryV2 -ResourceGroupName <resource group> –Location <location> -Name <data factory name>


If you are familiar with the PowerShell cmdlets for ADF V1 then you can make use of nearly all of them in V2 by appending “V2” to the end of the cmdlet name.

ADF V2 Object Templates

Now we have a Data Factory to work with we can start deploying objects. In order to make use of parameters we need to firstly specify how we will receive the value of each parameter. Currently there are two ways:

1.      Via a parameter file specified when you invoke the pipeline

2.      Using a lookup activity to obtain a value and pass that into a parameter

This blog will focus on the simpler method using a parameter file. Later blogs will demonstrate the use of the lookup activity

image

When we invoke our pipeline, I will show how to reference that file but for now we know we are working with a single parameter called “tableName”.

Now we can move on to our pipeline definition which is where the parameter values are initially received. To do this we need to add an attribute called “parameters” to the definition file that will contain all the parameters that will be used within the pipeline. See below:

image

The same concept needs to be carried through to the dataset that we want to feed the parameters in to. Within the dataset definition we need to have the parameter attribute specified in the same way as in the pipeline.

Now that we have declared the parameters to the necessary objects I can show you how to pass data into a parameter. As mentioned before, the pipeline parameter will be populated by the parameter file however the dataset parameter will need to be populated from within the pipeline. Instead of simply referring to a dataset by name as in ADF V1 we now need the ability to supply more data and so the “inputs” and “outputs” section of our pipeline now looks like the below:

image

  1. Firstly, we declare the reference type, hence “DatasetReference”.
  2. We then give the reference name. This could be parameterised if needed
  3. Finally, for each parameter in our dataset (in this case there is only one called “tableName”) we supply the corresponding value from the pipelines parameter set. We can get the value of the pipeline parameter using the “@Pipeline.parameters.<parameter name>” syntax.

At this point we have received the values from a file, passed them through the pipeline into a dataset and now it is time to use that value to manipulate the behaviour of the dataset. Because we are using the parameter to define our file name we can use its value as part of the “fileName” attribute, see below:

image

Now we have the ability to input a table name and our pipeline will fetch that table from the database and copy it into a blob. Perhaps a diagram to help provide the big picture:

image

Now we have a complete working pipeline that is totally generic, meaning we can change the parameters we feed in but should never have to change the JSON definition files. A pipeline such as this could have many uses but in the next blog I will show how we can use a ForEach loop (another ADF v2 feature) to copy every table from a data base still only using a single pipeline and some parameters.

P.S. Use this link to see the entire json script used to create all the objects required for this blog.  http://bit.ly/2zwZFLB

Comments (3) -

  • Luis Gonzalez

    11/8/2017 6:24:09 PM | Reply

    Hey Matt - thanks for the post. I'm trying to genericize a dataset by passing it a table name parameter from a lookup (something you alluded to posting about in the future). I have a pipeline that starts with a lookup activity that references a dataset to which I pass a tableName of "ArchiveTables":

                {
                    "name": "LookupArchiveTables",
                    "description": "Reads all rows from the ArchiveTables table that have the Enabled column set to 1.",
                    "type": "Lookup",
                    "typeProperties":
                    {
                        "dataset":
                        {
                            "referenceName": "Dataset_AzureSqlTable_Generic",
                            "type": "DatasetReference",
                "parameters":
                {
                  "tableName":
                  {
                    "value": "ArchiveTables",
                    "type": "Expression"
                  }
                }
                        },
                        "source":
                        {
                          "type": "SqlSource",
                          "sqlReaderQuery": "SELECT TableName, DateTimeColumnName FROM ArchiveTables WHERE Enabled = 1"
                        },
                        "firstRowOnly": false
                    }
                },

    And then the dataset uses that to setup the "ArchiveTables" table name as the source table for the dataset.

    {
        "name": "Dataset_AzureSqlTable_Generic",
        "properties":
        {
            "type": "AzureSqlTable",
            "linkedServiceName":
            {
                "referenceName": "LinkedService_AzureSqlDatabase",
                "type": "LinkedServiceReference"
            },
            "typeProperties":
            {
                "tableName": "@dataset().parameters.tableName"
            },
        "parameters":
            {
                "tableName":
                {
                  "type": "String"
                }
            }
        }
    }


    but when it runs it errors that parameters is not valid:

    {
        "errorCode": "InvalidTemplate",
        "message": " 'The template language expression 'body('LookupArchiveTablesComposeRuntimeVariables')?.Dataset_AzureSQLTable_Genericfbdad42d28c04fa39f8f3c467647fde8.parameters.tableName' cannot be evaluated because property 'parameters' doesn't exist, available properties are 'tableName'.",
        "failureType": "UserError",
        "target": "LookupArchiveTables"
    }

    I must be overlooking something basic.

  • Luis Gonzalez

    11/9/2017 5:59:02 PM | Reply

    Along the lines of what you are doing here:

    I managed to create a "generic" dataset that I can pass a table parameter to. It actually works:

    {
        "name": "Dataset_AzureSqlTable_Generic",
        "properties":
        {
            "type": "AzureSqlTable",
            "typeProperties":
            {
                "tableName":
          {
            "value": "@dataset().TableName",
            "type": "Expression"
          }
            },
            "linkedServiceName":
            {
                "referenceName": "LinkedService_AzureSqlDatabase",
                "type": "LinkedServiceReference"
            },
        "parameters":
        {
          "TableName":
          {
            "type": "String"
          }
        }
        }
    }

    I use a Lookup activity in my master pipeline to get a list of tables I need to work on by calling that and passing an "ArchiveTables" parameter which is a table in the database that has the table list. That works great.

    The next activity in my master pipeline is an ExecutePipeLine activity, which references the values returned from the first lookup activity.

    {
        "name": "Pipeline_TriggerArchiveTables",
        "description": "Uses the LookupArchiveTables activity to get the tables to archive and passes that list down to the ArchiveTables pipeline.",
        "properties":
        {
            "activities":
            [
                {
                    "name": "LookupArchiveTables",
                    "description": "Reads all rows from the ArchiveTables table that have the Enabled column set to 1.",
                    "type": "Lookup",
                    "typeProperties":
                    {
                        "source":
                        {
                          "type": "SqlSource",
                          "sqlReaderQuery": "SELECT TableName, DateTimeColumnName FROM ArchiveTables WHERE Enabled = 1 ORDER BY TableName"
                        },
                        "dataset":
                        {
                            "referenceName": "Dataset_AzureSqlTable_Generic",
                            "type": "DatasetReference",
                "parameters":
                {
                  "TableName":
                  {
                    "value": "ArchiveTables",
                    "type": "Expression"
                  }
                }
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "ExecuteArchiveTables",
                    "type": "ExecutePipeLine",
                    "typeProperties":
                    {
                        "pipeline":
                        {
                          "referenceName": "Pipeline_ArchiveTables",
                          "type": "PipelineReference"
                        },
                        "parameters":
                        {
                            "TableList":
                            {
                                "value": "@{activity('LookupArchiveTables').output.value}",
                                "type": "Expression"
                            }
                        },
                        "waitOnCompletion": true
                    },
                    "dependsOn":
                    [
                        {
                            "activity": "LookupArchiveTables",
                            "dependencyConditions": ["Succeeded"]
                        }
                    ]
          }
            ]
        }
    }

    That seems to all work as well. The pipeline "Pipeline_ArchiveTables" has a foreach activity that iterates through the table list. I followed the online examples from MS to the letter. I confirmed in the log that the table and column names were being properly passed into the TableList parameter so I know it's all good up to this pipeline. I kept it simple for troubleshooting by simply inserting the same lookup activity I know worked earlier to see if the foreach works, but it consistently fails with a 400 error. It complains about the inner activity, but I know that activity works. Just been staring at it wondering what the problem is. It seems rather cut and dry to me. It should do one iteration since there is only one table in the TableList parameter, and the lookup activity should be executed.

    {
        "errorCode": "400",
        "message": "Activity failed because an inner activity failed",
        "failureType": "UserError",
        "target": "IterateArchiveTables"
    }


    {    
        "name": "Pipeline_ArchiveTables",
        "properties":
        {
            "activities":
            [
                {
                    "name": "IterateArchiveTables",
                    "description": "Executes in parallel the ArchiveTable pipeline for each table passed to it from the TriggerArchiveTables pipeline.",
                    "type": "ForEach",
                    "typeProperties":
                    {
                        "isSequential": false,
                        "items":
              {
                "value": "@pipeline().parameters.TableList",
                "type": "Expression"
              },
                        "activities":
                        [
                {
                  "name": "LookupArchiveTables",
                  "description": "Reads all rows from the ArchiveTables table that have the Enabled column set to 1.",
                  "type": "Lookup",
                  "typeProperties":
                  {
                    "source":
                    {
                      "type": "SqlSource",
                      "sqlReaderQuery": "SELECT TableName, DateTimeColumnName FROM ArchiveTables WHERE Enabled = 1 ORDER BY TableName"
                    },
                    "dataset":
                    {
                      "referenceName": "Dataset_AzureSqlTable_Generic",
                      "type": "DatasetReference",
                      "parameters":
                      {
                        "TableName":
                        {
                          "value": "ArchiveTables",
                          "type": "Expression"
                        }
                      }
                    },
                    "firstRowOnly": false
                  }
                }
                        ]
                    }
                }
            ],
        "parameters":
        {
          "TableList":
          {
            "type": "Array"
          }
        }
        }
    }

    • matthow

      11/15/2017 9:36:57 AM | Reply

      Hi Luis, Thanks for your comments. I'm going to have a proper look at your issue shortly  but off the top of my head it looks like you are using an Array data type for the TableList parameter and I think you should try using an Object data type.

Pingbacks and trackbacks (2)+

Loading