Piotr Starczynski's Blog

Azure Databricks - overwriting table that is also being read from

Problem Definition

Recently I have reached interesting problem in Databricks Non delta. I tried to read data from the the table (table on the top of file) slightly transform it and write it back to the same location that i have been reading from. Attempt to execute code like that would manifest with exception:“org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from”

The lets try to answer the question How to write into a table(dataframe) that we are reading from as this might be a common use case?

This problem is trivial but it is very confusing, if we do not understand how queries are processed in spark.

I explain it on example.

Let define a process that would allow us to:

  • Filter out objects that we already processed.
  • Store history of loaded object for 3 days only (History need to take minimum amount of space on the file system)
  • Todays load will be part of history for tomorrow load

Process for this task would look like this:

  1. We Read CurrentLoad (new objects that we want to load in)
  2. Read limited Historic to last 3 days (objects that we already processed)
  3. Create new set called Unprocessed by removing existing History records from CurrentLoad (That is made via left anti join)
  4. Process/transform Unprocessed (optional process that may/may not use unprocessed further in further processing)
  5. Join only last 3 days of the history with Unprocessed (To maintain only last 3 days of history and keep the set as small as possible)
  6. Overwrite History

Above process is not the most efficient way of checking new load against historic loads, but it is a good example of overwriting dataframe/table that we might read from the same time.

Diagram of this process will look like this:


So let’s see what might go wrong in this process. We need to add Unprocessed Dataframe (which contains reference to History Dataframe) into existing History and at the same time we need to remove first day (to maintain only 3 days of history). This operation might be not as straight forward as it may seems. The reason is that Databricks by default use lazy execution it means that execution of the code is not happening immediately. Query is not going to be evaluated until last moment – that is when we will try to write data down on the file system. Spark is doing so to optimize the queries by creating execution plan. Execution plan helps spark to evaluate quires in way that would give the best performance. If we wait with processing of the Unprocessed Dataframe, it will hold reference to two sets History and Current load, while we try to write to History. It means that we are trying to perform a write operation to the Dataframe we are reading from.

There is simple solution for this problem.



Materialize the Dataframe that you want to overwrite data with (HistoryTemp) so as it clears out the dependencies on the tables that we read from (History, CurrentLoad). The simples way to do so is to write Dataframe (HistoryTemp) to the file system into temporary location and then re-read the data into new Dataframe. This will enable us to write data into History location. This approach requires to set a temp location your temp data. Make sure that you overwrite you HistoryTemp each time You could perform additional clean up on the end of your script to delete additional temp data and metadata (table, files).

This Process would look like:
1. Calculate new Unprocessed (CurrentLoad left anti join  History)
2. Union Unprocesed with current History to persist all records that we want to save (HistoryTemp)
3. Output HistoryTemp (overwriting set) to some temp location in the file system
4. Re-read the data from that we outputted (HistoryTemp) into new DataFrame
5. Write new Dataframe to you History location. Make sure that Unprocessed, History temp set is not used further in the notebook, so if you require to use it, perform write operation on the end



You can download full notebook from Reference section.



What Did not work for me

I have seen some suggestions on different forums that I tried and they DID NOT WORK for me, and i was receiving the same exceptions:

  • Creating temp view on the data and cache it:
  • Checkpointing the  Unprocessed dataframe with this commands: 



  1. You can download code to play around from here:
  2. users with similar problems were posting: here

Azure Databricks Windowing Functions with Dataframes API

This blog is going to cover Windowing Functions in Databricks. I will describe concept of Windowing Functions and how to use them with Dataframe API syntax. If you have not used Dataframes yet, it is rather not the best place to start. Azure Databricks also support Spark SQL syntax to perform queries, but this is not going to be covered in this blog.

Introduction – What are Window Functions?

Databricks was design to work with large sets. Data distribution and parallelization of the work, makes queries run against data very fast. Those queries can be used not only to read data but also to transform them.

