Matt How

Matt How's Blog

Getting started with Azure Data Factory Mapping Data Flows

image

Azure Data Factory V2 is the go-to service for moving large amounts of data within the Azure platform and, up until relatively recently, was focussed predominantly on control flow rather than data flow. Those familiar with SQL Server Integration Services will be aware of the difference however, to clarify, Control flow is used to orchestrate activities within a data pipeline whilst Data flow is used to physically move and transform the data.

In May 2019, the Azure Data Factory team released into public preview the Azure Data Factory Mapping Data Flows (MDF) functionality, which effectively moves ADF from being an orchestration tool into a fully fledged ETL tool. This new feature allows developers to configure data transformation logic using a no code, drag and drop approach and implements many of the transformation concepts that existed in SSIS. In fact, there exists an excellent comparison between SSIS, SQL and ADF MDF written by Kamil Nowinski available through this link which highlights how the three technologies matchup: https://sqlplayer.net/2018/12/azure-data-factory-v2-and-its-available-components-in-data-flows/. Points to note are that ADF does not have event handlers in the way SSIS does and also does not have an equivalent to the script component within Data Flows. It does have a custom activity which allows you to write C# within the control flow but currently you cannot write any custom code within a Mapping Data Flow.

Prior to the release of MDF, Data Factory could move data from one place to another with its copy activity however it could not modify the file in any meaningful way and would require external services such as SQL Data Warehouse or Data Lake Analytics (RIP) to be used to fill this gap. The copy activity performed the data movement using the Azure Integration Runtime which provided the compute power needed to complete the operation, so does that mean that Mapping Data Flows run on the same compute resource?

NO is the answer. In fact, your graphical data flow is converted into Scala and then compiled into a JAR library to be executed on a Databricks (Spark) cluster which is deployed and managed by Microsoft solely for running your data flow activities. This does mean that any pipelines that utilise MDF have a slight overhead to allow for the cluster to start-up and configure, however Microsoft are looking at ways to reduce this and there is no need for an ETL developer looking to build MDF’s to know anything about Databricks, Scala or Spark Clusters – although it will certainly help!

So, it’s in public preview so let’s get using it! This blog will walk through the process of creating a basic cleaning data flow that will populate a SQL Database table with the values from a delimited text file.

To begin, we need some data factory objects, anyone familiar with data factory will understand we need Linked Services and Datasets to connect to our data and create schemas, and Pipelines to give structure to the activities and create our control flow. In this example I have a small comma separated file that has some data quality issues such as,

  • Leading and trailing white space
  • Weird column names
  • Non standardised NULL values
  • Weak data types

The data flow we will create will address all of these issues before depositing the file into a SQL Database.

  1. First, we should create a linked service that connects to a blob store or data lake account where our dirty file is stored.
  2. We then need a dataset that sits on top of the Linked Service which allows us to locate and read the file using the specified parameters such as file path, column delimiter, row delimiter etc. Additionally, in this dataset we can import the schema of the file so that we have some column names available.
  3. Next, we can create a linked service that connects to our output SQL Database and also a dataset that points to the correct table again, importing the schema.
  4. After creating these objects we also need a pipeline to orchestrate the process and this will ultimately call our fancy new data flow. You only need a single activity in this pipeline which will be found under the “Move & Transform” heading and is called “Data Flow (Preview)”

At this point you should have something similar to the following with the exception of the data flow object under the Data Flows (Preview) tab:

Picture1

NOTE: I have not gone into much detail about creating these objects as they will be familiar to most ADF developers. For pointers on how to create the above follow this link: https://docs.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-portal

Now we can begin creating our Data Flow. Firstly, we will add the source and the sink to our data flow, this gives us our bookends as a clear starting point, and we can then play about with middle bit afterwards. Also, at this point, toggle the switch at the top of the Data Factory UI named “Data Flow Debug”, this will start a cluster up for you and avoid you having to wait later on

  1. Hover over the Data Flows (Preview) header and click on the ellipsis, choose “Add Data Flow” and this will take you into the Mapping Data Flow UI where you can begin creating you Data Flows. Remember to set a sensible name for your data flow in the general tab of the data flow UI.
  2. Click “Add Source” and we can begin configuring the source activity of our data flow.
    • Provide a clear name that identifies the source. This source is fed from a dataset so the names should closely match.
    • Choose the dataset that is linked to your blob account and uncheck “Allow schema drift”. This is a useful option and allows for constantly changing sources files to be handled however we will cover that in a later blog.
    • On the “Source Options” tab you can choose to add actions that occur after completion such as deleting or moving source files.
    • On the “Projection” tab you can tailor your schema which is defined in the source dataset. My preference is to leave these all as strings to avoid any early typing errors as we will address these later in the flow.
    • Finally, on the Optimize, Inspect and Data Preview tabs, all defaults can remain the same. Capture
  3. Now click on the tiny + icon in the bottom right corner of the source activity and choose “Sink” from the drop-down list. Here we can configure the output of our flow which in this case will be a SQL Database.
    • Specify the name, again relating it to your sink dataset, and choose the SQL Database dataset created earlier.
    • On the “Settings” tab you can choose which methods can be used by ADF when working with your table. These can be any combination of Insertion, Deletion, Upserting or Updating. Also, you can define actions to occur in the database before loading the data such as recreating the entire table or truncating the existing table before loading into it.
    • Finally, on the “Mapping” tab you can map columns from source to sink. Be aware that any columns that are not strings in your database will not be able to be mapped until the data typing has occurred.

