Tristan Robinson

Tristan Robinson's Blog

PySpark DataFrame Transformation Chaining

After working with Databricks and PySpark for a while now, its clear there needs to be as much best practice defined upfront as possible when coding notebooks. While you can get away with quite a bit when writing SQL - which is all too familiar to most of us now, the transition into other languages (from a BI background) requires a bit more thought in terms of coding standards. In this short blog, I will talk about multiple DataFrame transformations and what I believe to be the best way to go about structuring your code for them.

More often than not, if you have not come from a programming background or are unfamiliar with a language, you will end up writing spaghetti code. Essentially code that reads a bit like a book, with no real structure to it, causing debugging/maintenance issues. We’ve all been there, especially at the start of a project, just wanting to get some output – but its worth trying to bear in mind some coding rules while designing to save the refactoring a few weeks down the line.

Jumping into the subject matter of the blog, this came about because I noticed some of our project code when it came to transformations was made up of re-assigning to a new data frame on each transformation. While functionality it was acceptable, it didn’t feel the right way to go about things. The same can be said with re-using the same data frame (although this caused other dependency issues). It also left us with huge chunks of code which was only possible to read by using the comments to see where some transformations stopped and others started.

After a small bit of research I discovered the concept of monkey patching (modifying a program to extend its local execution) the DataFrame object to include a transform function. This function is missing from PySpark but does exist as part of the Scala language already.

The following code can be used to achieve this, and can be stored in a generic wrapper functions notebook to separate it out from your main code. This can then be called to import the functions whenever you need them.

 

# Monkey patch the DataFrame object with a transform method
from pyspark.sql.dataframe import DataFrame

def transform(self, f):
    return f(self)

DataFrame.transform = transform

 

As part of your main code, we can then wrap the transformations into functions, passing in and returning a DataFrame. A separate statement can then be called specifying transform on the original DataFrame and the list of functions (transformations) you want to pass in. By using this method, the code is almost self-documenting as its clear what transformations you’ve then applied to move a DataFrame from one context into another. The example below only includes 2 transformations, but imagine you have 20+ to implement – this makes the code much easier to read and debug should there be an issue.

 

# Create started/finished timestamps and int dates in UTC as new columns in the DataFrame.
def append_utc_dates(df):
  df = df.withColumn("finishedTimestampUTC", to_utc_timestamp(col("finished"), "EST")) \
         .withColumn("startedTimestampUTC", to_utc_timestamp(col("started"), "EST")) \
         .withColumn("startedDateUTC", ConvertDateTimeToIntDate(col("startedTimestampUTC")))\
         .withColumn("finishedDateUTC", ConvertDateTimeToIntDate(col("finishedTimestampUTC")))   
  return df

# Adds new attribute based on hand structure in to the DataFrame
def append_BB(df):
  df = df.join(refHandStructure, df["structure"] == refHandStructure["structure"], "left") \
         .select(df["*"], refHandStructure["handStructureName"]) \
         .withColumn("BB",when(col("HSName") == "Limit", col("x")).otherwise(col("y")))
  df = df.drop("handStructureName")
  return df

# Perform hand transformations
dfTransformed = dfRaw.transform(append_utc_dates) \
                     .transform(append_BB)
      
display(dfTransformed)

 

You should then use new DataFrames for changes in grain or changes in purpose (not just for each transformation).

This technique probably shouldn’t be used for some of the more basic transformations, especially if you only have 1 or 2, but more so when you have 50/100+ lines of code.  Not everything needs to be wrapped into functions, but it certainly reads better for larger notebooks!

Implementing Enterprise Security in Azure Databricks - Part 2

Following on from my last blog on the topic of security within Azure Databricks which concentrated on the implementation model for data processing for platform, the following blog concentrates on the alternative - data processing for users.

 

Data Processing for Users

By this, I mean data-related activities that a user is performing interactively, for instance data analysis from Data Lake.

