Adatis BI Blogs

Getting started with Azure Data Factory Mapping Data Flows

Azure Data Factory V2 is the go-to service for moving large amounts of data within the Azure platform and, up until relatively recently, was focussed predominantly on control flow rather than data flow. Those familiar with SQL Server Integration Services will be aware of the difference however, to clarify, Control flow is used to orchestrate activities within a data pipeline whilst Data flow is used to physically move and transform the data. In May 2019, the Azure Data Factory team released into public preview the Azure Data Factory Mapping Data Flows (MDF) functionality, which effectively moves ADF from being an orchestration tool into a fully fledged ETL tool. This new feature allows developers to configure data transformation logic using a no code, drag and drop approach and implements many of the transformation concepts that existed in SSIS. In fact, there exists an excellent comparison between SSIS, SQL and ADF MDF written by Kamil Nowinski available through this link which highlights how the three technologies matchup: Points to note are that ADF does not have event handlers in the way SSIS does and also does not have an equivalent to the script component within Data Flows. It does have a custom activity which allows you to write C# within the control flow but currently you cannot write any custom code within a Mapping Data Flow. Prior to the release of MDF, Data Factory could move data from one place to another with its copy activity however it could not modify the file in any meaningful way and would require external services such as SQL Data Warehouse or Data Lake Analytics (RIP) to be used to fill this gap. The copy activity performed the data movement using the Azure Integration Runtime which provided the compute power needed to complete the operation, so does that mean that Mapping Data Flows run on the same compute resource? NO is the answer. In fact, your graphical data flow is converted into Scala and then compiled into a JAR library to be executed on a Databricks (Spark) cluster which is deployed and managed by Microsoft solely for running your data flow activities. This does mean that any pipelines that utilise MDF have a slight overhead to allow for the cluster to start-up and configure, however Microsoft are looking at ways to reduce this and there is no need for an ETL developer looking to build MDF’s to know anything about Databricks, Scala or Spark Clusters – although it will certainly help!So, it’s in public preview so let’s get using it! This blog will walk through the process of creating a basic cleaning data flow that will populate a SQL Database table with the values from a delimited text file. To begin, we need some data factory objects, anyone familiar with data factory will understand we need Linked Services and Datasets to connect to our data and create schemas, and Pipelines to give structure to the activities and create our control flow. In this example I have a small comma separated file that has some data quality issues such as,Leading and trailing white spaceWeird column namesNon standardised NULL valuesWeak data typesThe data flow we will create will address all of these issues before depositing the file into a SQL Database. First, we should create a linked service that connects to a blob store or data lake account where our dirty file is stored. We then need a dataset that sits on top of the Linked Service which allows us to locate and read the file using the specified parameters such as file path, column delimiter, row delimiter etc. Additionally, in this dataset we can import the schema of the file so that we have some column names available. Next, we can create a linked service that connects to our output SQL Database and also a dataset that points to the correct table again, importing the schema. After creating these objects we also need a pipeline to orchestrate the process and this will ultimately call our fancy new data flow. You only need a single activity in this pipeline which will be found under the “Move & Transform” heading and is called “Data Flow (Preview)”At this point you should have something similar to the following with the exception of the data flow object under the Data Flows (Preview) tab:NOTE: I have not gone into much detail about creating these objects as they will be familiar to most ADF developers. For pointers on how to create the above follow this link: Now we can begin creating our Data Flow. Firstly, we will add the source and the sink to our data flow, this gives us our bookends as a clear starting point, and we can then play about with middle bit afterwards. Also, at this point, toggle the switch at the top of the Data Factory UI named “Data Flow Debug”, this will start a cluster up for you and avoid you having to wait later onHover over the Data Flows (Preview) header and click on the ellipsis, choose “Add Data Flow” and this will take you into the Mapping Data Flow UI where you can begin creating you Data Flows. Remember to set a sensible name for your data flow in the general tab of the data flow UI. Click “Add Source” and we can begin configuring the source activity of our data flow. Provide a clear name that identifies the source. This source is fed from a dataset so the names should closely match. Choose the dataset that is linked to your blob account and uncheck “Allow schema drift”. This is a useful option and allows for constantly changing sources files to be handled however we will cover that in a later blog. On the “Source Options” tab you can choose to add actions that occur after completion such as deleting or moving source files. On the “Projection” tab you can tailor your schema which is defined in the source dataset. My preference is to leave these all as strings to avoid any early typing errors as we will address these later in the flow. Finally, on the Optimize, Inspect and Data Preview tabs, all defaults can remain the same. Now click on the tiny + icon in the bottom right corner of the source activity and choose “Sink” from the drop-down list. Here we can configure the output of our flow which in this case will be a SQL Database. Specify the name, again relating it to your sink dataset, and choose the SQL Database dataset created earlier. On the “Settings” tab you can choose which methods can be used by ADF when working with your table. These can be any combination of Insertion, Deletion, Upserting or Updating. Also, you can define actions to occur in the database before loading the data such as recreating the entire table or truncating the existing table before loading into it. Finally, on the “Mapping” tab you can map columns from source to sink. Be aware that any columns that are not strings in your database will not be able to be mapped until the data typing has occurred. Now we have a basic copy structure that does nothing clever yet but does connect our dirty source file up to our sink dataset. We can begin doing the actual transform. The first transformation we will do will be to Trim white space from columns. Click on the + icon and choose “Derived Column”. Within the “Derived Column Settings” tab you should add each of the columns in your source dataset and then enter the following expression for each one in the expressions editor: trim({column name}). This expression will remove any whitespace from the columns value ensuring the database receives a clean value. Now we will standardise any NULL values and also transform any columns into their correct data types. To do this, click the + icon again and choose “Derived Column” again. Similar to the above step you can add an entry in the “Derived Column Settings” tab for each column, adding the below expression for each column: replace(replace({column name}, ' ',''),'Unknown',''). This expression will replace any empty values with NULL and also any values where we have ‘Unknown’ will also get replaced with NULL so that we have some standardisation before loading into the database. Any NULL values already present will be untouched. In my dataset I need to change one column from its originating string data type into an int so that it can be dropped in the DB. Rather than doing this change in place, it is best to create a new column to do this so that you have an original column and the new column with the correct type. Whilst still in the expression editor, hover over any column name in the “OUTPUT SCHEMA” window that is visible on the left hand side and choose the + icon. This will allow you add a new column to you data flow and you can use any of the conversion functions (toInteger, toLong, toString, toDate, toBoolean etc) to coerce the value into its correct type.At this point you should have four steps that resemble the below screenshot. Once your Data Flow Debug session is online you can debug the data flow and hopefully see the nice clean values pass through into the database. Throughout this process I recommend taking a peek at the Inspect and Data Preview tabs. The Inspect tabs give a bit more information about what steps are taking place on the data in that activity and the Data Preview will show you how the data will look, although the Debug session needs to be active for this to work. Finally, the optimize tab allows you to set the partitioning of the data using techniques such as Round Robin, HASH and range distribution although these are out of the scope of this blog. Hopefully this brief tutorial has been helpful and allowed you to gain some early knowledge on Data Factory Mapping Data Flows meaning that you can go on to create Flows that are tailored to your needs and cover off a wider variety of scenarios. Any questions, thoughts or feedback, please catch me on twitter @MattTheHow.

Injecting Databricks DataFrame into a Power BI Push Dataset

Using Python and the Power BI REST APINow, why would you want to do that?There are some scenarios where this approach may be beneficial. To get the full picture and establish the landscape let’s first answer two questions:Why a Databricks DataFrame?Recently Databricks became an integral part of the Modern Datawarehouse approach when aiming for the Azure cloud. Its wide usage in data transformation begs for a richer variety of data destinations. The usual and most widely used persistence is the file store (lake, blob, etc.). It’s fine with large volumes of data, but then we have to go a long way before reaching the data presentation (additional storage layers, SQL, Tabular data model etc…).What if we want to instantly update a Power BI report directly from Databricks? It could be a small dataset feeding a dashboard for example. It could be business data or data-related metrics that we want to see in (near)real-time. Or we want to monitor the data transformation process continuously in a Databricks streaming scenario.Why a Push Dataset?We can do real-time streaming in Power BI by using Streaming or PubNub streaming datasets, which is fine, but let’s see some of the advantages of using a Push dataset:You can build a report on top of a Push datasetYou can pin report visuals to a dashboard, and they will update in real-time on each data pushData is stored permanently in Power BIYou don’t need a data gateway in placeFor a more detailed comparison of all the real-time streaming methods, please check here.If you think that all this makes sense, then continue further.ImplementationSince we’ll be utilising the Power BI REST API there are some limitations that we need to be aware of upfront. You can see them here.In order to be able to call the Power BI API you need to register an application and set proper permissions. Follow the steps here. App registration details below:After creating the app registration, you should grant permissions in the Azure portal:Without granting these permissions from the Azure portal you won’t be able to get authorisation token and use the REST API.Now that we’re all set-up let’s go straight to the point.My initial intention was to show you how to build the whole thing step-by-step in this post. But then it become complicated and I decided that an abstraction is needed here to keep things simple. That’s why I wrapped up all the “boring” stuff in a Python class called pbiDatasetAPI, which you can find on GitHub here. The comments in the code should be enough to understand the logic. The methods of this class will take care of:AuthenticationHTTP requestsHTTP requests JSON body generation (data and metadata)Data type conversion (Spark to EDM)Wrapping in a Python function of all the Power BI REST API operations that we need to performIn order to start using it you should import a notebook (using the above-mentioned URL) in your Databricks Workspace:Now that the class notebook is imported you can create a new Python notebook in the same folder to test how it’s working. Let’s call it “Inject DataFrame into Power BI Push Dataset”. First, we’ll execute our class notebook:%run "./pbiDatasetAPI" Next, we’ll need a DataFrame with data that will be pushed to the Power BI Push dataset. We can use some of the sample datasets which come with Databricks (in this case Amazon reviews):dfInject ='dbfs:/databricks-datasets/amazon/test4K') dfInject ="brand", "img", "price", "rating", "review", "time").limit(200) We take 200 rows, which is just enough.In the next command we’ll create some variables and instantiate the pbiDatasetAPI class:# Initialise variables username = "<USER_EMAIL>" password = "<USER_PASSWORD>" application_id = "********-****-****-****-************" # Power BI application ID groupId = None # Set to None if not using Power BI groups datasetName = "InjectedDataset" # The name of the Power BI dataset tableNames = ["AmazonReviews"] # Table name or list of table names dataFrames = [dfInject] # DataFrame name or list of DataFrame names # Create a pbiDatasetAPI class instance pbi = pbiDatasetAPI(username, password, application_id) You should set your username and password, and the application ID obtained in the previous steps. Optionally provide also a group ID or set it to None if you’re not using groups on tableNames and dataFrames variables are lists, because we may want to insert multiple DataFrames in multiple tables. In our case it’s one DataFrame to one table.Let’s create a dataset with one table in Power BI:# Create the dataset and the table in PBI pbi.executePBIOperation("postdataset", groupId = groupId, datasetName = datasetName, tableNames = tableNames, dataFrames = dataFrames, reCreateIfExists = True) The next step is to post all the DataFrame’s rows to the Push dataset.# Get the datasetKey for the dataset (by name) datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"] # Insert the contents of the DataFrame in the PBI dataset table pbi.executePBIOperation("postrows", groupId = groupId, datasetKey = datasetKey, tableNames = tableNames, dataFrames = dataFrames) This is where the magic happens. One important note here is that the dataset key is always unique. This does not apply to the dataset’s name, which means that we can have multiple datasets with the same name.You can see the newly created dataset on can create a report on top of this dataset and pin visuals to a dashboard (tiles will be updated automatically upon data change).Now, let’s try to manipulate the metadata a bit – change the data type of the “rating” column in the DataFrame from double to string and update the Push dataset accordingly:# Change the data type of the column 'rating' from double to string dfInjectModified = dfInject.withColumn("rating", dfInject.rating.cast("string")) # Get the datasetKey for the dataset (by name) datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"] # Update the metadata of the Power BI table pbi.executePBIOperation("puttable", groupId = groupId, datasetKey = datasetKey, tableNames = tableNames, dataFrames = [dfInjectModified]) The only thing remaining now is to try and delete all the rows (it’s all or nothing – we can’t delete some of the rows) from the table in the dataset and then delete the entire dataset:# Get the datasetKey for the dataset (by name) datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"] # Delete all rows from the table(s) pbi.executePBIOperation("deleterows", groupId = groupId, datasetKey = datasetKey, tableNames = tableNames) # Get the datasetKey for the dataset (by name) datasetKey = pbi.executePBIOperation("getdatasetbyname", groupId = groupId, datasetName = datasetName)[0]["id"] # Delete the dataset pbi.executePBIOperation("deletedatasetbyid", groupId = groupId, datasetKey = datasetKey) All the code above you can import in a notebook using this URL.If you closely examine the Power BI REST API documentation here, you’ll find that the pbiDatasetAPI class is not completely finished yet. There’s more that needs to be done, like:Create measures with DAXCreate table relationshipsSet cross-filtering behaviouretc.I intend to update the class in the future so that all the features will be available. Check GitHub now and then for updates.

Installing Databricks Cluster Libraries from a Python notebook

Working with interactive clusters in Databricks makes it possible to manually install libraries using the workspace UI. It’s done once and for all. You don’t have to worry about it anymore. Creating job clusters is another story. You must take care of library installations prior to executing the notebooks which reference these libraries. In the case you’re using Azure Data Factory to orchestrate the whole process you’re lucky, because appending libraries to job clusters is an out-of-the-box functionality. For all other scenarios using the Databricks REST API is one possible option. In fact, you can do this right from a Python notebook. That’s what I’m going to demonstrate in the following lines. Let’s have a look at the REST API documentation first. We’ll be using the Cluster Status and Install endpoints only. For installing a library, we need to provide the library source and its properties. We need to create a proper HTTP request body in JSON format including the library source and properties. Here’s one example: { "pypi": { "package": "simplejson", "repo": "" } } Here "pypi" is the source and {"package": "simplejson", "repo": ""} are its properties. For more examples check the documentation here. Let’s create a new Python notebook and then some functions in it. The code is simple and self-explanatory. Comments are available where needed. We’ll be using only json and requests libraries:import json import requests from requests.auth import HTTPBasicAuth Now, let’s create a REST API wrapper function: def adbAPI(endPoint, body, method = "GET", region, token): """Execute HTTP request against Databricks REST API 2.0""" domain = region + "" baseURL = "https://%s/api/" % (domain) if method.upper() == "GET": response = requests.get( baseURL + endPoint , auth = HTTPBasicAuth("token", token) , json = body ) else: response = baseURL + endPoint , auth = HTTPBasicAuth("token", token) , json = body ) return response As parameters we’ll take the API endpoint, HTTP request body, HTTP method (GET or POST), Databricks workspace region (westeurope, northeurope, etc.) and, finally, a Databricks token.   Next is a helper function for translating the library status response into a human readable format: def describeClusterLibraryState(source, clusterId, status): """Converts cluster library status response to a verbose message""" result_map = { "NOT_INSTALLED" : "{} library is not installed on cluster {}.".format(source.title(), clusterId) , "INSTALLED" : "{} library is already installed on cluster {}.".format(source.title(), clusterId) , "PENDING" : "Pending installation of {} library on cluster {} . . .".format(source, clusterId) , "RESOLVING" : "Retrieving metadata for the installation of {} library on cluster {} . . .".format(source, clusterId) , "INSTALLING" : "Installing {} library on cluster {} . . .".format(source, clusterId) , "FAILED" : "{} library failed to install on cluster {}.".format(source.title(), clusterId) , "UNINSTALL_ON_RESTART": "{} library installed on cluster {} has been marked for removal upon cluster restart.".format(source.title(), clusterId) } return result_map[status.upper()] The parameters here include the library source, the cluster ID and the API’s status response. Now let’s get to the serious business. Obviously, we need a way to check the current state of the library to see if it’s not already installed or if something went wrong during installation: def getClusterLibraryStatus(source, properties, dbRegion, dbToken, verbose = True): """Gets the current library status """ source = source.lower() # Validate input assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), \ "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran" assert properties is not None, \ "Error: Empty properties provided" # Get the cluster ID from the Spark environment clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId") # Set default result to not installed result = describeClusterLibraryState(source, clusterId, "NOT_INSTALLED") if verbose else "NOT_INSTALLED" # Execute REST API request to get the library statuses libStatuses = adbAPI("2.0/libraries/cluster-status?cluster_id=" + clusterId, None, "GET", dbRegion, dbToken) if libStatuses.status_code == 200: statuses = libStatuses.json() if "library_statuses" in statuses: for status in statuses["library_statuses"]: if status["library"] == {source: properties}: if verbose: result = describeClusterLibraryState(source, clusterId, status["status"]) else: result = status["status"] else: print(status) return result Parameters here include the library source and properties, Databricks region and token and a verbose flag, which if set to true (default) will make the function return human readable library status.   Finally, we have to create the most important function that will actually install the library: def installClusterLibrary(source, properties, dbRegion, dbToken): """ Installs a cluster library given correct source and properties are provided For examples see """ source = source.lower() # Validate input assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), \ "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran" assert properties is not None, \ "Error: Empty properties provided" # Get the cluster ID from the Spark environment clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId") status = getClusterLibraryStatus(source, properties, dbRegion, dbToken, False).upper() if status != "INSTALLED": # Create the HTTP request body based on the cluster ID, library source and properties body = json.loads('{"cluster_id": "' + clusterId + '", "libraries": [{"' + source + '": ' + json.dumps(properties) + '}]}') # Execute REST API request to install the library runAPI = adbAPI("2.0/libraries/install", body, "POST", dbRegion, dbToken) if runAPI.status_code == 200: print("Installation started . . .") else: print(runAPI) else: print(describeClusterLibraryState(source, clusterId, status)) return status The parameters here are: the library source and properties and the Databricks region and token. Example function calls (azure-sqldb-spark library hosted on Maven) Show the library status: print(getClusterLibraryStatus("maven", {"coordinates": ""}, "westeurope", "dapi********************************")) Install the library: installClusterLibrary("maven", {"coordinates": ""}, "westeurope", "dapi********************************") One thing to note here is that the library installation starts and runs asynchronously. This means that you can execute the getClusterLibraryStatus function multiple times after installClusterLibrary and get different results until the installation is done (or failed).

Azure Databricks - overwriting table that is also being read from

