Nigel Meakins

Nigel Meakins' Blog

Azure Data Factory Custom Activity Development–Part 5: Using Cross Application Domains in ADF

This is the fifth and final post on a series on Azure Data Factory Custom Activity Development.

Introduction

There are times when the assemblies offered by the ADF hosted platform are not what we want, and we need to consider pushing our code into a separate application domain in order to make use of different dlls. Here’s an article on how I did this on a recent project.

The Development Scenario

If you’ve been following the previous posts, you’ll know that as part of an ADF project I created a custom activity for Data Validation that constructs a validation query, runs a Hive job, examines the result of the job (a query output) and then takes some appropriate action (warnings, alerts etc.). Because it has a set of conditional actions encapsulated within it, alerting based on validation results, the executing of the Hive query within a custom activity, and subsequent logic was deemed a better solution than trying to use a pipeline with the native Hive script runner activity followed by some attempt to execute alerting conditionally via pipeline activities.

The Techie Bits

In order to run the Hive job, you need to use the Microsoft.Azure.Management.HDInsight.Job assembly, which has a dependency on the Microsoft.WindowsAzure.Storage assembly. The version of the Storage assembly required however is not supported on the ADF batch platform. ADF v1 does not support beyond v4.3. The version required by the HDInsight.Job assembly is v6.0, so quite a way along the line in versioning terms. Hmmm. In fairness to Microsoft, this restriction is stated in the following article:

https://docs.microsoft.com/en-us/azure/data-factory/v1/data-factory-use-custom-activities

“Important

Data Factory service launcher requires the 4.3 version of WindowsAzure.Storage. If you add a reference to a later version of Azure Storage assembly in your custom activity project, you see an error when the activity executes. To resolve the error, see Appdomain isolation section.”

If, like me, you have managed to avoid reading this rather large caveat, you will however only be made aware of this when using classes from the assembly that are different to those in v4.3. You can still reference the assembly and use the classes, but when you call something that has a different interface, such as a method that has different parameters between versions, you then find out that your code will not run with that version of the assembly and you will be told that your method signature is not available in WindowsAzure.Storage v4.3. You won’t find this out however until you actually run your code in the ADF platform and get an error. Oh dear. Oh deary deary dear. Not good. Not good at all.

Don’t Panic…

Well as stated above, you have to roll up your DotNet sleeves, and crack on with some Cross App-Domain stuff, involving serializing the classes you need and all sorts of other rather tricky bits of development. Why? Well, in order to run your v6.0 Microsoft.WindowsAzure.Storage assembly, you will need to use a separate application domain with your own required version of the above Storage assembly loaded, thereby removing the ADF platform restriction. Not exactly your average day at the Custom Activity development workbench, but we all love a challenge right? Now you’re probably aware that serialization has been well implemented in DotNet for all sorts of scenarios from remoting to caching and other common use cases that require moving objects in and out of memory spaces. Now, what we need to do is to serialize the objects that will be required across our domains, pass them into a method for the execution of the activity and we’re cooking. Piece of cake.

Ah. The objects passed into the Execute method, being Activity, LinkedService and DataSet are NOT Serializable! But that is really easy for the Microsoft chaps to implement? There are loads of classes in DotNet that have native serialization built in right? With this running in a separate hosted process on ADF, surely it would have made sense to support ISerializable? Well to me it does make perfect sense, but this appears to have been something of an afterthought. Okay bellyaching over. What to do?

Passing Context into Your AppDomain

The entry point for calling our activity is the Execute method for the activity itself, as defined below:

public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)

