Adatis

Adatis BI Blogs

Databricks UDF Performance Comparisons

I’ve recently been spending quite a bit of time on the Azure Databricks platform, and while learning decided it was worth using it to experiment with some common data warehousing tasks in the form of data cleansing. As Databricks provides us with a platform to run a Spark environment on, it offers options to use cross-platform APIs that allow us to write code in Scala, Python, R, and SQL within the same notebook. As with most things in life, not everything is equal and there are potential differences in performance between them. In this blog, I will explain the tests I produced with the aim of outlining best practice for Databricks implementations for UDFs of this nature. Scala is the native language for Spark – and without going into too much detail here, it will compile down faster to the JVM for processing. Under the hood, Python on the other hand provides a wrapper around the code but in reality is a Scala program telling the cluster what to do, and being transformed by Scala code. Converting these objects into a form Python can read is called serialisation / deserialisation, and its expensive, especially over time and across a distributed dataset. This most expensive scenario occurs through UDFs (functions) – the runtime process for which can be seen below. The overhead here is in (4) and (5) to read the data and write into JVM memory. Using Scala to create the UDFs, the execution process can skip these steps and keep everything native. Scala UDFs operate within the JVM of the executor so we can skip serialisation and deserialisation.   Experiments As part of my data for this task I took a list of company names from a data set and then run them through a process to codify them, essentially stripping out characters which cause them to be unique and converting them to upper case, thus grouping a set of companies together under the same name. For instance Adatis, Adatis Ltd, and Adatis (Ltd) would become ADATIS. This was an example of a typical cleansing activity when working with data sets. The dataset in question was around 2.5GB and contained 10.5m rows. The cluster I used was Databricks runtime 4.2 (Spark 2.3.1 / Scala 2.11) with Standard_DS2_v2 VMs for the driver/worker nodes (14GB memory) with autoscaling disabled and limited to 2 workers. I disabled the autoscaling for this as I was seeing wildly inconsistent timings each run which impacted the tests. The goods news is that with it enabled and using up to 8 workers, the timings were about 20% faster albeit more erratic from a standard deviation point of view. The following approaches were tested: Scala program calls Scala UDF via Function Scala program calls Scala UDF via SQL Python program calls Scala UDF via SQL Python program calls Python UDF via Function Python program calls Python Vectorised UDF via Function Python program uses SQL While it was true in previous versions of Spark that there was a difference between these using Scala/Python, in the latest version of Spark (2.3) it is believed to be more of a level playing field by using Apache Arrow in the form of Vectorised Pandas UDFs within Python. As part of the tests I also wanted to use Python to call a Scala UDF via a function but unfortunately we cannot do this without creating a Jar file of the Scala code and importing it separately. This would be done via SBT (build tool) using the following guide here. I considered this too much of an overhead for the purposes of the experiment. The following code was then used as part of a Databricks notebook to define the tests. A custom function to time the write was required for Scala whereas Python allows us to use %timeit for a similar purpose.   Scala program calls Scala UDF via Function // Scala program calls Scala UDF via Function %scala def codifyScalaUdf = udf((string: String) => string.toUpperCase.replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP","")) spark.udf.register("ScalaUdf", codifyScalaUdf) val transformedScalaDf = table("DataTable").select(codifyScalaUdf($"CompanyName").alias("CompanyName")) val ssfTime = timeIt(transformedScalaDf.write.mode("overwrite").format("parquet").saveAsTable("SSF"))   Scala program calls Scala UDF via SQL // Scala program calls Scala UDF via SQL %scala val sss = spark.sql("SELECT ScalaUdf(CompanyName) as a from DataTable where CompanyName is not null") val sssTime = timeIt(sss.write.mode("overwrite").format("parquet").saveAsTable("SSS"))   Python program calls Scala UDF via SQL # Python program calls Scala UDF via SQL pss = spark.sql("SELECT ScalaUdf(CompanyName) as a from DataTable where CompanyName is not null") %timeit -r 1 pss.write.format("parquet").saveAsTable("PSS", mode='overwrite')   Python program calls Python UDF via Function # Python program calls Python UDF via Function from pyspark.sql.functions import * from pyspark.sql.types import StringType @udf(StringType()) def pythonCodifyUDF(string): return (string.upper().replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP","")) pyDF = df.select(pythonCodifyUDF(col("CompanyName")).alias("CompanyName")).filter(col("CompanyName").isNotNull()) %timeit -r 1 pyDF.write.format("parquet").saveAsTable("PPF", mode='overwrite')   Python program calls Python Vectorised UDF via Function # Python program calls Python Vectorised UDF via Function from pyspark.sql.types import StringType from pyspark.sql.functions import pandas_udf, col @pandas_udf(returnType=StringType()) def pythonCodifyVecUDF(string): return (string.replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP","")).str.upper() pyVecDF = df.select(pythonCodifyVecUDF(col("CompanyName")).alias("CompanyName")).filter(col("CompanyName").isNotNull()) %timeit -r 1 pyVecDF.write.format("parquet").saveAsTable("PVF", mode='overwrite')   Python Program uses SQL # Python Program uses SQL sql = spark.sql("SELECT upper(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(CompanyName,' ',''),'&',''),';',''),'#',''),' AND ',''),' THE ',''),'LTD',''),'LIMITED',''),'PLC',''),'.',''),',',''),'[',''),']',''),'LLP',''),'INC',''),'CORP','')) as a from DataTable where CompanyName is not null") %timeit -r 1 sql.write.format("parquet").saveAsTable("SQL", mode='overwrite')   Results and Observations It was interesting to note the following: The hypothesis above does indeed hold true and the 2 methods which were expected to be slowest were within the experiment, and by a considerable margin. The Scala UDF performs consistently regardless of the method used to call the UDF. The Python vectorised UDF now performs on par with the Scala UDFs and there is a clear difference between the vectorised and non-vectorised Python UDFs. The standard deviation for the vectorised UDF was surprisingly low and the method was performing consistently each run. The non-vectorised Python UDF was the opposite. To summarise, moving forward – as long as you adopt to writing your UDFs in Scala or use the vectorised version of the Python UDF, the performance will be similar for this type of activity. Its worth noting to definitely avoid writing the UDFs as standard Python functions due to the theory and results above. Over time, across a complete solution and with more data, this time would add up.