Problem Definition Recently I have reached interesting problem in Databricks Non delta. I tried to read data from the the table (table on the top of file) slightly transform it and write it back to the same location that i have been reading from. Attempt to execute code like that would manifest with exception:“org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from”.  The lets try to answer the question How to write into a table(dataframe) that we are reading from as this might be a common use case? This problem is trivial but it is very confusing, if we do not understand how queries are processed in spark. I explain it on example. Let define a process that would allow us to: Filter out objects that we already processed. Store history of loaded object for 3 days only (History need to take minimum amount of space on the file system) Todays load will be part of history for tomorrow load Process for this task would look like this: We Read CurrentLoad (new objects that we want to load in) Read limited Historic to last 3 days (objects that we already processed) Create new set called Unprocessed by removing existing History records from CurrentLoad (That is made via left anti join) Process/transform Unprocessed (optional process that may/may not use unprocessed further in further processing) Join only last 3 days of the history with Unprocessed (To maintain only last 3 days of history and keep the set as small as possible) Overwrite History Above process is not the most efficient way of checking new load against historic loads, but it is a good example of overwriting dataframe/table that we might read from the same time. Diagram of this process will look like this: So let’s see what might go wrong in this process. We need to add Unprocessed Dataframe (which contains reference to History Dataframe) into existing History and at the same time we need to remove first day (to maintain only 3 days of history). This operation might be not as straight forward as it may seems. The reason is that Databricks by default use lazy execution it means that execution of the code is not happening immediately. Query is not going to be evaluated until last moment – that is when we will try to write data down on the file system. Spark is doing so to optimize the queries by creating execution plan. Execution plan helps spark to evaluate quires in way that would give the best performance. If we wait with processing of the Unprocessed Dataframe, it will hold reference to two sets History and Current load, while we try to write to History. It means that we are trying to perform a write operation to the Dataframe we are reading from. There is simple solution for this problem.   Solution Materialize the Dataframe that you want to overwrite data with (HistoryTemp) so as it clears out the dependencies on the tables that we read from (History, CurrentLoad). The simples way to do so is to write Dataframe (HistoryTemp) to the file system into temporary location and then re-read the data into new Dataframe. This will enable us to write data into History location. This approach requires to set a temp location your temp data. Make sure that you overwrite you HistoryTemp each time You could perform additional clean up on the end of your script to delete additional temp data and metadata (table, files). This Process would look like:1. Calculate new Unprocessed (CurrentLoad left anti join  History)2. Union Unprocesed with current History to persist all records that we want to save (HistoryTemp)3. Output HistoryTemp (overwriting set) to some temp location in the file system4. Re-read the data from that we outputted (HistoryTemp) into new DataFrame5. Write new Dataframe to you History location. Make sure that Unprocessed, History temp set is not used further in the notebook, so if you require to use it, perform write operation on the end   Code You can download full notebook from Reference section. What Did not work for me I have seen some suggestions on different forums that I tried and they DID NOT WORK for me, and i was receiving the same exceptions: Creating temp view on the data and cache it: Unprocessed.createOrReplaceTempView(“unprocessedTemp”)Unprocessed.cache() Checkpointing the  Unprocessed dataframe with this commands:  spark.sparkContesxt.setCheckpointDire(“dbfs:/unprocessed/checkpointDirectory”)Unprocessed.rdd.checkpoint()   Reference You can download code to play around from here: users with similar problems were posting: here

Azure Databricks Windowing Functions with Dataframes API

This blog is going to cover Windowing Functions in Databricks. I will describe concept of Windowing Functions and how to use them with Dataframe API syntax. If you have not used Dataframes yet, it is rather not the best place to start. Azure Databricks also support Spark SQL syntax to perform queries, but this is not going to be covered in this blog. Introduction – What are Window Functions? Databricks was design to work with large sets. Data distribution and parallelization of the work, makes queries run against data very fast. Those queries can be used not only to read data but also to transform them. There are different kind of transformations that you can apply on the Dataframe: Operations on the single rows and produce single value output (build in functions, and UDF, single column transformations e.g. cast string, substring etc. if statements), Exploding arrays of complex data – e.g. explode(“”) Aggregate Functions - Grouping data with groupBy(…) and apply function on the groups e.g. max(), avg(), min() etc. (columns that do are not in groupBy(…) or as a part of aggregate function can not exists in result Dataframe Window Functions – allow us to partition data by columns and then apply aggregate, rank functions within each partition separately - similar to Grouping but allows us to preserve columns that do not take part in partitioning, It also allows us to order elements within each partition. The last one are the subject of the current blog.   problem definition Lets say that we got to answer questions  that are listed below: calculate running totals for the data set rank data based on some columns group rows and assign row number from 1…N within each group calculate difference between each record and aggregated values within the group that it belongs to e.g. difference between current value and the average from the partition/group it belongs to and we want to preserver all the columns from the set that we operate onto Some of this questions could be answered by using combination of groupBy(…) and join(..), some of the could be answered using looping mechanism. They could, but there is a better inbuilt mechanism that would allow us to do that queries in a bit more neat manner. This mechanism is called Window Function. If you are familiar with Window Function in T-SQL, this is not going to be new concept for you as it works in very similar way as T-SQL window function.   How is it constructed? There are three logical parts in window function Defining what function we want to use to calculate value e.g. max(“”), row_number() etc. Defining what window we want to use to calculate this value – that function might look differently for different kind of transformations and  of the components are optional what we are going to partition by ( what columns are we going to use for partitioning – grouping our data before calculations are applied. Function will be executed in each of this group) what order is required in each portion before making calculation – define columns and ordering direction before the function is applied. It would have no big influence on function like max, min, be might have a great impact on functions like row_number, lag, lead ) what is the scope/Boundaries of the window Use function in the data frame # What window function DataFrame syntax look like that: Import required namespaces and libraries – you are required to import Window function from pyspark.sql.window namespace before you start using it:from pyspark.sql import *from pyspark.sql.window import Windowfrom pyspark.sql.functions import * Define window specification – one of the specifications bellow, depending what type of window we want to define## if we do not want to specify boundaries, sliding window etc.windowSpecification = Window.partitionBy(…).orderBy(…)## if we specify window boundaries based on rowswindowSpecification = Window.partitionBy(…).orderBy(…).rowsBetween(…) ## if we specify window boundaries based on rows column valueswindowSpecification = Window.partitionBy(…).orderBy(…).rangeBetween(…)Explanations:Window – window function object (always stay as Window – see import statement)Partition columns are specified by: putting name of the columns in quotations in partitionBy() e.g. Window.partitionBy(“col1”,”col2”,…,”colN”).Sorting columns are specified by: putting name of the columns in quotations in orderBy() e.g. Window.orderBy(“col1”,”col2”,…,”colN”)Direction of the sort is specified by: adding desc(“”) or asc(“”) in the order by clause e.g. Window.orderBy(asc(“col1”),desc(”col2”),asc(”colN”)) Use below syntax to apply function over the specified windowmax(“Salary”).over(windowSpecification) In data frame we might use it by creating new column name that would hold value. Please note that we need to create windowSpecification first: Full code can look like this: ### Import statementes from pyspark.sql import *from pyspark.sql.window import Windowfrom pyspark.sql.functions import *### Define window specificationwindowSpecification = Window.partitionBy(“”).orderBy(desc(””))### Produce new Dataframe with new column that would held RowNumber in each partitiondfResult = dfSourceData.withColumn(“RowNumber”,row_number().over(windowSpecification))   Functions that we can use with Windowing Function: There are plenty of windowing function that we can use. Some of them can be found here:   Accessing absolute/relative partition’s records lag(“”) – returns value of the specified column from the row preceding the current row that we calculate. For the first row Null will be returned as there is no previous value lead(“”) – returns value of the specified column from the row following the current row that we calculate. For the first row Null will be returned as there is no previous value first(“”) – gets first value from the partition in current order last(“”) – gets last value from the partition in current order Ranking function row_number() – assigns row number starting with 1 and increasing by 1 with each row in the partition rank() – calculate rank number dense_rank() – calculates dense rank ntile(N) – splits set in N  equal buckets (if set can not be divided equally the last set will be uneven in size) Aggregation max(“”) – calculate max value of chosen column within specified partition. Remark:I we bound the window like unbounded preceding and current row this is might return different results depending on the ordering type (asc, desc) min(“”) – calculate min value of chosen column within specified partition. Remark:I we bound the window like unbounded preceding and current row this is might return different results depending on the ordering type (asc, desc) sum(“”) – calculates sum of the chosen column name. Can be used  for running total with bounded. avg(“”) – calculates average count(“”) – calculates count of items in the partition mean(“”) – calculates mean in the partition Other collect_list("") – collects list of elements in the partition of chosen column in order they resides and return value in form of array collect_set("") – collects set of elements in the partition of chosen column without ordering and return value in form of array Not Supported countDistinct(“”)/countDistinct(“”)   Remarks Some of the functions like Aggregation will behave differently with orderBy(…) clause and differently without it: with orderBy e.g. Window.partitionBy(…).orderBy() – it will be calculated in running manner (for first row value  = min(from 1st record) then for second value = min( from 1st & 2nd record), for third value = min( from 1st & 2nd & 3rd) and so on without orderBy e.g. Window.partitionBy(…)  - it will calculate value across whole partition so each record will get the same value e.g. average of the column from the partition.   Unbounded Examples Unbounded Window Functions are those that do not define boundary on window specification, that is they do not have rowsBetween() or rangeBetween() statements on the window object. Unbounded Windows functions executes function in each partition across all records. In other words the do not limit records aggregation/rank function will be executed within each partition. They will be useful to answer questions like:   Row Bounded Examples Row Bounded Window Functions are those that define boundary on window specification by using rowsBetween(…) statement on the window object. This allows to limit records that we executes function in each partition. This limit is enforced by specifying the offset of records as number of records, it does not use values of the records. This can be dynamic relative to the current row, or we can bound it on one side (lower, upper), both sides - relatively to the current row. They will be useful to answer questions like running totals et, or if we want to make calculations on records relative.   Range Bounded Examples Range Bounded Functions defines boundary on window specification by  using rangeBetween(…) statement on window object. This allows to limit records that we execute our function on in each partition. This limit is enforced by specifying offset of records by using values of the column. This is dynamic - each window is calculated based on the values that determines the size of the window. This kind of functions will be useful to answer question where we need to define the size of the window based on the values of the column that we apply our function on.   Summary Windowing functions in data bricks can cover very wide range of use cases. It provides nice alternative to loops, so if you need to perform looping operations on the data that you have, ask yourself if it is not easier to solve that problem with Windowing Functions. Be aware that this windowing function may cause large amount of data shuffling, so use windowing function with care.   References You can find more information in bellow links Data Frames – explains what  Dataframes are and how to use them Introduction to window Function – another good blog about windowing function (Spark and SQL API)   Download Databricks Notebook – you can download all examples used in this blog, and play without need of  mounting external storage. Data are generated by script.

PySpark DataFrame Transformation Chaining

After working with Databricks and PySpark for a while now, its clear there needs to be as much best practice defined upfront as possible when coding notebooks. While you can get away with quite a bit when writing SQL - which is all too familiar to most of us now, the transition into other languages (from a BI background) requires a bit more thought in terms of coding standards. In this short blog, I will talk about multiple DataFrame transformations and what I believe to be the best way to go about structuring your code for them. More often than not, if you have not come from a programming background or are unfamiliar with a language, you will end up writing spaghetti code. Essentially code that reads a bit like a book, with no real structure to it, causing debugging/maintenance issues. We’ve all been there, especially at the start of a project, just wanting to get some output – but its worth trying to bear in mind some coding rules while designing to save the refactoring a few weeks down the line. Jumping into the subject matter of the blog, this came about because I noticed some of our project code when it came to transformations was made up of re-assigning to a new data frame on each transformation. While functionality it was acceptable, it didn’t feel the right way to go about things. The same can be said with re-using the same data frame (although this caused other dependency issues). It also left us with huge chunks of code which was only possible to read by using the comments to see where some transformations stopped and others started. After a small bit of research I discovered the concept of monkey patching (modifying a program to extend its local execution) the DataFrame object to include a transform function. This function is missing from PySpark but does exist as part of the Scala language already. The following code can be used to achieve this, and can be stored in a generic wrapper functions notebook to separate it out from your main code. This can then be called to import the functions whenever you need them.   # Monkey patch the DataFrame object with a transform method from pyspark.sql.dataframe import DataFrame def transform(self, f): return f(self) DataFrame.transform = transform   As part of your main code, we can then wrap the transformations into functions, passing in and returning a DataFrame. A separate statement can then be called specifying transform on the original DataFrame and the list of functions (transformations) you want to pass in. By using this method, the code is almost self-documenting as its clear what transformations you’ve then applied to move a DataFrame from one context into another. The example below only includes 2 transformations, but imagine you have 20+ to implement – this makes the code much easier to read and debug should there be an issue.   # Create started/finished timestamps and int dates in UTC as new columns in the DataFrame. def append_utc_dates(df): df = df.withColumn("finishedTimestampUTC", to_utc_timestamp(col("finished"), "EST")) \ .withColumn("startedTimestampUTC", to_utc_timestamp(col("started"), "EST")) \ .withColumn("startedDateUTC", ConvertDateTimeToIntDate(col("startedTimestampUTC")))\ .withColumn("finishedDateUTC", ConvertDateTimeToIntDate(col("finishedTimestampUTC"))) return df # Adds new attribute based on hand structure in to the DataFrame def append_BB(df): df = df.join(refHandStructure, df["structure"] == refHandStructure["structure"], "left") \ .select(df["*"], refHandStructure["handStructureName"]) \ .withColumn("BB",when(col("HSName") == "Limit", col("x")).otherwise(col("y"))) df = df.drop("handStructureName") return df # Perform hand transformations dfTransformed = dfRaw.transform(append_utc_dates) \ .transform(append_BB) display(dfTransformed)   You should then use new DataFrames for changes in grain or changes in purpose (not just for each transformation). This technique probably shouldn’t be used for some of the more basic transformations, especially if you only have 1 or 2, but more so when you have 50/100+ lines of code.  Not everything needs to be wrapped into functions, but it certainly reads better for larger notebooks!

Implementing Enterprise Security in Azure Databricks - Part 2

Following on from my last blog on the topic of security within Azure Databricks which concentrated on the implementation model for data processing for platform, the following blog concentrates on the alternative - data processing for users.   Data Processing for Users By this, I mean data-related activities that a user is performing interactively, for instance data analysis from Data Lake. With the addition of Databricks runtime 5.1 which was released December 2018, comes the ability to use Azure AD credential pass-through. This is a huge step forward since there is no longer a need to control user permissions through Databricks Groups / Bash and then assigning these groups access to secrets to access Data Lake at runtime. As mentioned previously - with the lack of support for AAD within Databricks currently, ACL activities were done on an individual basis which was not ideal. By using this feature, you can now pass the authentication onto Data Lake, and as we know one of the advantages of Data Lake is the tight integration into Active Directory so this simplifies things. Its worth noting that this feature is currently in public preview but having tested it thoroughly, am happy with the implementation/limitations. The feature also requires a premium workspace and only works with high concurrency clusters – both of which you’d expect to use in this scenario. The other good thing is that its incredibly easy to enable this functionality, as it is controlled by the cluster configuration process. To enable, navigate to the cluster configuration page, select runtime 5.1 or higher, and expand the advanced options. At this point, you will see a tick box which needs to be checked (see below). This will add another line of code into your spark config. Its actually good to see it was implemented in this way – and helps to stick to the Microsoft mantra of keeping things simple.     Once enabled, only connections into Data Lake via acl:// are valid – any existing connections with the dbfs or through the databricks databases mounted to route via the dbfs for unmanaged tables will stop working. This is a current limitation and may be fixed at GA (although technically you could re-build the tables using the acl:// path if this was an issue). Great – so now my ACL can be controlled entirely within Data Lake without the need for Service Principals! But there’s more.. I touched on this with my previous blog, but as part of the access implementation for Data Lake, it is preferable to define Azure Active Directory Groups (AAD) to provide further flexibility moving forward. By this I mean, creating AAD groups and assigning them to Data Lake so as to create a level of abstraction away from Users/User AD Groups/Service Principals so that modifications will not need to be made to Data Lake permissions in the future, only to the AAD groups that are already assigned. From experience, by not going down this route - any additional user permissions that need applying in future have to be applied across all folders/files which depending on the size of the data lake, can take a particularly long time \ be awkward to add. Therefore this needs to be done upfront as part of the design! I would suggest the following conventions for this group setup, an example being AAD_PLATFORM_ADL_ENV_RAW_READER. AAD – to separate out AD/AAD once sync is implemented. PLATFORM – the platform or project/department this group is used by. ADL – the resource on the platform that the group is used by. ENV – the environment on which the resource resides (prod/non-prod). RAW – the layer within lake the permissions will be applied to. READ – the permission the group will have access to. You would then need to create the following AAD groups to cover all areas across the Data Lake. This assumes using our standard Data Lake layer/folder pattern first introduced by Ust’s blog back in 2016. AAD_PLATFORM_ADL_PROD_RAW_READER AAD_PLATFORM_ADL_PROD_RAW_WRITER AAD_PLATFORM_ADL_PROD_BASE_READER AAD_PLATFORM_ADL_PROD_BASE_WRITER AAD_PLATFORM_ADL_PROD_ENRICHED_READER AAD_PLATFORM_ADL_PROD_ENRICHED_WRITER AAD_PLATFORM_ADL_PROD_CURATED_READER AAD_PLATFORM_ADL_PROD_CURATED_WRITER AAD_PLATFORM_ADL_PROD_LABORATORY AAD_PLATFORM_ADL_PROD_LIBRARY_READER AAD_PLATFORM_ADL_PROD_ADMIN The permissions applied to these groups can then be implemented using the following matrix. When adding these permissions, they need to be added to the current folder, and all children, and added as both an access permission entry and a default permission entry. Without this, any changes to Data Lake in the future will not inherit these permissions.     Once this is complete, you would add the Users/AD User Groups/Service Principals into these AAD groups to provide access and the pass-through permissions work as expected. By using this method it separates out both read/write and the data lake layers meaning that unless specifically granted, there will be a much more limited footprint in terms of access permissions into Data Lake. Using a combination of this, and the AD credential pass-through from Databricks provides a suitable solution for implementing security using the Databricks/Data Lake combo.

Implementing Enterprise Security in Azure Databricks - Part 1

