Matt How

Matt How's Blog

Using Lookup, Execute Pipeline and For Each Activity in Azure Data Factory V2

In my previous blog I looked how we can utilise pipeline parameters to variablise certain aspects of a dataset to avoid duplication of work. In this blog I will take that one step further and use parameters to feed into a For Each activity that can iterate over a data object and perform an action for each item. This blog assumes some prior knowledge of Azure Data Factory and it won’t hurt to read my previous blog

Previously I showed how to use a parameter file to copy a single table from Azure SQL DB down into a blob. Now lets use the For Each activity to fetch every table in the database from a single pipeline run. The benefit of doing this is that we don’t have to create any more linked services of data sets, we are only going to create one more pipeline that will contain the loop. The big picture here looks like below.

image

The first activity to note is the lookup activity. This can go off and fetch a value from either SQL or JSON based sources and then incorporate that value into activities further down the chain.

image

Here we are using SQL and you can see that we have supplied a SQL query that will fetch the schema and table names of our database. One “gotcha” is that even though you supply a SQL query, you still need to provide a dummy table name in the SQL dataset. It will use the query above at run time but won’t pass deployment without a table name. Also note that at this point, we do nothing with the returned value.

Next, we have the Execute Pipeline activity which can accept input parameters and pass those down into the executed pipelines (or child pipelines as per the diagram).

image

Within the type properties we can specify the parameters we want to pass in. The names here need to match whatever parameters we specify in the child pipeline but for the “value” we can make use of the new expression language to get a hold of the output of the previous lookup activity. We then reference the pipeline we want to execute, and that we need to wait for it to complete before continuing with our parent pipeline. Finally, we use the “dependsOn” attribute to ensure that our Execute Pipeline activity occurs AFTER our lookup has completed successfully.

At this point we have told the child pipeline which tables to copy and then told it to start. Our child pipeline now just needs to iterate over that list and produce our output files. To do this it only needs one activity which is the For Each. The For Each really has two components which are the outer configurables (such as the items to iterate over) and then the inner activity to perform on each item. The outer section looks like this:

image

Here we can configure the “isSequential” property which when set to “false” allows Data Factory to parallelise the inner activity, otherwise it will run each activity one after another. The other property is the “items” which is what the activity will iterate through. Because we fed the table list in to the “tableList” parameter from the Execute Pipeline activity we can specify that as our list of items.

Now for the inner activity:

image

Whilst this is a fairly chunky bit of JSON, those familiar with the copy activity in ADF V1 will probably feel pretty comfortable with this. The key difference is that we are again making use of expressions and parameters to make our template generic. You can see in the “output” attribute we are dynamically specifying the output blob name by using the schema and table name properties gleaned from our input data set. Also, in the source attribute we dynamically build our SQL query to select all the data from the table that we are currently on using the @item() property. This method of combing text and parameters is called string interpolation and allows us to easily mix static and dynamic content without the needed for additional functions or syntax.

That’s it! By making use of only a few extra activities we can really easily do tasks that would have taken much longer in previous versions of ADF. You can find the full collection of JSON objects using this link: http://bit.ly/2zwZFLB. Watch this space for the next blog which will look at custom logging of data factory activities!