Now upon reading the AppDomain Isolation solution referenced in the ‘Appdomain isolation’ link above, it suggests creating a ‘context’ class, that holds the items of information from your Activity, IList<Dataset> and IList<LinkedServices> input parameters available in the CustomActivity.Execute method (we are ignoring the IActivityLogger for now, as it doesn’t contain actual state information that we need to consider as context-related for passing across app domains). This context class can then be used to pass the required state to the other application domain. However, do we really want to have to pass every bit of context that we need to make use of on an ad-hoc basis across to our Activity? That would become something of a coding pain, with every Activity Extended Property string value, connection string, authentication parameter or whatever piece of state we need having to be coded in each time, creating a bespoke context object with various fields to hold this state info within, for each activity that we write. That’s going to add a lot to any ongoing custom activity development efforts, with lots of context-specific code just because we want to create custom activities that need a more current version of the offending Storage assembly.

If only they had set that [Serializable] attribute for each of these Activity, Dataset and LinkedService ADF classes, we could push the objects into strings, pass them over to the other app domain and deserialize them safe in the knowledge that we have caught all the available context state.

Custom Serialization to the Rescue

Okay so just because we don’t have this serialization available doesn’t mean we can’t bake our own. ADF makes heavy use of JSON, via the NewtonSoft JSON library. And this is big on serialization, after all that’s what JSON was originally built for. However, if we want to serialize our Activity, LinkedService and Dataset objects, and thereby all required custom activity state passed into our Custom Activity Execute method, we need to understand that there are a lot of abstract classes, interfaces and other non ‘concrete’ (in OOP parlance) classes contained within these three class types, with various Activity, Dataset and LinkedService members making use of them. For the Newtonsoft JSON library to be able to serialise the Activity, LinkedServices and Datasets into strings, and them back into their required classes with all constituent contained objects intact, we are going to need to tell the deserialization process how to materialise these abstract classes and interfaces. That means we will need to tell our deserializer just what they ‘really’ are, as you can’t ask it to create an interface or an abstract class, as we’ve covered in Class Hierarchies, SOLID Code and Json.NET Serialization Part 1.

In short we need to create the required custom converters code to handle serialization into JSON strings and back again for these three required classes and all their problematic contained classes (such as the abstract classes StorageFormat, FileCompression) that are referenced within them. We need to make sure that all dependent classes are covered, with custom converters for any abstract classes and interfaces. If we can do this, then we are ensured of having all the required information ordinarily available in a non-cross-app domain Custom Activity also available in the Cross-App Domain activity. No need to implement piecemeal custom context handling classes for the various activities that we will create. We can simply use this cross-app domain custom activity as a new base class from which to derive our activities, include the converters within the assembly, and we can serialize and deserialize our objects as needed whenever we need to pass our objects across application domains, such as when we need to reference assemblies with versions different to that provided natively within the ADF batch service. Okay, that’s quite a preamble but hopefully it sets the stage for what we need to do.

Custom Converters (Encore Une Fois)

You may recall in the start of this series we looked at how to use Custom Converters with JSON to deserialize JSON documents into concrete classes rather than abstract classes or interfaces. After trawling through the object model for Activities, Datasets and LinkedServices, it turns out that we need custom converters to handle the following objects:

and our entry point objects used within the Execute method.

All other classes involved in our serializing/deserializing of Activities, Datasets and LinkedSevices can be auto-converted by Json.Net without need for custom converters.

Some of these Custom Converter classes are more involved than others, as we’ll see shortly. However, once we’ve done this once, we can use this in all code scenarios requiring the Cross App domain functionality.

ReadJSON Method

All our custom converters will have the same basic ReadJSON method as below:

/// 
		/// Reads the JSON representation of the object.
		/// 
		/// The  to read from.
		/// Type of the object.
		/// The existing value of object being read.
		/// The calling serializer.
		/// 
		/// The object value.
		/// 
		public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
		{
			if (reader.TokenType == JsonToken.StartObject)
			{
				// Load JObject from stream
				JObject jObject = JObject.Load(reader);

				// Create target object based on JObject
				var target = Create(objectType, jObject);

				return target;
			}
			else
				return null;
		}

So the same as we saw in our previous post on the subject.

Create Methods

The Create method will, however, vary greatly based on the object required to be deserialized.

