Adatis

Adatis BI Blogs

Using Lookup, Execute Pipeline and For Each Activity in Azure Data Factory V2

In my previous blog I looked how we can utilise pipeline parameters to variablise certain aspects of a dataset to avoid duplication of work. In this blog I will take that one step further and use parameters to feed into a For Each activity that can iterate over a data object and perform an action for each item. This blog assumes some prior knowledge of Azure Data Factory and it won’t hurt to read my previous blogPreviously I showed how to use a parameter file to copy a single table from Azure SQL DB down into a blob. Now lets use the For Each activity to fetch every table in the database from a single pipeline run. The benefit of doing this is that we don’t have to create any more linked services of data sets, we are only going to create one more pipeline that will contain the loop. The big picture here looks like below. The first activity to note is the lookup activity. This can go off and fetch a value from either SQL or JSON based sources and then incorporate that value into activities further down the chain. Here we are using SQL and you can see that we have supplied a SQL query that will fetch the schema and table names of our database. One “gotcha” is that even though you supply a SQL query, you still need to provide a dummy table name in the SQL dataset. It will use the query above at run time but won’t pass deployment without a table name. Also note that at this point, we do nothing with the returned value.Next, we have the Execute Pipeline activity which can accept input parameters and pass those down into the executed pipelines (or child pipelines as per the diagram). Within the type properties we can specify the parameters we want to pass in. The names here need to match whatever parameters we specify in the child pipeline but for the “value” we can make use of the new expression language to get a hold of the output of the previous lookup activity. We then reference the pipeline we want to execute, and that we need to wait for it to complete before continuing with our parent pipeline. Finally, we use the “dependsOn” attribute to ensure that our Execute Pipeline activity occurs AFTER our lookup has completed successfully. At this point we have told the child pipeline which tables to copy and then told it to start. Our child pipeline now just needs to iterate over that list and produce our output files. To do this it only needs one activity which is the For Each. The For Each really has two components which are the outer configurables (such as the items to iterate over) and then the inner activity to perform on each item. The outer section looks like this: Here we can configure the “isSequential” property which when set to “false” allows Data Factory to parallelise the inner activity, otherwise it will run each activity one after another. The other property is the “items” which is what the activity will iterate through. Because we fed the table list in to the “tableList” parameter from the Execute Pipeline activity we can specify that as our list of items. Now for the inner activity: Whilst this is a fairly chunky bit of JSON, those familiar with the copy activity in ADF V1 will probably feel pretty comfortable with this. The key difference is that we are again making use of expressions and parameters to make our template generic. You can see in the “output” attribute we are dynamically specifying the output blob name by using the schema and table name properties gleaned from our input data set. Also, in the source attribute we dynamically build our SQL query to select all the data from the table that we are currently on using the @item() property. This method of combing text and parameters is called string interpolation and allows us to easily mix static and dynamic content without the needed for additional functions or syntax. That’s it! By making use of only a few extra activities we can really easily do tasks that would have taken much longer in previous versions of ADF. You can find the full collection of JSON objects using this link: http://bit.ly/2zwZFLB. Watch this space for the next blog which will look at custom logging of data factory activities!

Pipeline Parameters in Azure Data Factory V2

The second release of Azure Data Factory (ADF) includes several new features that vastly improve the quality of the service. One of which is the ability to pass parameters down the pipeline into datasets. In this blog I will show how we can use parameters to manipulate a generic pipeline structure to copy a SQL table into a blob. Whilst this is a new feature for ADF, this blog assumes some prior knowledge of ADF (V1 or V2)   Creating the Data Factory V2 One other major difference with ADF V2 is that we no longer need to chain datasets and pipelines to create a working solution. Whilst the concept of dependency still exists, that is no longer needed to run a pipeline, we can now just run them ad hoc. This is important for this demo because we want to run the job once, check it succeeds and then move onto the next table instead of managing any data slices. You can create a new V2 data factory either from the portal or using this command in PowerShell: $df = Set-AzureRmDataFactoryV2 -ResourceGroupName <resource group> –Location <location> -Name <data factory name> If you are familiar with the PowerShell cmdlets for ADF V1 then you can make use of nearly all of them in V2 by appending “V2” to the end of the cmdlet name. ADF V2 Object Templates Now we have a Data Factory to work with we can start deploying objects. In order to make use of parameters we need to firstly specify how we will receive the value of each parameter. Currently there are two ways: 1.      Via a parameter file specified when you invoke the pipeline 2.      Using a lookup activity to obtain a value and pass that into a parameter This blog will focus on the simpler method using a parameter file. Later blogs will demonstrate the use of the lookup activity When we invoke our pipeline, I will show how to reference that file but for now we know we are working with a single parameter called “tableName”. Now we can move on to our pipeline definition which is where the parameter values are initially received. To do this we need to add an attribute called “parameters” to the definition file that will contain all the parameters that will be used within the pipeline. See below: The same concept needs to be carried through to the dataset that we want to feed the parameters in to. Within the dataset definition we need to have the parameter attribute specified in the same way as in the pipeline. Now that we have declared the parameters to the necessary objects I can show you how to pass data into a parameter. As mentioned before, the pipeline parameter will be populated by the parameter file however the dataset parameter will need to be populated from within the pipeline. Instead of simply referring to a dataset by name as in ADF V1 we now need the ability to supply more data and so the “inputs” and “outputs” section of our pipeline now looks like the below:Firstly, we declare the reference type, hence “DatasetReference”. We then give the reference name. This could be parameterised if neededFinally, for each parameter in our dataset (in this case there is only one called “tableName”) we supply the corresponding value from the pipelines parameter set. We can get the value of the pipeline parameter using the “@Pipeline.parameters.<parameter name>” syntax.At this point we have received the values from a file, passed them through the pipeline into a dataset and now it is time to use that value to manipulate the behaviour of the dataset. Because we are using the parameter to define our file name we can use its value as part of the “fileName” attribute, see below:Now we have the ability to input a table name and our pipeline will fetch that table from the database and copy it into a blob. Perhaps a diagram to help provide the big picture:Now we have a complete working pipeline that is totally generic, meaning we can change the parameters we feed in but should never have to change the JSON definition files. A pipeline such as this could have many uses but in the next blog I will show how we can use a ForEach loop (another ADF v2 feature) to copy every table from a data base still only using a single pipeline and some parameters. P.S. Use this link to see the entire json script used to create all the objects required for this blog.  http://bit.ly/2zwZFLB