With the addition of Databricks runtime 5.1 which was released December 2018, comes the ability to use Azure AD credential pass-through. This is a huge step forward since there is no longer a need to control user permissions through Databricks Groups / Bash and then assigning these groups access to secrets to access Data Lake at runtime. As mentioned previously - with the lack of support for AAD within Databricks currently, ACL activities were done on an individual basis which was not ideal. By using this feature, you can now pass the authentication onto Data Lake, and as we know one of the advantages of Data Lake is the tight integration into Active Directory so this simplifies things. Its worth noting that this feature is currently in public preview but having tested it thoroughly, am happy with the implementation/limitations. The feature also requires a premium workspace and only works with high concurrency clusters – both of which you’d expect to use in this scenario.

The other good thing is that its incredibly easy to enable this functionality, as it is controlled by the cluster configuration process. To enable, navigate to the cluster configuration page, select runtime 5.1 or higher, and expand the advanced options. At this point, you will see a tick box which needs to be checked (see below). This will add another line of code into your spark config. Its actually good to see it was implemented in this way – and helps to stick to the Microsoft mantra of keeping things simple.

 

image

 

Once enabled, only connections into Data Lake via acl:// are valid – any existing connections with the dbfs or through the databricks databases mounted to route via the dbfs for unmanaged tables will stop working. This is a current limitation and may be fixed at GA (although technically you could re-build the tables using the acl:// path if this was an issue).

Great – so now my ACL can be controlled entirely within Data Lake without the need for Service Principals! But there’s more..

I touched on this with my previous blog, but as part of the access implementation for Data Lake, it is preferable to define Azure Active Directory Groups (AAD) to provide further flexibility moving forward. By this I mean, creating AAD groups and assigning them to Data Lake so as to create a level of abstraction away from Users/User AD Groups/Service Principals so that modifications will not need to be made to Data Lake permissions in the future, only to the AAD groups that are already assigned. From experience, by not going down this route - any additional user permissions that need applying in future have to be applied across all folders/files which depending on the size of the data lake, can take a particularly long time \ be awkward to add. Therefore this needs to be done upfront as part of the design!

I would suggest the following conventions for this group setup, an example being AAD_PLATFORM_ADL_ENV_RAW_READER.

  • AAD – to separate out AD/AAD once sync is implemented.
  • PLATFORM – the platform or project/department this group is used by.
  • ADL – the resource on the platform that the group is used by.
  • ENV – the environment on which the resource resides (prod/non-prod).
  • RAW – the layer within lake the permissions will be applied to.
  • READ – the permission the group will have access to.

You would then need to create the following AAD groups to cover all areas across the Data Lake. This assumes using our standard Data Lake layer/folder pattern first introduced by Ust’s blog back in 2016.

  • AAD_PLATFORM_ADL_PROD_RAW_READER
  • AAD_PLATFORM_ADL_PROD_RAW_WRITER
  • AAD_PLATFORM_ADL_PROD_BASE_READER
  • AAD_PLATFORM_ADL_PROD_BASE_WRITER
  • AAD_PLATFORM_ADL_PROD_ENRICHED_READER
  • AAD_PLATFORM_ADL_PROD_ENRICHED_WRITER
  • AAD_PLATFORM_ADL_PROD_CURATED_READER
  • AAD_PLATFORM_ADL_PROD_CURATED_WRITER
  • AAD_PLATFORM_ADL_PROD_LABORATORY
  • AAD_PLATFORM_ADL_PROD_LIBRARY_READER
  • AAD_PLATFORM_ADL_PROD_ADMIN

The permissions applied to these groups can then be implemented using the following matrix. When adding these permissions, they need to be added to the current folder, and all children, and added as both an access permission entry and a default permission entry. Without this, any changes to Data Lake in the future will not inherit these permissions.

 

image

 

Once this is complete, you would add the Users/AD User Groups/Service Principals into these AAD groups to provide access and the pass-through permissions work as expected.