In recent weeks, I’ve spent a fair chunk of time working with different aspects of Databricks and as part of this, one topic that consumed a proportion of that time is the security and authentication aspects of the service. Our scenario was one that I expect most people will come across over the next few years, essentially the integration of Databricks with Azure Data Lake for data processing. In the following blogs (yup I need 2) I intend to document the way we went about implementing the security model, breaking it up into data processing for a platform and data processing for users – each has a slightly different implementation.   Data Processing for Platform By this, I mean data processing activities that happen as part of a service, orchestrated through something such as Azure Data Factory that happen on a schedule or in real-time. I expect the very first thing most people will do once they have a Databricks workspace is to mount their Data Lake.  Its by the far easiest option for getting your hands on your data, and one that’s still worth using to start off as long as you understand the consequences. If you need to do this, I will point you in the direction of Hugh’s blog post covering the topic which I’ve used a number of times now to remember all the different secrets/GUIDs you need.  By mounting the Data Lake in your Databricks workspace, you are able to interact with the data as if it were in the DBFS (the databricks local file system). This proves incredibly powerful for tools such as dbutils which allow you to perform a vast number of file system operations. Unfortunately the one downside to this method is that anyone with access to the workspace is able to directly interact with all layers in your Data Lake – not ideal. Since Data Lake’s are not treated in the same way as something like an Azure SQL DB, you also won’t have those handy recovery options if something happens to your data because of this unrestricted access. This obviously poses a risk for an Enterprise solution. Its worth noting that the mount is done once, so your clusters can go down and be spun up again by other users and they would still be able to access the mountpoint until it is un-mounted. With that in mind, it is far better to implement the access to Data Lake directly at runtime using the Spark API. The config for this is very similar to the mounting, except the spark config is set at runtime and therefore if this is not included you will be unable to access the Data Lake. This avoids the issues mentioned above where anyone can access the data, just because they can access the workspace. The following commands can be used to set this up at runtime.   # Get Secrets from Databricks ClientId = dbutils.secrets.get(scope = "ScopeX", key = "ClientId") ClientSecret = dbutils.secrets.get(scope = "ScopeX", key = "ClientSecret") DirectoryId = dbutils.secrets.get(scope = "ScopeX", key = "DirectoryId") # Apply variables to spark config spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential") spark.conf.set("", ClientId) spark.conf.set("dfs.adls.oauth2.credential", ClientSecret) spark.conf.set("dfs.adls.oauth2.refresh.url", "" + DirectoryId + "/oauth2/token")   You’ll notice that as part of this I’m retrieving the secrets/GUIDS I need for the connection from somewhere else – namely the Databricks-backed secrets store. This avoids exposing those secrets in plain text in your notebook – again this would not be ideal. The secret access is then based on an ACL (access control list) so I can only connect to Data Lake if I’m granted access into the secrets. While it is also possible to connect Databricks up to the Azure Key Vault and use this for secrets store instead, when I tried to configure this I was denied based on permissions. After research I was unable to overcome the issue. This would be more ideal to use but unfortunately there is limited support currently and the fact the error message contained spelling mistakes suggests to me the functionality is not yet mature. To configure the databricks-backed secrets, the easiest method is to use an Azure Bash console and go in via the Databricks CLI. To access the console - within the Azure portal you’ll notice an icon similar to below as part of the top ribbon.     Clicking this will then prompt you to start either a PowerShell or Bash console – which will look similar to below.     The connection into the Databricks CLI can be setup as per the following commands.   virtualenv -p /usr/bin/python2.7 databrickscli source databrickscli/bin/activate pip install databricks-cli databricks configure --token   At this point, you’ll need to provide it both your databricks host – something similar to and a token. The token will need to be generated through your Databricks workspace – under User Settings / Access Tokens. This essentially lets you into the API without the need for a password. The important thing to mention at this point is that the token is a personal access token. While this doesn’t impact anything in particular with Azure Bash, its worth noting that the token is created under your Databricks account, and therefore using this token for something such as a linked service into ADF then will use your account to process the connection authenticating as you. Hopefully over time, this will be matured and the use of Managed Identity's or Service Principals directly connected into the platform will be possible. As you might of guessed then, you will need to make sure the account that generates the token is then used within the ACL to read the secret, otherwise at processing time – ADF will not be able to read through Databricks into Lake. The configuration of the secrets is then required and can be done using the following commands. Simply replace [client id], [secret value], [directory id] with the necessary values from your service principal.   databricks secrets create-scope --scope ScopeX databricks secrets put --scope ScopeX --key ClientId --string-value [client id] databricks secrets put --scope ScopeX --key ClientSecret --string-value [secret value] databricks secrets put --scope ScopeX --key DirectoryId --string-value [directory id] databricks secrets put-acl --scope ScopeX --principal admins --permission READ   I then granted access to read these secrets to the admins group – this just keeps things simple but you can obviously provide it individual users as well or other Databricks groups. One of my hopes for 2019 is that the platform is integrated better into AAD moving forwards as everyone needs to be named individually currently. You would then need to manage permissions through the groups which can only be done via the CLI. Once this is complete, you can now interact with your Data Lake and authenticate at runtime. Its also worth mentioning that this interaction changes the connection string from dbfs:// to adl:// which may have a knock-on effect if you use certain tools such as dbutils to do things within your code. This is also important to know since with databricks runtime 5.1 and AD credential pass-through, you will be unable to access anything other than Data Lake file systems. I’ll explain this further in my next blog.   Conclusion Hopefully this will prove a useful to anyone venturing onto the platform and provide a basis to implement a security model. If you wanted to go a step further, you may also want to implement service principal access to Data Lake on a number of levels – both across folders AND read/write. This would add slightly more complexity to the solution but provide an even securer method avoiding the scenario where accounts can access the entire Data Lake. In my next blog, I will look at it from a user-perspective which takes on a slightly different implementation.

Lambda vs Azure Databricks Delta Architecture

Historically, when implementing big data processing architectures, Lambda has been the desired approach, however, as technology evolves, new paradigms arise and with that, more efficient approaches become available, such as the Databricks Delta architecture. In this blog, I’ll describe both architectures and demonstrate how to build a data pipeline in Azure Databricks following the Databricks Delta architecture.The Lambda architecture, originally defined by Nathan Marz, is a big data processing architecture that combines both batch and real time processing methods. This approach attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data.From a high-level perspective, the Lambda architecture is as followed.A valid approach of using Lambda in Azure could be as demonstrated below (please be aware that there are different options for each layer that I won’t be detailing in this blog).For the speed layer we could use Azure Streaming Analytics, a serverless scalable event processing engine that enables the development and run of real-time analytics on multiple streams of data from sources such as devices, sensors, web sites, social media, and other applications. For the batch layer, we could use Azure Data Lake Storage (ADLS) and Azure Databricks. ADLS is an enterprise-wide hyper-scale repository for big data analytic workloads that enable us to capture data of any size, type, and ingestion speed in one single place for operational and exploratory analytics. Currently there are two versions of ADLS, Gen 1 and Gen 2, with the latest still being in private preview.Azure Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform that allow us to create streamlined workflows and interactive workspaces that enables collaboration between data scientists, data engineers, and business analysts.For the serving layer, we could use Azure Data Warehouse, a cloud-based Enterprise Data Warehouse (EDW) that leverages Massively Parallel Processing (MPP) to quickly run complex queries across petabytes of data.With this architecture, the events are consumed by the Azure Streaming Analytics and landed in ADLS in flat files, that can be partitioned by hour. Once the processing of the file is completed, we can create a batch process via Azure Databricks and store the data in the Azure SQL Data Warehouse. To obtain the data that was not captured by the batch process, we can use Polybase to query the file being updated and then create a view to union both tables. Every time that view is queried, the polybase table will get the latest streamed data, meaning we have a real time query with the capability to obtain the most recent data.The major problem of the Lambda architecture is that we have to build two separate pipelines, which can be very complex, and, ultimately, difficult to combine the processing of batch and real-time data, however, it is now possible to overcome such limitation if we have the possibility to change our approach. Databricks Delta delivers a powerful transactional storage layer by harnessing the power of Apache Spark and Databricks File System (DBFS). It is a single data management tool that combines the scale of a data lake, the reliability and performance of a data warehouse, and the low latency of streaming in a single system. The core abstraction of Databricks Delta is an optimized Spark table that stores data as parquet files in DBFS and maintains a transaction log that tracks changes to the table.From a high-level perspective, the Databricks Delta architecture can be described as followed.An Azure Databricks Delta Raw table stores the data that is either produced by streaming sources or is stored in data lakes. Query tables contains the normalized data from the Raw tables. Summary tables, often used as the source for the presentation layer, contains the aggregated key business metrics that are frequently queried. This unified approach means that there are less complexity due to the removal of storage systems and data management steps, and, more importantly, output queries can be performed on streaming and historical data at the same time.In the next steps, I’ll demonstrate how to implement the Databricks Delta architecture using a python notebook.#If Databricks delta is not enabled in the cluster, run this cell spark.sql("set")#Define variables basePath = "/kafka" taxiRidesRawPath = basePath + "/" taxiRidesQueryPath = basePath + "/" taxiFaresQueryPath = basePath + "/" taxiSummaryPath = basePath + "/" checkpointPath = basePath + "/checkpoints"#Load the Kafka stream data to a DataFrame kafkaDF = (spark   .readStream   .option("kafka.bootstrap.servers", "")   .option("subscribe", "taxirides")   .option("startingOffsets", "earliest")   .option("checkpointLocation", "/taxinyc/kafka.checkpoint")   .format("kafka")   .load() )#Kafka transmits information using a key, value, and metadata such as topic and partition. The information we're interested in is the value column. Since this is a binary value, we must first cast it to a StringType and then split the columns. #Stream into the Raw Databricks Delta directory. By using a checkpoint location, the metadata on which data has already been processed will be maintained so the cluster can be shut down without a loss of information. from pyspark.sql.types import StructType, StructField,LongType,TimestampType,StringType,FloatType,IntegerType from pyspark.sql.functions import col, split (kafkaDF  .select(split(col("value").cast(StringType()),",").alias("message"))  .writeStream  .format("delta")  .option("checkpointLocation", checkpointPath + "/taxiRidesRaw")  .outputMode("append")  .start(taxiRidesRawPath) )#Create and populate the raw delta table. Data is stored in a single column as an array Eg. ["6","START","2013-01-01 00:00:00","1970-01-01 00:00:00","-73.866135","40.771091","-73.961334","40.764912","6","2013000006","2013000006"] spark.sql("DROP TABLE IF EXISTS TaxiRidesRaw")           spark.sql("""   CREATE TABLE TaxiRidesRaw   USING Delta   LOCATION '{}' """.format(taxiRidesRawPath))#Stream into the Query Databricks delta directory. (spark.readStream  .format("delta")  .load(str(taxiRidesRawPath))  .select(col("message")[0].cast(IntegerType()).alias("rideId"),    col("message")[1].cast(StringType()).alias("rideStatus"),    col("message")[2].cast(TimestampType()).alias("rideEndTime"),    col("message")[3].cast(TimestampType()).alias("rideStartTime"),    col("message")[4].cast(FloatType()).alias("startLong"),    col("message")[5].cast(FloatType()).alias("startLat"),    col("message")[6].cast(FloatType()).alias("endLong"),    col("message")[7].cast(FloatType()).alias("endLat"),    col("message")[8].cast(IntegerType()).alias("passengerCount"),    col("message")[9].cast(IntegerType()).alias("taxiId"),    col("message")[10].cast(IntegerType()).alias("driverId"))  .filter("rideStartTime <> '1970-01-01T00:00:00.000+0000'")  .writeStream  .format("delta")  .outputMode("append")  .option("checkpointLocation", checkpointPath + "/taxiRidesQuery")  .start(taxiRidesQueryPath) )#Create and populate the quer delta table. Data is no longer in a single column spark.sql("DROP TABLE IF EXISTS TaxiRidesQuery")           spark.sql("""   CREATE TABLE TaxiRidesQuery   USING Delta   LOCATION '{}' """.format(taxiRidesQueryPath))#Load the data to a DataFrame. The parquet files are stored in a blob storage taxiFaresDF = (                .parquet("/mnt/geospatial/kafka/NYC")                .write                .format("delta")                .mode("append")                .save(taxiFaresQueryPath)               )#Create and populate the query delta table spark.sql("DROP TABLE IF EXISTS TaxiFaresQuery")           spark.sql("""   CREATE TABLE TaxiFaresQuery   USING Delta   LOCATION '{}' """.format(taxiFaresQueryPath))#Load the data to a DataFrame taxiRidesDF = (spark                .readStream                .format("delta")                .load(str(taxiRidesQueryPath))               )#Load the data to a DataFrame taxiFaresDF = (spark                .read                .format("delta")                .load(str(taxiFaresQueryPath))               )#Join the steaming data and the batch data. Group by Date and Taxi Driver to obtain the number of rides per day from pyspark.sql.functions import date_format, col, sum RidesDf = (taxiRidesDF.join(taxiFaresDF, (taxiRidesDF.taxiId == taxiFaresDF.taxiId) & (taxiRidesDF.driverId == taxiFaresDF.driverId))            .withColumn("date", date_format(taxiRidesDF.rideStartTime, "yyyyMMdd"))            .groupBy(col("date"),taxiRidesDF.driverId)            .count()            .withColumnRenamed("count","RidesPerDay")            .writeStream            .format("delta")            .outputMode("complete")            .option("checkpointLocation", checkpointPath + "taxiSummary")            .start(taxiSummaryPath) )#Create and populate the summary delta table spark.sql("DROP TABLE IF EXISTS TaxiSummary")           spark.sql("""   CREATE TABLE TaxiSummary   USING Delta   LOCATION '{}' """.format(taxiSummaryPath))As always, if you have any questions or comments, do let me know.

Geospatial analysis in Azure Databricks – Part II

After my last post on running geospatial analysis in Azure Databricks with Magellan (here) I decided to investigate which other libraries were available and discover if they performed better or worse. The first library I investigated was GeoMesa. an Apache licensed open source suite of tools that enables large-scale geospatial analytics on cloud and distributed computing systems, letting you manage and analyze the huge spatio-temporal datasets. GeoMesa does this by providing spatio-temporal data persistence on top of the Accumulo, HBase, and Cassandra distributed column-oriented databases for massive storage of point, line, and polygon data. It allows rapid access to this data via queries that take full advantage of geographical properties to specify distance and area. GeoMesa also provides support for near real time stream processing of spatio-temporal data by layering spatial semantics on top of the Apache Kafka messaging system (further details here).Although their website is rich in documentation, I immediately stumbled in the most basic operation, read a GeoJSON file with the geomesa format. The reason behind this is because, in their tutorials, they assume Apache Accumulo, a distributed key/value store, is used as the backing data store. Because I wanted to make sure I could ingest data from either Azure Blob Storage or Azure Data Lake Storage, I decided to not use their recommendation. As such, after many hours of failed attempts, I decided to abandon the idea of using GeoMesa.My next option was GeoSpark, a cluster computing system for processing large-scale spatial data. GeoSpark extends Apache Spark / SparkSQL with a set of out-of-the-box Spatial Resilient Distributed Datasets (SRDDs)/ SpatialSQL that efficiently load, process, and analyze large-scale spatial data across machines (further details here ). GeoSpark immediately impresses with the possibility of either creating Spatial RDDs and run spatial queries using GeoSpark-core or create Spatial SQL/DataFrame to manage spatial data using GeoSparkSQL. Their website contains tutorials that are easy to follow and offers the possibility to chat with the community on gitter.In the spirit of trying to keep my approach as simple as possible, I decided to compare Magellan with GeoSparkSQL, since SparkSQL is easier to use and working with RDDs can be a complex task, however, it is important to highlight that their recommendation is to use GeoSpark core rather than GeoSparkSQL. The reason for this is because SparkSQL has some limitations, such as not supporting clustered indices, making it difficult to get it exposed to all GeoSpark core features.The data used in the following test cases was based on the NYC Taxicab datasets to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.The table below details the version of the libraries and clusters configuration. There are a couple of points to notice: Magellan does not support Apache Spark 2.3+.The Magellan library 1.0.6 is about to be released this month and should cover some of the limitations identified below.The GeoSpark library 1.2.0 is currently available in SNAPSHOT and will hopefully fix the load of multiline GeoJSON files. Library Version Runtime Version Cluster Specification Magellan 1.0.5 3.5 LTS (includes Apache Spark 2.2.1, Scala 2.11) Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled GeoSpark/ GeoSparkSQL 1.1.3 / 2.3-1.1.3 4.3 (includes Apache Spark 2.3.1, Scala 2.11) Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabledTo test the performance of both libraries, I implemented a set of queries and ran them 3 times, registering how long it took on each run. The best results are highlighted in green.DS1 - NYC Neighbourhoods dataset containing the polygonsDS2 – NYC Taxicab dataset containing the geometry points for the month of January 2015DS3 – NYC Taxicab dataset containing the geometry points for the year of 2015Test NumberDescriptionNumber of ResultsMagellan (avg in sec)GeoSparkSQL (avg in sec)1Select all rows from DS13100.860.692Select all rows from DS212.748.98619.8215.643Select all rows from DS1 where borough is Manhattan372.220.694Select all rows from DS2 where total amount is bigger than 202.111.70718.7117.235Select 100 rows from DS1 ordered by the distance between one point and all polygons100N/A*0.86Select all rows from DS1 where a single point is within all polygons11.630.687Select all rows from DS1 where one point with buffer 0.1 intersects all polygons73N/A*0.808Join DS1 and DS2 and select all rows where polygons contains points12.492.67829.171573.8 (~26min)9Join DS1 and DS2 and select all rows where points are within polygons12.492.67829.311518 (~25min)10Select all rows from DS3146.113.001187.8155.411Select all rows from DS3 where total amount is bigger than 2029.333.13094.8119.412**Join DS1 and DS3 and select all rows where points are within polygons143.664.028168N/A** Although the following link mentions Magellan can perform Distance and Buffer operations, I couldn’t find documentation demonstrating how to perform them, or, in the cases I tried, Azure Databricks threw an error indicating the class was not available.** Considering the time it took to run queries 8/9 using DS2 (~1.8GB), I decided to not test the performance against DS3 (~21.3GB), since I already knew the results were not going to be positive.From the tests above, we can see that GeoSparkSQL is generally better when not performing joins with spatial ranges, where the performance drastically decreases when compared with Magellan. On the other hand, Magellan is still an ongoing project and seems to be lacking some of the basic operations that might be of big importance for some analysis, however, it clearly excels when we need to run spatial analysis in joined datasets.Based on my experience using the libraries and the tests conducted in this blog, my recommendation would be to use Magellan, since even when GeoSparkSQL was better, the performance gains were not that significant, however, as already referred, Magellan might not be an option if the requirements involve operations that are not yet available, such as distances or buffers Following is the implementation of the tests using GeoSparkSQL.//Import Libraries and config session import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistratorimport org.datasyslab.geosparksql.utils.Adapterimport org.datasyslab.geosparksql.UDF.UdfRegistratorimport org.datasyslab.geosparksql.UDT.UdtRegistrator import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.geosparksql.strategy.join.JoinQueryDetectorimport org.apache.spark.sql.Rowimport org.apache.spark.sql.DataFrameimport org.apache.spark.sql.functions._import org.apache.spark.sql.types._//Initiate Spark Session var sparkSession = SparkSession.builder()                     .appName("NYCTaxis")                     // Enable GeoSpark custom Kryo serializer                     .config("spark.serializer", classOf[KryoSerializer].getName)                     .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)                     .getOrCreate() //Register GeoSparkSQL GeoSparkSQLRegistrator.registerAll(sparkSession)//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset. var trips =             .format("com.databricks.spark.csv")             .option("header", "true")             .schema(schema)             .load("/mnt/geospatial/nyctaxis/*") trips.createOrReplaceTempView("tripstable")//Read GeoJSON file var polygonJsonDF =                     .option("multiline", "true")                     .json("/mnt/geospatial/neighborhoods/neighborhoods.geojson") //GeoSparkSQL can't read multiline GeoJSON files. This workaround will only work if the file only contains one geometry type (eg. polygons)  val polygons = polygonJsonDF                 .select(explode(col("features")).as("feature"))                 .withColumn("polygon", callUDF("ST_GeomFromGeoJson", to_json(col("feature"))))                 .select($"polygon", $"", $"", $"") polygons.createOrReplaceTempView("polygontable")//Test 1 var polygonAll = sparkSession.sql(         """           | SELECT *           | FROM polygontable         """) polygonAll.count()//Test 2 var tripsAll = sparkSession.sql(         """           | SELECT *           | FROM tripstable         """) tripsAll.count()//Test 3 var polygonWhere = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE borough = 'Manhattan'         """) polygonWhere.count()//Test 4 var tripsWhere = sparkSession.sql(         """           | SELECT *           | FROM tripstable           | WHERE total_amount > 20         """) tripsWhere.count()//Test 5 var polygonGeomDistance = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | ORDER BY ST_Distance(polygon, ST_PointFromText('-74.00672149658203, 40.73177719116211', ','))           | LIMIT 100         """) polygonGeomDistance.count()//Test 6 var polygonGeomWithin = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE ST_Within(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','), polygon)         """) //Test 7 var polygonGeomInterset = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE ST_Intersects(ST_Circle(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','),0.1), polygon)         """) polygonGeomInterset.count() //Test 8 var polygonContainsJoin = sparkSession.sql(         """           | SELECT *           | FROM polygontable, tripstable           | WHERE ST_Contains(polygontable.polygon, ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))))         """) polygonContainsJoin.count() //Test 9 var polygonWithinJoin = sparkSession.sql(         """           | SELECT *           | FROM polygontable, tripstable           | WHERE ST_Within(ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))), polygontable.polygon)         """) polygonWithinJoin.count() Following is the implementation of the tests using Magellan.//Import Libraries import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._ import org.apache.spark.sql.Row import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset and create a Magellan point val trips =       .format("com.databricks.spark.csv")       .option("mode", "DROPMALFORMED")       .schema(schema)       .load("/mnt/geospatial/nyctaxis/*")       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))//Read GeoJSON file and define index precision val neighborhoods =       .format("magellan")       .option("type", "geojson")       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")       .select($"polygon",               $"metadata"("borough").as("borough"),              $"metadata"("boroughCode").as("boroughCode"),              $"metadata"("neighborhood").as("neighborhood"))       .index(30)//Test 1 magellan.Utils.injectRules(spark) neighborhoods.count()//Test 2 trips.count()//Test 3 neighborhoods.filter("borough == 'Manhattan'").count()//Test 4 trips.filter("total_amount > 20").count()//Test 6 val points = sc.parallelize(Seq((-74.00672149658203, 40.73177719116211))).toDF("x", "y").select(point($"x", $"y").as("point")) val polygons =neighborhoods.join(points) polygons.filter($"point" within $"polygon").count()//Test 8 trips.join(neighborhoods)         .where($"polygon" >? $"point")         .count()//Test 9 trips.join(neighborhoods)         .where($"point" within $"polygon")         .count()

