Jose Mendes

Jose Mendes' Blog

Lambda vs Azure Databricks Delta Architecture

Historically, when implementing big data processing architectures, Lambda has been the desired approach, however, as technology evolves, new paradigms arise and with that, more efficient approaches become available, such as the Databricks Delta architecture. In this blog, I’ll describe both architectures and demonstrate how to build a data pipeline in Azure Databricks following the Databricks Delta architecture.

The Lambda architecture, originally defined by Nathan Marz, is a big data processing architecture that combines both batch and real time processing methods. This approach attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data.

From a high-level perspective, the Lambda architecture is as followed.

image

A valid approach of using Lambda in Azure could be as demonstrated below (please be aware that there are different options for each layer that I won’t be detailing in this blog).

image

For the speed layer we could use Azure Streaming Analytics, a serverless scalable event processing engine that enables the development and run of real-time analytics on multiple streams of data from sources such as devices, sensors, web sites, social media, and other applications.

For the batch layer, we could use Azure Data Lake Storage (ADLS) and Azure Databricks. ADLS is an enterprise-wide hyper-scale repository for big data analytic workloads that enable us to capture data of any size, type, and ingestion speed in one single place for operational and exploratory analytics. Currently there are two versions of ADLS, Gen 1 and Gen 2, with the latest still being in private preview.

Azure Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform that allow us to create streamlined workflows and interactive workspaces that enables collaboration between data scientists, data engineers, and business analysts.

For the serving layer, we could use Azure Data Warehouse, a cloud-based Enterprise Data Warehouse (EDW) that leverages Massively Parallel Processing (MPP) to quickly run complex queries across petabytes of data.

With this architecture, the events are consumed by the Azure Streaming Analytics and landed in ADLS in flat files, that can be partitioned by hour. Once the processing of the file is completed, we can create a batch process via Azure Databricks and store the data in the Azure SQL Data Warehouse. To obtain the data that was not captured by the batch process, we can use Polybase to query the file being updated and then create a view to union both tables. Every time that view is queried, the polybase table will get the latest streamed data, meaning we have a real time query with the capability to obtain the most recent data.

The major problem of the Lambda architecture is that we have to build two separate pipelines, which can be very complex, and, ultimately, difficult to combine the processing of batch and real-time data, however, it is now possible to overcome such limitation if we have the possibility to change our approach.

Databricks Delta delivers a powerful transactional storage layer by harnessing the power of Apache Spark and Databricks File System (DBFS). It is a single data management tool that combines the scale of a data lake, the reliability and performance of a data warehouse, and the low latency of streaming in a single system. The core abstraction of Databricks Delta is an optimized Spark table that stores data as parquet files in DBFS and maintains a transaction log that tracks changes to the table.

From a high-level perspective, the Databricks Delta architecture can be described as followed.

image

An Azure Databricks Delta Raw table stores the data that is either produced by streaming sources or is stored in data lakes. Query tables contains the normalized data from the Raw tables. Summary tables, often used as the source for the presentation layer, contains the aggregated key business metrics that are frequently queried. This unified approach means that there are less complexity due to the removal of storage systems and data management steps, and, more importantly, output queries can be performed on streaming and historical data at the same time.

In the next steps, I’ll demonstrate how to implement the Databricks Delta architecture using a python notebook.

#If Databricks delta is not enabled in the cluster, run this cell
spark.sql("set spark.databricks.delta.preview.enabled=true")

#Define variables
basePath = "/kafka"
taxiRidesRawPath = basePath + "/taxiRidesRaw.delta"
taxiRidesQueryPath = basePath + "/taxiRidesQuery.delta"
taxiFaresQueryPath = basePath + "/taxiFaresQuery.delta"
taxiSummaryPath = basePath + "/taxiSummary.delta"
checkpointPath = basePath + "/checkpoints"

#Load the Kafka stream data to a DataFrame
kafkaDF = (spark
   .readStream
   .option("kafka.bootstrap.servers", "192.168.1.4:9092")
   .option("subscribe", "taxirides")
   .option("startingOffsets", "earliest")
   .option("checkpointLocation", "/taxinyc/kafka.checkpoint")
   .format("kafka")
   .load()
)