Capture2

Now we have a basic copy structure that does nothing clever yet but does connect our dirty source file up to our sink dataset. We can begin doing the actual transform.

  1. The first transformation we will do will be to Trim white space from columns. Click on the + icon and choose “Derived Column”. Within the “Derived Column Settings” tab you should add each of the columns in your source dataset and then enter the following expression for each one in the expressions editor: trim({column name}). This expression will remove any whitespace from the columns value ensuring the database receives a clean value. image
  2. Now we will standardise any NULL values and also transform any columns into their correct data types. To do this, click the + icon again and choose “Derived Column” again. Similar to the above step you can add an entry in the “Derived Column Settings” tab for each column, adding the below expression for each column: replace(replace({column name}, ' ',''),'Unknown',''). This expression will replace any empty values with NULL and also any values where we have ‘Unknown’ will also get replaced with NULL so that we have some standardisation before loading into the database. Any NULL values already present will be untouched.
  3. In my dataset I need to change one column from its originating string data type into an int so that it can be dropped in the DB. Rather than doing this change in place, it is best to create a new column to do this so that you have an original column and the new column with the correct type.
    • Whilst still in the expression editor, hover over any column name in the “OUTPUT SCHEMA” window that is visible on the left hand side and choose the + icon. This will allow you add a new column to you data flow and you can use any of the conversion functions (toInteger, toLong, toString, toDate, toBoolean etc) to coerce the value into its correct type.

image

At this point you should have four steps that resemble the below screenshot. Once your Data Flow Debug session is online you can debug the data flow and hopefully see the nice clean values pass through into the database. Throughout this process I recommend taking a peek at the Inspect and Data Preview tabs. The Inspect tabs give a bit more information about what steps are taking place on the data in that activity and the Data Preview will show you how the data will look, although the Debug session needs to be active for this to work. Finally, the optimize tab allows you to set the partitioning of the data using techniques such as Round Robin, HASH and range distribution although these are out of the scope of this blog.

Hopefully this brief tutorial has been helpful and allowed you to gain some early knowledge on Data Factory Mapping Data Flows meaning that you can go on to create Flows that are tailored to your needs and cover off a wider variety of scenarios. Any questions, thoughts or feedback, please catch me on twitter @MattTheHow.

Using ADF V2 Activities in Logic Apps

Logic Apps recently introduced the ability to connect to an Azure Data Factory V2 instance and perform a number of actions including cancelling a pipeline, invoking a pipeline and also interrogating the service for information about a particular pipeline run. This blog will focus on how to utilise the full potential of ADF V2 via a Logic App connector when calling a pipeline. In its current form there is no way to provide input parameters to ADF from Logic Apps, thereby hamstringing one of the best features about ADF V2! Later in this blog I will show how to get around that. Despite this there are a number of reasons why a Logic App calling an ADF V2 pipeline with a simple activity is a great thing and I have gone in to some detail below.

Event Driven Execution

The Azure Data Factory V2 team are now starting to bring in event driven triggers but currently this is limited only to the creation or modification blobs in a blob storage account. This is a good start but pales in comparison to the overwhelming number of events that can trigger a Logic App, notably things like Service Bus, Event Grid, HTTP calls etc etc… the list goes on! More info on Logic App connectors here. By utilising Logic Apps as a wrapper for your ADF V2 pipelines you can open up a huge amount of opportunities to diversify what triggers a pipeline run.

Simplifying Loops, Conditionals and Failure Paths

In addition to event driven triggers, the ADF team have also brought in an IF activity and a number of looping activities which are really useful in a lot of scenarios. However, This kind of logic can be simplified when its built in Logic Apps. The visual editor in Logic Apps makes understanding the flow of a loop or an IF seem much simpler to those maybe not so familiar with the business logic that's been implemented. Additionally failure paths can be handled much more efficiently and clearly with many more options for logging or notifying of failure right out the box – e.g. the Send Email activity.

Heavy lifting of Data

Logic Apps, for all its benefits, is definitely not a heavy lifter of data. They really excel at lightweight messaging and orchestration whereas Data Factory is great at moving big chunks of data when using the Copy activity. When you pair these two together you get something that resembles SSIS Control Flow (Logic Apps) and SSIS Data Flow (Data Factory). Now that they can be closely and easily integrated it makes orchestrating the logical flow and movement of data in the cloud much simpler.