SQL Server NOT IN Clause - Avoid like the….

Background Up until recently, I was one of the SQL Server developers adopting the bad habit that is known as the NOT IN clause.  It is an easy way of finding data in one table, that does not exist in another.  For this purpose, I thought using the NOT IN would help me conceptualise a query result, as well as help make it easier for someone else looking at the code.  In fact, although the performance (within an execution plan) is OK, you can pull back incorrect results from the overall query. The Problem The NOT IN clause is problematic in only one, but VERY IMPORTANT way…….it DOES NOT include NULLS in the comparison table.  Please see the example below: Create two tables for NOT In Example: Query results for both tables: NOT In Query: As you can see, 0 records were returned.  We would expect the record (containing Striker, Andy Cole) in the NewFootyPlayers table to be returned.  The NOT IN Clause is ignoring any comparisons on NULLS. NOTE  Adding an additional ‘WHERE Position IS NOT NULL’ filter to the NOT IN clause would also give the same result but a lot of people will forget to add it and spend a substantial amount of time wondering why certain records are missing from their result set. The Solution(s) There are a number of clauses or SQL syntax that can be used instead of the NOT IN.  although most do not have any major performance benefits, they actually return what is expected.  The three examples below all return the one expected record: All three return the below result, which we expected in the first place: Recommended Solution Whilst none of the solutions above cause major performance problems, there is one method that is better than the others.  If we are working with hundreds of millions of records in both tables, using the NOT EXISTS is the most efficient query.  Its performance is similar to NOT IN and EXCEPT, and it produces an identical plan, but is not prone to the potential issues caused by NULLs or duplicates. I would be interested to see if anyone else has performance tested each query type and if there are better alternatives to NOT EXISTS.  One thing I am certain on, however, is that no one should have to use the NOT IN clause.