For the StorageFormatConverter object, it is pretty simple, with a basic case statement to determine the subclass of storage based on the Type property.

/// 
		/// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have
		/// default constructors, which would otherwise block the creation of the parent object when attempting
		/// auto-deserialization.
		/// 
		/// Type of the object.
		/// The jObject.
		/// 
		public StorageFormat Create(Type objectType, JObject jObject)
		{
			string typeName = (string)jObject.SelectToken("$..Type");
			switch (typeName.ToUpper())
			{
				case "AVROFORMAT":
					return jObject.SelectToken("$..TypeProperties").ToObject<Avro​Format>();

				case "JSONFORMAT":
					return jObject.SelectToken("$..TypeProperties").ToObject<Json​Format>();

				case "ORCFORMAT":
					return jObject.SelectToken("$..TypeProperties").ToObject<OrcFormat>();

				case "PARQUETFORMAT":
					return jObject.SelectToken("$..TypeProperties").ToObject<Parquet​Format>();

				case "TEXTFORMAT":
					return jObject.SelectToken("$..TypeProperties").ToObject<TextFormat>();

				default:
					return null;
			}
		}

For the CompressionConverter it is a similar story, again with a case statement based on the Type property, to create the various subclasses of Compression (BZip2Compression, GZipCompression etc.). I’ll leave that one for you to figure out.

For the PartitionValueConverter there is only one class that derives from this, so it doesn’t get any simpler really:

		/// 
		/// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have
		/// default constructors, which would otherwise block the creation of the parent object when attempting
		/// auto-deserialization.
		/// 
		/// Type of the object.
		/// The jObject.
		/// 
		public PartitionValue Create(Type objectType, JObject jObject)
		{
			return jObject.ToObject<DateTimePartitionValue>();
		}

Same for the DictionaryConverter, where our return type is simply a Dictionary<<string>, <string>>.

Things get a bit more interesting for our ActivityConverter, as below.

		/// 
		/// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have
		/// default constructors, which would otherwise block the creation of the parent object when attempting
		/// auto-deserialization.
		/// 
		/// Type of the object.
		/// The jObject.
		/// 
		public Activity Create(Type objectType, JObject jObject)
		{
			string name = (string)jObject["Name"];
			string description = (string)jObject.SelectToken("$..Description");
			string LinkedServiceName = (string)jObject.SelectToken("$..LinkedServiceName");
			string typeName = (string)jObject.SelectToken("$..Type");
			string assemblyName = (string)jObject.SelectToken("$..TypeProperties.AssemblyName");
			string entryPoint = (string)jObject.SelectToken("$..TypeProperties.EntryPoint");
			string packageFile = (string)jObject.SelectToken("$..TypeProperties.PackageFile");
			string packageLinkedService = (string)jObject.SelectToken("$..TypeProperties.PackageLinkedService");
			Dictionary extendedProperties = jObject.SelectToken("$..TypeProperties.ExtendedProperties").ToObject<Dictionary<string, string>>();
			DotNetActivity typeProperties = new DotNetActivity(assemblyName, entryPoint, packageFile, packageLinkedService);
			typeProperties.ExtendedProperties = extendedProperties;
			Activity activity = new Activity(typeProperties);
			activity.Name = name;
			activity.Description = description;
			activity.LinkedServiceName = LinkedServiceName;
			activity.Inputs = jObject.SelectToken("$..Inputs").ToObject<IList<ActivityInput>>();
			activity.Outputs = jObject.SelectToken("$..Outputs").ToObject<IList<ActivityOutput>>();
			activity.Policy = jObject.SelectToken("$..Policy").ToObject<ActivityPolicy>();
			activity.Scheduler = jObject.SelectToken("$..Policy").ToObject<Scheduler>();
			return activity;
		}

Quite a few properties to capture there, just a matter of making sure we have them all covered though.

For the DatasetConverter we have an added complexity with the various Dataset Types that area available, so we have a Create method that calls a GetDataType method, as below.