#Kafka transmits information using a key, value, and metadata such as topic and partition. The information we're interested in is the value column. Since this is a binary value, we must first cast it to a StringType and then split the columns.
#Stream into the Raw Databricks Delta directory. By using a checkpoint location, the metadata on which data has already been processed will be maintained so the cluster can be shut down without a loss of information.
from pyspark.sql.types import StructType, StructField,LongType,TimestampType,StringType,FloatType,IntegerType
from pyspark.sql.functions import col, split

(kafkaDF
  .select(split(col("value").cast(StringType()),",").alias("message"))
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointPath + "/taxiRidesRaw")
  .outputMode("append")
  .start(taxiRidesRawPath)
)

#Create and populate the raw delta table. Data is stored in a single column as an array Eg. ["6","START","2013-01-01 00:00:00","1970-01-01 00:00:00","-73.866135","40.771091","-73.961334","40.764912","6","2013000006","2013000006"]
spark.sql("DROP TABLE IF EXISTS TaxiRidesRaw")
          
spark.sql("""
   CREATE TABLE TaxiRidesRaw
   USING Delta
   LOCATION '{}'
""".format(taxiRidesRawPath))

#Stream into the Query Databricks delta directory. 
(spark.readStream
  .format("delta")
  .load(str(taxiRidesRawPath))
  .select(col("message")[0].cast(IntegerType()).alias("rideId"),
    col("message")[1].cast(StringType()).alias("rideStatus"),
    col("message")[2].cast(TimestampType()).alias("rideEndTime"),
    col("message")[3].cast(TimestampType()).alias("rideStartTime"),
    col("message")[4].cast(FloatType()).alias("startLong"),
    col("message")[5].cast(FloatType()).alias("startLat"),
    col("message")[6].cast(FloatType()).alias("endLong"),
    col("message")[7].cast(FloatType()).alias("endLat"),
    col("message")[8].cast(IntegerType()).alias("passengerCount"),
    col("message")[9].cast(IntegerType()).alias("taxiId"),
    col("message")[10].cast(IntegerType()).alias("driverId"))
  .filter("rideStartTime <> '1970-01-01T00:00:00.000+0000'")
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", checkpointPath + "/taxiRidesQuery")
  .start(taxiRidesQueryPath)
)

#Create and populate the quer delta table. Data is no longer in a single column
spark.sql("DROP TABLE IF EXISTS TaxiRidesQuery")
          
spark.sql("""
   CREATE TABLE TaxiRidesQuery
   USING Delta
   LOCATION '{}'
""".format(taxiRidesQueryPath))

#Load the data to a DataFrame. The parquet files are stored in a blob storage
taxiFaresDF = (spark.read
                .parquet("/mnt/geospatial/kafka/NYC")
                .write
                .format("delta")
                .mode("append")
                .save(taxiFaresQueryPath)
               )

#Create and populate the query delta table
spark.sql("DROP TABLE IF EXISTS TaxiFaresQuery")
          
spark.sql("""
   CREATE TABLE TaxiFaresQuery
   USING Delta
   LOCATION '{}'
""".format(taxiFaresQueryPath))

#Load the data to a DataFrame
taxiRidesDF = (spark
                .readStream
                .format("delta")
                .load(str(taxiRidesQueryPath))
               )

#Load the data to a DataFrame
taxiFaresDF = (spark
                .read
                .format("delta")
                .load(str(taxiFaresQueryPath))
               )

#Join the steaming data and the batch data. Group by Date and Taxi Driver to obtain the number of rides per day
from pyspark.sql.functions import date_format, col, sum