Geospatial analysis with Azure Databricks

A few months ago, I wrote a blog demonstrating how to extract and analyse geospatial data in Azure Data Lake Analytics (ADLA) (here). The article aimed to prove that it was possible to run spatial analysis using U-SQL, even though it does not natively support spatial data analytics. The outcome of that experience was positive, however, with serious limitations in terms of performance. ADLA dynamically provisions resources and can perform analytics on terabytes to petabytes of data, however, because it must use the SQL Server Data Types and Spatial assemblies to perform spatial analysis, all the parallelism capabilities are suddenly limited. For example, if you are running an aggregation, ADLA will split the processing between multiple vertices, making it faster, however, when running intersections between points and polygons, because it is a SQL threaded operation, it will only use one vertex, and consequently, the job might take hours to complete. Since I last wrote my blog, the data analytics landscape has changed, and with that, new options became available, namely Azure Databricks. In this blog, I’ll demonstrate how to run spatial analysis and export the results to a mounted point using the Magellan library and Azure Databricks.Magellan is a distributed execution engine for geospatial analytics on big data. It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries (further details here).Although people mentioned in their GitHub page that the 1.0.5 Magellan library is available for Apache Spark 2.3+ clusters, I learned through a very difficult process that the only way to make it work in Azure Databricks is if you have an Apache Spark 2.2.1 cluster with Scala 2.11. The cluster I used for this experience consisted of a Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled. In terms of datasets, I used the NYC Taxicab dataset to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON dataset to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.As always, first we need to import the libraries.//Import Libraries import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._ import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._Next, we define the schema of the NYC Taxicab dataset and load the data to a DataFrame. While loading the data, we convert the pickup longitude and latitude into a Magellan Point. If there was a need, in the same operation we could also add another Magellan Point from the drop off longitude and latitude.//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset and create a Magellan point val trips =       .format("com.databricks.spark.csv")       .option("mode", "DROPMALFORMED")       .schema(schema)       .load("/mnt/geospatial/nyctaxis/*")       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))The next step is to load the neighbourhood data. As mentioned in their documentation, Magellan supports the reading of ESRI, GeoJSON, OSM-XML and WKT formats. From the GeoJSON dataset, Magellan will extract a collection of polygons and read the metadata into a Map. There are three things to notice in the code below. First, the extraction of the polygon, second, the selection of the key corresponding to the neighbourhood name and finally, the provision of a hint that defines what the index precision should be. This operation, alongside with the injection of a spatial join rule into Catalyst, massively increases the performance of the queries. To have a better understanding of this operation, read this excellent blog.//Read GeoJSON file and define index precision val neighborhoods =       .format("magellan")       .option("type", "geojson")       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")       .select($"polygon",         $"metadata"("neighborhood").as("neighborhood"))       .index(30)Now that we have our two datasets loaded, we can run our geospatial query, to identify in which neighbourhood the pickup points fall under. To achieve our goal, we need to join the two DataFrames and apply a within predicate. As a curiosity, if we consider that m represents the number of points (12.748.987), n the number of polygons (310) p the average # of edges per polygon (104) and O(mnp), then, we will roughly perform 4 trillion calculations on a single node to determine where each point falls. //Inject rules and join DataFrames with within predicate magellan.Utils.injectRules(spark) val intersected = trips.join(neighborhoods)         .where($"point" within $"polygon")The above code, does not take longer than 1 second to execute. It is only when we want to obtain details about our DataFrame that the computing time is visible. For example, if we want to know which state has the most pickups, we can write the following code which takes in average 40 seconds.//Neighbourhoods that received the most pickups display(intersected        .groupBy('neighborhood)       .count()       .orderBy($"count".desc))If we want to save the data and identify which pickups fall inside the NYC neighbourhoods, then we have to rewrite our intersected DataFrame to select all columns except the Magellan Points and Polygons, add a new column to the DataFrame and export the data back to the blob, as shown below.//select pickup points that don't fall inside a neighbourhood val nonIntersected = trips                       .select($"vendorId",$"pickup_datetime", $"dropoff_datetime", $"passenger_count", $"trip_distance", $"pickup_longitude", $"pickup_latitude", $"rateCodeId", $"store_fwd",$"dropoff_longitude", $"dropoff_latitude",$"payment_type",$"fare_amount", $"extra", $"mta_tax", $"tip_amount", $"tolls_amount", $"improvement_surcharge", $"total_amount")                       .except(intersected)//add new column intersected_flagval intersectedFlag = "1"val nonIntersectedFlag = "0"val tripsIntersected = intersected.withColumn("intersected_flag",expr(intersectedFlag)) val tripsNotIntersected = nonIntersected.withColumn("intersected_flag",expr(nonIntersectedFlag))//Union DataFramesval allTrips = tripsNotIntersected.union(tripsIntersected)//Save data to the blobintersected.write   .format("com.databricks.spark.csv")   .option("header", "true")   .save("/mnt/geospatial/trips/trips.csv")In summary, reading a dataset with 1.8GB, apply geospatial analysis and export it back to the blob storage only took in average 1 min, which is miles better when compared with my previous attempt with U-SQL.As always, if you have any comments or questions, do let me know.

Spark Streaming in Azure Databricks

Real-time stream processing is becoming more prevalent on modern day data platforms, and with a myriad of processing technologies out there, where do you begin? Stream processing involves the consumption of messages from either queue/files, doing some processing in the middle (querying, filtering, aggregation) and then forwarding the result to a sink – all with a minimal latency. This is in direct contrast to batch processing which usually occurs on an hourly or daily basis. Often is this the case, both of these will need to be combined to create a new data set. In terms of options for real-time stream processing on Azure you have the following: Azure Stream Analytics Spark Streaming / Storm on HDInsight Spark Streaming on Databricks Azure Functions Stream Analytics is a simple PaaS offering. It connects easily into other Azure resources such as Event Hubs, IoT Hub, and Blob, and outputs to a range of resources that you’d expect. It has its own intuitive query language, with the added benefit of letting you create functions in JavaScript. Scaling can be achieved by partitions, and it has windowing and late arrival event support that you’d expect from a processing option. For most jobs, this service will be the quickest/easiest to implement as long as its relatively small amount of limitations fall outside the bounds of what you want to achieve. Its also worth noting that the service does not currently support Azure network security such as Virtual Networks or IP Filtering. I suspect this may only be time with the Preview of this in EventHubs. Both Spark Streaming on HDInsight and Databricks open up the options for configurability and are possibly more suited to an enterprise level data platform, allowing us to use languages such as Scala/Python or even Java for the processing in the middle. The use of these options also allows us to integrate Kafka (an open source alternative to EventHubs) as well as HDFS, and Data Lake as inputs. Scalability is determined by the cluster sizes and the support for other events mentioned above is also included. These options also give us the flexibility for the future, and allow us to adapt moving forward depending on evolving technologies. They also come with the benefit of Azure network security support so we can peer our clusters onto a virtual network. Lastly – I wouldn’t personally use this but we can also use Functions to achieve the same goal through C#/Node.js. This route however does not include support for those temporal/windowing/late arrival events since functions are serverless and act on a per execution basis. In the following blog, I’ll be looking at Spark Streaming on Databricks (which is fast becoming my favourite research topic). A good place to start this is to understand the structured streaming model which I’ve seen a documented a few times now. Essentially treating the stream as an unbounded table, with new records from the stream being appended as a new rows to the table. This allows us to treat both batch and streaming data as tables in a DataFrame, therefore allowing similar queries to be run across them.     At this point, it will be useful to include some code to help explain the process. Before beginning its worth mounting your data sink to your databricks instance so you can reference it as if it were inside the DBFS (Databricks File System) – this is merely a pointer. For more info on this, refer to the databricks documentation here. Only create a mount point if you want all users in the workspace to have access. If you wish to apply security, you will need to access the store directly (also documented in the same place) and then apply permissions to the notebook accordingly. As my input for my stream was from EventHubs, we can start by defining the reading stream. You’ll firstly need to add the maven coordinate to add the EventHub library to the cluster to allow the connection. Further options can be added for the consumer group, starting positions (for partitioning), timeouts and events per trigger. Positions can also be used to define starting and ending points in time so that the stream is not running continuously. connectionString = "Endpoint=sb://{EVENTHUBNAMESPACE}{EVENTHUBNAME};EntityPath={EVENTHUBNAME};SharedAccessKeyName={ACCESSKEYNAME};SharedAccessKey={ACCESSKEY}" startingEventPosition = { "offset": "-1", # start of stream "seqNo": -1, # not in use "enqueuedTime": None, # not in use "isInclusive": True } endingEventPosition = { "offset": None, # not in use "seqNo": -1, # not in use "enqueuedTime":"%Y-%m-%dT%H:%M:%S.%fZ"), # point in time "isInclusive": True } ehConf = {} ehConf['eventhubs.connectionString'] = connectionString ehConf['eventhubs.startingPosition'] = json.dumps(startingEventPosition) ehConf['eventhubs.endingPosition'] = json.dumps(endingEventPosition) df = spark \ .readStream \ .format("eventhubs") \ .options(**ehConf) \ .load() The streaming data that is then output then follows the following schema – the body followed by a series of metadata about the streaming message.     Its important to note that the body comes out as a binary stream (this contains our message). We will need to cast the body to a String to deserialize the column to the JSON that we are expecting. This can be done by using some Spark SQL to turn the binary into a string as JSON and then parsing the column into a StructType with specified schema. If multiple records are coming through in the same message, you will need to explode out the result into separate records. Flattening out the nested columns is also useful as long as the data frame is still manageable. Spark SQL provides some great functions here to make our life easy. rawData = df. \ selectExpr("cast(body as string) as json"). \ select(from_json("json", Schema).alias("data")). \ select("data.*") While its entirely possible to construct your schema manually, its also worth noting that you can take a sample JSON, read it into a data frame using and then calling printSchema() on top of it to return the inferred schema. This can then used be used to create the StructType. # Inferred schema: # root # |-- LineTotal: string (nullable = true) # |-- OrderQty: string (nullable = true) # |-- ProductID: string (nullable = true) # |-- SalesOrderDetailID: string (nullable = true) # |-- SalesOrderID: string (nullable = true) # |-- UnitPrice: string (nullable = true) # |-- UnitPriceDiscount: string (nullable = true) Schema = StructType([ StructField('SalesOrderID', StringType(), False), StructField('SalesOrderDetailID', StringType(), False), StructField('OrderQty', StringType(), False), StructField('ProductID', StringType(), False), StructField('UnitPrice', StringType(), False), StructField('UnitPriceDiscount', StringType(), False), StructField('LineTotal', StringType(), False) ]) At this point, you have the data streaming into your data frame. To output to the console you can use display(rawData) to see the data visually. However this is only useful for debugging since the data is not actually going anywhere! To write the stream into somewhere such as data lake you would then use the following code. The checkpoint location can be used to recover from failures when the stream is interrupted, and this is important if this code were to make it to a production environment. Should a cluster fail, the query be restarted on a new cluster from a specific point and consistently recover, thus enabling exactly-once guarantees. This also means we can change the query as long as the input source and output schema are the same, and not directly interrupt the stream. Lastly, the trigger will check for new rows in to stream every 10 seconds. rawData.writeStream \ .format("json") \ .outputMode("append") \ .option("path", PATH) \ .trigger(processingTime = "10 seconds") \ .option("checkpointLocation", PATH) \ .start() Checking our data lake, you can now see the data has made its way over, broken up by the time intervals specified.     Hopefully this is useful for anyone getting going in the topic area. I’d advise to stick to Python given the extra capacity of the PySpark language over Scala, even though a lot of the Databricks documentation / tutorials uses Scala. This was just something that felt more comfortable. If you intend to do much in this area I would definitely suggest you use the PySpark SQL documentation which can be found here. This is pretty much a bible for all commands and I’ve been referencing it quite a bit. If this is not enough there is also a cheat sheet available here. Again, very useful for reference when the language is still not engrained.

Databricks – Cluster Sizing

IntroductionSetting up Clusters in Databricks presents you with a wrath of different options. Which cluster mode should I use? What driver type should I select? How many worker nodes should I be using? In this blog I will try to answer those questions and to give a little insight into how to setup a cluster which exactly meets your needs to allow you to save money and produce low running times. To do this I will first of all describe and explain the different options available, then we shall go through some experiments, before finally drawing some conclusions to give you a deeper understanding of how to effectively setup your cluster.Cluster TypesDatabricks has two different types of clusters: Interactive and Job. You can see these when you navigate to the Clusters homepage, all clusters are grouped under either Interactive or Job. When to use each one depends on your specific scenario. Interactive clusters are used to analyse data with notebooks, thus give you much more visibility and control. This should be used in the development phase of a project. Job clusters are used to run automated workloads using the UI or API. Jobs can be used to schedule Notebooks, they are recommended to be used in Production for most projects and that a new cluster is created for each run of each job. For the experiments we will go through in this blog we will use existing predefined interactive clusters so that we can fairly assess the performance of each configuration as opposed to start-up time.Cluster ModesWhen creating a cluster, you will notice that there are two types of cluster modes. Standard is the default and can be used with Python, R, Scala and SQL. The other cluster mode option is high concurrency. High concurrency provides resource utilisation, isolation for each notebook by creating a new environment for each one, security and sharing by multiple concurrently active users. Sharing is accomplished by pre-empting tasks to enforce fair sharing between different users. Pre-emption can be altered in a variety of different ways. To enable, you must be running Spark 2.2 above and add the following coloured underline lines to Spark Config, displayed in the image below. It should be noted high concurrency does not support Scala.Enabled – Self-explanatory, required to enable pre-emption.Threshold – Fair share fraction guaranteed. 1.0 will aggressively attempt to guarantee perfect sharing. 0.0 disables pre-emption. 0.5 is the default, at worse the user will get half of their fair share. Timeout – The amount of time that a user is starved before pre-emption starts. A lower value will cause more interactive response times, at the expense of cluster efficiency. Recommended to be between 1-100 seconds.Interval – How often the scheduler will check for pre-emption. This should be less than the timeout above.Driver Node and Worker NodesCluster nodes have a single driver node and multiple worker nodes. The driver and worker nodes can have different instance types, but by default they are the same. A driver node runs the main function and executes various parallel operations on the worker nodes. The worker nodes read and write from and to the data sources.When creating a cluster, you can either specify an exact number of workers required for the cluster or specify a minimum and maximum range and allow the number of workers to automatically be scaled. When auto scaling is enabled the number of total workers will sit between the min and max. If a cluster has pending tasks it scales up, once there are no pending tasks it scales back down again. This all happens whilst a load is running.PricingIf you’re going to be playing around with clusters, then it’s important you understand how the pricing works. Databricks uses something called Databricks Unit (DBU), which is a unit of processing capability per hour. Based upon different tiers, more information can be found here.You will be charged for your driver node and each worker node per hour.You can find out much more about pricing Databricks clusters by going to my colleague’s blog, which can be found here.ExperimentFor the experiments I wanted to use a medium and big dataset to make it a fair test. I started with the People10M dataset, with the intention of this being the larger dataset. I created some basic ETL to put it through its paces, so we could effectively compare different configurations. The ETL does the following: read in the data, pivot on the decade of birth, convert the salary to GBP and calculate the average, grouped by the gender. The People10M dataset wasn’t large enough for my liking, the ETL still ran in under 15 seconds. Therefore, I created a for loop to union the dataset to itself 4 times. Taking us from 10 million rows to 160 million rows. The code used can be found below:# Import relevant functions.import datetimefrom pyspark.sql.functions import year, floor# Read in the People10m table.people = spark.sql("select * from clusters.people10m ORDER BY ssn")# Explode the dataset.for i in xrange(0,4):     people = people.union(people)# Get decade from birthDate and convert salary to GBP.people = people.withColumn('decade', floor(year("birthDate")/10)*10).withColumn('salaryGBP', floor(people.salary.cast("float") * 0.753321205))# Pivot the decade of birth and sum the salary whilst applying a currency conversion.people = people.groupBy("gender").pivot("decade").sum("salaryGBP").show()To push it through its paces further and to test parallelism I used threading to run the above ETL 5 times, this brought the running time to over 5 minutes, perfect! The following code was used to carry out orchestration:from multiprocessing.pool import ThreadPoolpool = ThreadPool(10)     lambda path:          "/Users/ Sizing/PeopleETL160M",          timeout_seconds = 1200),     ["","","","",""])To be able to test the different options available to us I created 5 different cluster configurations. For each of them the Databricks runtime version was 4.3 (includes Apache Spark 2.3.1, Scala 2.11) and Python v2.Default – This was the default cluster configuration at the time of writing, which is a worker type of Standard_DS3_v2 (14 GB memory, 4 cores), driver node the same as the workers and autoscaling enabled with a range of 2 to 8. Total available is 112 GB memory and 32 cores.Auto scale (large range) – This is identical to the default but with autoscaling range of 2 to 14. Therefore total available is 182 GB memory and 56 cores. I included this to try and understand just how effective the autoscaling is. Static (few powerful workers) – The worker type is Standard_DS5_v2 (56 GB memory, 16 cores), driver node the same as the workers and just 2 worker nodes. Total available is 112 GB memory and 32 cores.Static (many workers new) – The same as the default, except there are 8 workers. Total available is 112 GB memory and 32 cores, which is identical to the Static (few powerful workers) configuration above. Therefore, will allow us to understand if few powerful workers or many weaker workers is more effective.High Concurrency – A cluster mode of ‘High Concurrency’ is selected, unlike all the others which are ‘Standard’. This results in a worker type of Standard_DS13_v2 (56 GB memory, 8 cores), driver node is the same as the workers and autoscaling enabled with a range of 2 to 8. Total available is 448 GB memory and 64 cores. This cluster also has all of the Spark Config attributes specified earlier in the blog. Here we are trying to understand when to use High Concurrency instead of Standard cluster mode.The results can be seen below, measured in seconds, a new row for each different configuration described above and I did three different runs and calculated the average and standard deviation, the rank is based upon the average. Run 1 was always done in the morning, Run 2 in the afternoon and Run 3 in the evening, this was to try and make the tests fair and reduce the effects of other clusters running at the same time.Before we move onto the conclusions, I want to make one important point, different cluster configurations work better or worse depending on the dataset size, so don’t discredit the smaller dataset, when you are working with smaller datasets you can’t apply what you know about the larger datasets.Comparing the default to the auto scale (large range) shows that when using a large dataset allowing for more worker nodes really does make a positive difference. With just 1 million rows the difference is negligible, but with 160 million on average it is 65% quicker.Comparing the two static configurations: few powerful worker nodes versus many less powerful worker nodes yielded some interesting results. Remember, both have identical memory and cores. With the small data set, few powerful worker nodes resulted in quicker times, the quickest of all configurations in fact. When looking at the larger dataset the opposite is true, having more, less powerful workers is quicker. Whilst this is a fair observation to make, it should be noted that the static configurations do have an advantage with these relatively short loading times as the autoscaling does take time.The final observation I’d like to make is High Concurrency configuration, it is the only configuration to perform quicker for the larger dataset. By quite a significant difference it is the slowest with the smaller dataset. With the largest dataset it is the second quickest, only losing out, I suspect, to the autoscaling. High concurrency isolates each notebook, thus enforcing true parallelism. Why the large dataset performs quicker than the smaller dataset requires further investigation and experiments, but it certainly is useful to know that with large datasets where time of execution is important that High Concurrency can make a good positive impact.To conclude, I’d like to point out the default configuration is almost the slowest in both dataset sizes, hence it is worth spending time contemplating which cluster configurations could impact your solution, because choosing the correct ones will make runtimes significantly quicker.