By using this method it separates out both read/write and the data lake layers meaning that unless specifically granted, there will be a much more limited footprint in terms of access permissions into Data Lake. Using a combination of this, and the AD credential pass-through from Databricks provides a suitable solution for implementing security using the Databricks/Data Lake combo.

Implementing Enterprise Security in Azure Databricks - Part 1

In recent weeks, I’ve spent a fair chunk of time working with different aspects of Databricks and as part of this, one topic that consumed a proportion of that time is the security and authentication aspects of the service. Our scenario was one that I expect most people will come across over the next few years, essentially the integration of Databricks with Azure Data Lake for data processing. In the following blogs (yup I need 2) I intend to document the way we went about implementing the security model, breaking it up into data processing for a platform and data processing for users – each has a slightly different implementation.

 

Data Processing for Platform

By this, I mean data processing activities that happen as part of a service, orchestrated through something such as Azure Data Factory that happen on a schedule or in real-time.

I expect the very first thing most people will do once they have a Databricks workspace is to mount their Data Lake.  Its by the far easiest option for getting your hands on your data, and one that’s still worth using to start off as long as you understand the consequences. If you need to do this, I will point you in the direction of Hugh’s blog post covering the topic which I’ve used a number of times now to remember all the different secrets/GUIDs you need.  By mounting the Data Lake in your Databricks workspace, you are able to interact with the data as if it were in the DBFS (the databricks local file system). This proves incredibly powerful for tools such as dbutils which allow you to perform a vast number of file system operations. Unfortunately the one downside to this method is that anyone with access to the workspace is able to directly interact with all layers in your Data Lake – not ideal. Since Data Lake’s are not treated in the same way as something like an Azure SQL DB, you also won’t have those handy recovery options if something happens to your data because of this unrestricted access. This obviously poses a risk for an Enterprise solution. Its worth noting that the mount is done once, so your clusters can go down and be spun up again by other users and they would still be able to access the mountpoint until it is un-mounted.

With that in mind, it is far better to implement the access to Data Lake directly at runtime using the Spark API. The config for this is very similar to the mounting, except the spark config is set at runtime and therefore if this is not included you will be unable to access the Data Lake. This avoids the issues mentioned above where anyone can access the data, just because they can access the workspace. The following commands can be used to set this up at runtime.

 

# Get Secrets from Databricks
ClientId = dbutils.secrets.get(scope = "ScopeX", key = "ClientId")
ClientSecret = dbutils.secrets.get(scope = "ScopeX", key = "ClientSecret")
DirectoryId = dbutils.secrets.get(scope = "ScopeX", key = "DirectoryId")

 # Apply variables to spark config
spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", ClientId)
spark.conf.set("dfs.adls.oauth2.credential", ClientSecret)
spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/" + DirectoryId + "/oauth2/token")

 

You’ll notice that as part of this I’m retrieving the secrets/GUIDS I need for the connection from somewhere else – namely the Databricks-backed secrets store. This avoids exposing those secrets in plain text in your notebook – again this would not be ideal. The secret access is then based on an ACL (access control list) so I can only connect to Data Lake if I’m granted access into the secrets. While it is also possible to connect Databricks up to the Azure Key Vault and use this for secrets store instead, when I tried to configure this I was denied based on permissions. After research I was unable to overcome the issue. This would be more ideal to use but unfortunately there is limited support currently and the fact the error message contained spelling mistakes suggests to me the functionality is not yet mature.

To configure the databricks-backed secrets, the easiest method is to use an Azure Bash console and go in via the Databricks CLI. To access the console - within the Azure portal you’ll notice an icon similar to below as part of the top ribbon.

 

image

 

Clicking this will then prompt you to start either a PowerShell or Bash console – which will look similar to below.

 

image

 

The connection into the Databricks CLI can be setup as per the following commands.

 

virtualenv -p /usr/bin/python2.7 databrickscli
source databrickscli/bin/activate
pip install databricks-cli
databricks configure --token 

 