Comments (11) -

  • Marius

    10/25/2017 8:58:37 PM | Reply

    Awesome post.  It's very hard to find information and practical examples.  The Microsoft documentation is fairly light at the moment.  Please do more!

  • Marius

    10/25/2017 9:16:09 PM | Reply

    Great post. Is the lookup activity the only way to get data from a source and feed that back into other activities in the pipeline?
    I'm executing a stored procedure on Azure SQL DW  using the stored proc activity. The  SP populates a dimension table and then outputs the number of records that was inserted/updated.  Is there a way to get that info back into the pipeline and pass it on the a next stored proc activity that updates a metadata table?

    Thanks in advance.

    • matthow

      10/30/2017 8:35:41 AM | Reply

      Hi Marius,
      Currently, Lookup is the only way to fetch a value during execution that can be fed into other activities within that same pipeline but you can use a stored proc as your SQL statement for the Lookup. So long as your stored proc always returns a value it will work fine - if no value is returned ADF V2 will complain about an invalid stored proc. Bear in mind, If you want to carry any values between pipelines you would need to write them out to a JSON file in blob and then lookup from that. However, as another way round your problem you can query the Data Factory Activity APIs and this can give you some of the detail you require. The best way to do this is to write an Azure Function which you can call from the new Web Activity in ADF V2. Currently I top and tail each pipeline with the Web Activity which calls a logging function so I know when pipelines start and end but also how many rows were copied. Hope that helps Smile

      • Mahindar

        1/25/2018 2:16:41 PM | Reply

        Hi matthow, Great post. Recently started learning ADF for the first time. Your post gave me a very good start.
        I have a small doubt
        How are you querying APIs (as you mentioned "query the Data Factory Activity APIs") ? . I'm able to post data(like @pipeline(),@activity()) to Azure Function and see it in log. But how to check complete api, Are there any other template functions which shows complete adf API?  what are you posting to Azure Function from web activity?.
        Thanks

  • DaveOD

    11/30/2017 5:24:34 PM | Reply

    Excellent post Matt, very succinct and informative. Greatly appreciated, thanks.

  • raman

    12/1/2017 7:10:46 AM | Reply

    Thanks for the practical implementation examples!!

  • Stephen Davies

    12/14/2017 3:07:11 PM | Reply

    This is a great blog post.

  • Magnus

    2/16/2018 10:53:47 AM | Reply

    Hi, excellent post, thanks for sharing!

    We plan to move our current ssis framework to adf, which is quite straightforward for the things you would expect to be tricky. However, we needed this post to get the params working (thanks again).

    As I see it, the Lookup pretty much gives you an ability to mimic the old SSISConfiguration-table. Preferably, you would only like to do this once and have all pipeline etc in the batch reuse the runtom config. If I read your post correctly, we should write the initial Lookup result to a Json for reuse. Is this correct?

    Secondly, logging. I am frankly quite new to this. I would like to achieve two typer of logging
    1. Start/End batch (date n time of run, what jobs where run in this batch)
    2. Start/End job  (date n time of run, number of row processed (insert, delete, update), what batch)

    If we process incoming files, I also would like to log what file was processed by what job at what time. And the result ofc. Preferably, I would find these logs in a single location for later analysis.

    I saw that you mentioned a web job as Start/End, how does this work? Any ideas how we should achieve the desired logging, or if is unnecessary due to available Azure built-in services?

    Thanks!

    Regards Magnus

  • nJvA

    2/19/2018 12:07:24 AM | Reply

    157599 794981I enjoy what you guys are typically up too. This kind of clever function and reporting! Maintain up the really excellent works guys I�ve added you guys to  blogroll. 4526

  • Vijay Sharma

    5/15/2018 5:17:58 PM | Reply

    This is my Master Pipeline JSOn

    {
        "name": "IncrementalDataLoad_v2",
        "properties": {
            "description": "Full or Incremental load to an ODS table.",
            "activities": [
                {
                    "name": "LookupTableList",
                    "type": "Lookup",
                    "policy": {
                        "timeout": "7.00:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false
                    },
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'"
                        },
                        "dataset": {
                            "referenceName": "SQL ODS Employees",
                            "type": "DatasetReference"
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "TriggerCopyData",
                    "type": "ExecutePipeline",
                    "dependsOn": [
                        {
                            "activity": "LookupTableList",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "typeProperties": {
                        "pipeline": {
                            "referenceName": "MergeData",
                            "type": "PipelineReference"
                        },
                        "waitOnCompletion": true
                    }
                }
            ],
            "parameters": {
                "tableList": {
                    "type": "Object",
                    "defaultValue": "@activity('LookupTableList').output.value"
                }
            }
        }
    }

  • Franco

    6/29/2018 2:31:11 PM | Reply

    Excellent Post, Thank you!

    I am trying to make the same exercise for LinkedServices (The datasets are the same for all of them), I am currently getting data from a HTTP connection and pulling it into my SQLDW, my goal is to make work my URL dynamically (based in a list of IDs that will be read from a table in my SQLDW).

    Although I got to parametrise my Linkedservice and then set up the parameter (only 1 value) before I execute my pipeline, when I put this in a ForEach ans use a LookUp to read all these values, I got the error below that makes me think (1. The Execution Pipeline is not receiving the values from my LookUp object, 2. The parameters for this dynamic URL are not correctly sett up or 3. This is not possible to do in Data Factory)

    Could you kindly let me know your thoughts about this please? Many Thanks in advance!

    {
        "errorCode": "2200",
        "message": "ErrorCode=UserErrorInvalidValueInPayload,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Failed to convert the value in 'url' property to 'System.String' type. Please make sure the payload structure and value are correct.,Source=Microsoft.DataTransfer.DataContracts,''Type=System.InvalidCastException,Message=Object must implement IConvertible.,Source=mscorlib,'",
        "failureType": "UserError",
        "target": ""
    }

Loading