/// 
		/// Creates the object by explicitly parsing Json tokens and constructing property objects that do not have
		/// default constructors, which would otherwise block the creation of the parent object when attempting
		/// auto-deserialization.
		/// 
		/// Type of the object.
		/// The jObject.
		/// 
		public Dataset Create(Type objectType, JObject jObject)
		{
			string name = (string)jObject["Name"];
			string description = (string)jObject.SelectToken("$..Description");
			string linkedServiceName = (string)jObject.SelectToken("$..LinkedServiceName");
			string typeName = (string)jObject.SelectToken("$..Type");

			IDictionary<string, JToken> serviceExtraProperties = jObject.SelectToken("$..TypeProperties").ToObject<IDictionary<string, JToken>>();
			DatasetTypeProperties typeProperties = GetDatasetType(jObject);
			Availability availability = jObject.SelectToken("$..Availability").ToObject<Availability>();
			Policy policy = jObject.SelectToken("$..Policy").ToObject<Policy>();
			IList<DataElement> structure = jObject.SelectToken("$..Structure").ToObject<IList<DataElement>>();
			DatasetProperties properties = new DatasetProperties(typeProperties, availability, linkedServiceName);
			properties.Description = description;
			properties.External = (bool?)jObject.SelectToken("$..External");
			properties.Policy = policy;
			properties.Structure = structure;
			return new Dataset(name, properties);
		}

GetDataType follows the pattern of using a case statement with a Type property from the JSON…

public DatasetTypeProperties GetDatasetType(JObject jObject)
		{
			JsonSerializer serializer = new JsonSerializer();
			serializer.Converters.Add(new PartitionValueConverter());
			serializer.Converters.Add(new CompressionConverter());
			serializer.Converters.Add(new StorageFormatConverter());
			string typeName = (string)jObject.SelectToken("$..Type");
			switch (typeName.ToUpper())
			{
				case "AMAZONS3":
					return jObject.SelectToken("$..TypeProperties").ToObject<AmazonS3Dataset>(serializer);

				case "AZUREBLOB":
					return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Blob​Dataset>(serializer);

				case "AZUREDATALAKESTORE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Data​Lake​Store​Dataset>(serializer);

				case "AZURESEARCHINDEX":
					return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Search​Index​Dataset>(serializer);

				case "AZURESQLDWTABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Sql​Data​Warehouse​Table​Dataset>(serializer);

				case "AZURESQLTABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Sql​Table​Dataset>(serializer);

				case "AZURETABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Azure​Table​Dataset>(serializer);

				case "CUSTOMDATASET":
					return jObject.SelectToken("$..TypeProperties").ToObject<Custom​Dataset>(serializer);

				case "DOCUMENTDBCOLLECTION":
					return jObject.SelectToken("$..TypeProperties").ToObject<Document​DbCollection​Dataset>(serializer);

				case "FILESHARE":
					return jObject.SelectToken("$..TypeProperties").ToObject<File​Share​Dataset>(serializer);

				case "HTTP":
					return jObject.SelectToken("$..TypeProperties").ToObject<Http​Dataset>(serializer);

				case "MONGODBCOLLECTION":
					return jObject.SelectToken("$..TypeProperties").ToObject<Mongo​DbCollection​Dataset>(serializer);

				case "CASSANDRATABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<On​Premises​Cassandra​Table​Dataset>(serializer);

				case "ORACLETABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Oracle​Table​Dataset>(serializer);

				case "RELATIONALTABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Relational​Table​Dataset>(serializer);

				case "SQLSERVERTABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Sql​Server​Table​Dataset>(serializer);

				case "WEBTABLE":
					return jObject.SelectToken("$..TypeProperties").ToObject<Web​Table​Dataset>(serializer);

				default:
					return null;
			}
		}