At this point, you’ll need to provide it both your databricks host – something similar to https://northeurope.azuredatabricks.net and a token. The token will need to be generated through your Databricks workspace – under User Settings / Access Tokens. This essentially lets you into the API without the need for a password. The important thing to mention at this point is that the token is a personal access token. While this doesn’t impact anything in particular with Azure Bash, its worth noting that the token is created under your Databricks account, and therefore using this token for something such as a linked service into ADF then will use your account to process the connection authenticating as you. Hopefully over time, this will be matured and the use of Managed Identity's or Service Principals directly connected into the platform will be possible. As you might of guessed then, you will need to make sure the account that generates the token is then used within the ACL to read the secret, otherwise at processing time – ADF will not be able to read through Databricks into Lake.

The configuration of the secrets is then required and can be done using the following commands. Simply replace [client id], [secret value], [directory id] with the necessary values from your service principal.

 

databricks secrets create-scope --scope ScopeX

databricks secrets put --scope ScopeX --key ClientId --string-value [client id]
databricks secrets put --scope ScopeX --key ClientSecret --string-value [secret value]
databricks secrets put --scope ScopeX --key DirectoryId --string-value [directory id]

databricks secrets put-acl --scope ScopeX --principal admins --permission READ

 

I then granted access to read these secrets to the admins group – this just keeps things simple but you can obviously provide it individual users as well or other Databricks groups. One of my hopes for 2019 is that the platform is integrated better into AAD moving forwards as everyone needs to be named individually currently. You would then need to manage permissions through the groups which can only be done via the CLI.

Once this is complete, you can now interact with your Data Lake and authenticate at runtime. Its also worth mentioning that this interaction changes the connection string from dbfs:// to adl:// which may have a knock-on effect if you use certain tools such as dbutils to do things within your code. This is also important to know since with databricks runtime 5.1 and AD credential pass-through, you will be unable to access anything other than Data Lake file systems. I’ll explain this further in my next blog.

 

Conclusion

Hopefully this will prove a useful to anyone venturing onto the platform and provide a basis to implement a security model. If you wanted to go a step further, you may also want to implement service principal access to Data Lake on a number of levels – both across folders AND read/write. This would add slightly more complexity to the solution but provide an even securer method avoiding the scenario where accounts can access the entire Data Lake. In my next blog, I will look at it from a user-perspective which takes on a slightly different implementation.

Spark Streaming in Azure Databricks

Real-time stream processing is becoming more prevalent on modern day data platforms, and with a myriad of processing technologies out there, where do you begin? Stream processing involves the consumption of messages from either queue/files, doing some processing in the middle (querying, filtering, aggregation) and then forwarding the result to a sink – all with a minimal latency. This is in direct contrast to batch processing which usually occurs on an hourly or daily basis. Often is this the case, both of these will need to be combined to create a new data set.

In terms of options for real-time stream processing on Azure you have the following:

  • Azure Stream Analytics
  • Spark Streaming / Storm on HDInsight
  • Spark Streaming on Databricks
  • Azure Functions

Stream Analytics is a simple PaaS offering. It connects easily into other Azure resources such as Event Hubs, IoT Hub, and Blob, and outputs to a range of resources that you’d expect. It has its own intuitive query language, with the added benefit of letting you create functions in JavaScript. Scaling can be achieved by partitions, and it has windowing and late arrival event support that you’d expect from a processing option. For most jobs, this service will be the quickest/easiest to implement as long as its relatively small amount of limitations fall outside the bounds of what you want to achieve. Its also worth noting that the service does not currently support Azure network security such as Virtual Networks or IP Filtering. I suspect this may only be time with the Preview of this in EventHubs.

