IvanVazharov

Ivan's Blog

Injecting Databricks DataFrame into a Power BI Push Dataset

Using Python and the Power BI REST API


Now, why would you want to do that?

There are some scenarios where this approach may be beneficial. To get the full picture and establish the landscape let’s first answer two questions:

Why a Databricks DataFrame?

Recently Databricks became an integral part of the Modern Datawarehouse approach when aiming for the Azure cloud. Its wide usage in data transformation begs for a richer variety of data destinations. The usual and most widely used persistence is the file store (lake, blob, etc.). It’s fine with large volumes of data, but then we have to go a long way before reaching the data presentation (additional storage layers, SQL, Tabular data model etc…).

What if we want to instantly update a Power BI report directly from Databricks? It could be a small dataset feeding a dashboard for example. It could be business data or data-related metrics that we want to see in (near)real-time. Or we want to monitor the data transformation process continuously in a Databricks streaming scenario.

Why a Push Dataset?

We can do real-time streaming in Power BI by using Streaming or PubNub streaming datasets, which is fine, but let’s see some of the advantages of using a Push dataset:

  • You can build a report on top of a Push dataset
  • You can pin report visuals to a dashboard, and they will update in real-time on each data push
  • Data is stored permanently in Power BI
  • You don’t need a data gateway in place

For a more detailed comparison of all the real-time streaming methods, please check here.

If you think that all this makes sense, then continue further.

Implementation

Since we’ll be utilising the Power BI REST API there are some limitations that we need to be aware of upfront. You can see them here.

In order to be able to call the Power BI API you need to register an application and set proper permissions. Follow the steps here. App registration details below:

App registration details

After creating the app registration, you should grant permissions in the Azure portal:

Azure portal: App registrations

Azure portal: Grant permissions

Azure portal: Update permissions confirmation

Without granting these permissions from the Azure portal you won’t be able to get authorisation token and use the REST API.

Now that we’re all set-up let’s go straight to the point.

My initial intention was to show you how to build the whole thing step-by-step in this post. But then it become complicated and I decided that an abstraction is needed here to keep things simple. That’s why I wrapped up all the “boring” stuff in a Python class called pbiDatasetAPI, which you can find on GitHub here. The comments in the code should be enough to understand the logic. The methods of this class will take care of:

  • Authentication
  • HTTP requests
  • HTTP requests JSON body generation (data and metadata)
  • Data type conversion (Spark to EDM)
  • Wrapping in a Python function of all the Power BI REST API operations that we need to perform

In order to start using it you should import a notebook (using the above-mentioned URL) in your Databricks Workspace:

Databricks workspace: Import Notebooks

Now that the class notebook is imported you can create a new Python notebook in the same folder to test how it’s working. Let’s call it “Inject DataFrame into Power BI Push Dataset”. First, we’ll execute our class notebook:

%run "./pbiDatasetAPI"


Next, we’ll need a DataFrame with data that will be pushed to the Power BI Push dataset. We can use some of the sample datasets which come with Databricks (in this case Amazon reviews):

dfInject = spark.read.parquet('dbfs:/databricks-datasets/amazon/test4K')
dfInject = dfInject.select("brand", "img", "price", "rating", "review", "time").limit(200)


We take 200 rows, which is just enough.

In the next command we’ll create some variables and instantiate the pbiDatasetAPI class:

# Initialise variables
username = "<USER_EMAIL>"
password = "<USER_PASSWORD>"
application_id = "********-****-****-****-************"  # Power BI application ID
groupId = None  # Set to None if not using Power BI groups
datasetName = "InjectedDataset"  # The name of the Power BI dataset
tableNames = ["AmazonReviews"]  # Table name or list of table names
dataFrames = [dfInject]  # DataFrame name or list of DataFrame names

# Create a pbiDatasetAPI class instance
pbi = pbiDatasetAPI(username, password, application_id)


You should set your username and password, and the application ID obtained in the previous steps. Optionally provide also a group ID or set it to None if you’re not using groups on powerbi.com.

The tableNames and dataFrames variables are lists, because we may want to insert multiple DataFrames in multiple tables. In our case it’s one DataFrame to one table.

Let’s create a dataset with one table in Power BI:

# Create the dataset and the table in PBI
pbi.executePBIOperation("postdataset", groupId = groupId, datasetName = datasetName, tableNames = tableNames, dataFrames = dataFrames, reCreateIfExists = True)


The next step is to post all the DataFrame’s rows to the Push dataset.

# Get the datasetKey for the dataset (by name)
datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"]