There are different kind of transformations that you can apply on the Dataframe:

  • Operations on the single rows and produce single value output (build in functions, and UDF, single column transformations e.g. cast string, substring etc. if statements),
  • Exploding arrays of complex data – e.g. explode(“col.name”)
  • Aggregate Functions - Grouping data with groupBy(…) and apply function on the groups e.g. max(), avg(), min() etc. (columns that do are not in groupBy(…) or as a part of aggregate function can not exists in result Dataframe
  • Window Functions – allow us to partition data by columns and then apply aggregate, rank functions within each partition separately - similar to Grouping but allows us to preserve columns that do not take part in partitioning, It also allows us to order elements within each partition.

The last one are the subject of the current blog.

problem definition

Lets say that we got to answer questions  that are listed below:

  • calculate running totals for the data set
  • rank data based on some columns
  • group rows and assign row number from 1…N within each group
  • calculate difference between each record and aggregated values within the group that it belongs to e.g. difference between current value and the average from the partition/group it belongs to and we want to preserver all the columns from the set that we operate onto

Some of this questions could be answered by using combination of groupBy(…) and join(..), some of the could be answered using looping mechanism. They could, but there is a better inbuilt mechanism that would allow us to do that queries in a bit more neat manner. This mechanism is called Window Function. If you are familiar with Window Function in T-SQL, this is not going to be new concept for you as it works in very similar way as T-SQL window function.


How is it constructed?

There are three logical parts in window function

  1. Defining what function we want to use to calculate value e.g. max(“column.name”), row_number() etc.
  2. Defining what window we want to use to calculate this value – that function might look differently for different kind of transformations and  of the components are optional
    1. what we are going to partition by ( what columns are we going to use for partitioning – grouping our data before calculations are applied. Function will be executed in each of this group)
    2. what order is required in each portion before making calculation – define columns and ordering direction before the function is applied. It would have no big influence on function like max, min, be might have a great impact on functions like row_number, lag, lead )
    3. what is the scope/Boundaries of the window
  3. Use function in the data frame

# What window function DataFrame syntax look like that:

  1. Import required namespaces and libraries – you are required to import Window function from pyspark.sql.window namespace before you start using it:from pyspark.sql import *
    from pyspark.sql.window import Window
    from pyspark.sql.functions import *
  2. Define window specification – one of the specifications bellow, depending what type of window we want to define
    ## if we do not want to specify boundaries, sliding window etc.
    windowSpecification = Window.partitionBy(…).orderBy(…)

    ## if we specify window boundaries based on rows
    windowSpecification = Window.partitionBy(…).orderBy(…).rowsBetween(…)

    ## if we specify window boundaries based on rows column values
    windowSpecification = Window.partitionBy(…).orderBy(…).rangeBetween(…)

    Window – window function object (always stay as Window – see import statement)
    Partition columns are specified by: putting name of the columns in quotations in partitionBy() e.g. Window.partitionBy(“col1”,”col2”,…,”colN”).
    Sorting columns are specified by: putting name of the columns in quotations in orderBy() e.g. Window.orderBy(“col1”,”col2”,…,”colN”)
    Direction of the sort is specified by: adding desc(“col.name”) or asc(“col.name”) in the order by clause e.g. Window.orderBy(asc(“col1”),desc(”col2”),asc(”colN”))
  3. Use below syntax to apply function over the specified window

    In data frame we might use it by creating new column name that would hold value. Please note that we need to create windowSpecification first:
  4. Full code can look like this:
  5. ### Import statementes

    from pyspark.sql import *
    from pyspark.sql.window import Window
    from pyspark.sql.functions import *

    ### Define window specification
    windowSpecification = Window.partitionBy(“column.to.partition.by”).orderBy(desc(”column.to.order.by”))

    ### Produce new Dataframe with new column that would held RowNumber in each partition
    dfResult = dfSourceData.withColumn(“RowNumber”,row_number().over(windowSpecification))