Both Spark Streaming on HDInsight and Databricks open up the options for configurability and are possibly more suited to an enterprise level data platform, allowing us to use languages such as Scala/Python or even Java for the processing in the middle. The use of these options also allows us to integrate Kafka (an open source alternative to EventHubs) as well as HDFS, and Data Lake as inputs. Scalability is determined by the cluster sizes and the support for other events mentioned above is also included. These options also give us the flexibility for the future, and allow us to adapt moving forward depending on evolving technologies. They also come with the benefit of Azure network security support so we can peer our clusters onto a virtual network.

Lastly – I wouldn’t personally use this but we can also use Functions to achieve the same goal through C#/Node.js. This route however does not include support for those temporal/windowing/late arrival events since functions are serverless and act on a per execution basis.

In the following blog, I’ll be looking at Spark Streaming on Databricks (which is fast becoming my favourite research topic).

A good place to start this is to understand the structured streaming model which I’ve seen a documented a few times now. Essentially treating the stream as an unbounded table, with new records from the stream being appended as a new rows to the table. This allows us to treat both batch and streaming data as tables in a DataFrame, therefore allowing similar queries to be run across them.

 

image

 

At this point, it will be useful to include some code to help explain the process. Before beginning its worth mounting your data sink to your databricks instance so you can reference it as if it were inside the DBFS (Databricks File System) – this is merely a pointer. For more info on this, refer to the databricks documentation here. Only create a mount point if you want all users in the workspace to have access. If you wish to apply security, you will need to access the store directly (also documented in the same place) and then apply permissions to the notebook accordingly.

As my input for my stream was from EventHubs, we can start by defining the reading stream. You’ll firstly need to add the maven coordinate com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.2 to add the EventHub library to the cluster to allow the connection. Further options can be added for the consumer group, starting positions (for partitioning), timeouts and events per trigger. Positions can also be used to define starting and ending points in time so that the stream is not running continuously.

connectionString = "Endpoint=sb://{EVENTHUBNAMESPACE}.servicebus.windows.net/{EVENTHUBNAME};EntityPath={EVENTHUBNAME};SharedAccessKeyName={ACCESSKEYNAME};SharedAccessKey={ACCESSKEY}"

startingEventPosition = {
  "offset": "-1",         # start of stream
  "seqNo": -1,            # not in use
  "enqueuedTime": None,   # not in use
  "isInclusive": True
}

endingEventPosition = {
  "offset": None,                                             # not in use
  "seqNo": -1,                                                # not in use
  "enqueuedTime": dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), # point in time
  "isInclusive": True
}

ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString
ehConf['eventhubs.startingPosition'] = json.dumps(startingEventPosition)
ehConf['eventhubs.endingPosition'] = json.dumps(endingEventPosition)

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

The streaming data that is then output then follows the following schema – the body followed by a series of metadata about the streaming message.

 

image

 

Its important to note that the body comes out as a binary stream (this contains our message). We will need to cast the body to a String to deserialize the column to the JSON that we are expecting. This can be done by using some Spark SQL to turn the binary into a string as JSON and then parsing the column into a StructType with specified schema. If multiple records are coming through in the same message, you will need to explode out the result into separate records. Flattening out the nested columns is also useful as long as the data frame is still manageable. Spark SQL provides some great functions here to make our life easy.

rawData = df. \
  selectExpr("cast(body as string) as json"). \
  select(from_json("json", Schema).alias("data")). \
  select("data.*")

While its entirely possible to construct your schema manually, its also worth noting that you can take a sample JSON, read it into a data frame using spark.read.json(path) and then calling printSchema() on top of it to return the inferred schema. This can then used be used to create the StructType.

# Inferred schema:

#   root
#    |-- LineTotal: string (nullable = true)
#    |-- OrderQty: string (nullable = true)
#    |-- ProductID: string (nullable = true)
#    |-- SalesOrderDetailID: string (nullable = true)
#    |-- SalesOrderID: string (nullable = true)
#    |-- UnitPrice: string (nullable = true)
#    |-- UnitPriceDiscount: string (nullable = true)
  
