Matt How

Matt How's Blog

Getting started with Azure Data Factory Mapping Data Flows

image

Azure Data Factory V2 is the go-to service for moving large amounts of data within the Azure platform and, up until relatively recently, was focussed predominantly on control flow rather than data flow. Those familiar with SQL Server Integration Services will be aware of the difference however, to clarify, Control flow is used to orchestrate activities within a data pipeline whilst Data flow is used to physically move and transform the data.

In May 2019, the Azure Data Factory team released into public preview the Azure Data Factory Mapping Data Flows (MDF) functionality, which effectively moves ADF from being an orchestration tool into a fully fledged ETL tool. This new feature allows developers to configure data transformation logic using a no code, drag and drop approach and implements many of the transformation concepts that existed in SSIS. In fact, there exists an excellent comparison between SSIS, SQL and ADF MDF written by Kamil Nowinski available through this link which highlights how the three technologies matchup: https://sqlplayer.net/2018/12/azure-data-factory-v2-and-its-available-components-in-data-flows/. Points to note are that ADF does not have event handlers in the way SSIS does and also does not have an equivalent to the script component within Data Flows. It does have a custom activity which allows you to write C# within the control flow but currently you cannot write any custom code within a Mapping Data Flow.

Prior to the release of MDF, Data Factory could move data from one place to another with its copy activity however it could not modify the file in any meaningful way and would require external services such as SQL Data Warehouse or Data Lake Analytics (RIP) to be used to fill this gap. The copy activity performed the data movement using the Azure Integration Runtime which provided the compute power needed to complete the operation, so does that mean that Mapping Data Flows run on the same compute resource?

NO is the answer. In fact, your graphical data flow is converted into Scala and then compiled into a JAR library to be executed on a Databricks (Spark) cluster which is deployed and managed by Microsoft solely for running your data flow activities. This does mean that any pipelines that utilise MDF have a slight overhead to allow for the cluster to start-up and configure, however Microsoft are looking at ways to reduce this and there is no need for an ETL developer looking to build MDF’s to know anything about Databricks, Scala or Spark Clusters – although it will certainly help!

So, it’s in public preview so let’s get using it! This blog will walk through the process of creating a basic cleaning data flow that will populate a SQL Database table with the values from a delimited text file.

To begin, we need some data factory objects, anyone familiar with data factory will understand we need Linked Services and Datasets to connect to our data and create schemas, and Pipelines to give structure to the activities and create our control flow. In this example I have a small comma separated file that has some data quality issues such as,

  • Leading and trailing white space
  • Weird column names
  • Non standardised NULL values
  • Weak data types

The data flow we will create will address all of these issues before depositing the file into a SQL Database.

  1. First, we should create a linked service that connects to a blob store or data lake account where our dirty file is stored.
  2. We then need a dataset that sits on top of the Linked Service which allows us to locate and read the file using the specified parameters such as file path, column delimiter, row delimiter etc. Additionally, in this dataset we can import the schema of the file so that we have some column names available.
  3. Next, we can create a linked service that connects to our output SQL Database and also a dataset that points to the correct table again, importing the schema.
  4. After creating these objects we also need a pipeline to orchestrate the process and this will ultimately call our fancy new data flow. You only need a single activity in this pipeline which will be found under the “Move & Transform” heading and is called “Data Flow (Preview)”

At this point you should have something similar to the following with the exception of the data flow object under the Data Flows (Preview) tab:

Picture1

NOTE: I have not gone into much detail about creating these objects as they will be familiar to most ADF developers. For pointers on how to create the above follow this link: https://docs.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-portal