RidesDf = (taxiRidesDF.join(taxiFaresDF, (taxiRidesDF.taxiId == taxiFaresDF.taxiId) & (taxiRidesDF.driverId == taxiFaresDF.driverId))
            .withColumn("date", date_format(taxiRidesDF.rideStartTime, "yyyyMMdd"))
            .groupBy(col("date"),taxiRidesDF.driverId)
            .count()
            .withColumnRenamed("count","RidesPerDay")
            .writeStream
            .format("delta")
            .outputMode("complete")
            .option("checkpointLocation", checkpointPath + "taxiSummary")
            .start(taxiSummaryPath)
)

#Create and populate the summary delta table
spark.sql("DROP TABLE IF EXISTS TaxiSummary")
          
spark.sql("""
   CREATE TABLE TaxiSummary
   USING Delta
   LOCATION '{}'
""".format(taxiSummaryPath))

As always, if you have any questions or comments, do let me know.

Geospatial analysis in Azure Databricks – Part II

After my last post on running geospatial analysis in Azure Databricks with Magellan (here) I decided to investigate which other libraries were available and discover if they performed better or worse.

The first library I investigated was GeoMesa. an Apache licensed open source suite of tools that enables large-scale geospatial analytics on cloud and distributed computing systems, letting you manage and analyze the huge spatio-temporal datasets.

GeoMesa does this by providing spatio-temporal data persistence on top of the Accumulo, HBase, and Cassandra distributed column-oriented databases for massive storage of point, line, and polygon data. It allows rapid access to this data via queries that take full advantage of geographical properties to specify distance and area. GeoMesa also provides support for near real time stream processing of spatio-temporal data by layering spatial semantics on top of the Apache Kafka messaging system (further details here).

Although their website is rich in documentation, I immediately stumbled in the most basic operation, read a GeoJSON file with the geomesa format. The reason behind this is because, in their tutorials, they assume Apache Accumulo, a distributed key/value store, is used as the backing data store. Because I wanted to make sure I could ingest data from either Azure Blob Storage or Azure Data Lake Storage, I decided to not use their recommendation. As such, after many hours of failed attempts, I decided to abandon the idea of using GeoMesa.

My next option was GeoSpark, a cluster computing system for processing large-scale spatial data. GeoSpark extends Apache Spark / SparkSQL with a set of out-of-the-box Spatial Resilient Distributed Datasets (SRDDs)/ SpatialSQL that efficiently load, process, and analyze large-scale spatial data across machines (further details here ).

GeoSpark immediately impresses with the possibility of either creating Spatial RDDs and run spatial queries using GeoSpark-core or create Spatial SQL/DataFrame to manage spatial data using GeoSparkSQL. Their website contains tutorials that are easy to follow and offers the possibility to chat with the community on gitter.

In the spirit of trying to keep my approach as simple as possible, I decided to compare Magellan with GeoSparkSQL, since SparkSQL is easier to use and working with RDDs can be a complex task, however, it is important to highlight that their recommendation is to use GeoSpark core rather than GeoSparkSQL. The reason for this is because SparkSQL has some limitations, such as not supporting clustered indices, making it difficult to get it exposed to all GeoSpark core features.

The data used in the following test cases was based on the NYC Taxicab datasets to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.

The table below details the version of the libraries and clusters configuration. There are a couple of points to notice:

  • Magellan does not support Apache Spark 2.3+.
  • The Magellan library 1.0.6 is about to be released this month and should cover some of the limitations identified below.
  • The GeoSpark library 1.2.0 is currently available in SNAPSHOT and will hopefully fix the load of multiline GeoJSON files.

Library

Version

Runtime Version

Cluster Specification

Magellan

1.0.5

3.5 LTS (includes Apache Spark 2.2.1, Scala 2.11)

Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled

GeoSpark/ GeoSparkSQL

1.1.3 / 2.3-1.1.3

4.3 (includes Apache Spark 2.3.1, Scala 2.11)

Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled

To test the performance of both libraries, I implemented a set of queries and ran them 3 times, registering how long it took on each run. The best results are highlighted in green.

DS1 - NYC Neighbourhoods dataset containing the polygons

DS2 – NYC Taxicab dataset containing the geometry points for the month of January 2015

DS3 – NYC Taxicab dataset containing the geometry points for the year of 2015

Test Number