# Insert the contents of the DataFrame in the PBI dataset table
pbi.executePBIOperation("postrows", groupId = groupId, datasetKey = datasetKey, tableNames = tableNames, dataFrames = dataFrames)


This is where the magic happens. One important note here is that the dataset key is always unique. This does not apply to the dataset’s name, which means that we can have multiple datasets with the same name.

You can see the newly created dataset on powerbi.com:

powerbi.com: List of datasets

You can create a report on top of this dataset and pin visuals to a dashboard (tiles will be updated automatically upon data change).

Now, let’s try to manipulate the metadata a bit – change the data type of the “rating” column in the DataFrame from double to string and update the Push dataset accordingly:

# Change the data type of the column 'rating' from double to string
dfInjectModified = dfInject.withColumn("rating", dfInject.rating.cast("string"))

# Get the datasetKey for the dataset (by name)
datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"]

# Update the metadata of the Power BI table
pbi.executePBIOperation("puttable", groupId = groupId, datasetKey = datasetKey, tableNames = tableNames, dataFrames = [dfInjectModified])


The only thing remaining now is to try and delete all the rows (it’s all or nothing – we can’t delete some of the rows) from the table in the dataset and then delete the entire dataset:

# Get the datasetKey for the dataset (by name)
datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"]

# Delete all rows from the table(s)
pbi.executePBIOperation("deleterows", groupId = groupId, datasetKey = datasetKey, tableNames = tableNames)


# Get the datasetKey for the dataset (by name)
datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"]

# Delete the dataset
pbi.executePBIOperation("deletedatasetbyid", groupId = groupId, datasetKey = datasetKey)


All the code above you can import in a notebook using this URL.

If you closely examine the Power BI REST API documentation here, you’ll find that the pbiDatasetAPI class is not completely finished yet. There’s more that needs to be done, like:

  • Create measures with DAX
  • Create table relationships
  • Set cross-filtering behaviour
  • etc.

I intend to update the class in the future so that all the features will be available. Check GitHub now and then for updates.

Installing Databricks Cluster Libraries from a Python notebook

Working with interactive clusters in Databricks makes it possible to manually install libraries using the workspace UI. It’s done once and for all. You don’t have to worry about it anymore.

Creating job clusters is another story. You must take care of library installations prior to executing the notebooks which reference these libraries. In the case you’re using Azure Data Factory to orchestrate the whole process you’re lucky, because appending libraries to job clusters is an out-of-the-box functionality. For all other scenarios using the Databricks REST API is one possible option. In fact, you can do this right from a Python notebook. That’s what I’m going to demonstrate in the following lines.

Let’s have a look at the REST API documentation first. We’ll be using the Cluster Status and Install endpoints only. For installing a library, we need to provide the library source and its properties. We need to create a proper HTTP request body in JSON format including the library source and properties. Here’s one example:

{
  "pypi": {
    "package": "simplejson",
    "repo": "http://my-pypi-mirror.com"
  }
}

Here "pypi" is the source and {"package": "simplejson", "repo": "http://my-pypi-mirror.com"} are its properties. For more examples check the documentation here.

Let’s create a new Python notebook and then some functions in it. The code is simple and self-explanatory. Comments are available where needed.

We’ll be using only json and requests libraries:

import json
import requests
from requests.auth import HTTPBasicAuth


Now, let’s create a REST API wrapper function:

def adbAPI(endPoint, body, method = "GET", region, token):
  """Execute HTTP request against Databricks REST API 2.0"""
  
  domain = region + ".azuredatabricks.net"
  baseURL = "https://%s/api/" % (domain)

  if method.upper() == "GET":
    response = requests.get(
        baseURL + endPoint
      , auth = HTTPBasicAuth("token", token)
      , json = body
    )
  else:
    response = requests.post(
        baseURL + endPoint
      , auth = HTTPBasicAuth("token", token)
      , json = body
    )

  return response

As parameters we’ll take the API endpoint, HTTP request body, HTTP method (GET or POST), Databricks workspace region (westeurope, northeurope, etc.) and, finally, a Databricks token.

Next is a helper function for translating the library status response into a human readable format:

def describeClusterLibraryState(source, clusterId, status):
  """Converts cluster library status response to a verbose message"""
  
  result_map = {
      "NOT_INSTALLED"       : "{} library is not installed on cluster {}.".format(source.title(), clusterId)
    , "INSTALLED"           : "{} library is already installed on cluster {}.".format(source.title(), clusterId)
    , "PENDING"             : "Pending installation of {} library on cluster {} . . .".format(source, clusterId)
    , "RESOLVING"           : "Retrieving metadata for the installation of {} library on cluster {} . . .".format(source, clusterId)
    , "INSTALLING"          : "Installing {} library on cluster {} . . .".format(source, clusterId)
    , "FAILED"              : "{} library failed to install on cluster {}.".format(source.title(), clusterId)
    , "UNINSTALL_ON_RESTART": "{} library installed on cluster {} has been marked for removal upon cluster restart.".format(source.title(), clusterId)
  }

  return result_map[status.upper()]

