This blog is going to cover Windowing Functions in Databricks. I will describe concept of Windowing Functions and how to use them with Dataframe API syntax. If you have not used Dataframes yet, it is rather not the best place to start. Azure Databricks also support Spark SQL syntax to perform queries, but this is not going to be covered in this blog.
Introduction – What are Window Functions?
Databricks was design to work with large sets. Data distribution and parallelization of the work, makes queries run against data very fast. Those queries can be used not only to read data but also to transform them.
There are different kind of transformations that you can apply on the Dataframe:
- Operations on the single rows and produce single value output (build in functions, and UDF, single column transformations e.g. cast string, substring etc. if statements),
- Exploding arrays of complex data – e.g. explode(“col.name”)
- Aggregate Functions - Grouping data with groupBy(…) and apply function on the groups e.g. max(), avg(), min() etc. (columns that do are not in groupBy(…) or as a part of aggregate function can not exists in result Dataframe
- Window Functions – allow us to partition data by columns and then apply aggregate, rank functions within each partition separately - similar to Grouping but allows us to preserve columns that do not take part in partitioning, It also allows us to order elements within each partition.
The last one are the subject of the current blog.
Lets say that we got to answer questions that are listed below:
- calculate running totals for the data set
- rank data based on some columns
- group rows and assign row number from 1…N within each group
- calculate difference between each record and aggregated values within the group that it belongs to e.g. difference between current value and the average from the partition/group it belongs to and we want to preserver all the columns from the set that we operate onto
Some of this questions could be answered by using combination of groupBy(…) and join(..), some of the could be answered using looping mechanism. They could, but there is a better inbuilt mechanism that would allow us to do that queries in a bit more neat manner. This mechanism is called Window Function. If you are familiar with Window Function in T-SQL, this is not going to be new concept for you as it works in very similar way as T-SQL window function.
How is it constructed?
There are three logical parts in window function
- Defining what function we want to use to calculate value e.g. max(“column.name”), row_number() etc.
- Defining what window we want to use to calculate this value – that function might look differently for different kind of transformations and of the components are optional
- what we are going to partition by ( what columns are we going to use for partitioning – grouping our data before calculations are applied. Function will be executed in each of this group)
- what order is required in each portion before making calculation – define columns and ordering direction before the function is applied. It would have no big influence on function like max, min, be might have a great impact on functions like row_number, lag, lead )
- what is the scope/Boundaries of the window
- Use function in the data frame
# What window function DataFrame syntax look like that:
- Import required namespaces and libraries – you are required to import Window function from pyspark.sql.window namespace before you start using it:from pyspark.sql import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
- Define window specification – one of the specifications bellow, depending what type of window we want to define
## if we do not want to specify boundaries, sliding window etc.
windowSpecification = Window.partitionBy(…).orderBy(…)
## if we specify window boundaries based on rows
windowSpecification = Window.partitionBy(…).orderBy(…).rowsBetween(…)
## if we specify window boundaries based on rows column values
windowSpecification = Window.partitionBy(…).orderBy(…).rangeBetween(…)
Window – window function object (always stay as Window – see import statement)
Partition columns are specified by: putting name of the columns in quotations in partitionBy() e.g. Window.partitionBy(“col1”,”col2”,…,”colN”).
Sorting columns are specified by: putting name of the columns in quotations in orderBy() e.g. Window.orderBy(“col1”,”col2”,…,”colN”)
Direction of the sort is specified by: adding desc(“col.name”) or asc(“col.name”) in the order by clause e.g. Window.orderBy(asc(“col1”),desc(”col2”),asc(”colN”))
- Use below syntax to apply function over the specified window
In data frame we might use it by creating new column name that would hold value. Please note that we need to create windowSpecification first:
- Full code can look like this:
|### Import statementes|
from pyspark.sql import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
### Define window specification
windowSpecification = Window.partitionBy(“column.to.partition.by”).orderBy(desc(”column.to.order.by”))
### Produce new Dataframe with new column that would held RowNumber in each partition
dfResult = dfSourceData.withColumn(“RowNumber”,row_number().over(windowSpecification))
Functions that we can use with Windowing Function:
There are plenty of windowing function that we can use. Some of them can be found here:
Accessing absolute/relative partition’s records
lag(“col.name”) – returns value of the specified column from the row preceding the current row that we calculate. For the first row Null will be returned as there is no previous value
lead(“col.name”) – returns value of the specified column from the row following the current row that we calculate. For the first row Null will be returned as there is no previous value
first(“col.name”) – gets first value from the partition in current order
last(“col.name”) – gets last value from the partition in current order
- row_number() – assigns row number starting with 1 and increasing by 1 with each row in the partition
- rank() – calculate rank number
- dense_rank() – calculates dense rank
- ntile(N) – splits set in N equal buckets (if set can not be divided equally the last set will be uneven in size)
- max(“col.name”) – calculate max value of chosen column within specified partition. Remark:I we bound the window like unbounded preceding and current row this is might return different results depending on the ordering type (asc, desc)
- min(“col.name”) – calculate min value of chosen column within specified partition. Remark:I we bound the window like unbounded preceding and current row this is might return different results depending on the ordering type (asc, desc)
- sum(“col.name”) – calculates sum of the chosen column name. Can be used for running total with bounded.
- avg(“col.name”) – calculates average
- count(“col.name”) – calculates count of items in the partition
- mean(“col.name”) – calculates mean in the partition
collect_list("col.name") – collects list of elements in the partition of chosen column in order they resides and return value in form of array
collect_set("col.name") – collects set of elements in the partition of chosen column without ordering and return value in form of array
Some of the functions like Aggregation will behave differently with orderBy(…) clause and differently without it:
- with orderBy e.g. Window.partitionBy(…).orderBy() – it will be calculated in running manner (for first row value = min(from 1st record) then for second value = min( from 1st & 2nd record), for third value = min( from 1st & 2nd & 3rd) and so on
- without orderBy e.g. Window.partitionBy(…) - it will calculate value across whole partition so each record will get the same value e.g. average of the column from the partition.
Unbounded Window Functions are those that do not define boundary on window specification, that is they do not have rowsBetween() or rangeBetween() statements on the window object. Unbounded Windows functions executes function in each partition across all records. In other words the do not limit records aggregation/rank function will be executed within each partition. They will be useful to answer questions like:
Row Bounded Examples
Row Bounded Window Functions are those that define boundary on window specification by using rowsBetween(…) statement on the window object. This allows to limit records that we executes function in each partition. This limit is enforced by specifying the offset of records as number of records, it does not use values of the records. This can be dynamic relative to the current row, or we can bound it on one side (lower, upper), both sides - relatively to the current row. They will be useful to answer questions like running totals et, or if we want to make calculations on records relative.
Range Bounded Examples
Range Bounded Functions defines boundary on window specification by using rangeBetween(…) statement on window object. This allows to limit records that we execute our function on in each partition. This limit is enforced by specifying offset of records by using values of the column. This is dynamic - each window is calculated based on the values that determines the size of the window. This kind of functions will be useful to answer question where we need to define the size of the window based on the values of the column that we apply our function on.
Windowing functions in data bricks can cover very wide range of use cases. It provides nice alternative to loops, so if you need to perform looping operations on the data that you have, ask yourself if it is not easier to solve that problem with Windowing Functions. Be aware that this windowing function may cause large amount of data shuffling, so use windowing function with care.
You can find more information in bellow links
- Data Frames – explains what Dataframes are and how to use them
- Introduction to window Function – another good blog about windowing function (Spark and SQL API)
- Databricks Notebook – you can download all examples used in this blog, and play without need of mounting external storage. Data are generated by script.