IvanVazharov

Ivan's Blog

Installing Databricks Cluster Libraries from a Python notebook

Working with interactive clusters in Databricks makes it possible to manually install libraries using the workspace UI. It’s done once and for all. You don’t have to worry about it anymore.

Creating job clusters is another story. You must take care of library installations prior to executing the notebooks which reference these libraries. In the case you’re using Azure Data Factory to orchestrate the whole process you’re lucky, because appending libraries to job clusters is an out-of-the-box functionality. For all other scenarios using the Databricks REST API is one possible option. In fact, you can do this right from a Python notebook. That’s what I’m going to demonstrate in the following lines.

Let’s have a look at the REST API documentation first. We’ll be using the Cluster Status and Install endpoints only. For installing a library, we need to provide the library source and its properties. We need to create a proper HTTP request body in JSON format including the library source and properties. Here’s one example:

{
  "pypi": {
    "package": "simplejson",
    "repo": "http://my-pypi-mirror.com"
  }
}

Here "pypi" is the source and {"package": "simplejson", "repo": "http://my-pypi-mirror.com"} are its properties. For more examples check the documentation here.

Let’s create a new Python notebook and then some functions in it. The code is simple and self-explanatory. Comments are available where needed.

We’ll be using only json and requests libraries:

import json
import requests
from requests.auth import HTTPBasicAuth


Now, let’s create a REST API wrapper function:

def adbAPI(endPoint, body, method = "GET", region, token):
  """Execute HTTP request against Databricks REST API 2.0"""
  
  domain = region + ".azuredatabricks.net"
  baseURL = "https://%s/api/" % (domain)

  if method.upper() == "GET":
    response = requests.get(
        baseURL + endPoint
      , auth = HTTPBasicAuth("token", token)
      , json = body
    )
  else:
    response = requests.post(
        baseURL + endPoint
      , auth = HTTPBasicAuth("token", token)
      , json = body
    )

  return response

As parameters we’ll take the API endpoint, HTTP request body, HTTP method (GET or POST), Databricks workspace region (westeurope, northeurope, etc.) and, finally, a Databricks token.

Next is a helper function for translating the library status response into a human readable format:

def describeClusterLibraryState(source, clusterId, status):
  """Converts cluster library status response to a verbose message"""
  
  result_map = {
      "NOT_INSTALLED"       : "{} library is not installed on cluster {}.".format(source.title(), clusterId)
    , "INSTALLED"           : "{} library is already installed on cluster {}.".format(source.title(), clusterId)
    , "PENDING"             : "Pending installation of {} library on cluster {} . . .".format(source, clusterId)
    , "RESOLVING"           : "Retrieving metadata for the installation of {} library on cluster {} . . .".format(source, clusterId)
    , "INSTALLING"          : "Installing {} library on cluster {} . . .".format(source, clusterId)
    , "FAILED"              : "{} library failed to install on cluster {}.".format(source.title(), clusterId)
    , "UNINSTALL_ON_RESTART": "{} library installed on cluster {} has been marked for removal upon cluster restart.".format(source.title(), clusterId)
  }

  return result_map[status.upper()]

The parameters here include the library source, the cluster ID and the API’s status response.


Now let’s get to the serious business. Obviously, we need a way to check the current state of the library to see if it’s not already installed or if something went wrong during installation:

def getClusterLibraryStatus(source, properties, dbRegion, dbToken, verbose = True):
  """Gets the current library status """
  
  source = source.lower()

  # Validate input
  assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), \
    "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran"
  assert properties is not None, \
    "Error: Empty properties provided"

  # Get the cluster ID from the Spark environment
  clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

  # Set default result to not installed
  result = describeClusterLibraryState(source, clusterId, "NOT_INSTALLED") if verbose else "NOT_INSTALLED" 

  # Execute REST API request to get the library statuses
  libStatuses = adbAPI("2.0/libraries/cluster-status?cluster_id=" + clusterId, None, "GET", dbRegion, dbToken)

  if libStatuses.status_code == 200:
    statuses = libStatuses.json()
    if "library_statuses" in statuses:
      for status in statuses["library_statuses"]:
        if status["library"] == {source: properties}:
          if verbose:
            result = describeClusterLibraryState(source, clusterId, status["status"])
          else:
            result = status["status"]
  else:
    print(status)

  return result

Parameters here include the library source and properties, Databricks region and token and a verbose flag, which if set to true (default) will make the function return human readable library status.

Finally, we have to create the most important function that will actually install the library:

def installClusterLibrary(source, properties, dbRegion, dbToken):
  """
  Installs a cluster library given correct source and properties are provided
  For examples see https://docs.databricks.com/api/latest/libraries.html#install
  """
  
  source = source.lower()

  # Validate input
  assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), \
    "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran"
  assert properties is not None, \
    "Error: Empty properties provided"
  
  # Get the cluster ID from the Spark environment
  clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

  status = getClusterLibraryStatus(source, properties, dbRegion, dbToken, False).upper()
  if status != "INSTALLED":
    # Create the HTTP request body based on the cluster ID, library source and properties
    body = json.loads('{"cluster_id": "' + clusterId + '", "libraries": [{"' + source + '": ' + json.dumps(properties) + '}]}')
    # Execute REST API request to install the library
    runAPI = adbAPI("2.0/libraries/install", body, "POST", dbRegion, dbToken)
    if runAPI.status_code == 200:
      print("Installation started . . .")
    else:
      print(runAPI)
  else:
    print(describeClusterLibraryState(source, clusterId, status))

  return status

The parameters here are: the library source and properties and the Databricks region and token.


Example function calls (azure-sqldb-spark library hosted on Maven)

Show the library status:

print(getClusterLibraryStatus("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************"))


Install the library:

installClusterLibrary("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************")


One thing to note here is that the library installation starts and runs asynchronously. This means that you can execute the getClusterLibraryStatus function multiple times after installClusterLibrary and get different results until the installation is done (or failed).

Parsing nested JSON lists in Databricks using Python

Parsing complex JSON structures is usually not a trivial task. When your destination is a database, what you expect naturally is a flattened result set. Things get more complicated when your JSON source is a web service and the result consists of multiple nested objects including lists in lists and so on. Things get even more complicated if the JSON schema changes over time, which is often a real-life scenario.

We have these wonderful Azure Logic Apps, which help us consistently get the JSON results from various sources. However, Logic Apps are not so good at parsing more complex nested structures. And they definitely don’t like even subtle source schema changes.

Enter Databricks!

With Databricks you get:

  • An easy way to infer the JSON schema and avoid creating it manually
  • Subtle changes in the JSON schema won’t break things
  • The ability to explode nested lists into rows in a very easy way (see the Notebook below)
  • Speed!

Following is an example Databricks Notebook (Python) demonstrating the above claims. The JSON sample consists of an imaginary JSON result set, which contains a list of car models within a list of car vendors within a list of people. We want to flatten this result into a dataframe. Here you go:

We've seen here how we can use Databricks to flatten JSON with just a few lines of code.

Keep your eyes open for future Databricks related blogs, which will demonstrate more of the versatility of this great platform.

More on some of the used functions (PySpark 2.3.0 documentation):