Once we have the ability to serialize and deserialize our Activity, IList<Dataset> and IList<LinkedService> objects, we then need to wrap these all up into a containing context class, which we will use within our own Execute method implemented in our base class.

There is a similar story for the LinkedServiceConverter class, where we need to consider the LinkedService type (there are quite a lot). Again I’m going to leave that one for you to figure out, based on the pattern we have for the DatasetConverter.

The ActivityContext Class

This will contain the serialized JSON for those objects that we are passing in via our Execute method parameters. Note that the class has been marked as Serializable as this will be travelling between application domains.

/// 
	/// Container for Custom Activity serialized contextual information for passing
	/// between application domains.
	/// 
	/// All context is stored as JSON strings.
	[Serializable]
	public class ActivityContext
	{
		/// 
		/// Gets or sets the activity object in JSON form.
		/// 
		/// 
		/// The activity json.
		/// 
		public string ActivityJson { get; set; }
		/// 
		/// Gets or sets the linked services in JSON form.
		/// 
		/// 
		/// The linked services json.
		/// 
		public List<String> LinkedServicesJson { get; set; }
		/// 
		/// Gets or sets the datasets in JSON form.
		/// 
		/// 
		/// The datasets json.
		/// 
		public List<String> DatasetsJson { get; set; }
	}

The CrossAppDomainDotNetActivity Class

Okay, so now to actually implement all this Cross App domain stuff. As mentioned, we’ll create a base class for our activities to derive from, which we will use to encapsulate our serialization functionality.This is based on the example given in the Microsoft article link above, with the addition of a conditional compilation #if DEBUG block to allow us to use it with our ADFLocalEnvironment debugging harness (see this previous post for details).