Schema = StructType([
    StructField('SalesOrderID', StringType(), False),
    StructField('SalesOrderDetailID', StringType(), False),
    StructField('OrderQty', StringType(), False),
    StructField('ProductID', StringType(), False),
    StructField('UnitPrice', StringType(), False),
    StructField('UnitPriceDiscount', StringType(), False),
    StructField('LineTotal', StringType(), False)
])

At this point, you have the data streaming into your data frame. To output to the console you can use display(rawData) to see the data visually. However this is only useful for debugging since the data is not actually going anywhere! To write the stream into somewhere such as data lake you would then use the following code. The checkpoint location can be used to recover from failures when the stream is interrupted, and this is important if this code were to make it to a production environment. Should a cluster fail, the query be restarted on a new cluster from a specific point and consistently recover, thus enabling exactly-once guarantees. This also means we can change the query as long as the input source and output schema are the same, and not directly interrupt the stream. Lastly, the trigger will check for new rows in to stream every 10 seconds.

rawData.writeStream \
    .format("json") \
    .outputMode("append") \
    .option("path", PATH) \
    .trigger(processingTime = "10 seconds") \
    .option("checkpointLocation", PATH) \
    .start()

Checking our data lake, you can now see the data has made its way over, broken up by the time intervals specified.

 

image

 

Hopefully this is useful for anyone getting going in the topic area. I’d advise to stick to Python given the extra capacity of the PySpark language over Scala, even though a lot of the Databricks documentation / tutorials uses Scala. This was just something that felt more comfortable.

If you intend to do much in this area I would definitely suggest you use the PySpark SQL documentation which can be found here. This is pretty much a bible for all commands and I’ve been referencing it quite a bit. If this is not enough there is also a cheat sheet available here. Again, very useful for reference when the language is still not engrained.

Getting Started with Databricks Cluster Pricing

The use of databricks for data engineering or data analytics workloads is becoming more prevalent as the platform grows, and has made its way into most of our recent modern data architecture proposals – whether that be PaaS warehouses, or data science platforms.

To run any type of workload on the platform, you will need to setup a cluster to do the processing for you. While the Azure-based platform has made this relatively simple for development purposes, i.e. give it a name, select a runtime, select the type of VMs you want and away you go – for production workloads, a bit more thought needs to go into the configuration/cost.  In the following blog I’ll start by looking at the pricing in a bit more detail which will aim to provide a cost element to the cluster configuration process.

For arguments sake, the work that we tend to deliver with databricks is based on data engineering usage – spinning up resource for an allocated period to perform a task. Therefore this is generally the focus for the following topic.

 

To get started in this area, I think it would be useful to included some definitions.

  • DBU – a databricks unit (unit of processing capability per hour billed on per minute usage)
  • Data Engineering Workload - a job that both starts and terminates the cluster which it runs on (via the job scheduler)
  • Data Analytics Workload – a non automated workload, for example running a command manually within a databricks notebook. Multiple users can share the cluster to perform interactive analysis
  • Cluster – made up of instances of processing (VMs) and constitute of a driver, and workers. Workers can either be provisioned upfront, or autoscaled between a min no. workers / max no. workers.
  • Tier – either standard or premium. Premium includes role based access control, ODBC endpoint authentication, audit logs, Databricks Delta (unified data management system).

The billing for the clusters primarily works depending on the type of workload you initiate and tier (or functionality) you require. As you might of guessed data engineering workloads on the standard tier offer the best price.

I’ve taken the DS3 v2 instance (VM) pricing from the Azure Databricks pricing page.

image

 

The pricing can be broken down as follows:

  • Each instance is charged at £0.262/hour. So for example, the cost of a very simple cluster - 1 driver and 2 workers is £0.262/hour x 3 = £0.786/hour. The VM cost does not depend on the workload type/tier.
  • The DBU cost is then calculated at £0.196/hour. So for example, the cost of 3 nodes (as above) is £0.196/hour x 3 = £0.588/hour. This cost does change depending on workload type/tier.
  • The total cost is then £0.786/hour (VM Cost) + £0.588/hour (DBU Cost) = £1.374/hour. Also known as the pay as you go price. Discounts are then added accordingly for reserved processing power.