Now we can begin creating our Data Flow. Firstly, we will add the source and the sink to our data flow, this gives us our bookends as a clear starting point, and we can then play about with middle bit afterwards. Also, at this point, toggle the switch at the top of the Data Factory UI named “Data Flow Debug”, this will start a cluster up for you and avoid you having to wait later on

  1. Hover over the Data Flows (Preview) header and click on the ellipsis, choose “Add Data Flow” and this will take you into the Mapping Data Flow UI where you can begin creating you Data Flows. Remember to set a sensible name for your data flow in the general tab of the data flow UI.
  2. Click “Add Source” and we can begin configuring the source activity of our data flow.
    • Provide a clear name that identifies the source. This source is fed from a dataset so the names should closely match.
    • Choose the dataset that is linked to your blob account and uncheck “Allow schema drift”. This is a useful option and allows for constantly changing sources files to be handled however we will cover that in a later blog.
    • On the “Source Options” tab you can choose to add actions that occur after completion such as deleting or moving source files.
    • On the “Projection” tab you can tailor your schema which is defined in the source dataset. My preference is to leave these all as strings to avoid any early typing errors as we will address these later in the flow.
    • Finally, on the Optimize, Inspect and Data Preview tabs, all defaults can remain the same. Capture
  3. Now click on the tiny + icon in the bottom right corner of the source activity and choose “Sink” from the drop-down list. Here we can configure the output of our flow which in this case will be a SQL Database.
    • Specify the name, again relating it to your sink dataset, and choose the SQL Database dataset created earlier.
    • On the “Settings” tab you can choose which methods can be used by ADF when working with your table. These can be any combination of Insertion, Deletion, Upserting or Updating. Also, you can define actions to occur in the database before loading the data such as recreating the entire table or truncating the existing table before loading into it.
    • Finally, on the “Mapping” tab you can map columns from source to sink. Be aware that any columns that are not strings in your database will not be able to be mapped until the data typing has occurred.

Capture2

Now we have a basic copy structure that does nothing clever yet but does connect our dirty source file up to our sink dataset. We can begin doing the actual transform.

  1. The first transformation we will do will be to Trim white space from columns. Click on the + icon and choose “Derived Column”. Within the “Derived Column Settings” tab you should add each of the columns in your source dataset and then enter the following expression for each one in the expressions editor: trim({column name}). This expression will remove any whitespace from the columns value ensuring the database receives a clean value. image
  2. Now we will standardise any NULL values and also transform any columns into their correct data types. To do this, click the + icon again and choose “Derived Column” again. Similar to the above step you can add an entry in the “Derived Column Settings” tab for each column, adding the below expression for each column: replace(replace({column name}, ' ',''),'Unknown',''). This expression will replace any empty values with NULL and also any values where we have ‘Unknown’ will also get replaced with NULL so that we have some standardisation before loading into the database. Any NULL values already present will be untouched.
  3. In my dataset I need to change one column from its originating string data type into an int so that it can be dropped in the DB. Rather than doing this change in place, it is best to create a new column to do this so that you have an original column and the new column with the correct type.
    • Whilst still in the expression editor, hover over any column name in the “OUTPUT SCHEMA” window that is visible on the left hand side and choose the + icon. This will allow you add a new column to you data flow and you can use any of the conversion functions (toInteger, toLong, toString, toDate, toBoolean etc) to coerce the value into its correct type.

image

At this point you should have four steps that resemble the below screenshot. Once your Data Flow Debug session is online you can debug the data flow and hopefully see the nice clean values pass through into the database. Throughout this process I recommend taking a peek at the Inspect and Data Preview tabs. The Inspect tabs give a bit more information about what steps are taking place on the data in that activity and the Data Preview will show you how the data will look, although the Debug session needs to be active for this to work. Finally, the optimize tab allows you to set the partitioning of the data using techniques such as Round Robin, HASH and range distribution although these are out of the scope of this blog.

Hopefully this brief tutorial has been helpful and allowed you to gain some early knowledge on Data Factory Mapping Data Flows meaning that you can go on to create Flows that are tailored to your needs and cover off a wider variety of scenarios. Any questions, thoughts or feedback, please catch me on twitter @MattTheHow.

Using ADF V2 Activities in Logic Apps

Logic Apps recently introduced the ability to connect to an Azure Data Factory V2 instance and perform a number of actions including cancelling a pipeline, invoking a pipeline and also interrogating the service for information about a particular pipeline run. This blog will focus on how to utilise the full potential of ADF V2 via a Logic App connector when calling a pipeline. In its current form there is no way to provide input parameters to ADF from Logic Apps, thereby hamstringing one of the best features about ADF V2! Later in this blog I will show how to get around that. Despite this there are a number of reasons why a Logic App calling an ADF V2 pipeline with a simple activity is a great thing and I have gone in to some detail below.

Event Driven Execution

The Azure Data Factory V2 team are now starting to bring in event driven triggers but currently this is limited only to the creation or modification blobs in a blob storage account. This is a good start but pales in comparison to the overwhelming number of events that can trigger a Logic App, notably things like Service Bus, Event Grid, HTTP calls etc etc… the list goes on! More info on Logic App connectors here. By utilising Logic Apps as a wrapper for your ADF V2 pipelines you can open up a huge amount of opportunities to diversify what triggers a pipeline run.