The parameters here include the library source, the cluster ID and the API’s status response.


Now let’s get to the serious business. Obviously, we need a way to check the current state of the library to see if it’s not already installed or if something went wrong during installation:

def getClusterLibraryStatus(source, properties, dbRegion, dbToken, verbose = True):
  """Gets the current library status """
  
  source = source.lower()

  # Validate input
  assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), \
    "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran"
  assert properties is not None, \
    "Error: Empty properties provided"

  # Get the cluster ID from the Spark environment
  clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

  # Set default result to not installed
  result = describeClusterLibraryState(source, clusterId, "NOT_INSTALLED") if verbose else "NOT_INSTALLED" 

  # Execute REST API request to get the library statuses
  libStatuses = adbAPI("2.0/libraries/cluster-status?cluster_id=" + clusterId, None, "GET", dbRegion, dbToken)

  if libStatuses.status_code == 200:
    statuses = libStatuses.json()
    if "library_statuses" in statuses:
      for status in statuses["library_statuses"]:
        if status["library"] == {source: properties}:
          if verbose:
            result = describeClusterLibraryState(source, clusterId, status["status"])
          else:
            result = status["status"]
  else:
    print(status)

  return result

Parameters here include the library source and properties, Databricks region and token and a verbose flag, which if set to true (default) will make the function return human readable library status.

Finally, we have to create the most important function that will actually install the library:

def installClusterLibrary(source, properties, dbRegion, dbToken):
  """
  Installs a cluster library given correct source and properties are provided
  For examples see https://docs.databricks.com/api/latest/libraries.html#install
  """
  
  source = source.lower()

  # Validate input
  assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), \
    "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran"
  assert properties is not None, \
    "Error: Empty properties provided"
  
  # Get the cluster ID from the Spark environment
  clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

  status = getClusterLibraryStatus(source, properties, dbRegion, dbToken, False).upper()
  if status != "INSTALLED":
    # Create the HTTP request body based on the cluster ID, library source and properties
    body = json.loads('{"cluster_id": "' + clusterId + '", "libraries": [{"' + source + '": ' + json.dumps(properties) + '}]}')
    # Execute REST API request to install the library
    runAPI = adbAPI("2.0/libraries/install", body, "POST", dbRegion, dbToken)
    if runAPI.status_code == 200:
      print("Installation started . . .")
    else:
      print(runAPI)
  else:
    print(describeClusterLibraryState(source, clusterId, status))

  return status

The parameters here are: the library source and properties and the Databricks region and token.


Example function calls (azure-sqldb-spark library hosted on Maven)

Show the library status:

print(getClusterLibraryStatus("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************"))


Install the library:

installClusterLibrary("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************")


One thing to note here is that the library installation starts and runs asynchronously. This means that you can execute the getClusterLibraryStatus function multiple times after installClusterLibrary and get different results until the installation is done (or failed).

Parsing nested JSON lists in Databricks using Python

Parsing complex JSON structures is usually not a trivial task. When your destination is a database, what you expect naturally is a flattened result set. Things get more complicated when your JSON source is a web service and the result consists of multiple nested objects including lists in lists and so on. Things get even more complicated if the JSON schema changes over time, which is often a real-life scenario.

We have these wonderful Azure Logic Apps, which help us consistently get the JSON results from various sources. However, Logic Apps are not so good at parsing more complex nested structures. And they definitely don’t like even subtle source schema changes.

Enter Databricks!

With Databricks you get:

  • An easy way to infer the JSON schema and avoid creating it manually
  • Subtle changes in the JSON schema won’t break things
  • The ability to explode nested lists into rows in a very easy way (see the Notebook below)
  • Speed!

Following is an example Databricks Notebook (Python) demonstrating the above claims. The JSON sample consists of an imaginary JSON result set, which contains a list of car models within a list of car vendors within a list of people. We want to flatten this result into a dataframe. Here you go:

We've seen here how we can use Databricks to flatten JSON with just a few lines of code.

Keep your eyes open for future Databricks related blogs, which will demonstrate more of the versatility of this great platform.

More on some of the used functions (PySpark 2.3.0 documentation):