Description

Number of Results

Magellan

(avg in sec)

GeoSparkSQL

(avg in sec)

1

Select all rows from DS1

310

0.86

0.69

2

Select all rows from DS2

12.748.986

19.82

15.64

3

Select all rows from DS1 where borough is Manhattan

37

2.22

0.69

4

Select all rows from DS2 where total amount is bigger than 20

2.111.707

18.71

17.23

5

Select 100 rows from DS1 ordered by the distance between one point and all polygons

100

N/A*

0.8

6

Select all rows from DS1 where a single point is within all polygons

1

1.63

0.68

7

Select all rows from DS1 where one point with buffer 0.1 intersects all polygons

73

N/A*

0.80

8

Join DS1 and DS2 and select all rows where polygons contains points

12.492.678

29.17

1573.8 (~26min)

9

Join DS1 and DS2 and select all rows where points are within polygons

12.492.678

29.31

1518 (~25min)

10

Select all rows from DS3

146.113.001

187.8

155.4

11

Select all rows from DS3 where total amount is bigger than 20

29.333.130

94.8

119.4

12**

Join DS1 and DS3 and select all rows where points are within polygons

143.664.028

168

N/A*

* Although the following link mentions Magellan can perform Distance and Buffer operations, I couldn’t find documentation demonstrating how to perform them, or, in the cases I tried, Azure Databricks threw an error indicating the class was not available.

** Considering the time it took to run queries 8/9 using DS2 (~1.8GB), I decided to not test the performance against DS3 (~21.3GB), since I already knew the results were not going to be positive.

From the tests above, we can see that GeoSparkSQL is generally better when not performing joins with spatial ranges, where the performance drastically decreases when compared with Magellan. On the other hand, Magellan is still an ongoing project and seems to be lacking some of the basic operations that might be of big importance for some analysis, however, it clearly excels when we need to run spatial analysis in joined datasets.

Based on my experience using the libraries and the tests conducted in this blog, my recommendation would be to use Magellan, since even when GeoSparkSQL was better, the performance gains were not that significant, however, as already referred, Magellan might not be an option if the requirements involve operations that are not yet available, such as distances or buffers

Following is the implementation of the tests using GeoSparkSQL.

//Import Libraries and config session
import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
import org.datasyslab.geosparksql.utils.Adapter
import org.datasyslab.geosparksql.UDF.UdfRegistrator
import org.datasyslab.geosparksql.UDT.UdtRegistrator

import org.apache.spark.serializer.KryoSerializer;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.geosparksql.strategy.join.JoinQueryDetector
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

//Initiate Spark Session
var sparkSession = SparkSession.builder()
                     .appName("NYCTaxis")
                     // Enable GeoSpark custom Kryo serializer
                     .config("spark.serializer", classOf[KryoSerializer].getName)
                     .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
                     .getOrCreate()

//Register GeoSparkSQL
GeoSparkSQLRegistrator.registerAll(sparkSession)

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))

//Read data from the NYC Taxicab dataset. 
var trips = sparkSession.read
             .format("com.databricks.spark.csv")
             .option("header", "true")
             .schema(schema)
             .load("/mnt/geospatial/nyctaxis/*")

trips.createOrReplaceTempView("tripstable")

//Read GeoJSON file
var polygonJsonDF = spark.read
                     .option("multiline", "true")
                     .json("/mnt/geospatial/neighborhoods/neighborhoods.geojson")

//GeoSparkSQL can't read multiline GeoJSON files. This workaround will only work if the file only contains one geometry type (eg. polygons) 
val polygons = polygonJsonDF
                 .select(explode(col("features")).as("feature"))
                 .withColumn("polygon", callUDF("ST_GeomFromGeoJson", to_json(col("feature"))))
                 .select($"polygon", $"feature.properties.borough", $"feature.properties.boroughCode", $"feature.properties.neighborhood")

polygons.createOrReplaceTempView("polygontable")

