Adatis BI Blogs

Azure Data Factory Custom Activity Development–Part 5: Using Cross Application Domains in ADF

This is the fifth and final post on a series on Azure Data Factory Custom Activity Development.IntroductionThere are times when the assemblies offered by the ADF hosted platform are not what we want, and we need to consider pushing our code into a separate application domain in order to make use of different dlls. Here’s an article on how I did this on a recent project.The Development ScenarioIf you’ve been following the previous posts, you’ll know that as part of an ADF project I created a custom activity for Data Validation that constructs a validation query, runs a Hive job, examines the result of the job (a query output) and then takes some appropriate action (warnings, alerts etc.). Because it has a set of conditional actions encapsulated within it, alerting based on validation results, the executing of the Hive query within a custom activity, and subsequent logic was deemed a better solution than trying to use a pipeline with the native Hive script runner activity followed by some attempt to execute alerting conditionally via pipeline activities. The Techie BitsIn order to run the Hive job, you need to use the Microsoft.Azure.Management.HDInsight.Job assembly, which has a dependency on the Microsoft.WindowsAzure.Storage assembly. The version of the Storage assembly required however is not supported on the ADF batch platform. ADF v1 does not support beyond v4.3. The version required by the HDInsight.Job assembly is v6.0, so quite a way along the line in versioning terms. Hmmm. In fairness to Microsoft, this restriction is stated in the following article:“ImportantData Factory service launcher requires the 4.3 version of WindowsAzure.Storage. If you add a reference to a later version of Azure Storage assembly in your custom activity project, you see an error when the activity executes. To resolve the error, see Appdomain isolation section.”If, like me, you have managed to avoid reading this rather large caveat, you will however only be made aware of this when using classes from the assembly that are different to those in v4.3. You can still reference the assembly and use the classes, but when you call something that has a different interface, such as a method that has different parameters between versions, you then find out that your code will not run with that version of the assembly and you will be told that your method signature is not available in WindowsAzure.Storage v4.3. You won’t find this out however until you actually run your code in the ADF platform and get an error. Oh dear. Oh deary deary dear. Not good. Not good at all.Don’t Panic…Well as stated above, you have to roll up your DotNet sleeves, and crack on with some Cross App-Domain stuff, involving serializing the classes you need and all sorts of other rather tricky bits of development. Why? Well, in order to run your v6.0 Microsoft.WindowsAzure.Storage assembly, you will need to use a separate application domain with your own required version of the above Storage assembly loaded, thereby removing the ADF platform restriction. Not exactly your average day at the Custom Activity development workbench, but we all love a challenge right? Now you’re probably aware that serialization has been well implemented in DotNet for all sorts of scenarios from remoting to caching and other common use cases that require moving objects in and out of memory spaces. Now, what we need to do is to serialize the objects that will be required across our domains, pass them into a method for the execution of the activity and we’re cooking. Piece of cake.Ah. The objects passed into the Execute method, being Activity, LinkedService and DataSet are NOT Serializable! But that is really easy for the Microsoft chaps to implement? There are loads of classes in DotNet that have native serialization built in right? With this running in a separate hosted process on ADF, surely it would have made sense to support ISerializable? Well to me it does make perfect sense, but this appears to have been something of an afterthought. Okay bellyaching over. What to do?Passing Context into Your AppDomainThe entry point for calling our activity is the Execute method for the activity itself, as defined below:public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)Now upon reading the AppDomain Isolation solution referenced in the ‘Appdomain isolation’ link above, it suggests creating a ‘context’ class, that holds the items of information from your Activity, IList<Dataset> and IList<LinkedServices> input parameters available in the CustomActivity.Execute method (we are ignoring the IActivityLogger for now, as it doesn’t contain actual state information that we need to consider as context-related for passing across app domains). This context class can then be used to pass the required state to the other application domain. However, do we really want to have to pass every bit of context that we need to make use of on an ad-hoc basis across to our Activity? That would become something of a coding pain, with every Activity Extended Property string value, connection string, authentication parameter or whatever piece of state we need having to be coded in each time, creating a bespoke context object with various fields to hold this state info within, for each activity that we write. That’s going to add a lot to any ongoing custom activity development efforts, with lots of context-specific code just because we want to create custom activities that need a more current version of the offending Storage assembly. If only they had set that [Serializable] attribute for each of these Activity, Dataset and LinkedService ADF classes, we could push the objects into strings, pass them over to the other app domain and deserialize them safe in the knowledge that we have caught all the available context state. Custom Serialization to the RescueOkay so just because we don’t have this serialization available doesn’t mean we can’t bake our own. ADF makes heavy use of JSON, via the NewtonSoft JSON library. And this is big on serialization, after all that’s what JSON was originally built for. However, if we want to serialize our Activity, LinkedService and Dataset objects, and thereby all required custom activity state passed into our Custom Activity Execute method, we need to understand that there are a lot of abstract classes, interfaces and other non ‘concrete’ (in OOP parlance) classes contained within these three class types, with various Activity, Dataset and LinkedService members making use of them. For the Newtonsoft JSON library to be able to serialise the Activity, LinkedServices and Datasets into strings, and them back into their required classes with all constituent contained objects intact, we are going to need to tell the deserialization process how to materialise these abstract classes and interfaces. That means we will need to tell our deserializer just what they ‘really’ are, as you can’t ask it to create an interface or an abstract class, as we’ve covered in Class Hierarchies, SOLID Code and Json.NET Serialization Part 1.In short we need to create the required custom converters code to handle serialization into JSON strings and back again for these three required classes and all their problematic contained classes (such as the abstract classes StorageFormat, FileCompression) that are referenced within them. We need to make sure that all dependent classes are covered, with custom converters for any abstract classes and interfaces. If we can do this, then we are ensured of having all the required information ordinarily available in a non-cross-app domain Custom Activity also available in the Cross-App Domain activity. No need to implement piecemeal custom context handling classes for the various activities that we will create. We can simply use this cross-app domain custom activity as a new base class from which to derive our activities, include the converters within the assembly, and we can serialize and deserialize our objects as needed whenever we need to pass our objects across application domains, such as when we need to reference assemblies with versions different to that provided natively within the ADF batch service. Okay, that’s quite a preamble but hopefully it sets the stage for what we need to do.Custom Converters (Encore Une Fois)You may recall in the start of this series we looked at how to use Custom Converters with JSON to deserialize JSON documents into concrete classes rather than abstract classes or interfaces. After trawling through the object model for Activities, Datasets and LinkedServices, it turns out that we need custom converters to handle the following objects:Microsoft.Azure.Management.DataFactories.Models.StorageFormat (used by the Dataset)Microsoft.Azure.Management.DataFactories.Models.PartitionValue (used by the Dataset)Microsoft.Azure.Management.DataFactories.Models.Compression (used by the Dataset)System.Collections.Generic.IDictionary (used by the Custom Activity ExtendedProperies member)and our entry point objects used within the Execute method.Microsoft.Azure.Management.DataFactories.Models.ActivityMicrosoft.Azure.Management.DataFactories.Models.DatasetMicrosoft.Azure.Management.DataFactories.Models.LinkedServiceAll other classes involved in our serializing/deserializing of Activities, Datasets and LinkedSevices can be auto-converted by Json.Net without need for custom converters.Some of these Custom Converter classes are more involved than others, as we’ll see shortly. However, once we’ve done this once, we can use this in all code scenarios requiring the Cross App domain functionality.ReadJSON MethodAll our custom converters will have the same basic ReadJSON method as below:/// /// Reads the JSON representation of the object. /// /// The to read from. /// Type of the object. /// The existing value of object being read. /// The calling serializer. /// /// The object value. /// public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) { if (reader.TokenType == JsonToken.StartObject) { // Load JObject from stream JObject jObject = JObject.Load(reader); // Create target object based on JObject var target = Create(objectType, jObject); return target; } else return null; }So the same as we saw in our previous post on the subject. Create MethodsThe Create method will, however, vary greatly based on the object required to be deserialized. For the StorageFormatConverter object, it is pretty simple, with a basic case statement to determine the subclass of storage based on the Type property./// /// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have /// default constructors, which would otherwise block the creation of the parent object when attempting /// auto-deserialization. /// /// Type of the object. /// The jObject. /// public StorageFormat Create(Type objectType, JObject jObject) { string typeName = (string)jObject.SelectToken("$..Type"); switch (typeName.ToUpper()) { case "AVROFORMAT": return jObject.SelectToken("$..TypeProperties").ToObject<Avro​Format>(); case "JSONFORMAT": return jObject.SelectToken("$..TypeProperties").ToObject<Json​Format>(); case "ORCFORMAT": return jObject.SelectToken("$..TypeProperties").ToObject<OrcFormat>(); case "PARQUETFORMAT": return jObject.SelectToken("$..TypeProperties").ToObject<Parquet​Format>(); case "TEXTFORMAT": return jObject.SelectToken("$..TypeProperties").ToObject<TextFormat>(); default: return null; } }For the CompressionConverter it is a similar story, again with a case statement based on the Type property, to create the various subclasses of Compression (BZip2Compression, GZipCompression etc.). I’ll leave that one for you to figure out.For the PartitionValueConverter there is only one class that derives from this, so it doesn’t get any simpler really: /// /// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have /// default constructors, which would otherwise block the creation of the parent object when attempting /// auto-deserialization. /// /// Type of the object. /// The jObject. /// public PartitionValue Create(Type objectType, JObject jObject) { return jObject.ToObject<DateTimePartitionValue>(); }Same for the DictionaryConverter, where our return type is simply a Dictionary<<string>, <string>>.Things get a bit more interesting for our ActivityConverter, as below. /// /// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have /// default constructors, which would otherwise block the creation of the parent object when attempting /// auto-deserialization. /// /// Type of the object. /// The jObject. /// public Activity Create(Type objectType, JObject jObject) { string name = (string)jObject["Name"]; string description = (string)jObject.SelectToken("$..Description"); string LinkedServiceName = (string)jObject.SelectToken("$..LinkedServiceName"); string typeName = (string)jObject.SelectToken("$..Type"); string assemblyName = (string)jObject.SelectToken("$..TypeProperties.AssemblyName"); string entryPoint = (string)jObject.SelectToken("$..TypeProperties.EntryPoint"); string packageFile = (string)jObject.SelectToken("$..TypeProperties.PackageFile"); string packageLinkedService = (string)jObject.SelectToken("$..TypeProperties.PackageLinkedService"); Dictionary extendedProperties = jObject.SelectToken("$..TypeProperties.ExtendedProperties").ToObject<Dictionary<string, string>>(); DotNetActivity typeProperties = new DotNetActivity(assemblyName, entryPoint, packageFile, packageLinkedService); typeProperties.ExtendedProperties = extendedProperties; Activity activity = new Activity(typeProperties); activity.Name = name; activity.Description = description; activity.LinkedServiceName = LinkedServiceName; activity.Inputs = jObject.SelectToken("$..Inputs").ToObject<IList<ActivityInput>>(); activity.Outputs = jObject.SelectToken("$..Outputs").ToObject<IList<ActivityOutput>>(); activity.Policy = jObject.SelectToken("$..Policy").ToObject<ActivityPolicy>(); activity.Scheduler = jObject.SelectToken("$..Policy").ToObject<Scheduler>(); return activity; }Quite a few properties to capture there, just a matter of making sure we have them all covered though. For the DatasetConverter we have an added complexity with the various Dataset Types that area available, so we have a Create method that calls a GetDataType method, as below./// /// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have /// default constructors, which would otherwise block the creation of the parent object when attempting /// auto-deserialization. /// /// Type of the object. /// The jObject. /// public Dataset Create(Type objectType, JObject jObject) { string name = (string)jObject["Name"]; string description = (string)jObject.SelectToken("$..Description"); string linkedServiceName = (string)jObject.SelectToken("$..LinkedServiceName"); string typeName = (string)jObject.SelectToken("$..Type"); IDictionary<string, JToken> serviceExtraProperties = jObject.SelectToken("$..TypeProperties").ToObject<IDictionary<string, JToken>>(); DatasetTypeProperties typeProperties = GetDatasetType(jObject); Availability availability = jObject.SelectToken("$..Availability").ToObject<Availability>(); Policy policy = jObject.SelectToken("$..Policy").ToObject<Policy>(); IList<DataElement> structure = jObject.SelectToken("$..Structure").ToObject<IList<DataElement>>(); DatasetProperties properties = new DatasetProperties(typeProperties, availability, linkedServiceName); properties.Description = description; properties.External = (bool?)jObject.SelectToken("$..External"); properties.Policy = policy; properties.Structure = structure; return new Dataset(name, properties); }GetDataType follows the pattern of using a case statement with a Type property from the JSON…public DatasetTypeProperties GetDatasetType(JObject jObject) { JsonSerializer serializer = new JsonSerializer(); serializer.Converters.Add(new PartitionValueConverter()); serializer.Converters.Add(new CompressionConverter()); serializer.Converters.Add(new StorageFormatConverter()); string typeName = (string)jObject.SelectToken("$..Type"); switch (typeName.ToUpper()) { case "AMAZONS3": return jObject.SelectToken("$..TypeProperties").ToObject<AmazonS3Dataset>(serializer); case "AZUREBLOB": return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Blob​Dataset>(serializer); case "AZUREDATALAKESTORE": return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Data​Lake​Store​Dataset>(serializer); case "AZURESEARCHINDEX": return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Search​Index​Dataset>(serializer); case "AZURESQLDWTABLE": return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Sql​Data​Warehouse​Table​Dataset>(serializer); case "AZURESQLTABLE": return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Sql​Table​Dataset>(serializer); case "AZURETABLE": return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Table​Dataset>(serializer); case "CUSTOMDATASET": return jObject.SelectToken("$..TypeProperties").ToObject<Custom​Dataset>(serializer); case "DOCUMENTDBCOLLECTION": return jObject.SelectToken("$..TypeProperties").ToObject<Document​DbCollection​Dataset>(serializer); case "FILESHARE": return jObject.SelectToken("$..TypeProperties").ToObject<File​Share​Dataset>(serializer); case "HTTP": return jObject.SelectToken("$..TypeProperties").ToObject<Http​Dataset>(serializer); case "MONGODBCOLLECTION": return jObject.SelectToken("$..TypeProperties").ToObject<Mongo​DbCollection​Dataset>(serializer); case "CASSANDRATABLE": return jObject.SelectToken("$..TypeProperties").ToObject<On​Premises​Cassandra​Table​Dataset>(serializer); case "ORACLETABLE": return jObject.SelectToken("$..TypeProperties").ToObject<Oracle​Table​Dataset>(serializer); case "RELATIONALTABLE": return jObject.SelectToken("$..TypeProperties").ToObject<Relational​Table​Dataset>(serializer); case "SQLSERVERTABLE": return jObject.SelectToken("$..TypeProperties").ToObject<Sql​Server​Table​Dataset>(serializer); case "WEBTABLE": return jObject.SelectToken("$..TypeProperties").ToObject<Web​Table​Dataset>(serializer); default: return null; } }Once we have the ability to serialize and deserialize our Activity, IList<Dataset> and IList<LinkedService> objects, we then need to wrap these all up into a containing context class, which we will use within our own Execute method implemented in our base class.There is a similar story for the LinkedServiceConverter class, where we need to consider the LinkedService type (there are quite a lot). Again I’m going to leave that one for you to figure out, based on the pattern we have for the DatasetConverter.The ActivityContext ClassThis will contain the serialized JSON for those objects that we are passing in via our Execute method parameters. Note that the class has been marked as Serializable as this will be travelling between application domains./// /// Container for Custom Activity serialized contextual information for passing /// between application domains. /// /// All context is stored as JSON strings. [Serializable] public class ActivityContext { /// /// Gets or sets the activity object in JSON form. /// /// /// The activity json. /// public string ActivityJson { get; set; } /// /// Gets or sets the linked services in JSON form. /// /// /// The linked services json. /// public List<String> LinkedServicesJson { get; set; } /// /// Gets or sets the datasets in JSON form. /// /// /// The datasets json. /// public List<String> DatasetsJson { get; set; } }The CrossAppDomainDotNetActivity ClassOkay, so now to actually implement all this Cross App domain stuff. As mentioned, we’ll create a base class for our activities to derive from, which we will use to encapsulate our serialization functionality.This is based on the example given in the Microsoft article link above, with the addition of a conditional compilation #if DEBUG block to allow us to use it with our ADFLocalEnvironment debugging harness (see this previous post for details).public abstract class CrossAppDomainDotNetActivity<TExecutionContext> : MarshalByRefObject, IActivityLogger, ICrossAppDomainDotNetActivity<TExecutionContext>, IDotNetActivity where TExecutionContext : class { #region Private Fields private IActivityLogger logger; #endregion Private Fields #region Public Methods IDictionary<string, string> IDotNetActivity.Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger) { TExecutionContext context = this.PreExecute(linkedServices, datasets, activity, logger); Type myType = this.GetType(); var assemblyLocation = new FileInfo(myType.Assembly.Location); var appDomainSetup = new AppDomainSetup { ApplicationBase = assemblyLocation.DirectoryName, ConfigurationFile = assemblyLocation.Name + ".config" }; AppDomain appDomain = AppDomain.CreateDomain(myType.ToString(), null, appDomainSetup); this.logger = logger; logger.Write("Assembly Location FullName: {0} Directory: {1}", assemblyLocation.FullName, assemblyLocation.Directory); //when running through debugger with ADFLocalEnvironment, cannot cast from the appDomain.CreateInstanceAndUnwrap call. #if DEBUG return Execute(context, logger); #else var proxy = (ICrossAppDomainDotNetActivity<TExecutionContext>) appDomain.CreateInstanceAndUnwrap(myType.Assembly.FullName, myType.FullName); return proxy.Execute(context, (IActivityLogger)this); #endif } public abstract IDictionary<string, string> Execute(TExecutionContext context, IActivityLogger logger); public override object InitializeLifetimeService() { // Ensure that the client-activated object lives as long as the hosting app domain. return null; } void IActivityLogger.Write(string format, params object[] args) { this.logger.Write(format, args); } #endregion Public Methods #region Protected Methods protected virtual TExecutionContext PreExecute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger) { return null; } #endregion Protected Methods } You'll notice that this derives from the MarshalByRefObject class, which is a requirement for marshalling across application boundaries. We get the context object in line 16 above, which stores all our serialized state for the Activity, LinkedServices and Datasets, by calling the PreExecute method. Note however that in this base class there is no implementation for this. We need to implement this in the derived class, which I’m coming on to. We create a proxy object in line 32, which is an instance of our derived Custom Activity class (remember this is an abstract class, so when this method is called, myType will refer to the concrete derived class that calls it). Then in line 34 we call the Execute method of this proxy class, passing in our context container object together with an IActivityLogger created from casting our class (note that the base abstract class implements the IActivityLogger interface, with the required Write method). Again note that we haven’t implemented the Execute method here either, as this will be implemented in our derived Custom Activity class with the specifics of the activity required. So quite a lot going on here, but it should all make sense when you see the derived Custom Activity class implementation. First however, just for completeness, here’s the ICrossAppDomainDotNetActivity interface, which we use to ensure our implementing derived classes contain the members required. interface ICrossAppDomainDotNetActivity { IDictionary<string, string> Execute(TExecutionContext context, IActivityLogger logger); }CustomActivity Class Derived from CrossAppDomainDotNetActivityOkay, so now we have all our supporting code, time for the star of the show, the actual Custom Activity that does something. PreExecute MethodThe PreExecute method essentially takes the same input parameters as the non-Cross App Domain Custom Activity Execute method, and serializes these into JSON strings and stores them within an ActivityContext class which it returns. Note that as this is generic for all our activities we can put this in our ActivityBase class from which all our cross application domain Custom Activities will derive. public abstract class ActivityBase : CrossAppDomainDotNetActivity { #region Private Fields private Configuration configuration; #endregion Private Fields protected override ActivityContext PreExecute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger) { // Process ADF artifacts up front as these objects are not serializable across app domain boundaries. List<string> datasetsJson = new List<string>(); List<string> linkedServicesJson = new List<string>(); datasets.ToList<Dataset>().ForEach(ds => { datasetsJson.Add(JsonConvert.SerializeObject(ds)); }); linkedServices.ToList<LinkedService>().ForEach(ls => { linkedServicesJson.Add(JsonConvert.SerializeObject(ls)); }); return new ActivityContext { ActivityJson = JsonConvert.SerializeObject(activity), DatasetsJson = datasetsJson, LinkedServicesJson = linkedServicesJson }; } Execute MethodThis is implemented in our actual Custom Activity class, such as the HiveDataValidation class mentioned in previous posts. The Execute method deserializes the ActivityContext object into the required Activity, Dataset and LinkedServices, from which point we are free to continue with our code as if all this crossing of domain boundaries was nothing more than a slightly disturbing dream…public override IDictionary<string, string> Execute(ActivityContext context, IActivityLogger logger) { List<LinkedService> linkedServices = new List<LinkedService>(); context.LinkedServicesJson.ForEach(lsJson =>; { LinkedService ls = JsonConvert.DeserializeObject<LinkedService>(lsJson, new LinkedServiceConverter()); linkedServices.Add(ls); }); Activity activity = JsonConvert.DeserializeObject<Activity>(context.ActivityJson, new ActivityConverter()); List<Dataset> datasets = new List<Dataset>(); context.DatasetsJson.ForEach(dsJson =>; { Dataset ds = JsonConvert.DeserializeObject<Dataset>(dsJson, new DatasetConverter()); datasets.Add(ds); });That’s All FolksAnd on that happy note it’s time to wrap up this series on Custom Activity development with Azure Data Factory. I hope my suggestions have helped make your development easier and more productive. Feel free to contact me via email at or on Twitter @NigelMeakins. Thanks for reading.