Automating The Deployment of Azure Data Factory Custom Activities

Custom Activities in Azure Data Factory (ADF) are a great way to extend the capabilities of ADF by utilising C# functionality. Custom Activities are useful if you need to move data to/from a data store that ADF does not support, or to transform/process data in a way that isn't supported by Data Factory, as it can be used within an ADF pipeline.Deploying Custom Activities to ADF is a manual process, which requires many steps. Microsoft’s documentation lists them as:Compile the project. Click Build from the menu and click Build Solution.Launch Windows Explorer, and navigate to bin\debug or bin\release folder depending on the type of build.Create a zip file MyDotNetActivity.zip that contains all the binaries in the \bin\Debug folder. Include the MyDotNetActivity.pdb file so that you get additional details such as line number in the source code that caused the issue if there was a failure.Create a blob container named customactivitycontainer if it does not already existUpload MyDotNetActivity.zip as a blob to the customactivitycontainer in a general purpose Azure blob storage that is referred to by AzureStorageLinkedService.The number of steps means that it can take some time to deploy Custom Activities and, because it is a manual process, can contain errors such as missing files or uploading to the wrong storage account. To avoid that errors and delays caused by a manual deployment, we want to automate as much as possible. Thanks to PowerShell, it’s possible to automate the entire deployment steps. The script to do this is as follows:Login-AzureRmAccount# Parameters $SourceCodePath = "C:\PathToCustomActivitiesProject\"$ProjectFile ="CustomActivities.csproj"$Configuration = "Debug" #Azure parameters$StorageAccountName = "storageaccountname"$ResourceGroupName = "resourcegroupname"$ContainerName = "blobcontainername"# Local Variables$MsBuild = "C:\Program Files (x86)\MSBuild\14.0\Bin\MSBuild.exe";            $SlnFilePath = $SourceCodePath + $ProjectFile;                                         # Prepare the Args for the actual build            $BuildArgs = @{                 FilePath = $MsBuild                 ArgumentList = $SlnFilePath, "/t:rebuild", ("/p:Configuration=" + $Configuration), "/v:minimal"                 Wait = $true                 }          # Start the build            Start-Process @BuildArgs # initiate a sleep to avoid zipping up a half built projectSleep 5# create zip file $zipfilename = ($ProjectFile -replace ".csproj", "") + ".zip"$source = $SourceCodePath + "bin\" + $Configuration$destination = $SourceCodePath + $zipfilenameif(Test-path $destination) {Remove-item $destination}Add-Type -assembly "system.io.compression.filesystem"[io.compression.zipfile]::CreateFromDirectory($Source, $destination) #create storage account if not exists$storageAccount = Get-AzureRmStorageAccount -ErrorAction Stop | where-object {$_.StorageAccountName -eq $StorageAccountName}       if  ( !$storageAccount ) {     $StorageLocation = (Get-AzureRmResourceGroup -ResourceGroupName $ResourceGroupName).Location     $StorageType = "Standard_LRS"     New-AzureRmStorageAccount -ResourceGroupName $ResourceGroupName  -Name $StorageAccountName -Location $StorageLocation -Type $StorageType} #create container if not exists$ContainerObject = Get-AzureStorageContainer -ErrorAction Stop | where-object {$_.Name -eq $ContainerName}if (!$ContainerObject){$storagekey = Get-AzureRmStorageAccountKey -ResourceGroupName $ResourceGroupName -Name $StorageAccountName$context = New-AzureStorageContext -StorageAccountName $StorageAccountName -StorageAccountKey $storagekey.Key1 -Protocol HttpNew-AzureStorageContainer -Name $ContainerName -Permission Blob -Context $context} # upload to blob#set default contextSet-AzureRmCurrentStorageAccount -StorageAccountName $StorageAccountName -ResourceGroupName  $ResourceGroupNameGet-AzureRmStorageAccount -ResourceGroupName $ResourceGroupName -Name $StorageAccountName # Upload fileSet-AzureStorageBlobContent –Container $ContainerName -File $destination By removing the manual steps in building, zipping and deploying ADF Custom Activities, you remove the risk of something going wrong and you add the reassurance that you have a consistent method of deployment which will hopefully speed up your overall development and deployments.As always, if you have any questions or comments, do let me know.