I thought this was worth simplifying since the pricing page doesn’t make this abundantly clear with the way the table is laid out and often this is overlooked. Due to the vast amount of options you can have to setup clusters, its worth understanding this element to balance against time.

The DBU count is merely to be used as reference to compare the different VMs processing power and is not directly included in the calculations. Its also worth mentioning that by default databricks services are setup as premium and can be downgraded to standard only by contacting support. In some cases, this can add some massive cost savings depending upon the type of work you are doing on the platform so please take this into account before spinning up clusters and don’t just go with the default.

With regards to configuration, clusters can either be setup under a High Concurrency mode (previously known as serverless) or as Standard. The high concurrency mode is optimised for concurrent workloads and therefore is more applicable to data analytics workloads and interactive notebooks which are used by multiple users simultaneously. This piece of configuration does not effect the pricing.

By using the following cost model, we can then assume for a basic batch ETL run where we have a driver and 8 worker nodes on relatively small DS3 instances, would cost £123.60/month given a standard 1 hour daily ETL window. Hopefully this provides as a very simple introduction into the pricing model used by Databricks.

Databricks UDF Performance Comparisons

I’ve recently been spending quite a bit of time on the Azure Databricks platform, and while learning decided it was worth using it to experiment with some common data warehousing tasks in the form of data cleansing. As Databricks provides us with a platform to run a Spark environment on, it offers options to use cross-platform APIs that allow us to write code in Scala, Python, R, and SQL within the same notebook. As with most things in life, not everything is equal and there are potential differences in performance between them. In this blog, I will explain the tests I produced with the aim of outlining best practice for Databricks implementations for UDFs of this nature.

Scala is the native language for Spark – and without going into too much detail here, it will compile down faster to the JVM for processing. Under the hood, Python on the other hand provides a wrapper around the code but in reality is a Scala program telling the cluster what to do, and being transformed by Scala code. Converting these objects into a form Python can read is called serialisation / deserialisation, and its expensive, especially over time and across a distributed dataset. This most expensive scenario occurs through UDFs (functions) – the runtime process for which can be seen below. The overhead here is in (4) and (5) to read the data and write into JVM memory.

image

Using Scala to create the UDFs, the execution process can skip these steps and keep everything native. Scala UDFs operate within the JVM of the executor so we can skip serialisation and deserialisation.

image

 

Experiments

As part of my data for this task I took a list of company names from a data set and then run them through a process to codify them, essentially stripping out characters which cause them to be unique and converting them to upper case, thus grouping a set of companies together under the same name. For instance Adatis, Adatis Ltd, and Adatis (Ltd) would become ADATIS. This was an example of a typical cleansing activity when working with data sets. The dataset in question was around 2.5GB and contained 10.5m rows. The cluster I used was Databricks runtime 4.2 (Spark 2.3.1 / Scala 2.11) with Standard_DS2_v2 VMs for the driver/worker nodes (14GB memory) with autoscaling disabled and limited to 2 workers. I disabled the autoscaling for this as I was seeing wildly inconsistent timings each run which impacted the tests. The goods news is that with it enabled and using up to 8 workers, the timings were about 20% faster albeit more erratic from a standard deviation point of view.

The following approaches were tested:

  • Scala program calls Scala UDF via Function
  • Scala program calls Scala UDF via SQL
  • Python program calls Scala UDF via SQL
  • Python program calls Python UDF via Function
  • Python program calls Python Vectorised UDF via Function
  • Python program uses SQL

While it was true in previous versions of Spark that there was a difference between these using Scala/Python, in the latest version of Spark (2.3) it is believed to be more of a level playing field by using Apache Arrow in the form of Vectorised Pandas UDFs within Python.