Functions that we can use with Windowing Function:

There are plenty of windowing function that we can use. Some of them can be found here:


  • Accessing absolute/relative partition’s records
    • lag(“col.name”) – returns value of the specified column from the row preceding the current row that we calculate. For the first row Null will be returned as there is no previous value

    • lead(“col.name”) – returns value of the specified column from the row following the current row that we calculate. For the first row Null will be returned as there is no previous value

    • first(“col.name”) – gets first value from the partition in current order

    • last(“col.name”) – gets last value from the partition in current order

  • Ranking function
    • row_number() – assigns row number starting with 1 and increasing by 1 with each row in the partition
    • rank() – calculate rank number
    • dense_rank() – calculates dense rank
    • ntile(N) – splits set in N  equal buckets (if set can not be divided equally the last set will be uneven in size)
  • Aggregation
    • max(“col.name”) – calculate max value of chosen column within specified partition. Remark:I we bound the window like unbounded preceding and current row this is might return different results depending on the ordering type (asc, desc)
    • min(“col.name”) – calculate min value of chosen column within specified partition. Remark:I we bound the window like unbounded preceding and current row this is might return different results depending on the ordering type (asc, desc)
    • sum(“col.name”) – calculates sum of the chosen column name. Can be used  for running total with bounded.
    • avg(“col.name”) – calculates average
    • count(“col.name”) – calculates count of items in the partition
    • mean(“col.name”) – calculates mean in the partition
  • Other
    • collect_list("col.name") – collects list of elements in the partition of chosen column in order they resides and return value in form of array

    • collect_set("col.name") – collects set of elements in the partition of chosen column without ordering and return value in form of array

  • Not Supported
    • countDistinct(“col.name”)/countDistinct(“col.name”)


Some of the functions like Aggregation will behave differently with orderBy(…) clause and differently without it:

  • with orderBy e.g. Window.partitionBy(…).orderBy() – it will be calculated in running manner (for first row value  = min(from 1st record) then for second value = min( from 1st & 2nd record), for third value = min( from 1st & 2nd & 3rd) and so on
  • without orderBy e.g. Window.partitionBy(…)  - it will calculate value across whole partition so each record will get the same value e.g. average of the column from the partition.


Unbounded Examples

Unbounded Window Functions are those that do not define boundary on window specification, that is they do not have rowsBetween() or rangeBetween() statements on the window object. Unbounded Windows functions executes function in each partition across all records. In other words the do not limit records aggregation/rank function will be executed within each partition. They will be useful to answer questions like:



Row Bounded Examples

Row Bounded Window Functions are those that define boundary on window specification by using rowsBetween(…) statement on the window object. This allows to limit records that we executes function in each partition. This limit is enforced by specifying the offset of records as number of records, it does not use values of the records. This can be dynamic relative to the current row, or we can bound it on one side (lower, upper), both sides - relatively to the current row. They will be useful to answer questions like running totals et, or if we want to make calculations on records relative.



Range Bounded Examples

Range Bounded Functions defines boundary on window specification by  using rangeBetween(…) statement on window object. This allows to limit records that we execute our function on in each partition. This limit is enforced by specifying offset of records by using values of the column. This is dynamic - each window is calculated based on the values that determines the size of the window. This kind of functions will be useful to answer question where we need to define the size of the window based on the values of the column that we apply our function on.




Windowing functions in data bricks can cover very wide range of use cases. It provides nice alternative to loops, so if you need to perform looping operations on the data that you have, ask yourself if it is not easier to solve that problem with Windowing Functions. Be aware that this windowing function may cause large amount of data shuffling, so use windowing function with care.



You can find more information in bellow links

  1. Data Frames – explains what  Dataframes are and how to use them
  2. Introduction to window Function – another good blog about windowing function (Spark and SQL API)



  1. Databricks Notebook – you can download all examples used in this blog, and play without need of  mounting external storage. Data are generated by script.