//Test 1
var polygonAll = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
         """)

polygonAll.count()

//Test 2
var tripsAll = sparkSession.sql(
         """
           | SELECT *
           | FROM tripstable
         """)

tripsAll.count()

//Test 3
var polygonWhere = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE borough = 'Manhattan'
         """)

polygonWhere.count()

//Test 4
var tripsWhere = sparkSession.sql(
         """
           | SELECT *
           | FROM tripstable
           | WHERE total_amount > 20
         """)

tripsWhere.count()

//Test 5
var polygonGeomDistance = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | ORDER BY ST_Distance(polygon, ST_PointFromText('-74.00672149658203, 40.73177719116211', ','))
           | LIMIT 100
         """)
polygonGeomDistance.count()

//Test 6
var polygonGeomWithin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE ST_Within(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','), polygon)
         """)
polygonGeomWithin.show()

//Test 7
var polygonGeomInterset = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE ST_Intersects(ST_Circle(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','),0.1), polygon)
         """)
polygonGeomInterset.count()

//Test 8
var polygonContainsJoin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable, tripstable
           | WHERE ST_Contains(polygontable.polygon, ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))))
         """)
polygonContainsJoin.count()

//Test 9
var polygonWithinJoin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable, tripstable
           | WHERE ST_Within(ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))), polygontable.polygon)
         """)

polygonWithinJoin.count()

Following is the implementation of the tests using Magellan.

//Import Libraries
import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._

import org.apache.spark.sql.Row import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))

//Read data from the NYC Taxicab dataset and create a Magellan point 
val trips = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("mode", "DROPMALFORMED")
       .schema(schema)
       .load("/mnt/geospatial/nyctaxis/*")
       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))

//Read GeoJSON file and define index precision
val neighborhoods = sqlContext.read
       .format("magellan")
       .option("type", "geojson")
       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")
       .select($"polygon",
              $"metadata"("borough").as("borough"),
              $"metadata"("boroughCode").as("boroughCode"),
              $"metadata"("neighborhood").as("neighborhood"))
       .index(30)

//Test 1
magellan.Utils.injectRules(spark)

neighborhoods.count()

//Test 2
trips.count()

//Test 3
neighborhoods.filter("borough == 'Manhattan'").count()

//Test 4
trips.filter("total_amount > 20").count()

//Test 6
val points = sc.parallelize(Seq((-74.00672149658203, 40.73177719116211))).toDF("x", "y").select(point($"x", $"y").as("point"))
val polygons =neighborhoods.join(points)

polygons.filter($"point" within $"polygon").count()

//Test 8

trips.join(neighborhoods)
         .where($"polygon" >? $"point")
         .count()

//Test 9

trips.join(neighborhoods)
         .where($"point" within $"polygon")
         .count()

Geospatial analysis with Azure Databricks

A few months ago, I wrote a blog demonstrating how to extract and analyse geospatial data in Azure Data Lake Analytics (ADLA) (here). The article aimed to prove that it was possible to run spatial analysis using U-SQL, even though it does not natively support spatial data analytics. The outcome of that experience was positive, however, with serious limitations in terms of performance. ADLA dynamically provisions resources and can perform analytics on terabytes to petabytes of data, however, because it must use the SQL Server Data Types and Spatial assemblies to perform spatial analysis, all the parallelism capabilities are suddenly limited. For example, if you are running an aggregation, ADLA will split the processing between multiple vertices, making it faster, however, when running intersections between points and polygons, because it is a SQL threaded operation, it will only use one vertex, and consequently, the job might take hours to complete.

Since I last wrote my blog, the data analytics landscape has changed, and with that, new options became available, namely Azure Databricks. In this blog, I’ll demonstrate how to run spatial analysis and export the results to a mounted point using the Magellan library and Azure Databricks.

Magellan is a distributed execution engine for geospatial analytics on big data. It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries (further details here).

Although people mentioned in their GitHub page that the 1.0.5 Magellan library is available for Apache Spark 2.3+ clusters, I learned through a very difficult process that the only way to make it work in Azure Databricks is if you have an Apache Spark 2.2.1 cluster with Scala 2.11. The cluster I used for this experience consisted of a Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled.

In terms of datasets, I used the NYC Taxicab dataset to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON dataset to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.

As always, first we need to import the libraries.

//Import Libraries
import magellan._
import org.apache.spark.sql.magellan.dsl.expressions._

import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._

Next, we define the schema of the NYC Taxicab dataset and load the data to a DataFrame. While loading the data, we convert the pickup longitude and latitude into a Magellan Point. If there was a need, in the same operation we could also add another Magellan Point from the drop off longitude and latitude.

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))
//Read data from the NYC Taxicab dataset and create a Magellan point 
val trips = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("mode", "DROPMALFORMED")
       .schema(schema)
       .load("/mnt/geospatial/nyctaxis/*")
       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))

The next step is to load the neighbourhood data. As mentioned in their documentation, Magellan supports the reading of ESRI, GeoJSON, OSM-XML and WKT formats. From the GeoJSON dataset, Magellan will extract a collection of polygons and read the metadata into a Map.

There are three things to notice in the code below. First, the extraction of the polygon, second, the selection of the key corresponding to the neighbourhood name and finally, the provision of a hint that defines what the index precision should be. This operation, alongside with the injection of a spatial join rule into Catalyst, massively increases the performance of the queries. To have a better understanding of this operation, read this excellent blog.

//Read GeoJSON file and define index precision
val neighborhoods = sqlContext.read
       .format("magellan")
       .option("type", "geojson")
       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")
       .select($"polygon",
        $"metadata"("neighborhood").as("neighborhood"))
       .index(30)

Now that we have our two datasets loaded, we can run our geospatial query, to identify in which neighbourhood the pickup points fall under. To achieve our goal, we need to join the two DataFrames and apply a within predicate. As a curiosity, if we consider that m represents the number of points (12.748.987), n the number of polygons (310) p the average # of edges per polygon (104) and O(mnp), then, we will roughly perform 4 trillion calculations on a single node to determine where each point falls.

//Inject rules and join DataFrames with within predicate
magellan.Utils.injectRules(spark)


val intersected = trips.join(neighborhoods)
         .where($"point" within $"polygon")

The above code, does not take longer than 1 second to execute. It is only when we want to obtain details about our DataFrame that the computing time is visible. For example, if we want to know which state has the most pickups, we can write the following code which takes in average 40 seconds.

//Neighbourhoods that received the most pickups
display(intersected 
       .groupBy('neighborhood)
       .count()
       .orderBy($"count".desc))

If we want to save the data and identify which pickups fall inside the NYC neighbourhoods, then we have to rewrite our intersected DataFrame to select all columns except the Magellan Points and Polygons, add a new column to the DataFrame and export the data back to the blob, as shown below.

//select pickup points that don't fall inside a neighbourhood
val nonIntersected = trips
                       .select($"vendorId",$"pickup_datetime", $"dropoff_datetime", $"passenger_count", $"trip_distance", $"pickup_longitude", $"pickup_latitude", $"rateCodeId", $"store_fwd",$"dropoff_longitude", $"dropoff_latitude",$"payment_type",$"fare_amount", $"extra", $"mta_tax", $"tip_amount", $"tolls_amount", $"improvement_surcharge", $"total_amount")
                       .except(intersected)
//add new column intersected_flag
val intersectedFlag = "1"
val nonIntersectedFlag = "0"
val tripsIntersected = intersected.withColumn("intersected_flag",expr(intersectedFlag))
val tripsNotIntersected = nonIntersected.withColumn("intersected_flag",expr(nonIntersectedFlag))
//Union DataFrames
val allTrips = tripsNotIntersected.union(tripsIntersected)
//Save data to the blob
intersected.write
   .format("com.databricks.spark.csv")
   .option("header", "true")
   .save("/mnt/geospatial/trips/trips.csv")

In summary, reading a dataset with 1.8GB, apply geospatial analysis and export it back to the blob storage only took in average 1 min, which is miles better when compared with my previous attempt with U-SQL.

As always, if you have any comments or questions, do let me know.