Simplifying Loops, Conditionals and Failure Paths

In addition to event driven triggers, the ADF team have also brought in an IF activity and a number of looping activities which are really useful in a lot of scenarios. However, This kind of logic can be simplified when its built in Logic Apps. The visual editor in Logic Apps makes understanding the flow of a loop or an IF seem much simpler to those maybe not so familiar with the business logic that's been implemented. Additionally failure paths can be handled much more efficiently and clearly with many more options for logging or notifying of failure right out the box – e.g. the Send Email activity.

Heavy lifting of Data

Logic Apps, for all its benefits, is definitely not a heavy lifter of data. They really excel at lightweight messaging and orchestration whereas Data Factory is great at moving big chunks of data when using the Copy activity. When you pair these two together you get something that resembles SSIS Control Flow (Logic Apps) and SSIS Data Flow (Data Factory). Now that they can be closely and easily integrated it makes orchestrating the logical flow and movement of data in the cloud much simpler.

So now we know that pairing Logic Apps and Data Factory V2 is a great idea, lets look at how to do it. Understandably there are no triggers from Data Factory at this point so you will need to trigger your Logic App in any one of the million ways that are available. Once you have your trigger sorted you can search for the Data Factory connecter and choose the action you need. You will then need to connect to the Data Factory service as below by logging into your Azure tenant.

image

Once logged in you will need to supply a few details to locate the Data Factory instance and pipeline as below.

image

Now you have completed this you can trigger your Logic App and you will see that your Data Factory pipeline will be invoked as normal. However, by using only this method we are depriving ourselves of the ability to derive parameter values outside of Data Factory and pass them in at execution time. Currently there is no where to supply the input data! This is where the blog gets a bit hacky and we will venture into the Code View of the Logic App. [Side note: you can usually get around a lot of tricky situations by manipulating the code in logic apps and not just relying on what the UI gives you] Click the “Code View” button in the designer and then locate your Data Factory V2 activity definition in the code page. It will look something like this (I have whited out my subscription id)

image

In order to pass in data to this Data Factory pipeline, all we need to do is add a “body” attribute within the “inputs” object like the below image. We can then reference any of the variables called out in the Logic App or any other value that might be available to us. In case you’re wondering, these input values need to configured as input parameters to the Data Factory Pipeline you will be calling. More on how to do that here

image

Once you have updated your code accordingly then save and run your logic app and you should be able to see the pipeline invoked with your inputs in the Data Factory service as below.

image

In conclusion, the marriage of Logic Apps and Data Factory is a happy and harmonious one with many benefits to be exploited. Now armed with the key to utilise input parameters for pipelines called from Logic Apps you can cater for a vast amount of data integration scenarios that require complex but clear logic and heavy lifting of data. Any questions or comments please supply below or catch me on twitter @MattTheHow.

Optimising Lookups with Caching

One of, if not the most, used transformations in SQL Server Integration Services (SSIS) must be the Lookup transform. Pretty much every ETL routine will make use of the component at some point, especially at the time of loading fact tables. Given the nature of fact and dimension tables, these lookups can be tasked with loading large amounts of data in memory to perform high speed lookups. There are occasions when a few optimisations on these lookups can greatly improve the efficiency of your ETL.

 

Choosing the Best Cache Option

One of the most important options featured on the Lookup Transform is the Cache Mode. By understanding the differences between these choices, you can be sure to pick the option that best suits your ETL needs.

image

 

Using the Full Cache Option (Default)

The Full Cache option tell SSIS to consume all your reference data into memory and then perform its lookups on that data. This has the benefit of only hitting the database once to fetch the data but can cause delay on package start-up as all the lookup data will have to be loaded prior to executing the package. You can optimise this by using a SQL Command to fetch only the columns you need but as you can imagine, a dimension with > 1million rows will take some time to cache and this is before you have even done any lookups!

Be aware that if you cannot fit your entire reference set into memory your package will FAIL!! SSIS cannot spool data out to disk if Full Cache is chosen and there is no graceful way of handling this error either. Luckily there is a great blog on how to calculate the cache size here.