As part of the tests I also wanted to use Python to call a Scala UDF via a function but unfortunately we cannot do this without creating a Jar file of the Scala code and importing it separately. This would be done via SBT (build tool) using the following guide here. I considered this too much of an overhead for the purposes of the experiment.

The following code was then used as part of a Databricks notebook to define the tests. A custom function to time the write was required for Scala whereas Python allows us to use %timeit for a similar purpose.

 

Scala program calls Scala UDF via Function

// Scala program calls Scala UDF via Function

%scala

def codifyScalaUdf = udf((string: String) => string.toUpperCase.replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP",""))  

spark.udf.register("ScalaUdf", codifyScalaUdf)  

val transformedScalaDf = table("DataTable").select(codifyScalaUdf($"CompanyName").alias("CompanyName"))
val ssfTime = timeIt(transformedScalaDf.write.mode("overwrite").format("parquet").saveAsTable("SSF"))

 

Scala program calls Scala UDF via SQL

// Scala program calls Scala UDF via SQL

%scala

val sss = spark.sql("SELECT ScalaUdf(CompanyName) as a from DataTable where CompanyName is not null")
val sssTime = timeIt(sss.write.mode("overwrite").format("parquet").saveAsTable("SSS"))

 

Python program calls Scala UDF via SQL

# Python program calls Scala UDF via SQL

pss = spark.sql("SELECT ScalaUdf(CompanyName) as a from DataTable where CompanyName is not null")
%timeit -r 1 pss.write.format("parquet").saveAsTable("PSS", mode='overwrite') 

 

Python program calls Python UDF via Function

# Python program calls Python UDF via Function

from pyspark.sql.functions import *
from pyspark.sql.types import StringType

@udf(StringType())
def pythonCodifyUDF(string):
    return (string.upper().replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP",""))

pyDF = df.select(pythonCodifyUDF(col("CompanyName")).alias("CompanyName")).filter(col("CompanyName").isNotNull())
%timeit -r 1 pyDF.write.format("parquet").saveAsTable("PPF", mode='overwrite')

 

Python program calls Python Vectorised UDF via Function

# Python program calls Python Vectorised UDF via Function

from pyspark.sql.types import StringType
from pyspark.sql.functions import pandas_udf, col

@pandas_udf(returnType=StringType())
def pythonCodifyVecUDF(string):
    return (string.replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP","")).str.upper()
  
pyVecDF = df.select(pythonCodifyVecUDF(col("CompanyName")).alias("CompanyName")).filter(col("CompanyName").isNotNull())
%timeit -r 1 pyVecDF.write.format("parquet").saveAsTable("PVF", mode='overwrite')

 

Python Program uses SQL

# Python Program uses SQL

sql = spark.sql("SELECT upper(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(CompanyName,' ',''),'&',''),';',''),'#',''),' AND ',''),' THE ',''),'LTD',''),'LIMITED',''),'PLC',''),'.',''),',',''),'[',''),']',''),'LLP',''),'INC',''),'CORP','')) as a from DataTable where CompanyName is not null")           

%timeit -r 1 sql.write.format("parquet").saveAsTable("SQL", mode='overwrite')

 

Results and Observations

image

It was interesting to note the following:

  • The hypothesis above does indeed hold true and the 2 methods which were expected to be slowest were within the experiment, and by a considerable margin.
  • The Scala UDF performs consistently regardless of the method used to call the UDF.
  • The Python vectorised UDF now performs on par with the Scala UDFs and there is a clear difference between the vectorised and non-vectorised Python UDFs.
  • The standard deviation for the vectorised UDF was surprisingly low and the method was performing consistently each run. The non-vectorised Python UDF was the opposite.

To summarise, moving forward – as long as you adopt to writing your UDFs in Scala or use the vectorised version of the Python UDF, the performance will be similar for this type of activity. Its worth noting to definitely avoid writing the UDFs as standard Python functions due to the theory and results above. Over time, across a complete solution and with more data, this time would add up.