Getting Started with Databricks Cluster Pricing

The use of databricks for data engineering or data analytics workloads is becoming more prevalent as the platform grows, and has made its way into most of our recent modern data architecture proposals – whether that be PaaS warehouses, or data science platforms. To run any type of workload on the platform, you will need to setup a cluster to do the processing for you. While the Azure-based platform has made this relatively simple for development purposes, i.e. give it a name, select a runtime, select the type of VMs you want and away you go – for production workloads, a bit more thought needs to go into the configuration/cost.  In the following blog I’ll start by looking at the pricing in a bit more detail which will aim to provide a cost element to the cluster configuration process. For arguments sake, the work that we tend to deliver with databricks is based on data engineering usage – spinning up resource for an allocated period to perform a task. Therefore this is generally the focus for the following topic.   To get started in this area, I think it would be useful to included some definitions. DBU – a databricks unit (unit of processing capability per hour billed on per minute usage) Data Engineering Workload - a job that both starts and terminates the cluster which it runs on (via the job scheduler) Data Analytics Workload – a non automated workload, for example running a command manually within a databricks notebook. Multiple users can share the cluster to perform interactive analysis Cluster – made up of instances of processing (VMs) and constitute of a driver, and workers. Workers can either be provisioned upfront, or autoscaled between a min no. workers / max no. workers. Tier – either standard or premium. Premium includes role based access control, ODBC endpoint authentication, audit logs, Databricks Delta (unified data management system). The billing for the clusters primarily works depending on the type of workload you initiate and tier (or functionality) you require. As you might of guessed data engineering workloads on the standard tier offer the best price. I’ve taken the DS3 v2 instance (VM) pricing from the Azure Databricks pricing page.   The pricing can be broken down as follows: Each instance is charged at £0.262/hour. So for example, the cost of a very simple cluster - 1 driver and 2 workers is £0.262/hour x 3 = £0.786/hour. The VM cost does not depend on the workload type/tier. The DBU cost is then calculated at £0.196/hour. So for example, the cost of 3 nodes (as above) is £0.196/hour x 3 = £0.588/hour. This cost does change depending on workload type/tier. The total cost is then £0.786/hour (VM Cost) + £0.588/hour (DBU Cost) = £1.374/hour. Also known as the pay as you go price. Discounts are then added accordingly for reserved processing power. I thought this was worth simplifying since the pricing page doesn’t make this abundantly clear with the way the table is laid out and often this is overlooked. Due to the vast amount of options you can have to setup clusters, its worth understanding this element to balance against time. The DBU count is merely to be used as reference to compare the different VMs processing power and is not directly included in the calculations. Its also worth mentioning that by default databricks services are setup as premium and can be downgraded to standard only by contacting support. In some cases, this can add some massive cost savings depending upon the type of work you are doing on the platform so please take this into account before spinning up clusters and don’t just go with the default. With regards to configuration, clusters can either be setup under a High Concurrency mode (previously known as serverless) or as Standard. The high concurrency mode is optimised for concurrent workloads and therefore is more applicable to data analytics workloads and interactive notebooks which are used by multiple users simultaneously. This piece of configuration does not effect the pricing. By using the following cost model, we can then assume for a basic batch ETL run where we have a driver and 8 worker nodes on relatively small DS3 instances, would cost £123.60/month given a standard 1 hour daily ETL window. Hopefully this provides as a very simple introduction into the pricing model used by Databricks.

Databricks UDF Performance Comparisons

I’ve recently been spending quite a bit of time on the Azure Databricks platform, and while learning decided it was worth using it to experiment with some common data warehousing tasks in the form of data cleansing. As Databricks provides us with a platform to run a Spark environment on, it offers options to use cross-platform APIs that allow us to write code in Scala, Python, R, and SQL within the same notebook. As with most things in life, not everything is equal and there are potential differences in performance between them. In this blog, I will explain the tests I produced with the aim of outlining best practice for Databricks implementations for UDFs of this nature. Scala is the native language for Spark – and without going into too much detail here, it will compile down faster to the JVM for processing. Under the hood, Python on the other hand provides a wrapper around the code but in reality is a Scala program telling the cluster what to do, and being transformed by Scala code. Converting these objects into a form Python can read is called serialisation / deserialisation, and its expensive, especially over time and across a distributed dataset. This most expensive scenario occurs through UDFs (functions) – the runtime process for which can be seen below. The overhead here is in (4) and (5) to read the data and write into JVM memory. Using Scala to create the UDFs, the execution process can skip these steps and keep everything native. Scala UDFs operate within the JVM of the executor so we can skip serialisation and deserialisation.   Experiments As part of my data for this task I took a list of company names from a data set and then run them through a process to codify them, essentially stripping out characters which cause them to be unique and converting them to upper case, thus grouping a set of companies together under the same name. For instance Adatis, Adatis Ltd, and Adatis (Ltd) would become ADATIS. This was an example of a typical cleansing activity when working with data sets. The dataset in question was around 2.5GB and contained 10.5m rows. The cluster I used was Databricks runtime 4.2 (Spark 2.3.1 / Scala 2.11) with Standard_DS2_v2 VMs for the driver/worker nodes (14GB memory) with autoscaling disabled and limited to 2 workers. I disabled the autoscaling for this as I was seeing wildly inconsistent timings each run which impacted the tests. The goods news is that with it enabled and using up to 8 workers, the timings were about 20% faster albeit more erratic from a standard deviation point of view. The following approaches were tested: Scala program calls Scala UDF via Function Scala program calls Scala UDF via SQL Python program calls Scala UDF via SQL Python program calls Python UDF via Function Python program calls Python Vectorised UDF via Function Python program uses SQL While it was true in previous versions of Spark that there was a difference between these using Scala/Python, in the latest version of Spark (2.3) it is believed to be more of a level playing field by using Apache Arrow in the form of Vectorised Pandas UDFs within Python. As part of the tests I also wanted to use Python to call a Scala UDF via a function but unfortunately we cannot do this without creating a Jar file of the Scala code and importing it separately. This would be done via SBT (build tool) using the following guide here. I considered this too much of an overhead for the purposes of the experiment. The following code was then used as part of a Databricks notebook to define the tests. A custom function to time the write was required for Scala whereas Python allows us to use %timeit for a similar purpose.   Scala program calls Scala UDF via Function // Scala program calls Scala UDF via Function %scala def codifyScalaUdf = udf((string: String) => string.toUpperCase.replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP","")) spark.udf.register("ScalaUdf", codifyScalaUdf) val transformedScalaDf = table("DataTable").select(codifyScalaUdf($"CompanyName").alias("CompanyName")) val ssfTime = timeIt(transformedScalaDf.write.mode("overwrite").format("parquet").saveAsTable("SSF"))   Scala program calls Scala UDF via SQL // Scala program calls Scala UDF via SQL %scala val sss = spark.sql("SELECT ScalaUdf(CompanyName) as a from DataTable where CompanyName is not null") val sssTime = timeIt(sss.write.mode("overwrite").format("parquet").saveAsTable("SSS"))   Python program calls Scala UDF via SQL # Python program calls Scala UDF via SQL pss = spark.sql("SELECT ScalaUdf(CompanyName) as a from DataTable where CompanyName is not null") %timeit -r 1 pss.write.format("parquet").saveAsTable("PSS", mode='overwrite')   Python program calls Python UDF via Function # Python program calls Python UDF via Function from pyspark.sql.functions import * from pyspark.sql.types import StringType @udf(StringType()) def pythonCodifyUDF(string): return (string.upper().replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP","")) pyDF ="CompanyName")).alias("CompanyName")).filter(col("CompanyName").isNotNull()) %timeit -r 1 pyDF.write.format("parquet").saveAsTable("PPF", mode='overwrite')   Python program calls Python Vectorised UDF via Function # Python program calls Python Vectorised UDF via Function from pyspark.sql.types import StringType from pyspark.sql.functions import pandas_udf, col @pandas_udf(returnType=StringType()) def pythonCodifyVecUDF(string): return (string.replace(" ", "").replace("#","").replace(";","").replace("&","").replace(" AND ","").replace(" THE ","").replace("LTD","").replace("LIMITED","").replace("PLC","").replace(".","").replace(",","").replace("[","").replace("]","").replace("LLP","").replace("INC","").replace("CORP","")).str.upper() pyVecDF ="CompanyName")).alias("CompanyName")).filter(col("CompanyName").isNotNull()) %timeit -r 1 pyVecDF.write.format("parquet").saveAsTable("PVF", mode='overwrite')   Python Program uses SQL # Python Program uses SQL sql = spark.sql("SELECT upper(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(CompanyName,' ',''),'&',''),';',''),'#',''),' AND ',''),' THE ',''),'LTD',''),'LIMITED',''),'PLC',''),'.',''),',',''),'[',''),']',''),'LLP',''),'INC',''),'CORP','')) as a from DataTable where CompanyName is not null") %timeit -r 1 sql.write.format("parquet").saveAsTable("SQL", mode='overwrite')   Results and Observations It was interesting to note the following: The hypothesis above does indeed hold true and the 2 methods which were expected to be slowest were within the experiment, and by a considerable margin. The Scala UDF performs consistently regardless of the method used to call the UDF. The Python vectorised UDF now performs on par with the Scala UDFs and there is a clear difference between the vectorised and non-vectorised Python UDFs. The standard deviation for the vectorised UDF was surprisingly low and the method was performing consistently each run. The non-vectorised Python UDF was the opposite. To summarise, moving forward – as long as you adopt to writing your UDFs in Scala or use the vectorised version of the Python UDF, the performance will be similar for this type of activity. Its worth noting to definitely avoid writing the UDFs as standard Python functions due to the theory and results above. Over time, across a complete solution and with more data, this time would add up.

Embedding Databricks Notebooks into a BlogEngine.NET post