Use this option when:

  • You want to avoid excessive traffic to your database or it is remote
  • You have a small reference table that can be cached quickly
  • You can fit your whole lookup dataset into memory

 

Using the Partial Cache Option

The Partial Cache is a good half way house. This option will tell SSIS to fetch the data at runtime, meaning a quicker start-up time but also a lot more queries to the database. Any value not currently in the cache will be queried for in the database and then stored in the cache, making it available for future lookups. The efficiency here comes from not having to charge the cache on package start-up and then also from having any previously fetched items stored in memory. One very specific benefit the partial cache has is that if your lookup table is being written to during the data flow, the full cache wouldn’t see those rows as it only queries the database once prior to execution. The partial cache however works perfectly here as it will see all the rows in the table at runtime.

Additional performance can be gained by making use of the Miss Cache. This option (found on the advanced page) allows a portion of the cache to be reserved for rows that don’t have a match in the database. SSIS will then know not to look for these rows again, thereby saving any unnecessary queries.

Use this option when:

  • You have a small reference set not worth charging a full cache for
  • You want to optimise your cache for more frequently accessed rows.
  • Your reference table is being updated within your dataflow.

 

Using the No Cache Option

Finally, No Cache literally means that no data will be cached. This means that every single row will be looked up against the database, even if that row has already been looked up before.

Use this option when:

  • You only need to process a small number of rows
  • You have severe memory limitations
  • For some reason, the partial cache wasn’t a better option

 

The Cascading Lookup Pattern

A nice way of optimising your lookups is to combine a full and partial cache creating what is known as a Cascading Lookup. The idea here is to use the full cache to store a subset of your reference data that is frequently looked up and then a partial cache for the rest. This works because the full cache shouldn’t take too long to charge, as it’s only a subset of the data, but will still have the benefits of being in memory. The partial cache will be used for the larger, non-frequently looked up data as it will have no charging time but can also make use of the cache to avoid querying the database twice for the same lookup value. An added benefit is that if the reference table is written to during the data flow, these rows will be caught by the partial cache lookup.

In this example, we have a large customer dimension, but we know that only 500 of those customers make a lot of purchases and so we can use a cascading lookup to optimise this dataflow.

image 

  1. Firstly, choose the Full Cache option. Then use a select statement to get only the data that is most looked up against. In this case, the 500 most loyal customers. Remember to redirect non-matching rows to the non-matching output!
  2. Now choose the Partial Cache option and then use a select statement to get the rest of the dataset – excluding the loyal customers.
  3. Finally, union the rows back together so that you have a single result set.

 

The Cache Connection Manager

Another option that is available is to use the Cache Connection Manager. This is akin to having a persisted full cache that can be referenced several times throughout your package. Better yet, you can convert a package cache connection manager to a project cache connection manager so that it can be referenced by other packages in the project! This offers multiple benefits including:

  • Charging your lookup caches in parallel to speed up package start-up
  • Caches can be loaded at any given time (although must be before the lookups are needed) so if you need to load the dimension during your ETL, you can.
  • Caches will only need to be charged once for your whole ETL project, meaning you can avoid similar lookups querying the same data twice.

The way to implement a Cache Connection Manager is seriously easy!

  • To start you need to create a data flow task and then drag a data source onto the design surface.
  • Below that, add a Cache Transform and connect the two up. Once you have configured your data source (remembering to select only the columns you absolutely need!), open the editor for the cache transform. Where prompted for a cache connection manager, select New.
  • On the General page, you can add a name and description.
  • On the Columns page, you will need to do some configuration. For the column/columns you want look up on you should add an Index Position. The lowest index number that is > 0 will be looked up first followed by any other subsequent columns with an Index Position > 0. Any columns with a 0 index are columns that you want to pull into the data flow. In this case that is the CustomerKey. You can also set data types here.

image

  • Once that’s complete you can map the columns in the cache transform.
  • Finally, you will now see a new connection manager in your Connection Managers window. Right click on the new cache connection manager and click Convert to Project.

An implementation of a cache connection manager might have a parent package like the below. Within each “Populate XXXXXX Cache” is a dataflow that extracts source data and puts it into a cache transform. The fact table load packages below that can now reference all the dimensional data using the cache connection manager.

image

In conclusion, there is a lot of optimisation that can be done with regards to lookups and their caching options. Not all of these need to be implemented all of the time. It is more on a case by case basis but knowing that there are options is half the battle.