Azure Data Factory Custom Activity Development–Part 4: Testing ADF Pipelines

This is the fourth post in a series on Azure Data Factory Custom Activity Development.Pipeline ExecutionExecuting Pipeline Activities basically requires resetting the status of prior Data Slices so that they can re-execute. We can do this in the “Manage and Monitor“ Data Factory dashboard provided by Azure. We can also do this relatively simply using PowerShell. What is missing is really being able to do this from the comfort of your own Visual Studio environment.When writing Unit and Integration Tests it is most beneficial to be able to do this within the IDE, using frameworks such as VS Test or NUnit to assist with our test logic and execution. This follows proven practice with other types of development and allows easy automation of testing for CI/CD activities. So having made the case, how can we easily go about running our Pipeline Data Slices? Well the ADF API isn’t that intimidating, and although there are some caveats to be aware of, it is relatively easy to go about creating a class or two to assist with the task of executing ADF Activities from DotNet test code. We are of course working on the assumption that we are doing our testing in a Data Factory environment that is indeed purposed for testing and can therefore update the Pipeline accordingly.PipelineExecutorThe first task we need is one that will execute our Pipeline Activity Data Slices. It will need to have access to various properties of the Pipeline itself. It will also need various methods and properties to conduct the desired Data Slice operations. Hence our class with a relatively fitting name of PipelineExecutor. The PipelineGetResponse class within the Microsoft.Azure.DataFactory.Models namespace allows us to request a Pipeline object’s details from our Data Factory. We’ll create a private field for this and retrieve this when constructing our PipelineExecutor class.public class PipelineExecutor { #region Private Fields private string dataFactoryName; private DataFactoryManagementClient dfClient; private DateTime maxDateTime = DateTime.MaxValue; private DateTime minDateTime = DateTime.MinValue; private PipelineGetResponse pipelineGR; private string pipelineName; private string resourceGroupName; #endregion Private Fields #region Public Constructors /// <summary> /// Initializes a new instance of the <see cref="pipelineexecutor"> class. /// /// The adf application identifier. /// The adf application secret. /// The domain. /// The login windows prefix. /// The management windows DNS prefix. /// The subscription identifier. /// Name of the resource group. /// Name of the data factory. /// Name of the pipeline. public PipelineExecutor(string adfApplicationId, string adfApplicationSecret, string domain, string loginWindowsPrefix, string managementWindowsDnsPrefix, string subscriptionId, string resourceGroupName, string dataFactoryName, string pipelineName) { this.resourceGroupName = resourceGroupName; this.dataFactoryName = dataFactoryName; this.pipelineName = pipelineName; ClientCredentialProvider ccp = new ClientCredentialProvider(adfApplicationId, adfApplicationSecret); dfClient = new DataFactoryManagementClient(ccp.GetTokenCloudCredentials(domain, loginWindowsPrefix, managementWindowsDnsPrefix, subscriptionId)); LoadPipeline(); } #endregion Public Constructors /// /// Loads the pipeline using a get request. /// /// public void LoadPipeline() { this.pipelineGR = dfClient.Pipelines.Get(resourceGroupName, dataFactoryName, pipelineName); if (this.pipelineGR == null) throw new Exception(string.Format("Pipeline {0} not found in Data Factory {1} within Resource Group {2}", pipelineName, dataFactoryName, resourceGroupName)); }The PipelineGetResponse.Pipeline property is the actual ADF Pipeline object, so we can apply the Decorator pattern to simply reference this and any other relevant properties within our code whenever required. For example, we create a simple readwrite property End, which will be used for getting and setting the Pipeline End, as below:/// /// Gets or sets the pipeline end. /// /// /// The end. /// public DateTime? End { get { return this.pipelineGR.Pipeline.Properties.End; } set { if (this.pipelineGR.Pipeline.Properties.End != value) { this.pipelineGR.Pipeline.Properties.End = value; IsDirty = true; } } }Notice that we are setting an IsDirty property when updating the property. This can then be used to determine whether we need to save any changes to the Pipeline back to the Data Factory when we are actually ready to run our Data Slice(s).We have properties for the major Pipeline attributes, such as a List of Activities and Data Sets, whether the Pipeline is Paused and various others. These will be used within a number of helper methods that allow for easier running of our Activity Data SlicesAs previously mentioned the method for executing an Activity Data Slice is to set the status accordingly. We also need to check that the Pipeline is not currently paused, and if it is, resume it. For these purposes there are a number of methods that do similar things with regard to Data Slices, such as setting the status of the last Data Slice prior to a specified datetime, for all the Data Sets within the Pipeline, as below:/// /// Sets the state of the pipeline activities last data slice prior to the date specified. Updates the pipeline start and end If data slices fall outside of this range. /// /// The date prior to which the last data slice will be updated. /// State of the slice. /// Type of the update. public void SetPipelineActivitiesPriorDataSliceState(DateTime priorTo, string sliceState = "Waiting", string updateType = "UpstreamInPipeline") { Dictionary<string, dataslice> datasetDataSlices = GetPipelineActivitiesLastDataSlice(priorTo); //the earliest start and latest end for the data slices to be reset. DateTime earliestStart = datasetDataSlices.OrderBy(ds => ds.Value.Start).First().Value.Start; DateTime latestEnd = datasetDataSlices.OrderByDescending(ds => ds.Value.End).First().Value.End; //If the pipeline start and end values do not cover this timespan, the pipeline schedule needs to be updated to use the expanded time period, else //the dataslice updates will fail. if (this.Start > earliestStart) { this.Start = earliestStart; } if (this.End < latestEnd) { this.End = latestEnd; } SavePipeline(); foreach (string datasetName in datasetDataSlices.Keys) { DataSlice ds = datasetDataSlices[datasetName]; SetDataSetDataSliceState(datasetName, ds.Start, ds.End, sliceState, updateType); } } /// /// Saves the pipeline. /// public void SavePipeline() { //update the pipeline with the amended properties if (IsDirty) dfClient.Pipelines.CreateOrUpdate(resourceGroupName, dataFactoryName, new PipelineCreateOrUpdateParameters() { Pipeline = this.pipelineGR.Pipeline }); }Pipeline Data Slice Range Issues…The first gotcha you may encounter when resetting Activity Data Slices is that of possibly exceeding the Pipeline Start and/or End times. If you try and run a Pipeline in this state you will get an exception. Hence the need to check our earliest and latest Data Slice range over all our activities and amend the Start and End properties for our Pipeline (wrapped up inside our PipelineExecutor.Start, End properties). Nothing too taxing here and we’re back in the game.For completeness I’ll include the SetDataSetDataSliceState method called above so you can see what is required to actually set the Data Slice statuses using the Data Factory API, via the Microsoft.Azure.Management.DataFactories.DataFactoryManagementClient class./// /// Sets the state of all data set data slices that fall within the date range specified. /// /// <param name=" datasetname"="">Name of the dataset. /// The slice start. /// The slice end. /// State of the slice. /// Type of the update. /// /// public void SetDataSetDataSliceState(string datasetName, DateTime sliceStart, DateTime sliceEnd, string sliceState = "Waiting", string updateType = "UpstreamInPipeline") { DataSliceState sliceStatusResult; if (!Enum.TryParse<DataSliceState&lgt;(sliceState, out sliceStatusResult)) throw new ArgumentException(string.Format("The value {0} for sliceStatus is invalid. Valid values are {1}.", sliceState, string.Join(", ", Enum.GetNames(typeof(DataSliceState))))); DataSliceUpdateType updateTypeResult; if (!Enum.TryParse<DataSliceUpdateType>(updateType, out updateTypeResult)) throw new ArgumentException(string.Format("The value {0} for sliceStatus is invalid. Valid values are {1}.", sliceState, string.Join(", ", Enum.GetNames(typeof(DataSliceUpdateType))))); DataSliceSetStatusParameters dsssParams = new DataSliceSetStatusParameters() { DataSliceRangeStartTime = sliceStart.ConvertToISO8601DateTimeString(), DataSliceRangeEndTime = sliceEnd.ConvertToISO8601DateTimeString(), SliceState = sliceState, UpdateType = updateType }; dfClient.DataSlices.SetStatus(resourceGroupName, dataFactoryName, datasetName, dsssParams); }You’ll see there are a couple of annoyances we need to code for, such as having to parse the sliceState and updateType parameters against some Enums of valid values that we’ve had to create to ensure only permitted values for these, and having to call ConvertToISO8601DateTimeString() on our slice start and end times. No biggie though.Now that we have a class that gives us some ease of running our Pipeline Slices in a flexible manner (I’ve left out other methods for brevity) we can move on to using these within the various test methods of the Test Project classes we will be using.A simple Base Testing ClassIn the case our the project in question, we will be writing a lot of similar Tests that will simply check row counts on Hive destination objects once a Data Factory Pipeline has executed. To make writing these easier, and remembering our DRY principle mentioned back in Part 2 of this series, we can encapsulate the functionality to do this in a base class method, and derive our row counting Test classes from this.public class TestBase { #region Private Fields protected string adfApplicationId = Properties.Settings.Default.ADFApplicationId; protected string adlsAccountName = Properties.Settings.Default.ADLSAccountName; protected string adlsRootDirPath = Properties.Settings.Default.ADLSRootDirPath; protected string domain = Properties.Settings.Default.Domain; protected string loginWindowsPrefix = Properties.Settings.Default.LoginWindowsDnsPrefix; protected string managementWindowsDnsPrefix = Properties.Settings.Default.ManagementWindowsDnsPrefix; protected string clusterName = Properties.Settings.Default.HDIClusterName; protected string clusterUserName = Properties.Settings.Default.HDIClusterUserName; protected string outputSubDir = Properties.Settings.Default.HDIClusterJobOutputSubDir; protected string dataFactory = Properties.Settings.Default.DataFactory; protected string resourceGroup = Properties.Settings.Default.ResourceGroup; protected string subscriptionId = Properties.Settings.Default.SubscriptionId; protected string tenantId = Properties.Settings.Default.TenantId; #endregion Private Fields #region Public Methods //[TestMethod] public void ExecuteRowCountTest(string pipelineName, long expected, string databaseName, string objectName) { string rowCountStatement = "select count(*) as CountAll from {0}.{1};"; //check for sql injection if (databaseName.Contains(";") | objectName.Contains(";")) throw new ArgumentException(string.Format("The parameters submitted contain potentially malicious values. databaseName : {0}, objectName{1}. This may be an attempt at sql injection", databaseName, objectName)); PipelineExecutor plr = new PipelineExecutor(adfApplicationId, adfApplicationSecret, domain, loginWindowsPrefix, managementWindowsDnsPrefix, subscriptionId, resourceGroup, dataFactory, pipelineName); plr.SetPipelineActivitiesPriorDataSliceState(DateTime.Now); plr.AwaitPipelineCompletion().Wait(); string commandText = string.Format(rowCountStatement, databaseName, objectName); IStorageAccess storageAccess = new HiveDataLakeStoreStorageAccess(tenantId, adfApplicationId, adfApplicationSecret, adlsAccountName, adlsRootDirPath); HiveDataLakeStoreJobExecutor executor = new HiveDataLakeStoreJobExecutor(clusterName, clusterUserName, clusterPassword, outputSubDir, storageAccess); long actual = Convert.ToInt64(executor.ExecuteScalar(commandText)); Assert.AreEqual(expected, actual); } #endregion Public Methods }Our test project classes methods for destination row counts can then be simplified to something such as that below:[TestClass] public class MyPipelineName : TestBase { #region Private Fields private string pipelineName = "MyPipelineName"; #endregion Private Fields #region Public Methods //todo centralise the storage of these [TestMethod] public void DestinationRowCountTest() { string databaseName = "UniversalStaging"; string objectName = "extBrandPlanning"; long expected = 10; base.ExecuteRowCountTest(pipelineName, expected, databaseName, objectName); } #endregion Public Methods } Going Further For tests involving dependent objects we can use mocking frameworks such as NSubstitute, JustMock or Moq to create these, and define expected behaviours for the mocked objects, thereby making the writing and asserting of our test conditions all very much in line with proven Test Driven Development (TDD) practices.I won’t go into this here as there are plenty of well grounded resources out there on these subjects. Up Next… In the final instalment of the series we get remove some of the referenced library limitations inherent in the ADF execution environment. Y’all come back soon now for Part 5: Using Cross Application Domains in ADF…

Azure Data Factory Custom Activity Development–Part 3: Debugging Custom Activities in Visual Studio

This is the third post in a series on Azure Data Factory Custom Activity Development.Are We There Yet?Azure Data Factory is, as we all know, essentially an execution platform in the cloud for orchestrating data processing. This means however that when it comes to working locally on your development environment, there isn’t much you can do in the way of actually running your code. That means no debugging. Developing Custom Activities without an environment in which to debug them is pretty painful. You package up your Activity, copy it to Azure Blob Storage, reschedule your Pipeline Activity data slice and wait with fingers crossed to see whether your Activity is doing what you were convinced it should have done the last time you went through this process. And then nope, still something amiss. Pick through the log messages to try and decipher where it went belly up and why. Add some more output statements somewhere near where you think the issue is and fire it off into the ADF stratosphere for yet another test flight. As I’m sure you know, this can very quickly become not only frustrating if your bug is one that is not so easy to find, but also very very time consuming.What if, instead of this self-flagellating loop of despair, you could simulate all that Azure Data Factory runtime stuff right here on your dev machine, where it really matters? Well turns out I wasn’t just pointing out the pain points because I want us to revisit all those flashbacks of treacle-wading development experiences. There is hope, and there are two readily available solutions to this debugging slogathon.Debugging, Yay!ADFCustomActivityRunnerThe first approach to allowing that most welcome ability to step through your misbehaving Custom Activity code is to use a base class for your activity, deriving your Custom Activity from this. The base class also includes a number of helper methods such as GetExtendedProperty() and GetDataSet() that make inspecting the Activity easier. You can find this project, the ADFCustomActivityRunner, on GitHub here.I have to say I’m not a fan of including additional code to your deployment for the purpose of debugging, and I didn’t have much joy getting this to work, at least in the short time I spent with it, so I looked for another solution. I realise I’m not doing justice to the hard work put into the project here, so please do let me know how you get on with it if you choose to use this.ADFLocalEnvironmentI did find a more preferable (at least in my view) alternative to our Data Factory debugging requirement. This is the ADFLocalEnvironment project on GitHub, available here. It also creates an execution environment for our Activity, but does not require any base class from which we need to derive our Custom Activities. For me this is a cleaner solution, with no inherent dependency within our deployed code on another code base. The project also has code to export to ARM Templates, which looks very useful, although surplus to my my pressing desire to be able to F11 through the Custom Activity code.Having downloaded the source code and built the assembly, there are essentially two things we need to do to set up our debugging environment:1: Create ADF Pipeline Project In order to be able to run our Custom Activity within the provided local environment, we need to wrap it into an Azure Data Factory Pipeline. Unless you have some complex Input Dataset requirements as a precursor to your debugging activities, this Pipeline can be as simple as creating some dummy datasets to serve as Input and Output, and having the Custom Activity in question sitting in the pipeline all on its lonesome. No need for anything else as they won’t be being used. This is purely a harness for including our Custom Activity.2: Create an Executable ProjectSo as to be able to execute our code for stepping into from within Visual Studio,we need to create a small harness project from which we can call this local environment to run our Custom Activity Pipeline. This can be a console app, VS/NUnit Test project or whatever you want to use. All you need is to be able to call the ADFLocalEnvironment code which will handle running your Pipeline Activity. So we add our harness project, add the reference to the ADFLocalEnvironment, and then go about writing the code to execute the Pipeline.private string adfProjectPath = @"..\..\..\CustomActivities.DataFactory\CustomActivities.DataFactory.dfproj"; ADFLocalEnvironment env = new ADFLocalEnvironment(adfProjectPath, "MyConfig"); env.ExecuteActivity("PL_HiveDataValidationTest", "HiveDataValidation", DateTime.Now.AddSeconds(2), DateTime.Now.AddDays(1)); So we set the relative path to the project containing our skeleton pipeline, create a new Local Environment pointing to this (including an optional ADF config file should you want to) and then simulate the execution of the required Custom Activity within the skeleton pipeline we created earlier. We can then set debug points within our Custom Activity allowing runtime inspection and all of a sudden there are small fluffy bunnies playing on hillocks and a beautiful sunrise, who knows maybe even a tax rebate waiting on the door matt, and the world is indeed a wonderful place once more.Tweak RequiredIf you are using the above ADFLocalEnvironment within a VS Test project, there is a small change that you will need to make to the source code. I have raised this as an issue on GitHub here, so hopefully this will make it into the code base soon. I’ll detail it below briefly so that you can make the change yourself. in the ADFLocalEnvironment.cs file, at line 329, you will need to change how the debugger build path is determined, due to the slightly different value returned from AppDomain.CurrentDomain.BaseDirectory when running withVS Test. Here’s the change:debuggerBuildPath = string.Join("\\", AppDomain.CurrentDomain.BaseDirectory.GetTokens('\\', 0, AppDomain.CurrentDomain.BaseDirectory.EndsWith("\\") ? 3 : 2, true));That should be it and you’re all good to go bug hunting.Big ShoutI would like to thank the above two GitHub project contributors for all their hard work in making ADF Custom Activities a place where we no longer need to fear to tread. If I was American I would probably say something like “You guys are awesome”, but I’m not, and as you don’t actually fill me with awe it would be wrong for me to do so, but thanks all the same for some amazing development that has at least made my work a whole lot easier. Whoop whoop.Up Next…The next post in the series will look at that most important part of the development cycle. Part 4: Testing ADF Pipelines

Azure Data Factory Custom Activity Development–Part 2: Encapsulating Common Functionality

This is the second post in a series on Azure Data Factory Custom Activity Development.The TheoryIn accordance with the DRY principle, or “Don’t Repeat Yourself”, we should avoid writing the same piece of code twice. The reasons behind this are pretty obvious from a maintenance, testing and general productivity perspective, so I won’t try and hammer this point home. In C# we tend to apply this idea using class inheritance.A Simple Custom Activity Class HierarchyFor reference within our discussion, we have a class hierarchy defined as below.You’ll notice that our ActivityBase itself derives from a parent class, which is the subject of another post. We could have simply derived from IDotNetActivity for the purposes of this exercise.Basic Custom Activity Functionality RequirementsWhen developing Custom Activities within Azure Data Factory (ADF) there are a lot of common tasks we find ourselves needing to do. We can start by putting the most common requirements in our ActivityBase class.ActivityBaseOne set of functionality we will be using time and again is validating our Activity’s Extended Properties. This allows us to avoid bugs in our code that only get discovered when attempting to reference Extended Properties that we’ve neglected to add. In order to encapsulate this, we create an ActivityBase class from which we will be inheriting for additional functionality. In this we create a method to validate our set of Extended Properties. /// /// Confirms the extended properties required for the Key Vault connection. /// /// The extended properties. /// Name of the activity. /// public void ValidateExtendedProperties(Activity activity, List<string> requiredProperties, IActivityLogger logger) { logger.Write("ValidateExtendedProperties"); IDictionary<string, string> extendedProperties = ((DotNetActivity)activity.TypeProperties).ExtendedProperties; List<string> missingProps = new List<string>(); //must contain the keyvault properties used to determine the key vault to use requiredProperties.ForEach(kvp => { if (!extendedProperties.ContainsKey(kvp)) missingProps.Add(kvp); }); if (missingProps.Count > 0) throw new Exception(string.Format("The following required extended properties were not found on the activity {0}: {1}", activity.Name, string.Join(", ", missingProps))); }It takes as input a list of strings contained the names of the properties we will require. The derived Activity class can then have a list of these properties defined, which we can then use to confirm that the Activity JSON defined within our pipeline does indeed contain the required Extended Properties. For example: public class HiveDataValidation : KeyVaultActivity { #region Private Fields //static strings are required in order to add to the requiredProps List. private static string ADLSACCOUNTNAMEPROPERTYNAME = Properties.Settings.Default.ADLSAccountNamePropertyName; private static string ADLSROOTDIRPATHPROPERTYNAME = Properties.Settings.Default.ADLSRootDirPathPropertyName; private static string DATAVALIDATIONRULESETPROPERTYNAME = Properties.Settings.Default.DataValidationRuleSetIdPropertyName; private static string DOCUMENTDBAUTHKEYPROPERTYNAME = Properties.Settings.Default.DocumentDbAuthKeyPropertyName; private static string DOCUMENTDBCOLLECTIONPROPERTYNAME = Properties.Settings.Default.DocumentDbCollectionPropertyName; private static string DOCUMENTDBPROPERTYNAME = Properties.Settings.Default.DocumentDbNamePropertyName; private static string DOCUMENTDBURIPROPERTYNAME = Properties.Settings.Default.DocumentDbUriPropertyName; private static string HDICLUSTERJOBOUTPUTSUBDIRPROPERTYNAME = Properties.Settings.Default.HDIClusterJobOutputSubDirPropertyName; private static string HDICLUSTERNAMEPROPERTYNAME = Properties.Settings.Default.HDIClusterNamePropertyName; private static string HDICLUSTERPASSWORDPROPERTYNAME = Properties.Settings.Default.HDIClusterPasswordPropertyName; private static string HDICLUSTERUSERNAMEPROPERTYNAME = Properties.Settings.Default.HDIClusterUserNamePropertyName; //Extended Properties required for the activity private List<string> requiredProps = new List<string>() { ADLSACCOUNTNAMEPROPERTYNAME, ADLSROOTDIRPATHPROPERTYNAME, DOCUMENTDBURIPROPERTYNAME, DOCUMENTDBCOLLECTIONPROPERTYNAME, DOCUMENTDBAUTHKEYPROPERTYNAME, DATAVALIDATIONRULESETPROPERTYNAME, DOCUMENTDBPROPERTYNAME, HDICLUSTERNAMEPROPERTYNAME, HDICLUSTERUSERNAMEPROPERTYNAME, HDICLUSTERPASSWORDPROPERTYNAME, HDICLUSTERJOBOUTPUTSUBDIRPROPERTYNAME }; Within the Execute method for the Custom Activity we can then call the above method, public override IDictionary<string, string> Execute(ActivityContext context, IActivityLogger logger) { IDictionary<string, string> extendedProperties = UpdateExtendedPropertySecrets(activity); We can add to ActivityBase class as required in order to extend our common functionality.Activity AuthenticationTo avoid writing a lot of boiler plate code for authenticating our activity against various services within Azure, we can derive them from an class that contains this logic for us, which we’ll call AuthenticateActivity.AuthenticateActivityThe base class calls a simple library that does the actual authentication process for us. We often need the credentials for the executing ADF Application in order to carry out some privileged action that requires an object of type Microsoft.Rest.ServiceClientCredential (or derived from it). A simple method within our AuthenticateActivity makes this available at a snip. /// /// Gets the service credentials. /// /// The activity. /// public ServiceClientCredentials GetServiceClientCredentials(Activity activity) { ClientCredentialProvider ccp = new ClientCredentialProvider(adfApplicationId, adfApplicationSecret); return ccp.GetServiceClientCredentials(domain); } We can then use this functionality within any activities that will derive from our AuthenticateActivity class whenever they need the ServiceClientCredential. Retrieving Key Vault SecretsThe Azure Key Vault can be used for storing secret values, such as passwords and other keys that are required to be secure. Using an idea from the ADFSecurePublish project, we can embed placeholders within our custom activity extended properties for values that we would like to populate from Key Vault secrets. For example, if we need to reference a Key Vault secret called “docDBPrimaryKey”, we could add the following extended property with a placeholder."documentDbAuthKey": "<KeyVault:docDBPrimaryKey>" We can then replace this within the code for our activity with the respective secret value at runtime, thereby avoiding any secret values being included in our ADF pipeline code base. The ADFSecurePublish project includes code to do this in the form of a KeyVaultResolver class, which allows authentication against the Key Vault using various means, and then the fetching of the Key Vault secret string value for the identifier required. Again this is a very common scenario, so we create another derived class KeyVaultActivity, this time with AuthenticateActivty as the base class, so as to be make the parent code available. KeyVaultActivityThe code within our KeyVaultActivity is relatively straight forward, with a simple iteration over our activity’s extended properties, replacing the key vault placeholder values where required. /// /// Updates the property secret placeholders within the extended properties of the activity with their values from Key Vault. /// /// The activity. /// The key vault resolver. /// protected IDictionary<string, string> UpdateExtendedPropertySecrets(Activity activity) { IDictionary<string, string> extendedProperties = ((DotNetActivity)activity.TypeProperties).ExtendedProperties; //copy the dictionary to a list so that can iterate over the list and modify the dictionary at the same time (iterating and modifying the dictionary raises exception) List<KeyValuePair<string, string>> extPropList = new List<KeyValuePair<string, string>>(extendedProperties); foreach (KeyValuePair<string, string> item in extPropList) { //update the dictionary for the corresponding list key value. extendedProperties[item.Key] = ReplacePlaceholderWithSecret(item.Value); } return extendedProperties; } /// /// Replaces the KeyValue placeholder in the target string with the respective secret value from KeyVaut. /// /// The target string containing the KeyVault placeholder. /// protected string ReplacePlaceholderWithSecret(string target) { return keyVaultResolver.ReplacePlaceholderWithSecret(target, KEYVAULTPLACEHOLDERREGEX); } We can now derive our custom activity from this KeyVaultActivity class and use the encapsulated functionality as desired. So in the example of a HiveDataValidation activity, we simply use.public class HiveDataValidation : KeyVaultActivity The amended dictionary of extended property values can then be easily referenced within our activity.IDictionary<string, string> extendedProperties = UpdateExtendedPropertySecrets(activity); string documentDbAuthKey = extendedProperties["documentDbAuthKey"];In SummaryAs you can see this simple inheritance exercise makes developing ADF Custom Activities a whole lot easier. We can soon build up a library of classes based on these to assist with extending Azure Data Factory through Custom Activities.Coming Soon…Up shortly, the next instalment in the series, Part 3: Debugging Custom Activities.

Azure Data Factory Custom Activity Development–Part 1: Configuration Settings

This is the first post in a series on Azure Data Factory Custom Activity Development. Configuration Setttings for Custom ActivitiesA very common requirement is to set up some configuration settings for your custom activity. Unfortunately, as your custom activity will be a dll, this is not easy using standard DotNet app.config file approaches. The Settings FileThe Settings file was introduced back in DotNet 2.0, as an improvement over config files for configuration and customisation settings. In Visual Studio these are found in your solution under “Properties” and have the benefit of a simple UI for editing values. Settings files support all the primitive data types (int, string etc.) and also items such as System.DateTime, System.TimeSpan and also allows using any types referenced within the project. For most purposes primitive types will probably suffice however. There are two settings scope types, being User and Application, for User-specific or Application-wide settings. For our purposes the Application scoped settings are fine, although all settings will be visible from within your code. Application settings cannot be changed once the application has started, whereas User settings are writable, allowing the User to make changes to their own settings as needed. It is worth noting that you cannot have a more than one setting with the same name, regardless of scope.Interestingly, these settings are then saved in the app.config file for the assembly, as below, and are readily discovered and accessible using the Properties.Settings namespace within your code. <applicationSettings> <Adatis.ADF.CustomActivities.Properties.Settings> <setting name="documentDbDnsPrefix" serializeAs="String"> <value></value> </setting> Making use of our Settings file settings is then a case of simply reading these values into static fields within our custom activity class for further use.private string documentDbDnsPrefix = Properties.Settings.Default.DocumentDbDnsPrefix;So a nice short one for the start of the series and hopefully of use to those of us developing ADF Custom Activities looking for a simple configuration settings solution.Next up…Join me for the next instalment in the series, Part 2: Encapsulating Common Functionality.

Azure Data Factory Custom Activity Development Series

Having spent a fair amount of time recently developing custom activities for Azure Data Factory I thought it would be useful to share some of my findings in a series of blog posts on the subject. This includes some simple tips, suggestions for code structure and development, and also more advanced topics such as debugging, testing and coding using Cross Application domains (to allow usage of assemblies later than those provided by the ADF service).Posts in the SeriesPart 1: Configuration SettingsPart 2: Encapsulating Common FunctionalityPart 3: Debugging Custom Activities in Visual StudioPart 4: Testing ADF PipelinesPart 5: Using Cross Application Domains in ADF

How to prepare for 70-766 - Perform Big Data Engineering on Microsoft Cloud Services

There is a new exam currently in beta titled "Perform Big Data Engineering on Microsoft Cloud Services (beta)". With all new exams there is little content on how to revise for the exam beyond the exams summary. This exam however, is what Adatis specialises in! Microsoft may call this "Big Data Engineering" we call it "Modern Data Analytics" and we have a few blogs on the subject. You can sign up to the exam here: Below you will find links to blog posts by Adatis consultants on topics related to all the key objectives of this exam. I will endeavour to keep this up-to-date with new content added by the team. Good luck with the exam. Design and Implement Complex Event Processing By Using Azure Stream Analytics (15-20%)Streaming data is vital to achieving real-time analytics. The following blogs posts focus on this and offer an introduction and walkthrough for getting started with Stream Analytics. When talking about a wider Lambda approach to Big Data, streaming enables rapid processing via a “Speed” layer. Design and Implement Analytics by Using Azure Data Lake (25-30%)Azure Data Lake Store and Analytics are a vital component of the “Modern Data Analytics”. Data which is too large for traditional single server processing needs distributed parallel computation. Rather than pulling data and processing ADLA pushes the processing to the data. Understanding how to process large volumes of data is one part of the “Batch” layer in Lambda Design and Implement Azure SQL Data Warehouse Solutions (15-20%)Either as an alternative or in accompaniment to Data Lake is Azure SQL Data Warehouse. If Data Lake is batch across many files, Azure SQLDW is parallel batch over many databases. The key to both services is processing at the storage and not at the compute. The following is an on-going blog series covering the basics all the way to a  deep-dive. Design and Implement Cloud-Based Integration by using Azure Data Factory (15-20%)If you’re looking for a paas solution to move data in Azure, there is only really one option. Azure Data Factory. The following blogs will get you up-to-speed with ADF. Manage and Maintain Azure SQL Data Warehouse, Azure Data Lake, Azure Data Factory, and Azure Stream Analytics (20-25%)Know each of the parts is only half the battle, you need to know how, when and why to use each part. What are the best practices?

Azure Data Factory, using the Copy Data task to migrate data from on premise SQL Server to Blob storage.

This is the third blog in an ongoing series on Azure Data Factory. I recommend you start with the following blogs before continuing: Introduction to Azure Data Factory Setting up your first Azure Data Factory   In the first blog in this series I talked about working through a use case. Over the next 3 blogs we will look at 3 different methods for migrating data to Azure Blob storage. Using the Azure Data Factory Copy Data Wizard. (on table) Using BIML and SSIS (entire database - SSIS) Using Azure Data Factory and PowerShell (entire database - ADF) The reason I have included the latter 2 versions is because if you just want to load an entire database in the blob storage it can be quicker to use one of these methods as a one off or on a scheduled basis. Hand writing all the JSON required for each move table from on premise to blob storage is very time consuming. Depending on whether you need to do a one off upload or something on a schedule options 2 and 3 might help. Our original use case from an introduction to Azure Data Factory: Let's imagine a process where you have an on premise SQL server box, you want to move multiple tables to blob storage, from there you then want to issue a stored procedure which will consume that data in to an Azure SQL data warehouse via PolyBase - As illustrated in the image below. Linked services: On-premise SQL database Azure BLOB storage Azure SQL data warehouse Datasets: Table in On-premise SQL database The blob container The stored procedure Pipelines: Pipeline to move data from SQL database to blob storage Pipeline to issue stored procedure In the blog we will tackle the first part: Copying data: We are going to start looking in a bit more in detail at the Azure Data Factories (ADF) copy data task (CD). CD is still in preview (at the time of writing [01/2007]). Prior to the inclusion of the copy data wizard, you had to manually configure ADF artefacts and write most of the JSON for linked services, datasets and pipeline by hand. The copy data task is a wizard for generating a data movement activity pipeline, complete with datasets and linked services. To get started connect to azure and navigate to your existing Azure data factory (if you do not have an existing ADF you can follow how to create one here To begin setting up a copy data pipeline click on the "copy data (PREVIEW)" option in the ADF panel in Azure. Once you have selected "Copy data" you will be taken to the new ADF interface. enables the creation and monitoring of ADF pipelines. The general process for creating an ADF pipeline process (prior to the copy data task) was Create an ADF Create a linked service/s Create a gateway as needed Create you input and output datasets Create a pipeline Monitor the pipeline When using the ADF copy data the process is slightly flipped Create an ADF Configure the properties for the pipeline Create a gateway Configure the linked service/s Configure the datasets Deploy all configurations Monitor the pipeline. The main difference here is that you do not deploy anything until it has all been configured, you have the added advantage that it is somewhat cumbersome to do this manually. At present, the copy data task is very narrow in its functionality. If your intention is to build a more advanced pipeline will either need to generate a move task and tweak it or create it all manually. Copy data has many short comings, for our example the most prevalent is that a movement to blob storage only has the option to sink data to a folder and not multiple folders. Option 3 in our list of methods for migration aims to get around this limitation using PowerShell Configure Pipeline properties:Once you have selected "copy data" you will be launched in to, the new fresher looking environment. The copy data task is a 4 stop process which will guide you through the creation of a data movement pipeline (I want to highlight that this is only used for data movement and not transformation). This is a great way to get you started with ADF without having to understand the json or trickier elements such as data slices and scheduling, although we will touch on scheduling as it is quite tricky. (image 1 - Configure Properties) The first screen is you will see are the properties of the pipeline you're creating. It is here you will configure the frequency and schedule of the pipeline. A pipeline is a group of logically related activities. Task name - This is important and will be used as a prefix for the names of datasets and data stores. Task description - Task schedule - See below for a more in depth analysis. Start time - This date is in UTC / End time - This data is also in UTC For quick conversions to your time zone, I recommend worldtimebuddy ( More on schedules:The Microsoft page about scheduling is incredibly deep and takes a lot of effort to fully digest and understand. I will attempt to impart my understanding of pipeline scheduling in to a brief list of key points. You can read more here I would recommend that you do read this site as there is a lot of good examples. The key points from this document are: A schedule can be one-off or between dates It can run on a scheduled basis Minute, hourly, daily or weekly. Every 15 minutes is the minimum. This forms what is known as a tumbling window. Microsoft defines a tumbling window as "A series of fixed-size, non-overlapping, contiguous time intervals". Tumbling windows are also known as activity windows. A pipeline schedule's interval needs to be the same as a datasets availability - However, it does not need to run at the same time. For our example we will use frequency of "Daily" and an interval of "1", this will run our pipeline every day. To write this in JSON and not through the wizard you would use the following JSON as part of your pipeline. "scheduler": {"frequency": "Daily","interval": 1} To create a pipeline which will run undefinably you can set the end date time to "12/31/2099 12:00am" which while this is not infinite, the date will outlive ADF. Start date time will default to the time you have created the pipeline (n.b. these dates are expressed in US format MM/DD/YYYY). Creating linked services (Source data store):The next screen is the configuration of the linked sources. ADF is still new and the terminology is somewhat confusing. Depending on tool you're using and sometimes the screen you're looking at, ADF will mix up the names for key parts, anywhere you see the term "data store" assume it is referring to a linked service. For our example we will use the SQL Server option (bottom right of the above image). (Image - SQL Server linked service configuration screen) You should now be able to configure the connection details (via a gateway) to you SQL server database. Connection Name - You can refer to the Adatis naming standard as a referenceLS - Linked serviceMSQL - Microsoft SQL ServerPerson - Table being exported. Gateway - Select an existing or create a new gateway (see below) Server name - For my example I am using my local server with the default instance "." local would also work. If you're connecting to a named instance this will need to be server\InstanceName Database Name - Which database you want to connect to Credential encryption - You have the option to save a credential in Azure or use authentication through the browser. For simplicity I am using the latter. For production, use the former. Authentication type - How to connect to SQL Server, Windows or SQL login. User name Password   Creating and configuring a gateway:In our example we will be connecting to a local version of SQL Server, to connect and read data we will need to create an ADF gateway connection and also install our gateway on the server which has our database (or at least connection to that database). You have a few options to create the gateway, but before you can configure any of these you will need to download and install the gateway. You can find the latest version of the gateway here Once installed the gateway will be waiting for an access key. (image - Microsoft gateway - Awaiting gateway key) We have 3 options to create an ADF gateway and obtain the key the gateway is expecting. Copy Data configuration page (click create gateway)This will build the gateway and add the name to your pipeline. You will need to take the access key it generates and add that to you installed gateway. Add via author and deployNavigate to author and deploy on the main panel of ADF in Azure. Click on "...More" and select New Data gateway, configure and deploy. This will return a key. Add the key to the gateway on your server. Via PowerShellOpen a new PowerShell prompt and connect to Azure (Login-AzureRmAccount)Replace the following with your Azure details - $ResourceGroup, $DataFactoryName and $GatewayNew-AzureRmDataFactoryGateway -ResourceGroupName $ResourceGroup -Name $Gateway -DataFactoryName $DataFactoryName -Description $GatewayThis will return a key. Add the key to the gateway on your server.   (image - A registered gateway) (Image - main screen on a registered gateway) Configuring linked services:Select next to choose which table/s you wish to move. (Image - ADF Copy - Select tables) You can select one or more tables here. For our example we will be consuming the data using PolyBase. We want our data to sink to its own container in Azure. As such we cannot move multiple tables at once (at the time of writing this is limited to one sink container). (Image - ADF data filter) You will next be asked how you want to filter the data. Each time our data runs we are looking to move the whole table. If we were looking to do incremental loads, we could select a column which indicates which rows to import each hour. For our example select Filter: None. Next (image - ADF destination Source) Configuring the destination source:On the next screen you will see the list of available sinks (where you can insert data). You will notice the list of sinks is far smaller than the list of sources - at present not all sources can be sinks. For our example select Azure Blob storage Connection Name - Based on Adatis ADF naming standards LS_ Linked Service ABLB_ Blob storage Person - blob container data will sink to Account selection method - Manual/Azure list Azure subscription - Select you subscription Storage account name - Select your storage account (Image - Selecting a blob container) Select a folder for the file to sink to. I created a folder ahead of time called person. (Image - ADF file format configuration screen) Customise you output settings. For now we will just select the defaults to create a CSV. Select finish to build your pipeline. (image - ADF deployment completed screen) As long as everything has worked you will see the screen above. Congratulations your pipeline has been deployed. To monitor the pipeline and see what has been created select the link "click here to monitor your pipeline". You will be taken to a different screen in the ADF portal. We will have more on how to monitor ADF shortly. Image - (ADF pipeline in You can check data has moved successfully using Azure storage explorer. ASE is a great utility for browsing files in blob storage. You can download ASE here (image - Storage explorer) I can see that my file is there and is populated as intended. Once a further 24 hours has passed this file will be over written. So we have seen what we can do with the Copy data task in Azure. While it is fantastic at basic data movement functions, copy data does not offer much beyond that. I have listed the following pains and shortfalls which exit in ADF copy data at present . Limitations of the copy data wizard:There are quite a few limitations, some of these are nice to have, others are show stoppers. The CD action is limited to only a subset of the pipeline activities. As the name suggests you can only copy data, or move data. There is no transformation wizard. The menus are very temperamental and regularly do not work You cannot name a dataset - InputDataset-8tl was created in my example. This is not helpful The name of the pipeline is also not helpful. You cannot chain multiple activities together Each pipeline needs to created separately. You can only sink datasets to one blob container. Now that we have our data in blob storage we can begin to look at the rest of our solution, where we will create an Azure SQL Data Warehouse, with external PolyBase tables. We will use stored procedures to persist the external tables in to ASDW. In the next blog we will look at moving an entire database to Azure blob storage using SSIS and BIML. Links

What is Azure Data Factory?

(Image 1 - Azure Data Factory – Transformation Pipeline) Overview: Azure data factory (ADF) is a big data processing platform from Microsoft on the Azure platform. For database developers, the obvious comparison is with Microsoft's SQL Server integration services (SSIS). SSIS is an ETL tool (extract data, transform it and load), ADF is not an ETL tool. ADF is more akin to ELT frameworks (Extract-Load-Transform), while the terms are similar, the process is very different. For those who have worked in the data warehouse arena for a number of years, you will be very familiar with ETL and switching to ELT can feel somewhat alien and cumbersome. As such, making obvious comparisons to SSIS will only hinder your understanding of the core concepts of ADF. ADF is not a cloud version of SSIS as many would have you believe, it is both more and less. On the topic of ETL, Dan Linstedt (The father of Data Vault), published an interesting article "ETL is Dead! Long Live ELT" ( In this, Linstedt argues that ETL is now defunct for a number of reasons, the primary being big data. To enable fast processing of big data, we have seen a shift to parallel processing with tools such has Hadoop's HDFS, Cassandra, Spark and on the Microsoft side, Data Lake and Azure Data warehouse. By having our data spread across many nodes we can push the processing to the data via a MapReduce job. ADF is designed to be a big data processing pipeline, so it makes sense to process where the data lives rather than extracting all our data, transforming it and then loading it in one processes. We instead ingest the data and load it in to a parallel storage engine (HDFS) and then transform. If this is the first time you're reading about ELT, I recommend both Linstedt's article where Linstedt states "ETL truly is DEAD. Metadata & Lineage are NOT dead, ELT is the future" and also James Serra's blog “The difference between ETL and ELT” ( ADF is often described as an "Orchestration" tool ( and this is because of its two functions. Being ELT and not ETL, the two main functions might not surprise you. They are moving data and transforming it. When I say transforming data you might assume ADF does the transformation, however the only action ADF actually preforms is to issue a command for a transformation to happen (Issue an EXEC for a stored procedure or a MapReduce job for example). It is this issuing of commands which makes the tool an orchestration and not a multipurpose ETL tool such as SSIS. The data processing landscape is change, while ADF is still a relatively new tool, a lot of development is ongoing and improvements are being made. ADF is available through the Azure Portal and using the ADF add-in for visual studio ( Now we know what ADF does, let look at how it does it. ADF has 3 main components, linked services, datasets and pipelines. Core Concepts: (Image 2 - High level ADF) The diagram above is showing how ADF all slots together. If we follow this starting with a linked service we can explore this further. A linked service is our connection to some form of resource. This could be an on premise SQL database, a blob storage container, an azure SQL data warehouse or something else. It is here that we want something to happen. That could be extracting data, sinking data (an ADF term for storing data) or transforming data using stored procedures, MapReduce etc. A dataset is on which part of a linked service you want an activity to be performed (a table, view, stored procedure). A dataset is not a database, but rather a table or a container in blob storage. A linked service can have one or many datasets. An activity is performed by a pipeline which will perform an action/actions on a dataset/datasets.   Linked services A linked services is best thought of as a logical data connection, allowing ADF to connect to an external resource. A linked services acts as a representation of a data store or a compute resource. A data store is used as part of a data movement activity. A compute resource is executed as part data transformation exercise. Click here for more information on the creation and different sources you can use for linked services. Datasets A dataset represents a data structure in ADF, each belonging to a linked service. A linked service might be an Azure Blob storage account or an Azure SQL database, the dataset is a representation of the data, a folder of files in blob storage or a table in an Azure database, not the database itself. For a full list of sources and sinks you see the list of links below. Pipelines Pipelines offer the power for ADF. There are 2 types of pipeline which support our notion on ELT. Data movement pipeline - The Data movement pipeline does what its name implies. It moves data from one dataset to another. When referring to Azure documentation these two datasets will be referred to as the source and the sink. In a data movement activity, data can be moved from any source to any sink. For example, you can move data from Azure Blob storage in to a SQL database or from HDFS to an Azure SQL Data warehouse. This part handles the E and the L of our ETL. Data is extracted and loaded. The only difference to SSIS, is that at this point there is no T. No transformation has happened at this point. Data transformation pipeline - The data transformation activities again do as the name implies - Transform. As ADF is an orchestration tool, a transformation could be a MapReduce job, a streaming job, a stored proc or a data lake U-SQL query. To fully use ADF you will need to know how you're storing data (ADFS/Data lake) and then what language you want to use to process the data. Knowing how ADF functions is only half the battle. In relationship terms, you can think of the interconnection of linked services, pipelines and datasets as follows: A listed service has one or more datasets (A SQL database with many tables), a pipeline performs an action on one or more datasets. An Example: Let's imagine a process where you have an on premise SQL server box, you want to move multiple tables to blob storage, from there you then want to issue a stored procedure which will consume that data in to an Azure SQL data warehouse via PolyBase - As illustrated in the image below. (Image 3 - Example Process) To complete this we would need to create the following artefacts in ADF: Linked services:On-premise SQL databaseAzure BLOB storageAzure SQL data warehouse Datasets:Table in On-premise SQL databaseThe blob containerThe stored procedure Pipelines:Pipeline to move data from SQL database to blob storagePipeline to issue stored procedure In later blogs we will look at creating this example in full, however if you want to get started now, the links at the bottom of the page will guide you through setting up ADF. How to get started: There are 3 ways to develop for ADF. Azure portal - Semi-rich IDE. All in the Azure portal. Visual studio with ADF add-in - Richer development environment. Allows for source control of code through TFS and automated deployment of changes. This is required as the Azure portal can sometime get caught in a syntax loop where nothing will deploy. VS will remove and deploy artefacts which makes this a lot easier. You will sometime have to delete your ADF and start again. If your code is in VS all you need to do is deploy. PowerShell - Anything you can in the portal you can do in PowerShell. My development preference is 2 and 3. I use Visual studio for all my code which is source controlled with TFS, then I use PowerShell to build my ADF environment. Thanks for reading this introduction to ADF. I hope you have found it somewhat insightful. We have a whole series of blogs looking in depth at different aspects and design patterns for loading a data warehouse with Azure Data Factory. You can read more in our next blog looking at setting up your first Azure Data Factory. Links: Learning path: Documentation: Introduction to ADF [YouTube]: ADF overview [Channel9]