Databricks is a buzzword now. This means that each day more and more related content appears on the net. With Databricks Notebooks it’s so easy to share code. If by any chance you need to share a notebook directly in your blog post here are some short guidelines on how to do so. If your favourite blog engine appears to be BlogEngine.NET it’s not so straightforward task. Fear not – following are the steps you need to take: Export your notebook to HTML from the Databricks portal: Make the following text replacements in the exported HTML: replace & with &amp; and “ with &quot; Paste the following code in the blog’s source replacing both the [[HEIGHT]] placeholders with the notebook’s height in pixels (you may need a little trial and error to get to the exact value so that the vertical scrollers disappear) and [[NOTEBOOK_HTML]] placeholder with the resulting HTML from the above point: <iframe height="[[HEIGHT]]px" frameborder="0" scrolling="no" width="100%" style="width: 100%; height: [[HEIGHT]]px;" sandbox="allow-forms allow-pointer-lock allow-popups allow-presentation allow-same-origin allow-scripts allow-top-navigation" srcdoc="[[NOTEBOOK_HTML]]"></iframe> To make everything compatible with Internet Explorer and Edge, at the very end of your blog script place between <script></script> tags the wonderful srcdoc-polyfill script Here is an example notebook with the above steps applied: Blog2 - Databricks window.settings = {"enableUsageDeliveryConfiguration":false,"enableNotebookNotifications":true,"enableSshKeyUI":false,"defaultInteractivePricePerDBU":0.55,"enableDynamicAutoCompleteResourceLoading":false,"enableClusterMetricsUI":false,"allowWhitelistedIframeDomains":true,"enableOnDemandClusterType":true,"enableAutoCompleteAsYouType":[],"devTierName":"Community Edition","enableJobsPrefetching":true,"workspaceFeaturedLinks":[{"linkURI":"","displayName":"Documentation","icon":"question"},{"linkURI":"","displayName":"Release Notes","icon":"code"},{"linkURI":"","displayName":"Training & Tutorials","icon":"graduation-cap"}],"enableReservoirTableUI":true,"enableClearStateFeature":true,"dbcForumURL":"","enableProtoClusterInfoDeltaPublisher":true,"enableAttachExistingCluster":true,"sandboxForSandboxFrame":"allow-scripts allow-popups allow-popups-to-escape-sandbox allow-forms","resetJobListOnConnect":true,"serverlessDefaultSparkVersion":"latest-stable-scala2.11","maxCustomTags":8,"serverlessDefaultMaxWorkers":8,"enableInstanceProfilesUIInJobs":true,"nodeInfo":{"node_types":[{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":7284,"instance_type_id":"Standard_DS3_v2","node_type_id":"Standard_DS3_v2","description":"Standard_DS3_v2","support_cluster_tags":true,"container_memory_mb":9105,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":342},"node_instance_type":{"instance_type_id":"Standard_DS3_v2","provider":"Azure","local_disk_size_gb":28,"supports_accelerated_networking":true,"compute_units":4.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":14336,"num_cores":4,"cpu_quota_type":"Standard DSv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":14336,"is_hidden":false,"category":"General Purpose","num_cores":4.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":18409,"instance_type_id":"Standard_DS4_v2","node_type_id":"Standard_DS4_v2","description":"Standard_DS4_v2","support_cluster_tags":true,"container_memory_mb":23011,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":342},"node_instance_type":{"instance_type_id":"Standard_DS4_v2","provider":"Azure","local_disk_size_gb":56,"supports_accelerated_networking":true,"compute_units":8.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":28672,"num_cores":8,"cpu_quota_type":"Standard DSv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":28672,"is_hidden":false,"category":"General Purpose","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":40658,"instance_type_id":"Standard_DS5_v2","node_type_id":"Standard_DS5_v2","description":"Standard_DS5_v2","support_cluster_tags":true,"container_memory_mb":50823,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":342},"node_instance_type":{"instance_type_id":"Standard_DS5_v2","provider":"Azure","local_disk_size_gb":112,"supports_accelerated_networking":true,"compute_units":16.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":57344,"num_cores":16,"cpu_quota_type":"Standard DSv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":57344,"is_hidden":false,"category":"General Purpose","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":21587,"instance_type_id":"Standard_D8s_v3","node_type_id":"Standard_D8s_v3","description":"Standard_D8s_v3","support_cluster_tags":true,"container_memory_mb":26984,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":348},"node_instance_type":{"instance_type_id":"Standard_D8s_v3","provider":"Azure","local_disk_size_gb":64,"supports_accelerated_networking":true,"compute_units":8.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":32768,"num_cores":8,"cpu_quota_type":"Standard DSv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":32768,"is_hidden":false,"category":"General Purpose","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":47015,"instance_type_id":"Standard_D16s_v3","node_type_id":"Standard_D16s_v3","description":"Standard_D16s_v3","support_cluster_tags":true,"container_memory_mb":58769,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":348},"node_instance_type":{"instance_type_id":"Standard_D16s_v3","provider":"Azure","local_disk_size_gb":128,"supports_accelerated_networking":true,"compute_units":16.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":65536,"num_cores":16,"cpu_quota_type":"Standard DSv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":65536,"is_hidden":false,"category":"General Purpose","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":97871,"instance_type_id":"Standard_D32s_v3","node_type_id":"Standard_D32s_v3","description":"Standard_D32s_v3","support_cluster_tags":true,"container_memory_mb":122339,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":348},"node_instance_type":{"instance_type_id":"Standard_D32s_v3","provider":"Azure","local_disk_size_gb":256,"supports_accelerated_networking":true,"compute_units":32.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":131072,"num_cores":32,"cpu_quota_type":"Standard DSv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":131072,"is_hidden":false,"category":"General Purpose","num_cores":32.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":199583,"instance_type_id":"Standard_D64s_v3","node_type_id":"Standard_D64s_v3","description":"Standard_D64s_v3","support_cluster_tags":true,"container_memory_mb":249479,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":348},"node_instance_type":{"instance_type_id":"Standard_D64s_v3","provider":"Azure","local_disk_size_gb":512,"supports_accelerated_networking":true,"compute_units":64.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":262144,"num_cores":64,"cpu_quota_type":"Standard DSv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":262144,"is_hidden":false,"category":"General Purpose","num_cores":64.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":7284,"instance_type_id":"Standard_D3_v2","node_type_id":"Standard_D3_v2","description":"Standard_D3_v2","support_cluster_tags":true,"container_memory_mb":9105,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D3_v2","provider":"Azure","local_disk_size_gb":200,"supports_accelerated_networking":true,"compute_units":4.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":14336,"num_cores":4,"cpu_quota_type":"Standard Dv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":14336,"is_hidden":false,"category":"General Purpose (HDD)","num_cores":4.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":21587,"instance_type_id":"Standard_D8_v3","node_type_id":"Standard_D8_v3","description":"Standard_D8_v3","support_cluster_tags":true,"container_memory_mb":26984,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D8_v3","provider":"Azure","local_disk_size_gb":200,"supports_accelerated_networking":true,"compute_units":8.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":32768,"num_cores":8,"cpu_quota_type":"Standard Dv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":32768,"is_hidden":false,"category":"General Purpose (HDD)","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":47015,"instance_type_id":"Standard_D16_v3","node_type_id":"Standard_D16_v3","description":"Standard_D16_v3","support_cluster_tags":true,"container_memory_mb":58769,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D16_v3","provider":"Azure","local_disk_size_gb":400,"supports_accelerated_networking":true,"compute_units":16.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":65536,"num_cores":16,"cpu_quota_type":"Standard Dv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":65536,"is_hidden":false,"category":"General Purpose (HDD)","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":97871,"instance_type_id":"Standard_D32_v3","node_type_id":"Standard_D32_v3","description":"Standard_D32_v3","support_cluster_tags":true,"container_memory_mb":122339,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D32_v3","provider":"Azure","local_disk_size_gb":800,"supports_accelerated_networking":true,"compute_units":32.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":131072,"num_cores":32,"cpu_quota_type":"Standard Dv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":131072,"is_hidden":false,"category":"General Purpose (HDD)","num_cores":32.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":199583,"instance_type_id":"Standard_D64_v3","node_type_id":"Standard_D64_v3","description":"Standard_D64_v3","support_cluster_tags":true,"container_memory_mb":249479,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D64_v3","provider":"Azure","local_disk_size_gb":1600,"supports_accelerated_networking":true,"compute_units":64.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":262144,"num_cores":64,"cpu_quota_type":"Standard Dv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":262144,"is_hidden":false,"category":"General Purpose (HDD)","num_cores":64.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":18409,"instance_type_id":"Standard_D12_v2","node_type_id":"Standard_D12_v2","description":"Standard_D12_v2","support_cluster_tags":true,"container_memory_mb":23011,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D12_v2","provider":"Azure","local_disk_size_gb":200,"supports_accelerated_networking":true,"compute_units":4.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":28672,"num_cores":4,"cpu_quota_type":"Standard Dv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":28672,"is_hidden":false,"category":"Memory Optimized (Remote HDD)","num_cores":4.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":40658,"instance_type_id":"Standard_D13_v2","node_type_id":"Standard_D13_v2","description":"Standard_D13_v2","support_cluster_tags":true,"container_memory_mb":50823,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D13_v2","provider":"Azure","local_disk_size_gb":400,"supports_accelerated_networking":true,"compute_units":8.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":57344,"num_cores":8,"cpu_quota_type":"Standard Dv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":57344,"is_hidden":false,"category":"Memory Optimized (Remote HDD)","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":85157,"instance_type_id":"Standard_D14_v2","node_type_id":"Standard_D14_v2","description":"Standard_D14_v2","support_cluster_tags":true,"container_memory_mb":106447,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D14_v2","provider":"Azure","local_disk_size_gb":800,"supports_accelerated_networking":true,"compute_units":16.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":114688,"num_cores":16,"cpu_quota_type":"Standard Dv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":114688,"is_hidden":false,"category":"Memory Optimized (Remote HDD)","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":107407,"instance_type_id":"Standard_D15_v2","node_type_id":"Standard_D15_v2","description":"Standard_D15_v2","support_cluster_tags":true,"container_memory_mb":134259,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_D15_v2","provider":"Azure","local_disk_size_gb":1000,"supports_accelerated_networking":true,"compute_units":20.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":143360,"num_cores":20,"cpu_quota_type":"Standard Dv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":143360,"is_hidden":false,"category":"Memory Optimized (Remote HDD)","num_cores":20.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":18409,"instance_type_id":"Standard_DS12_v2","node_type_id":"Standard_DS12_v2","description":"Standard_DS12_v2","support_cluster_tags":true,"container_memory_mb":23011,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":342},"node_instance_type":{"instance_type_id":"Standard_DS12_v2","provider":"Azure","local_disk_size_gb":56,"supports_accelerated_networking":true,"compute_units":4.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":28672,"num_cores":4,"cpu_quota_type":"Standard DSv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":28672,"is_hidden":false,"category":"Memory Optimized","num_cores":4.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":40658,"instance_type_id":"Standard_DS13_v2","node_type_id":"Standard_DS13_v2","description":"Standard_DS13_v2","support_cluster_tags":true,"container_memory_mb":50823,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":342},"node_instance_type":{"instance_type_id":"Standard_DS13_v2","provider":"Azure","local_disk_size_gb":112,"supports_accelerated_networking":true,"compute_units":8.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":57344,"num_cores":8,"cpu_quota_type":"Standard DSv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":57344,"is_hidden":false,"category":"Memory Optimized","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":85157,"instance_type_id":"Standard_DS14_v2","node_type_id":"Standard_DS14_v2","description":"Standard_DS14_v2","support_cluster_tags":true,"container_memory_mb":106447,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":342},"node_instance_type":{"instance_type_id":"Standard_DS14_v2","provider":"Azure","local_disk_size_gb":224,"supports_accelerated_networking":true,"compute_units":16.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":114688,"num_cores":16,"cpu_quota_type":"Standard DSv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":114688,"is_hidden":false,"category":"Memory Optimized","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":107407,"instance_type_id":"Standard_DS15_v2","node_type_id":"Standard_DS15_v2","description":"Standard_DS15_v2","support_cluster_tags":true,"container_memory_mb":134259,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":342},"node_instance_type":{"instance_type_id":"Standard_DS15_v2","provider":"Azure","local_disk_size_gb":280,"supports_accelerated_networking":true,"compute_units":20.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":143360,"num_cores":20,"cpu_quota_type":"Standard DSv2 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":143360,"is_hidden":false,"category":"Memory Optimized","num_cores":20.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":47015,"instance_type_id":"Standard_E8s_v3","node_type_id":"Standard_E8s_v3","description":"Standard_E8s_v3","support_cluster_tags":true,"container_memory_mb":58769,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_E8s_v3","provider":"Azure","local_disk_size_gb":128,"supports_accelerated_networking":true,"compute_units":8.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":65536,"num_cores":8,"cpu_quota_type":"Standard ESv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":65536,"is_hidden":false,"category":"Memory Optimized","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":97871,"instance_type_id":"Standard_E16s_v3","node_type_id":"Standard_E16s_v3","description":"Standard_E16s_v3","support_cluster_tags":true,"container_memory_mb":122339,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_E16s_v3","provider":"Azure","local_disk_size_gb":256,"supports_accelerated_networking":true,"compute_units":16.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":131072,"num_cores":16,"cpu_quota_type":"Standard ESv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":131072,"is_hidden":false,"category":"Memory Optimized","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":199583,"instance_type_id":"Standard_E32s_v3","node_type_id":"Standard_E32s_v3","description":"Standard_E32s_v3","support_cluster_tags":true,"container_memory_mb":249479,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_E32s_v3","provider":"Azure","local_disk_size_gb":512,"supports_accelerated_networking":true,"compute_units":32.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":262144,"num_cores":32,"cpu_quota_type":"Standard ESv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":262144,"is_hidden":false,"category":"Memory Optimized","num_cores":32.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":21587,"instance_type_id":"Standard_L4s","node_type_id":"Standard_L4s","description":"Standard_L4s","support_cluster_tags":true,"container_memory_mb":26984,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_L4s","provider":"Azure","local_disk_size_gb":678,"supports_accelerated_networking":false,"compute_units":4.0,"number_of_ips":2,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":32768,"num_cores":4,"cpu_quota_type":"Standard LS Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":32768,"is_hidden":false,"category":"Storage Optimized","num_cores":4.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":47015,"instance_type_id":"Standard_L8s","node_type_id":"Standard_L8s","description":"Standard_L8s","support_cluster_tags":true,"container_memory_mb":58769,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_L8s","provider":"Azure","local_disk_size_gb":1388,"supports_accelerated_networking":false,"compute_units":8.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":65536,"num_cores":8,"cpu_quota_type":"Standard LS Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":65536,"is_hidden":false,"category":"Storage Optimized","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":97871,"instance_type_id":"Standard_L16s","node_type_id":"Standard_L16s","description":"Standard_L16s","support_cluster_tags":true,"container_memory_mb":122339,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_L16s","provider":"Azure","local_disk_size_gb":2807,"supports_accelerated_networking":false,"compute_units":16.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":131072,"num_cores":16,"cpu_quota_type":"Standard LS Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":131072,"is_hidden":false,"category":"Storage Optimized","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":199583,"instance_type_id":"Standard_L32s","node_type_id":"Standard_L32s","description":"Standard_L32s","support_cluster_tags":true,"container_memory_mb":249479,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_L32s","provider":"Azure","local_disk_size_gb":5630,"supports_accelerated_networking":false,"compute_units":32.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":262144,"num_cores":32,"cpu_quota_type":"Standard LS Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":262144,"is_hidden":false,"category":"Storage Optimized","num_cores":32.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":2516,"instance_type_id":"Standard_F4s","node_type_id":"Standard_F4s","description":"Standard_F4s","support_cluster_tags":true,"container_memory_mb":3146,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_F4s","provider":"Azure","local_disk_size_gb":16,"supports_accelerated_networking":true,"compute_units":4.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":8192,"num_cores":4,"cpu_quota_type":"Standard FS Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":16,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":8192,"is_hidden":false,"category":"Compute Optimized","num_cores":4.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":8873,"instance_type_id":"Standard_F8s","node_type_id":"Standard_F8s","description":"Standard_F8s","support_cluster_tags":true,"container_memory_mb":11092,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_F8s","provider":"Azure","local_disk_size_gb":32,"supports_accelerated_networking":true,"compute_units":8.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":16384,"num_cores":8,"cpu_quota_type":"Standard FS Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":16384,"is_hidden":false,"category":"Compute Optimized","num_cores":8.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":21587,"instance_type_id":"Standard_F16s","node_type_id":"Standard_F16s","description":"Standard_F16s","support_cluster_tags":true,"container_memory_mb":26984,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":350},"node_instance_type":{"instance_type_id":"Standard_F16s","provider":"Azure","local_disk_size_gb":64,"supports_accelerated_networking":true,"compute_units":16.0,"number_of_ips":16,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":32768,"num_cores":16,"cpu_quota_type":"Standard FS Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"},{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":32768,"is_hidden":false,"category":"Compute Optimized","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":0,"spark_heap_memory":85157,"instance_type_id":"Standard_H16","node_type_id":"Standard_H16","description":"Standard_H16","support_cluster_tags":true,"container_memory_mb":106447,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":8},"node_instance_type":{"instance_type_id":"Standard_H16","provider":"Azure","local_disk_size_gb":2000,"supports_accelerated_networking":false,"compute_units":16.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":0,"memory_mb":114688,"num_cores":16,"cpu_quota_type":"Standard H Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":114688,"is_hidden":false,"category":"Compute Optimized","num_cores":16.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":2,"spark_heap_memory":85157,"instance_type_id":"Standard_NC12","node_type_id":"Standard_NC12","description":"Standard_NC12 (beta)","support_cluster_tags":true,"container_memory_mb":106447,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":48},"node_instance_type":{"instance_type_id":"Standard_NC12","provider":"Azure","local_disk_size_gb":680,"supports_accelerated_networking":false,"compute_units":12.0,"number_of_ips":2,"local_disks":1,"reserved_compute_units":1.0,"gpus":2,"memory_mb":114688,"num_cores":12,"cpu_quota_type":"Standard NC Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":48,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":114688,"is_hidden":false,"category":"GPU Accelerated","num_cores":12.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":4,"spark_heap_memory":174155,"instance_type_id":"Standard_NC24","node_type_id":"Standard_NC24","description":"Standard_NC24 (beta)","support_cluster_tags":true,"container_memory_mb":217694,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":48},"node_instance_type":{"instance_type_id":"Standard_NC24","provider":"Azure","local_disk_size_gb":1440,"supports_accelerated_networking":false,"compute_units":24.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":4,"memory_mb":229376,"num_cores":24,"cpu_quota_type":"Standard NC Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":64,"supported_disk_types":[{"azure_disk_volume_type":"STANDARD_LRS"}],"reserved_memory_mb":4800},"memory_mb":229376,"is_hidden":false,"category":"GPU Accelerated","num_cores":24.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":1,"spark_heap_memory":85157,"instance_type_id":"Standard_NC6s_v3","node_type_id":"Standard_NC6s_v3","description":"Standard_NC6s_v3 (beta)","support_cluster_tags":true,"container_memory_mb":106447,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":0},"node_instance_type":{"instance_type_id":"Standard_NC6s_v3","provider":"Azure","local_disk_size_gb":736,"supports_accelerated_networking":false,"compute_units":6.0,"number_of_ips":4,"local_disks":1,"reserved_compute_units":1.0,"gpus":1,"memory_mb":114688,"num_cores":6,"cpu_quota_type":"Standard NCSv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":12,"supported_disk_types":[{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":114688,"is_hidden":false,"category":"GPU Accelerated","num_cores":6.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":2,"spark_heap_memory":174155,"instance_type_id":"Standard_NC12s_v3","node_type_id":"Standard_NC12s_v3","description":"Standard_NC12s_v3 (beta)","support_cluster_tags":true,"container_memory_mb":217694,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":0},"node_instance_type":{"instance_type_id":"Standard_NC12s_v3","provider":"Azure","local_disk_size_gb":1474,"supports_accelerated_networking":false,"compute_units":12.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":2,"memory_mb":229376,"num_cores":12,"cpu_quota_type":"Standard NCSv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":24,"supported_disk_types":[{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":229376,"is_hidden":false,"category":"GPU Accelerated","num_cores":12.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false},{"display_order":0,"support_ssh":true,"num_gpus":4,"spark_heap_memory":352151,"instance_type_id":"Standard_NC24s_v3","node_type_id":"Standard_NC24s_v3","description":"Standard_NC24s_v3 (beta)","support_cluster_tags":true,"container_memory_mb":440189,"node_info":{"status":["NotEnabledOnSubscription"],"available_core_quota":0},"node_instance_type":{"instance_type_id":"Standard_NC24s_v3","provider":"Azure","local_disk_size_gb":2948,"supports_accelerated_networking":false,"compute_units":24.0,"number_of_ips":8,"local_disks":1,"reserved_compute_units":1.0,"gpus":4,"memory_mb":458752,"num_cores":24,"cpu_quota_type":"Standard NCSv3 Family vCPUs","local_disk_type":"AHCI","max_attachable_disks":32,"supported_disk_types":[{"azure_disk_volume_type":"PREMIUM_LRS"}],"reserved_memory_mb":4800},"memory_mb":458752,"is_hidden":false,"category":"GPU Accelerated","num_cores":24.0,"is_io_cache_enabled":false,"support_port_forwarding":true,"support_ebs_volumes":true,"is_deprecated":false}],"default_node_type_id":"Standard_DS3_v2"},"enableDatabaseSupportClusterChoice":true,"enableClusterAcls":true,"notebookRevisionVisibilityHorizon":0,"serverlessClusterProductName":"Serverless Pool","showS3TableImportOption":false,"redirectBrowserOnWorkspaceSelection":true,"maxEbsVolumesPerInstance":10,"enableRStudioUI":false,"isAdmin":true,"deltaProcessingBatchSize":1000,"timerUpdateQueueLength":100,"sqlAclsEnabledMap":{"spark.databricks.acl.enabled":"true","spark.databricks.acl.sqlOnly":"true"},"enableLargeResultDownload":true,"maxElasticDiskCapacityGB":5000,"serverlessDefaultMinWorkers":2,"zoneInfos":[],"enableCustomSpotPricingUIByTier":true,"serverlessClustersEnabled":true,"enableWorkspaceBrowserSorting":true,"enableSentryLogging":false,"enableFindAndReplace":true,"disallowUrlImportExceptFromDocs":false,"defaultStandardClusterModel":{"cluster_name":"","node_type_id":"Standard_DS3_v2","spark_version":"4.0.x-scala2.11","num_workers":null,"autoscale":{"min_workers":2,"max_workers":8},"autotermination_minutes":120,"enable_elastic_disk":false,"default_tags":{"Vendor":"Databricks","Creator":"","ClusterName":null,"ClusterId":""}},"enableEBSVolumesUIForJobs":true,"enablePublishNotebooks":false,"enableBitbucketCloud":true,"shouldShowCommandStatus":true,"createTableInNotebookS3Link":{"url":"","displayName":"S3","workspaceFileName":"S3 Example"},"sanitizeHtmlResult":true,"enableClusterPinningUI":true,"enableJobAclsConfig":false,"enableFullTextSearch":false,"enableElasticSparkUI":true,"enableNewClustersCreate":true,"clusters":true,"allowRunOnPendingClusters":true,"useAutoscalingByDefault":true,"enableAzureToolbar":true,"enableRequireClusterSettingsUI":true,"fileStoreBase":"FileStore","enableEmailInAzure":true,"enableRLibraries":true,"enableTableAclsConfig":false,"enableSshKeyUIInJobs":true,"enableDetachAndAttachSubMenu":true,"configurableSparkOptionsSpec":[{"keyPattern":"spark\\.kryo(\\.[^\\.]+)+","valuePattern":".*","keyPatternDisplay":"spark.kryo.*","valuePatternDisplay":"*","description":"Configuration options for Kryo serialization"},{"keyPattern":"spark\\.io\\.compression\\.codec","valuePattern":"(lzf|snappy|org\\.apache\\.spark\\.io\\.LZFCompressionCodec|org\\.apache\\.spark\\.io\\.SnappyCompressionCodec)","keyPatternDisplay":"","valuePatternDisplay":"snappy|lzf","description":"The codec used to compress internal data such as RDD partitions, broadcast variables and shuffle outputs."},{"keyPattern":"spark\\.serializer","valuePattern":"(org\\.apache\\.spark\\.serializer\\.JavaSerializer|org\\.apache\\.spark\\.serializer\\.KryoSerializer)","keyPatternDisplay":"spark.serializer","valuePatternDisplay":"org.apache.spark.serializer.JavaSerializer|org.apache.spark.serializer.KryoSerializer","description":"Class to use for serializing objects that will be sent over the network or need to be cached in serialized form."},{"keyPattern":"spark\\.rdd\\.compress","valuePattern":"(true|false)","keyPatternDisplay":"spark.rdd.compress","valuePatternDisplay":"true|false","description":"Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER). Can save substantial space at the cost of some extra CPU time."},{"keyPattern":"spark\\.speculation","valuePattern":"(true|false)","keyPatternDisplay":"spark.speculation","valuePatternDisplay":"true|false","description":"Whether to use speculation (recommended off for streaming)"},{"keyPattern":"spark\\.es(\\.[^\\.]+)+","valuePattern":".*","keyPatternDisplay":"*","valuePatternDisplay":"*","description":"Configuration options for ElasticSearch"},{"keyPattern":"es(\\.([^\\.]+))+","valuePattern":".*","keyPatternDisplay":"es.*","valuePatternDisplay":"*","description":"Configuration options for ElasticSearch"},{"keyPattern":"spark\\.(storage|shuffle)\\.memoryFraction","valuePattern":"0?\\.0*([1-9])([0-9])*","keyPatternDisplay":"spark.(storage|shuffle).memoryFraction","valuePatternDisplay":"(0.0,1.0)","description":"Fraction of Java heap to use for Spark's shuffle or storage"},{"keyPattern":"spark\\.streaming\\.backpressure\\.enabled","valuePattern":"(true|false)","keyPatternDisplay":"spark.streaming.backpressure.enabled","valuePatternDisplay":"true|false","description":"Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values `spark.streaming.receiver.maxRate` and `spark.streaming.kafka.maxRatePerPartition` if they are set."},{"keyPattern":"spark\\.streaming\\.receiver\\.maxRate","valuePattern":"^([0-9]{1,})$","keyPatternDisplay":"spark.streaming.receiver.maxRate","valuePatternDisplay":"numeric","description":"Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide in the Spark Streaming programing guide for mode details."},{"keyPattern":"spark\\.streaming\\.kafka\\.maxRatePerPartition","valuePattern":"^([0-9]{1,})$","keyPatternDisplay":"spark.streaming.kafka.maxRatePerPartition","valuePatternDisplay":"numeric","description":"Maximum rate (number of records per second) at which data will be read from each Kafka partition when using the Kafka direct stream API introduced in Spark 1.3. See the Kafka Integration guide for more details."},{"keyPattern":"spark\\.streaming\\.kafka\\.maxRetries","valuePattern":"^([0-9]{1,})$","keyPatternDisplay":"spark.streaming.kafka.maxRetries","valuePatternDisplay":"numeric","description":"Maximum number of consecutive retries the driver will make in order to find the latest offsets on the leader of each partition (a default value of 1 means that the driver will make a maximum of 2 attempts). Only applies to the Kafka direct stream API introduced in Spark 1.3."},{"keyPattern":"spark\\.streaming\\.ui\\.retainedBatches","valuePattern":"^([0-9]{1,})$","keyPatternDisplay":"spark.streaming.ui.retainedBatches","valuePatternDisplay":"numeric","description":"How many batches the Spark Streaming UI and status APIs remember before garbage collecting."}],"enableReactNotebookComments":true,"enableAdminPasswordReset":false,"checkBeforeAddingAadUser":true,"enableResetPassword":true,"maxClusterTagValueLength":256,"enableJobsSparkUpgrade":true,"createTableInNotebookDBFSLink":{"url":"","displayName":"DBFS","workspaceFileName":"DBFS Example"},"perClusterAutoterminationEnabled":true,"enableNotebookCommandNumbers":true,"measureRoundTripTimes":true,"allowStyleInSanitizedHtml":false,"sparkVersions":[{"key":"3.3.x-scala2.10","displayName":"3.3 (includes Apache Spark 2.2.0, Scala 2.10)","packageLabel":"spark-image-86a9b375074f5afad339e70230ec0ec265c4cefbd280844785fab3bcde5869f9","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":[]},{"key":"4.1.x-scala2.11","displayName":"4.1 (includes Apache Spark 2.3.0, Scala 2.11)","packageLabel":"spark-image-f439d0c801506f0362ffed2e4541799ffeb433963a5b449822b3878a3751bcf8","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS","SUPPORTS_RSTUDIO"]},{"key":"4.1.x-ml-gpu-scala2.11","displayName":"4.1 ML Beta (includes Apache Spark 2.3.0, GPU, Scala 2.11)","packageLabel":"spark-image-5907529b625e97ac8feb0a069002b4fdb861a16740752b5df568fe4efb1c004e","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":[]},{"key":"4.0.x-scala2.11","displayName":"4.0 (includes Apache Spark 2.3.0, Scala 2.11)","packageLabel":"spark-image-f55a02a6e4d4df4eae4ac60d16781794f2061a3e86e0fc2c9697f69fd8211a22","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS"]},{"key":"3.4.x-scala2.11","displayName":"3.4 (includes Apache Spark 2.2.0, Scala 2.11)","packageLabel":"spark-image-ae9f3e3a4d6c99f1d9704425a13db5561c44c9ec0b9a45121cc9c7e8036e4051","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION"]},{"key":"3.2.x-scala2.10","displayName":"3.2 (includes Apache Spark 2.2.0, Scala 2.10)","packageLabel":"spark-image-557788bea0eea16bbf7a8ba13ace07e64dd7fc86270bd5cea086097fe886431f","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":[]},{"key":"4.1.x-ml-scala2.11","displayName":"4.1 ML Beta (includes Apache Spark 2.3.0, Scala 2.11)","packageLabel":"spark-image-ad599fbbca53898d7531a4b94c73f3e68b3c2e49e3502c09f6bf01468d801882","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":[]},{"key":"latest-experimental-scala2.10","displayName":"[DO NOT USE] Latest experimental (3.5 snapshot, Scala 2.10)","packageLabel":"spark-image-5e4f1f2feb631875a6036dffb069ec14b436939b5efe0ecb3ff8220c835298d6","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS"]},{"key":"latest-rc-scala2.11","displayName":"Latest RC (4.2 snapshot, Scala 2.11)","packageLabel":"spark-image-0592d44fb91741b05c314304a91661216c672f0424e342fe8a2a0b2c794d3cd0","upgradable":true,"deprecated":false,"customerVisible":false,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS","SUPPORTS_RSTUDIO"]},{"key":"latest-stable-scala2.11","displayName":"Latest stable (Scala 2.11)","packageLabel":"spark-image-f439d0c801506f0362ffed2e4541799ffeb433963a5b449822b3878a3751bcf8","upgradable":true,"deprecated":false,"customerVisible":false,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS","SUPPORTS_RSTUDIO"]},{"key":"4.1.x-gpu-scala2.11","displayName":"4.1 (includes Apache Spark 2.3.0, GPU, Scala 2.11)","packageLabel":"spark-image-aea6646f13f50000babc2af707cc31c43b11e68e7f452eb25c7dfd73ad24d705","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_RSTUDIO"]},{"key":"3.5.x-scala2.10","displayName":"3.5 LTS (includes Apache Spark 2.2.1, Scala 2.10)","packageLabel":"spark-image-9fa5a2faf500af1478f5d9e67ccbde7f9342a5c91a3b6260c974bc15e922928d","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS"]},{"key":"latest-rc-scala2.10","displayName":"[DO NOT USE] Latest RC (3.5 snapshot, Scala 2.10)","packageLabel":"spark-image-5e4f1f2feb631875a6036dffb069ec14b436939b5efe0ecb3ff8220c835298d6","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS"]},{"key":"latest-stable-scala2.10","displayName":"[DEPRECATED] Latest stable (Scala 2.10)","packageLabel":"spark-image-5e4f1f2feb631875a6036dffb069ec14b436939b5efe0ecb3ff8220c835298d6","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS"]},{"key":"latest-experimental-gpu-scala2.11","displayName":"Latest experimental (4.2 snapshot, GPU, Scala 2.11)","packageLabel":"spark-image-dfabb78470bd1abaaf209f0f6397e0da699a115933f76bea620aa664f8667e7b","upgradable":true,"deprecated":false,"customerVisible":false,"capabilities":["SUPPORTS_RSTUDIO"]},{"key":"3.1.x-scala2.11","displayName":"3.1 (includes Apache Spark 2.2.0, Scala 2.11)","packageLabel":"spark-image-241fa8b78ee6343242b1756b18076270894385ff40a81172a6fb5eadf66155d3","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":[]},{"key":"3.1.x-scala2.10","displayName":"3.1 (includes Apache Spark 2.2.0, Scala 2.10)","packageLabel":"spark-image-7efac6b9a8f2da59cb4f6d0caac46cfcb3f1ebf64c8073498c42d0360f846714","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":[]},{"key":"3.3.x-scala2.11","displayName":"3.3 (includes Apache Spark 2.2.0, Scala 2.11)","packageLabel":"spark-image-46cc39a9afa43fbd7bfa9f4f5ed8d23f658cd0b0d74208627243222ae0d22f8d","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":[]},{"key":"3.5.x-scala2.11","displayName":"3.5 LTS (includes Apache Spark 2.2.1, Scala 2.11)","packageLabel":"spark-image-6f5628e4962f9e1963e43a5acfbc89a3e32ef15d3da0462bca09a985bae2fe14","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS"]},{"key":"latest-experimental-scala2.11","displayName":"Latest experimental (4.2 snapshot, Scala 2.11)","packageLabel":"spark-image-0592d44fb91741b05c314304a91661216c672f0424e342fe8a2a0b2c794d3cd0","upgradable":true,"deprecated":false,"customerVisible":false,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS","SUPPORTS_RSTUDIO"]},{"key":"3.2.x-scala2.11","displayName":"3.2 (includes Apache Spark 2.2.0, Scala 2.11)","packageLabel":"spark-image-5537926238bc55cb6cd76ee0f0789511349abead3781c4780721a845f34b5d4e","upgradable":true,"deprecated":true,"customerVisible":false,"capabilities":[]},{"key":"latest-rc-gpu-scala2.11","displayName":"Latest RC (4.2 snapshot, GPU, Scala 2.11)","packageLabel":"spark-image-ed2636c4e44f9d4ce48b6ea1b4de7d172e33d9971f82700788751efee16d6bd1","upgradable":true,"deprecated":false,"customerVisible":false,"capabilities":["SUPPORTS_RSTUDIO"]},{"key":"3.4.x-scala2.10","displayName":"3.4 (includes Apache Spark 2.2.0, Scala 2.10)","packageLabel":"spark-image-f28e110ac5f9efcb3f48d812bf34961f4fe17336d732e33de5ffa64f269ed59a","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION"]}],"enablePresentationMode":false,"enableClearStateAndRunAll":true,"enableTableAclsByTier":true,"enableRestrictedClusterCreation":false,"enableFeedback":false,"enableClusterAutoScaling":true,"enableUserVisibleDefaultTags":true,"defaultNumWorkers":8,"serverContinuationTimeoutMillis":10000,"jobsUnreachableThresholdMillis":60000,"driverStderrFilePrefix":"stderr","roundTripReportTimeoutMs":5000,"enableNotebookRefresh":true,"createTableInNotebookImportedFileLink":{"url":"","displayName":"Imported File","workspaceFileName":"Imported File Example"},"accountsOwnerUrl":"","driverStdoutFilePrefix":"stdout","showDbuPricing":true,"databricksDocsBaseHostname":"","defaultNodeTypeToPricingUnitsMap":{"Standard_E64s_v3":16,"r3.2xlarge":2,"i3.4xlarge":4,"Standard_NC12s_v2":6.75,"class-node":1,"m4.2xlarge":1.5,"Standard_D11_v2":0.5,"r4.xlarge":1,"m4.4xlarge":3,"p3.2xlarge":4.15,"Standard_DS5_v2":3,"Standard_D2s_v3":0.5,"Standard_DS4_v2_Promo":1.5,"Standard_DS14":4,"Standard_DS11_v2_Promo":0.5,"r4.16xlarge":16,"Standard_NC6":1.5,"Standard_DS11":0.5,"Standard_D2_v3":0.5,"Standard_DS14_v2_Promo":4,"Standard_D64s_v3":12,"p2.8xlarge":9.76,"m4.10xlarge":8,"Standard_D8s_v3":1.5,"Standard_E32s_v3":8,"Standard_DS3":0.75,"Standard_DS2_v2":0.5,"r3.8xlarge":8,"r4.4xlarge":4,"dev-tier-node":1,"Standard_L8s":2,"Standard_D13_v2":2,"p3.16xlarge":33.2,"Standard_NC24rs_v3":20,"Standard_DS13_v2_Promo":2,"Standard_E4s_v3":1,"Standard_D3_v2":0.75,"Standard_NC24":6,"Standard_NC24r":6,"Standard_DS15_v2":5,"Standard_D16s_v3":3,"Standard_D5_v2":3,"Standard_E8s_v3":2,"Standard_DS2_v2_Promo":0.5,"c3.8xlarge":4,"Standard_D4_v3":0.75,"Standard_E2s_v3":0.5,"Standard_D32_v3":6,"Standard_DS3_v2":0.75,"Standard_NC6s_v3":5,"r3.4xlarge":4,"Standard_DS4":1.5,"i2.4xlarge":6,"Standard_DS3_v2_Promo":0.75,"m4.xlarge":0.75,"r4.8xlarge":8,"Standard_D14_v2":4,"Standard_H16":4,"Standard_NC12":3,"Standard_DS14_v2":4,"r4.large":0.5,"Standard_D15_v2":5,"Standard_DS12":1,"development-node":1,"i2.2xlarge":3,"Standard_NC6s_v2":3.38,"g2.8xlarge":6,"Standard_D12_v2":1,"i3.large":0.75,"Standard_NC12s_v3":10,"memory-optimized":1,"m4.large":0.4,"Standard_D16_v3":3,"Standard_F4s":0.5,"p2.16xlarge":19.52,"Standard_NC24rs_v2":13.5,"i3.8xlarge":8,"Standard_D32s_v3":6,"i3.16xlarge":16,"Standard_DS12_v2":1,"Standard_L32s":8,"Standard_D4s_v3":0.75,"Standard_DS13":2,"Standard_DS11_v2":0.5,"Standard_DS12_v2_Promo":1,"Standard_DS13_v2":2,"c3.2xlarge":1,"Standard_L4s":1,"Standard_F16s":2,"c4.2xlarge":1,"Standard_L16s":4,"i2.xlarge":1.5,"Standard_DS2":0.5,"compute-optimized":1,"c4.4xlarge":2,"Standard_DS5_v2_Promo":3,"Standard_D64_v3":12,"Standard_D2_v2":0.5,"Standard_D8_v3":1.5,"i3.2xlarge":2,"Standard_E16s_v3":4,"Standard_F8s":1,"c3.4xlarge":2,"Standard_NC24s_v2":13.5,"Standard_NC24s_v3":20,"Standard_D4_v2":1.5,"g2.2xlarge":1.5,"p3.8xlarge":16.6,"p2.xlarge":1.22,"m4.16xlarge":12,"Standard_DS4_v2":1.5,"c4.8xlarge":4,"i3.xlarge":1,"r3.xlarge":1,"r4.2xlarge":2,"i2.8xlarge":12},"tableFilesBaseFolder":"/tables","enableSparkDocsSearch":true,"sparkHistoryServerEnabled":true,"enableClusterAppsUIOnServerless":false,"enableEBSVolumesUI":true,"minDaysSinceDeletedToPurge":"30 days","homePageWelcomeMessage":"","metastoreServiceRowLimit":1000000,"enableIPythonImportExport":true,"enableClusterTagsUIForJobs":true,"enableClusterTagsUI":true,"enableNotebookHistoryDiffing":true,"branch":"2.73.271","accountsLimit":-1,"enableSparkEnvironmentVariables":true,"enableX509Authentication":false,"useAADLogin":true,"enableStructuredStreamingNbOptimizations":true,"enableNotebookGitBranching":true,"terminatedClustersWindow":2592000000,"local":false,"enableNotebookLazyRenderWrapper":false,"enableClusterAutoScalingForJobs":true,"enableStrongPassword":false,"showReleaseNote":false,"displayDefaultContainerMemoryGB":30,"broadenedEditPermission":false,"enableWorkspacePurgeDryRun":false,"disableS3TableImport":true,"enableArrayParamsEdit":true,"deploymentMode":"production","useSpotForWorkers":true,"removePasswordInAccountSettings":true,"preferStartTerminatedCluster":false,"enableUserInviteWorkflow":true,"createTableConnectorOptionLinks":[{"url":"","displayName":"Azure Blob Storage","workspaceFileName":"Azure Blob Storage Import Example Notebook"},{"url":"","displayName":"JDBC","workspaceFileName":"JDBC Example"},{"url":"","displayName":"Cassandra","workspaceFileName":"Cassandra Example"},{"url":"","displayName":"Kafka","workspaceFileName":"Kafka Example"},{"url":"","displayName":"Redis","workspaceFileName":"Redis Example"},{"url":"","displayName":"Elasticsearch","workspaceFileName":"Elasticsearch Example"}],"enableStaticNotebooks":true,"enableNewLineChart":true,"shouldReportUnhandledPromiseRejectionsToSentry":false,"sandboxForUrlSandboxFrame":"allow-scripts allow-popups allow-popups-to-escape-sandbox allow-forms","enableCssTransitions":true,"serverlessEnableElasticDisk":true,"minClusterTagKeyLength":1,"showHomepageFeaturedLinks":true,"pricingURL":"","enableClusterEdit":true,"enableClusterAclsConfig":false,"useTempS3UrlForTableUpload":false,"notifyLastLogin":false,"enableFilePurge":true,"enableSshKeyUIByTier":true,"enableCreateClusterOnAttach":false,"defaultAutomatedPricePerDBU":0.35,"enableNotebookGitVersioning":true,"defaultMinWorkers":2,"commandStatusDebounceMaxWait":1000,"files":"files/","feedbackEmail":"","enableDriverLogsUI":true,"enableExperimentalCharts":false,"defaultMaxWorkers":8,"enableWorkspaceAclsConfig":false,"serverlessRunPythonAsLowPrivilegeUser":false,"dropzoneMaxFileSize":2047,"enableNewClustersList":true,"enableNewDashboardViews":true,"enableJobListPermissionFilter":true,"terminatedInteractiveClustersMax":70,"driverLog4jFilePrefix":"log4j","enableSingleSignOn":false,"enableMavenLibraries":true,"updateTreeTableToV2Schema":false,"displayRowLimit":1000,"notebookTooManyCommandsNotificationLimit":100,"deltaProcessingAsyncEnabled":true,"enableSparkEnvironmentVariablesUI":false,"defaultSparkVersion":{"key":"4.0.x-scala2.11","displayName":"4.0 (includes Apache Spark 2.3.0, Scala 2.11)","packageLabel":"spark-image-f55a02a6e4d4df4eae4ac60d16781794f2061a3e86e0fc2c9697f69fd8211a22","upgradable":true,"deprecated":false,"customerVisible":true,"capabilities":["SUPPORTS_END_TO_END_ENCRYPTION","SUPPORTS_TABLE_ACLS"]},"enableNewLineChartParams":false,"deprecatedEnableStructuredDataAcls":false,"enableCustomSpotPricing":true,"enableRStudioFreeUI":false,"enableMountAclsConfig":false,"defaultAutoterminationMin":120,"useDevTierHomePage":false,"disableExportNotebook":false,"enableClusterClone":true,"enableNotebookLineNumbers":true,"enablePublishHub":false,"notebookHubUrl":"","commandStatusDebounceInterval":100,"enableTrashFolder":true,"showSqlEndpoints":true,"enableNotebookDatasetInfoView":true,"defaultTagKeys":{"CLUSTER_NAME":"ClusterName","VENDOR":"Vendor","CLUSTER_TYPE":"ResourceClass","CREATOR":"Creator","CLUSTER_ID":"ClusterId"},"enableClusterAclsByTier":true,"databricksDocsBaseUrl":"","azurePortalLink":"","cloud":"Azure","customSparkVersionPrefix":"custom:","disallowAddingAdmins":false,"enableSparkConfUI":true,"enableClusterEventsUI":true,"featureTier":"STANDARD_W_SEC_TIER","mavenCentralSearchEndpoint":"","defaultServerlessClusterModel":{"cluster_name":"","node_type_id":"Standard_DS13_v2","spark_version":"latest-stable-scala2.11","num_workers":null,"enable_jdbc_auto_start":true,"custom_tags":{"ResourceClass":"Serverless"},"autoscale":{"min_workers":2,"max_workers":8},"spark_conf":{"spark.databricks.cluster.profile":"serverless","spark.databricks.repl.allowedLanguages":"sql,python,r"},"autotermination_minutes":120,"enable_elastic_disk":true,"default_tags":{"Vendor":"Databricks","Creator":"","ClusterName":null,"ClusterId":""}},"enableClearRevisionHistoryForNotebook":true,"enableOrgSwitcherUI":true,"bitbucketCloudBaseApiV2Url":"","clustersLimit":-1,"enableJdbcImport":true,"enableClusterAppsUIOnNormalClusters":false,"enableMergedServerlessUI":false,"enableElasticDisk":true,"logfiles":"logfiles/","enableRelativeNotebookLinks":true,"enableMultiSelect":true,"homePageLogo":"login/DB_Azure_Lockup_2x.png","enableWebappSharding":true,"enableNotebookParamsEdit":true,"enableClusterDeltaUpdates":true,"enableSingleSignOnLogin":false,"separateTableForJobClusters":true,"ebsVolumeSizeLimitGB":{"GENERAL_PURPOSE_SSD":[100,4096],"THROUGHPUT_OPTIMIZED_HDD":[500,4096]},"enableClusterDeleteUI":true,"enableMountAcls":false,"requireEmailUserName":true,"enableRServerless":true,"frameRateReportIntervalMs":10000,"dbcFeedbackURL":"","enableMountAclService":true,"showVersion":false,"serverlessClustersByDefault":false,"collectDetailedFrameRateStats":true,"enableWorkspaceAcls":true,"maxClusterTagKeyLength":512,"gitHash":"","clusterTagReservedPrefixes":["azure","microsoft","windows"],"tableAclsEnabledMap":{"spark.databricks.acl.dfAclsEnabled":"true","spark.databricks.repl.allowedLanguages":"python,sql"},"showWorkspaceFeaturedLinks":true,"signupUrl":"","databricksDocsNotebookPathPrefix":"^https://docs\\.azuredatabricks\\.net/_static/notebooks/.+$","serverlessAttachEbsVolumesByDefault":false,"enableTokensConfig":true,"allowFeedbackForumAccess":true,"frameDurationReportThresholdMs":1000,"enablePythonVersionUI":true,"enableImportFromUrl":true,"allowDisplayHtmlByUrl":true,"enableTokens":true,"enableMiniClusters":false,"enableNewJobList":true,"maxPinnedClustersPerOrg":20,"enableDebugUI":false,"enableStreamingMetricsDashboard":true,"allowNonAdminUsers":true,"enableSingleSignOnByTier":true,"enableJobsRetryOnTimeout":true,"loginLogo":"/login/DB_Azure_Lockup_2x.png","useStandardTierUpgradeTooltips":true,"staticNotebookResourceUrl":"","enableSpotClusterType":true,"enableSparkPackages":true,"checkAadUserInWorkspaceTenant":false,"dynamicSparkVersions":false,"useIframeForHtmlResult":false,"enableClusterTagsUIByTier":true,"enableUserPromptForPendingRpc":true,"enableNotebookHistoryUI":true,"addWhitespaceAfterLastNotebookCell":true,"enableClusterLoggingUI":true,"setDeletedAtForDeletedColumnsOnWebappStart":false,"enableDatabaseDropdownInTableUI":true,"showDebugCounters":false,"enableInstanceProfilesUI":true,"enableFolderHtmlExport":true,"homepageFeaturedLinks":[{"linkURI":"","displayName":"Introduction to Apache Spark on Databricks","icon":"img/home/Python_icon.svg"},{"linkURI":"","displayName":"Databricks for Data Scientists","icon":"img/home/Scala_icon.svg"},{"linkURI":"","displayName":"Introduction to Structured Streaming","icon":"img/home/Python_icon.svg"}],"enableClusterStart":true,"maxImportFileVersion":5,"enableEBSVolumesUIByTier":true,"enableTableAclService":true,"removeSubCommandCodeWhenExport":true,"upgradeURL":"","maxAutoterminationMinutes":10000,"showResultsFromExternalSearchEngine":true,"autoterminateClustersByDefault":false,"notebookLoadingBackground":"#fff","sshContainerForwardedPort":2200,"enableStaticHtmlImport":true,"enableInstanceProfilesByTier":true,"showForgotPasswordLink":true,"defaultMemoryPerContainerMB":28000,"enablePresenceUI":true,"minAutoterminationMinutes":10,"accounts":true,"useOnDemandClustersByDefault":false,"enableAutoCreateUserUI":true,"defaultCoresPerContainer":4,"showTerminationReason":true,"enableNewClustersGet":true,"showPricePerDBU":true,"showSqlProxyUI":true,"enableNotebookErrorHighlighting":true}; var __DATABRICKS_NOTEBOOK_MODEL = 'JTdCJTIydmVyc2lvbiUyMiUzQSUyMk5vdGVib29rVjElMjIlMkMlMjJvcmlnSWQlMjIlM0E0MzQxNTg5Nzc5MzMxMjU4JTJDJTIybmFtZSUyMiUzQSUyMkJsb2cyJTIyJTJDJTIybGFuZ3VhZ2UlMjIlM0ElMjJweXRob24lMjIlMkMlMjJjb21tYW5kcyUyMiUzQSU1QiU3QiUyMnZlcnNpb24lMjIlM0ElMjJDb21tYW5kVjElMjIlMkMlMjJvcmlnSWQlMjIlM0E0MzQxNTg5Nzc5MzMxMjU5JTJDJTIyZ3VpZCUyMiUzQSUyMjNmYmJiYzI1LWVhZGEtNGZhOS1hOGNiLTJiNjFkODg2NWYzMCUyMiUyQyUyMnN1YnR5cGUlMjIlM0ElMjJjb21tYW5kJTIyJTJDJTIyY29tbWFuZFR5cGUlMjIlM0ElMjJhdXRvJTIyJTJDJTIycG9zaXRpb24lMjIlM0ExLjAlMkMlMjJjb21tYW5kJTIyJTNBJTIyJTIzJTIwRmlyc3QlMjBjb21tYW5kJTVDbiU1Q25wcmludCglNUMlMjJQaW5nJTVDJTIyKSUyMiUyQyUyMmNvbW1hbmRWZXJzaW9uJTIyJTNBMCUyQyUyMnN0YXRlJTIyJTNBJTIyZmluaXNoZWQlMjIlMkMlMjJyZXN1bHRzJTIyJTNBJTdCJTIydHlwZSUyMiUzQSUyMmh0bWwlMjIlMkMlMjJkYXRhJTIyJTNBJTIyJTNDZGl2JTIwY2xhc3MlM0QlNUMlMjJhbnNpb3V0JTVDJTIyJTNFUGluZyU1Q24lM0MlMkZkaXYlM0UlMjIlMkMlMjJhcmd1bWVudHMlMjIlM0ElN0IlN0QlMkMlMjJhZGRlZFdpZGdldHMlMjIlM0ElN0IlN0QlMkMlMjJyZW1vdmVkV2lkZ2V0cyUyMiUzQSU1QiU1RCUyQyUyMmRhdGFzZXRJbmZvcyUyMiUzQSU1QiU1RCU3RCUyQyUyMmVycm9yU3VtbWFyeSUyMiUzQW51bGwlMkMlMjJlcnJvciUyMiUzQW51bGwlMkMlMjJ3b3JrZmxvd3MlMjIlM0ElNUIlNUQlMkMlMjJzdGFydFRpbWUlMjIlM0ExNTI4NzE1ODg2MzcxJTJDJTIyc3VibWl0VGltZSUyMiUzQTE1Mjg3MTU4ODYwODIlMkMlMjJmaW5pc2hUaW1lJTIyJTNBMTUyODcxNTg4NjM4NSUyQyUyMmNvbGxhcHNlZCUyMiUzQWZhbHNlJTJDJTIyYmluZGluZ3MlMjIlM0ElN0IlN0QlMkMlMjJpbnB1dFdpZGdldHMlMjIlM0ElN0IlN0QlMkMlMjJkaXNwbGF5VHlwZSUyMiUzQSUyMnRhYmxlJTIyJTJDJTIyd2lkdGglMjIlM0ElMjJhdXRvJTIyJTJDJTIyaGVpZ2h0JTIyJTNBJTIyYXV0byUyMiUyQyUyMnhDb2x1bW5zJTIyJTNBbnVsbCUyQyUyMnlDb2x1bW5zJTIyJTNBbnVsbCUyQyUyMnBpdm90Q29sdW1ucyUyMiUzQW51bGwlMkMlMjJwaXZvdEFnZ3JlZ2F0aW9uJTIyJTNBbnVsbCUyQyUyMmN1c3RvbVBsb3RPcHRpb25zJTIyJTNBJTdCJTdEJTJDJTIyY29tbWVudFRocmVhZCUyMiUzQSU1QiU1RCUyQyUyMmNvbW1lbnRzVmlzaWJsZSUyMiUzQWZhbHNlJTJDJTIycGFyZW50SGllcmFyY2h5JTIyJTNBJTVCJTVEJTJDJTIyZGlmZkluc2VydHMlMjIlM0ElNUIlNUQlMkMlMjJkaWZmRGVsZXRlcyUyMiUzQSU1QiU1RCUyQyUyMmdsb2JhbFZhcnMlMjIlM0ElN0IlN0QlMkMlMjJsYXRlc3RVc2VyJTIyJTNBJTIyYSUyMHVzZXIlMjIlMkMlMjJsYXRlc3RVc2VySWQlMjIlM0FudWxsJTJDJTIyY29tbWFuZFRpdGxlJTIyJTNBJTIyQ29tbWFuZCUyMDElMjBUaXRsZSUyMiUyQyUyMnNob3dDb21tYW5kVGl0bGUlMjIlM0F0cnVlJTJDJTIyaGlkZUNvbW1hbmRDb2RlJTIyJTNBZmFsc2UlMkMlMjJoaWRlQ29tbWFuZFJlc3VsdCUyMiUzQWZhbHNlJTJDJTIyaVB5dGhvbk1ldGFkYXRhJTIyJTNBbnVsbCUyQyUyMnN0cmVhbVN0YXRlcyUyMiUzQSU3QiU3RCUyQyUyMm51aWQlMjIlM0ElMjJhYjdkMjhjZC04NzZkLTRlNGUtYWU2Mi05N2UwZTVkM2Q0ZWElMjIlN0QlMkMlN0IlMjJ2ZXJzaW9uJTIyJTNBJTIyQ29tbWFuZFYxJTIyJTJDJTIyb3JpZ0lkJTIyJTNBNDM0MTU4OTc3OTMzMTI2MiUyQyUyMmd1aWQlMjIlM0ElMjI5OGY1ODFiMS01MTQyLTRhZWEtYjlhNy0zYTAyYTM5ZDFmYzMlMjIlMkMlMjJzdWJ0eXBlJTIyJTNBJTIyY29tbWFuZCUyMiUyQyUyMmNvbW1hbmRUeXBlJTIyJTNBJTIyYXV0byUyMiUyQyUyMnBvc2l0aW9uJTIyJTNBMS41JTJDJTIyY29tbWFuZCUyMiUzQSUyMiUyMyUyMFNlY29uZCUyMGNvbW1hbmQlNUNuJTVDbnByaW50KCU1QyUyMlBvbmclNUMlMjIpJTIyJTJDJTIyY29tbWFuZFZlcnNpb24lMjIlM0EwJTJDJTIyc3RhdGUlMjIlM0ElMjJmaW5pc2hlZCUyMiUyQyUyMnJlc3VsdHMlMjIlM0ElN0IlMjJ0eXBlJTIyJTNBJTIyaHRtbCUyMiUyQyUyMmRhdGElMjIlM0ElMjIlM0NkaXYlMjBjbGFzcyUzRCU1QyUyMmFuc2lvdXQlNUMlMjIlM0VQb25nJTVDbiUzQyUyRmRpdiUzRSUyMiUyQyUyMmFyZ3VtZW50cyUyMiUzQSU3QiU3RCUyQyUyMmFkZGVkV2lkZ2V0cyUyMiUzQSU3QiU3RCUyQyUyMnJlbW92ZWRXaWRnZXRzJTIyJTNBJTVCJTVEJTJDJTIyZGF0YXNldEluZm9zJTIyJTNBJTVCJTVEJTdEJTJDJTIyZXJyb3JTdW1tYXJ5JTIyJTNBbnVsbCUyQyUyMmVycm9yJTIyJTNBbnVsbCUyQyUyMndvcmtmbG93cyUyMiUzQSU1QiU1RCUyQyUyMnN0YXJ0VGltZSUyMiUzQTE1Mjg3MTU4ODYzOTIlMkMlMjJzdWJtaXRUaW1lJTIyJTNBMTUyODcxNTg4NjMzMSUyQyUyMmZpbmlzaFRpbWUlMjIlM0ExNTI4NzE1ODg2NDAxJTJDJTIyY29sbGFwc2VkJTIyJTNBZmFsc2UlMkMlMjJiaW5kaW5ncyUyMiUzQSU3QiU3RCUyQyUyMmlucHV0V2lkZ2V0cyUyMiUzQSU3QiU3RCUyQyUyMmRpc3BsYXlUeXBlJTIyJTNBJTIydGFibGUlMjIlMkMlMjJ3aWR0aCUyMiUzQSUyMmF1dG8lMjIlMkMlMjJoZWlnaHQlMjIlM0ElMjJhdXRvJTIyJTJDJTIyeENvbHVtbnMlMjIlM0FudWxsJTJDJTIyeUNvbHVtbnMlMjIlM0FudWxsJTJDJTIycGl2b3RDb2x1bW5zJTIyJTNBbnVsbCUyQyUyMnBpdm90QWdncmVnYXRpb24lMjIlM0FudWxsJTJDJTIyY3VzdG9tUGxvdE9wdGlvbnMlMjIlM0ElN0IlN0QlMkMlMjJjb21tZW50VGhyZWFkJTIyJTNBJTVCJTVEJTJDJTIyY29tbWVudHNWaXNpYmxlJTIyJTNBZmFsc2UlMkMlMjJwYXJlbnRIaWVyYXJjaHklMjIlM0ElNUIlNUQlMkMlMjJkaWZmSW5zZXJ0cyUyMiUzQSU1QiU1RCUyQyUyMmRpZmZEZWxldGVzJTIyJTNBJTVCJTVEJTJDJTIyZ2xvYmFsVmFycyUyMiUzQSU3QiU3RCUyQyUyMmxhdGVzdFVzZXIlMjIlM0ElMjJhJTIwdXNlciUyMiUyQyUyMmxhdGVzdFVzZXJJZCUyMiUzQW51bGwlMkMlMjJjb21tYW5kVGl0bGUlMjIlM0ElMjJDb21tYW5kJTIwMiUyMFRpdGxlJTIyJTJDJTIyc2hvd0NvbW1hbmRUaXRsZSUyMiUzQXRydWUlMkMlMjJoaWRlQ29tbWFuZENvZGUlMjIlM0FmYWxzZSUyQyUyMmhpZGVDb21tYW5kUmVzdWx0JTIyJTNBZmFsc2UlMkMlMjJpUHl0aG9uTWV0YWRhdGElMjIlM0FudWxsJTJDJTIyc3RyZWFtU3RhdGVzJTIyJTNBJTdCJTdEJTJDJTIybnVpZCUyMiUzQSUyMjE1MjY0NzBlLTgxMjgtNDIyZi04MzQ1LTQ1MWI5OGUwZTgyNyUyMiU3RCU1RCUyQyUyMmRhc2hib2FyZHMlMjIlM0ElNUIlNUQlMkMlMjJndWlkJTIyJTNBJTIyZWQxNWFlNjMtNjVmYi00MTMwLTk2ZmQtOWIzNWU3MTQ5MDJiJTIyJTJDJTIyZ2xvYmFsVmFycyUyMiUzQSU3QiU3RCUyQyUyMmlQeXRob25NZXRhZGF0YSUyMiUzQW51bGwlMkMlMjJpbnB1dFdpZGdldHMlMjIlM0ElN0IlN0QlN0Q='; if (window.mainJsLoadError) { var u = ''; var b = document.getElementsByTagName('body')[0]; var c = document.createElement('div'); c.innerHTML = ('Network Error' + 'Please check your network connection and try again.' + 'Could not load a required resource: ' + u + ''); = '30px'; = '20px 50px'; = '#f5f5f5'; = '5px'; b.appendChild(c); } ">Your browser is not supported. Please use any of the following browsers: Chrome, Firefox, Safari, Opera. Not all notebook features are available within an iframe, but the main ones, such as syntax highlighting are intact and so is the ability to display a table results (with column sort capability!). The proposed way of embedding notebook code in a blog post is just the first idea that came to my mind. Maybe there are more clever ways to achieve this. Please let me know in the comments. (function(root, factory) { // `root` does not resolve to the global window object in a Browserified // bundle, so a direct reference to that object is used instead. var _srcDoc = window.srcDoc; if (typeof define === "function" && define.amd) { define(['exports'], function(exports) { factory(exports, _srcDoc); root.srcDoc = exports; }); } else if (typeof exports === "object") { factory(exports, _srcDoc); } else { root.srcDoc = {}; factory(root.srcDoc, _srcDoc); } })(this, function(exports, _srcDoc) { var idx, iframes; var isCompliant = !!("srcdoc" in document.createElement("iframe")); var sandboxMsg = "Polyfill may not function in the presence of the " + "`sandbox` attribute. Consider using the `force` option."; var sandboxAllow = /\ballow-same-origin\b/; /** * Determine if the operation may be blocked by the `sandbox` attribute in * some environments, and optionally issue a warning or remove the * attribute. */ var validate = function( iframe, options ) { var sandbox = iframe.getAttribute("sandbox"); if (typeof sandbox === "string" && !sandboxAllow.test(sandbox)) { if (options && options.force) { iframe.removeAttribute("sandbox"); } else if (!options || options.force !== false) { logError(sandboxMsg); iframe.setAttribute("data-srcdoc-polyfill", sandboxMsg); } } }; var implementations = { compliant: function( iframe, content, options ) { if (content) { validate(iframe, options); iframe.setAttribute("srcdoc", content); } }, legacy: function( iframe, content, options ) { var jsUrl; if (!iframe || !iframe.getAttribute) { return; } if (!content) { content = iframe.getAttribute("srcdoc"); } else { iframe.setAttribute("srcdoc", content); } if (content) { validate(iframe, options); // The value returned by a script-targeted URL will be used as // the iFrame's content. Create such a URL which returns the // iFrame element's `srcdoc` attribute. jsUrl = "javascript: window.frameElement.getAttribute('srcdoc');"; // Explicitly set the iFrame's window.location for // compatability with IE9, which does not react to changes in // the `src` attribute when it is a `javascript:` URL, for // some reason if (iframe.contentWindow) { iframe.contentWindow.location = jsUrl; } iframe.setAttribute("src", jsUrl); } } }; var srcDoc = exports; var logError; if (window.console && window.console.error) { logError = function(msg) { window.console.error("[srcdoc-polyfill] " + msg); }; } else { logError = function() {}; } // Assume the best srcDoc.set = implementations.compliant; srcDoc.noConflict = function() { window.srcDoc = _srcDoc; return srcDoc; }; // If the browser supports srcdoc, no shimming is necessary if (isCompliant) { return; } srcDoc.set = implementations.legacy; // Automatically shim any iframes already present in the document iframes = document.getElementsByTagName("iframe"); idx = iframes.length; while (idx--) { srcDoc.set( iframes[idx] ); } });

Connecting Azure Databricks to Data Lake Store

Just a quick post here to help anyone who needs integrate their Azure Databricks cluster with Data Lake Store. This is not hard to do but there are a few steps so its worth recording them here in a quick and easy to follow form.This assumes you have created your Databricks cluster and have created a data lake store you want to integrate with. If you haven’t created your cluster that’s described in a previous blog here which you may find useful.The objective here is to create a mount point, a folder in the lake accessible from Databricks so we can read from and write to ADLS. Here this is done in notebooks in Databricks using Python but if Scala is your thing then its just as easy. To create the mount point you need to run the following command:-configs = {"dfs.adls.oauth2.access.token.provider.type": "ClientCredential",            "": "{YOUR SERVICE CLIENT ID}",            "dfs.adls.oauth2.credential": "{YOUR SERVICE CREDENTIALS}",            "dfs.adls.oauth2.refresh.url": "{YOUR DIRECTORY ID}/oauth2/token"} dbutils.fs.mount(   source = "adl://{YOUR DATA LAKE STORE ACCOUNT NAME}{YOUR DIRECTORY NAME}",   mount_point = "{mountPointPath}",   extra_configs = configs)So to do this we need to collect together the values to use for{YOUR SERVICE CLIENT ID}{YOUR SERVICE CREDENTIALS}{YOUR DIRECTORY ID}{YOUR DATA LAKE STORE ACCOUNT NAME}{YOUR DIRECTORY NAME}{mountPointPath}First the easy ones, my data lake store is called “pythonregression” and I want the folder I am going to use to be ‘/mnt/python’, these are just my choices.I need the service client id and credentials, for this I will create a new Application Registration by going to the Active Directory blade in the Azure portal and clicking on “New Application Registration”Fill in you chosen App name, here I have used the name ‘MyNewApp’, I know not really original. Then press ‘Create’ to create the App registrationThis will only take a few seconds to create and you should then see your App registration in the list of available apps. Click on the App you have created to see the details which will look something like this:Make a note of the ApplicationId GUID (partially deleted here), this is the SERVICE CLIENT ID you will need. Then from this screen click the “Settings” button and then the “Keys” link. We are going to create a key specifically for the purpose. Enter a Key Description, choose a Duration from the drop down and when you hit “Save” a key will be produced. Save this key, its the value you need for YOUR SERVICE CREDENTIALS and as soon as you leave the blade it will disappear. We now have everything we need except the DIRECTORY ID. To get the DIRECTORY ID go back to the Active Directory blade and click on “Properties” as shown below:-From here you can get the DIRECTORY IDOk, one last thing to do. You need to grant access to the “MyNewApp” App to the Data Lake Store, otherwise you will get access forbidden messages when you try to access ADLS from Databricks. This can be done from Data Explorer in ADLS using the link highlighted below.Now we have everything we need to mount the drive. In Databricks launch a workspace then create a new notebook (as described in my previous post). Run the command we put together above in the python notebookYou can then create directories and files in the lake from within your databricks notebookIf you want to you can unmount the drive using the following commandSomething to note. If you Terminate the cluster (terminate meaning shutdown) you can restart the cluster and the mounted folder will still be available to you, it doesn’t need to be remounted.You can access the file system from Python as follows:with open("/dbfs/mnt/python/newdir/iris_labels.txt", "w") as outfile:and then write to the file in ADLS as if it was a local file system.Ok, that was more detailed than I intended when I started, but I hope that was interesting and helpful. Let me know if you have any questions on any of the above and enjoy the power of Databricks and ADLS combined.