So now we know that pairing Logic Apps and Data Factory V2 is a great idea, lets look at how to do it. Understandably there are no triggers from Data Factory at this point so you will need to trigger your Logic App in any one of the million ways that are available. Once you have your trigger sorted you can search for the Data Factory connecter and choose the action you need. You will then need to connect to the Data Factory service as below by logging into your Azure tenant.

image

Once logged in you will need to supply a few details to locate the Data Factory instance and pipeline as below.

image

Now you have completed this you can trigger your Logic App and you will see that your Data Factory pipeline will be invoked as normal. However, by using only this method we are depriving ourselves of the ability to derive parameter values outside of Data Factory and pass them in at execution time. Currently there is no where to supply the input data! This is where the blog gets a bit hacky and we will venture into the Code View of the Logic App. [Side note: you can usually get around a lot of tricky situations by manipulating the code in logic apps and not just relying on what the UI gives you] Click the “Code View” button in the designer and then locate your Data Factory V2 activity definition in the code page. It will look something like this (I have whited out my subscription id)

image

In order to pass in data to this Data Factory pipeline, all we need to do is add a “body” attribute within the “inputs” object like the below image. We can then reference any of the variables called out in the Logic App or any other value that might be available to us. In case you’re wondering, these input values need to configured as input parameters to the Data Factory Pipeline you will be calling. More on how to do that here

image

Once you have updated your code accordingly then save and run your logic app and you should be able to see the pipeline invoked with your inputs in the Data Factory service as below.

image

In conclusion, the marriage of Logic Apps and Data Factory is a happy and harmonious one with many benefits to be exploited. Now armed with the key to utilise input parameters for pipelines called from Logic Apps you can cater for a vast amount of data integration scenarios that require complex but clear logic and heavy lifting of data. Any questions or comments please supply below or catch me on twitter @MattTheHow.

Using Lookup, Execute Pipeline and For Each Activity in Azure Data Factory V2

In my previous blog I looked how we can utilise pipeline parameters to variablise certain aspects of a dataset to avoid duplication of work. In this blog I will take that one step further and use parameters to feed into a For Each activity that can iterate over a data object and perform an action for each item. This blog assumes some prior knowledge of Azure Data Factory and it won’t hurt to read my previous blog

Previously I showed how to use a parameter file to copy a single table from Azure SQL DB down into a blob. Now lets use the For Each activity to fetch every table in the database from a single pipeline run. The benefit of doing this is that we don’t have to create any more linked services of data sets, we are only going to create one more pipeline that will contain the loop. The big picture here looks like below.

image

The first activity to note is the lookup activity. This can go off and fetch a value from either SQL or JSON based sources and then incorporate that value into activities further down the chain.

image

Here we are using SQL and you can see that we have supplied a SQL query that will fetch the schema and table names of our database. One “gotcha” is that even though you supply a SQL query, you still need to provide a dummy table name in the SQL dataset. It will use the query above at run time but won’t pass deployment without a table name. Also note that at this point, we do nothing with the returned value.

Next, we have the Execute Pipeline activity which can accept input parameters and pass those down into the executed pipelines (or child pipelines as per the diagram).

image

Within the type properties we can specify the parameters we want to pass in. The names here need to match whatever parameters we specify in the child pipeline but for the “value” we can make use of the new expression language to get a hold of the output of the previous lookup activity. We then reference the pipeline we want to execute, and that we need to wait for it to complete before continuing with our parent pipeline. Finally, we use the “dependsOn” attribute to ensure that our Execute Pipeline activity occurs AFTER our lookup has completed successfully.

At this point we have told the child pipeline which tables to copy and then told it to start. Our child pipeline now just needs to iterate over that list and produce our output files. To do this it only needs one activity which is the For Each. The For Each really has two components which are the outer configurables (such as the items to iterate over) and then the inner activity to perform on each item. The outer section looks like this:

image

Here we can configure the “isSequential” property which when set to “false” allows Data Factory to parallelise the inner activity, otherwise it will run each activity one after another. The other property is the “items” which is what the activity will iterate through. Because we fed the table list in to the “tableList” parameter from the Execute Pipeline activity we can specify that as our list of items.

Now for the inner activity:

image

Whilst this is a fairly chunky bit of JSON, those familiar with the copy activity in ADF V1 will probably feel pretty comfortable with this. The key difference is that we are again making use of expressions and parameters to make our template generic. You can see in the “output” attribute we are dynamically specifying the output blob name by using the schema and table name properties gleaned from our input data set. Also, in the source attribute we dynamically build our SQL query to select all the data from the table that we are currently on using the @item() property. This method of combing text and parameters is called string interpolation and allows us to easily mix static and dynamic content without the needed for additional functions or syntax.

That’s it! By making use of only a few extra activities we can really easily do tasks that would have taken much longer in previous versions of ADF. You can find the full collection of JSON objects using this link: http://bit.ly/2zwZFLB. Watch this space for the next blog which will look at custom logging of data factory activities!