public abstract class CrossAppDomainDotNetActivity<TExecutionContext>
		: MarshalByRefObject, IActivityLogger, ICrossAppDomainDotNetActivity<TExecutionContext>, IDotNetActivity
		where TExecutionContext : class
	{
		#region Private Fields

		private IActivityLogger logger;

		#endregion Private Fields

		#region Public Methods

		IDictionary<string, string> IDotNetActivity.Execute(IEnumerable<LinkedService> linkedServices,
			IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
		{
			TExecutionContext context = this.PreExecute(linkedServices, datasets, activity, logger);

			Type myType = this.GetType();
			var assemblyLocation = new FileInfo(myType.Assembly.Location);
			var appDomainSetup = new AppDomainSetup
			{
				ApplicationBase = assemblyLocation.DirectoryName,
				ConfigurationFile = assemblyLocation.Name + ".config"
			};
			AppDomain appDomain = AppDomain.CreateDomain(myType.ToString(), null, appDomainSetup);
			this.logger = logger;
			logger.Write("Assembly Location FullName: {0} Directory: {1}", assemblyLocation.FullName, assemblyLocation.Directory);
			//when running through debugger with ADFLocalEnvironment, cannot cast from the appDomain.CreateInstanceAndUnwrap call.
#if DEBUG
			return Execute(context, logger);
#else
			var proxy = (ICrossAppDomainDotNetActivity<TExecutionContext>)
					appDomain.CreateInstanceAndUnwrap(myType.Assembly.FullName, myType.FullName);
			return proxy.Execute(context, (IActivityLogger)this);
#endif
		}

		public abstract IDictionary<string, string> Execute(TExecutionContext context, IActivityLogger logger);

		public override object InitializeLifetimeService()
		{
			// Ensure that the client-activated object lives as long as the hosting app domain.
			return null;
		}

		void IActivityLogger.Write(string format, params object[] args)
		{
			this.logger.Write(format, args);
		}

		#endregion Public Methods

		#region Protected Methods

		protected virtual TExecutionContext PreExecute(IEnumerable<LinkedService> linkedServices,
					IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
		{
			return null;
		}

		#endregion Protected Methods
	}

You'll notice that this derives from the MarshalByRefObject class, which is a requirement for marshalling across application boundaries. We get the context object in line 16 above, which stores all our serialized state for the Activity, LinkedServices and Datasets, by calling the PreExecute method. Note however that in this base class there is no implementation for this. We need to implement this in the derived class, which I’m coming on to. We create a proxy object in line 32, which is an instance of our derived Custom Activity class (remember this is an abstract class, so when this method is called, myType will refer to the concrete derived class that calls it). Then in line 34 we call the Execute method of this proxy class, passing in our context container object together with an IActivityLogger created from casting our class (note that the base abstract class implements the IActivityLogger interface, with the required Write method).

Again note that we haven’t implemented the Execute method here either, as this will be implemented in our derived Custom Activity class with the specifics of the activity required. So quite a lot going on here, but it should all make sense when you see the derived Custom Activity class implementation.

First however, just for completeness, here’s the ICrossAppDomainDotNetActivity interface, which we use to ensure our implementing derived classes contain the members required.

	interface ICrossAppDomainDotNetActivity
	{
		IDictionary<string, string> Execute(TExecutionContext context, IActivityLogger logger);
	}

CustomActivity Class Derived from CrossAppDomainDotNetActivity

Okay, so now we have all our supporting code, time for the star of the show, the actual Custom Activity that does something.

PreExecute Method

The PreExecute method essentially takes the same input parameters as the non-Cross App Domain Custom Activity Execute method, and serializes these into JSON strings and stores them within an ActivityContext class which it returns. Note that as this is generic for all our activities we can put this in our ActivityBase class from which all our cross application domain Custom Activities will derive.

	public abstract class ActivityBase : CrossAppDomainDotNetActivity
	{
		#region Private Fields

		private Configuration configuration;

		#endregion Private Fields
		protected override ActivityContext PreExecute(IEnumerable<LinkedService> linkedServices,
		IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
		{
			// Process ADF artifacts up front as these objects are not serializable across app domain boundaries.
			List<string> datasetsJson = new List<string>();
			List<string> linkedServicesJson = new List<string>();

			datasets.ToList<Dataset>().ForEach(ds =>
			{
				datasetsJson.Add(JsonConvert.SerializeObject(ds));
			});
			linkedServices.ToList<LinkedService>().ForEach(ls =>
			{
				linkedServicesJson.Add(JsonConvert.SerializeObject(ls));
			});
			return new ActivityContext
			{
				ActivityJson = JsonConvert.SerializeObject(activity),
				DatasetsJson = datasetsJson,
				LinkedServicesJson = linkedServicesJson
			};
		}

Execute Method

This is implemented in our actual Custom Activity class, such as the HiveDataValidation class mentioned in previous posts. The Execute method deserializes the ActivityContext object into the required Activity, Dataset and LinkedServices, from which point we are free to continue with our code as if all this crossing of domain boundaries was nothing more than a slightly disturbing dream…

public override IDictionary<string, string> Execute(ActivityContext context, IActivityLogger logger)
		{
			List<LinkedService> linkedServices = new List<LinkedService>();
			context.LinkedServicesJson.ForEach(lsJson =>;
			{
				LinkedService ls = JsonConvert.DeserializeObject<LinkedService>(lsJson, new LinkedServiceConverter());
				linkedServices.Add(ls);
			});
			Activity activity = JsonConvert.DeserializeObject<Activity>(context.ActivityJson, new ActivityConverter());
			List<Dataset> datasets = new List<Dataset>();
			context.DatasetsJson.ForEach(dsJson =>;
			{
				Dataset ds = JsonConvert.DeserializeObject<Dataset>(dsJson, new DatasetConverter());
				datasets.Add(ds);
			});

That’s All Folks

And on that happy note it’s time to wrap up this series on Custom Activity development with Azure Data Factory. I hope my suggestions have helped make your development easier and more productive. Feel free to contact me via email at Nigel.Meakins@Adatis.co.uk or on Twitter @NigelMeakins. Thanks for reading.

Pingbacks and trackbacks (1)+

Loading