Jose Mendes

Jose Mendes' Blog

Geospatial analysis in Azure Databricks – Part II

After my last post on running geospatial analysis in Azure Databricks with Magellan (here) I decided to investigate which other libraries were available and discover if they performed better or worse.

The first library I investigated was GeoMesa. an Apache licensed open source suite of tools that enables large-scale geospatial analytics on cloud and distributed computing systems, letting you manage and analyze the huge spatio-temporal datasets.

GeoMesa does this by providing spatio-temporal data persistence on top of the Accumulo, HBase, and Cassandra distributed column-oriented databases for massive storage of point, line, and polygon data. It allows rapid access to this data via queries that take full advantage of geographical properties to specify distance and area. GeoMesa also provides support for near real time stream processing of spatio-temporal data by layering spatial semantics on top of the Apache Kafka messaging system (further details here).

Although their website is rich in documentation, I immediately stumbled in the most basic operation, read a GeoJSON file with the geomesa format. The reason behind this is because, in their tutorials, they assume Apache Accumulo, a distributed key/value store, is used as the backing data store. Because I wanted to make sure I could ingest data from either Azure Blob Storage or Azure Data Lake Storage, I decided to not use their recommendation. As such, after many hours of failed attempts, I decided to abandon the idea of using GeoMesa.

My next option was GeoSpark, a cluster computing system for processing large-scale spatial data. GeoSpark extends Apache Spark / SparkSQL with a set of out-of-the-box Spatial Resilient Distributed Datasets (SRDDs)/ SpatialSQL that efficiently load, process, and analyze large-scale spatial data across machines (further details here ).

GeoSpark immediately impresses with the possibility of either creating Spatial RDDs and run spatial queries using GeoSpark-core or create Spatial SQL/DataFrame to manage spatial data using GeoSparkSQL. Their website contains tutorials that are easy to follow and offers the possibility to chat with the community on gitter.

In the spirit of trying to keep my approach as simple as possible, I decided to compare Magellan with GeoSparkSQL, since SparkSQL is easier to use and working with RDDs can be a complex task, however, it is important to highlight that their recommendation is to use GeoSpark core rather than GeoSparkSQL. The reason for this is because SparkSQL has some limitations, such as not supporting clustered indices, making it difficult to get it exposed to all GeoSpark core features.

The data used in the following test cases was based on the NYC Taxicab datasets to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.

The table below details the version of the libraries and clusters configuration. There are a couple of points to notice:

  • Magellan does not support Apache Spark 2.3+.
  • The Magellan library 1.0.6 is about to be released this month and should cover some of the limitations identified below.
  • The GeoSpark library 1.2.0 is currently available in SNAPSHOT and will hopefully fix the load of multiline GeoJSON files.

Library

Version

Runtime Version

Cluster Specification

Magellan

1.0.5

3.5 LTS (includes Apache Spark 2.2.1, Scala 2.11)

Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled

GeoSpark/ GeoSparkSQL

1.1.3 / 2.3-1.1.3

4.3 (includes Apache Spark 2.3.1, Scala 2.11)

Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled

To test the performance of both libraries, I implemented a set of queries and ran them 3 times, registering how long it took on each run. The best results are highlighted in green.

DS1 - NYC Neighbourhoods dataset containing the polygons

DS2 – NYC Taxicab dataset containing the geometry points for the month of January 2015

DS3 – NYC Taxicab dataset containing the geometry points for the year of 2015

Test Number

Description

Number of Results

Magellan

(avg in sec)

GeoSparkSQL

(avg in sec)

1

Select all rows from DS1

310

0.86

0.69

2

Select all rows from DS2

12.748.986

19.82

15.64

3

Select all rows from DS1 where borough is Manhattan

37

2.22

0.69

4

Select all rows from DS2 where total amount is bigger than 20

2.111.707

18.71

17.23

5

Select 100 rows from DS1 ordered by the distance between one point and all polygons

100

N/A*

0.8

6

Select all rows from DS1 where a single point is within all polygons

1

1.63

0.68

7

Select all rows from DS1 where one point with buffer 0.1 intersects all polygons

73

N/A*

0.80

8

Join DS1 and DS2 and select all rows where polygons contains points

12.492.678

29.17

1573.8 (~26min)

9

Join DS1 and DS2 and select all rows where points are within polygons

12.492.678

29.31

1518 (~25min)

10

Select all rows from DS3

146.113.001

187.8

155.4

11

Select all rows from DS3 where total amount is bigger than 20

29.333.130

94.8

119.4

12**

Join DS1 and DS3 and select all rows where points are within polygons

143.664.028

168

N/A*

* Although the following link mentions Magellan can perform Distance and Buffer operations, I couldn’t find documentation demonstrating how to perform them, or, in the cases I tried, Azure Databricks threw an error indicating the class was not available.

** Considering the time it took to run queries 8/9 using DS2 (~1.8GB), I decided to not test the performance against DS3 (~21.3GB), since I already knew the results were not going to be positive.

From the tests above, we can see that GeoSparkSQL is generally better when not performing joins with spatial ranges, where the performance drastically decreases when compared with Magellan. On the other hand, Magellan is still an ongoing project and seems to be lacking some of the basic operations that might be of big importance for some analysis, however, it clearly excels when we need to run spatial analysis in joined datasets.

Based on my experience using the libraries and the tests conducted in this blog, my recommendation would be to use Magellan, since even when GeoSparkSQL was better, the performance gains were not that significant, however, as already referred, Magellan might not be an option if the requirements involve operations that are not yet available, such as distances or buffers

Following is the implementation of the tests using GeoSparkSQL.

//Import Libraries and config session
import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
import org.datasyslab.geosparksql.utils.Adapter
import org.datasyslab.geosparksql.UDF.UdfRegistrator
import org.datasyslab.geosparksql.UDT.UdtRegistrator

import org.apache.spark.serializer.KryoSerializer;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.geosparksql.strategy.join.JoinQueryDetector
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

//Initiate Spark Session
var sparkSession = SparkSession.builder()
                     .appName("NYCTaxis")
                     // Enable GeoSpark custom Kryo serializer
                     .config("spark.serializer", classOf[KryoSerializer].getName)
                     .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
                     .getOrCreate()

//Register GeoSparkSQL
GeoSparkSQLRegistrator.registerAll(sparkSession)

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))

//Read data from the NYC Taxicab dataset. 
var trips = sparkSession.read
             .format("com.databricks.spark.csv")
             .option("header", "true")
             .schema(schema)
             .load("/mnt/geospatial/nyctaxis/*")

trips.createOrReplaceTempView("tripstable")

//Read GeoJSON file
var polygonJsonDF = spark.read
                     .option("multiline", "true")
                     .json("/mnt/geospatial/neighborhoods/neighborhoods.geojson")

//GeoSparkSQL can't read multiline GeoJSON files. This workaround will only work if the file only contains one geometry type (eg. polygons) 
val polygons = polygonJsonDF
                 .select(explode(col("features")).as("feature"))
                 .withColumn("polygon", callUDF("ST_GeomFromGeoJson", to_json(col("feature"))))
                 .select($"polygon", $"feature.properties.borough", $"feature.properties.boroughCode", $"feature.properties.neighborhood")

polygons.createOrReplaceTempView("polygontable")

//Test 1
var polygonAll = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
         """)

polygonAll.count()

//Test 2
var tripsAll = sparkSession.sql(
         """
           | SELECT *
           | FROM tripstable
         """)

tripsAll.count()

//Test 3
var polygonWhere = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE borough = 'Manhattan'
         """)

polygonWhere.count()

//Test 4
var tripsWhere = sparkSession.sql(
         """
           | SELECT *
           | FROM tripstable
           | WHERE total_amount > 20
         """)

tripsWhere.count()

//Test 5
var polygonGeomDistance = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | ORDER BY ST_Distance(polygon, ST_PointFromText('-74.00672149658203, 40.73177719116211', ','))
           | LIMIT 100
         """)
polygonGeomDistance.count()

//Test 6
var polygonGeomWithin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE ST_Within(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','), polygon)
         """)
polygonGeomWithin.show()

//Test 7
var polygonGeomInterset = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE ST_Intersects(ST_Circle(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','),0.1), polygon)
         """)
polygonGeomInterset.count()

//Test 8
var polygonContainsJoin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable, tripstable
           | WHERE ST_Contains(polygontable.polygon, ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))))
         """)
polygonContainsJoin.count()

//Test 9
var polygonWithinJoin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable, tripstable
           | WHERE ST_Within(ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))), polygontable.polygon)
         """)

polygonWithinJoin.count()

Following is the implementation of the tests using Magellan.

//Import Libraries
import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._

import org.apache.spark.sql.Row import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))

//Read data from the NYC Taxicab dataset and create a Magellan point 
val trips = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("mode", "DROPMALFORMED")
       .schema(schema)
       .load("/mnt/geospatial/nyctaxis/*")
       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))

//Read GeoJSON file and define index precision
val neighborhoods = sqlContext.read
       .format("magellan")
       .option("type", "geojson")
       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")
       .select($"polygon",
              $"metadata"("borough").as("borough"),
              $"metadata"("boroughCode").as("boroughCode"),
              $"metadata"("neighborhood").as("neighborhood"))
       .index(30)

//Test 1
magellan.Utils.injectRules(spark)

neighborhoods.count()

//Test 2
trips.count()

//Test 3
neighborhoods.filter("borough == 'Manhattan'").count()

//Test 4
trips.filter("total_amount > 20").count()

//Test 6
val points = sc.parallelize(Seq((-74.00672149658203, 40.73177719116211))).toDF("x", "y").select(point($"x", $"y").as("point"))
val polygons =neighborhoods.join(points)

polygons.filter($"point" within $"polygon").count()

//Test 8

trips.join(neighborhoods)
         .where($"polygon" >? $"point")
         .count()

//Test 9

trips.join(neighborhoods)
         .where($"point" within $"polygon")
         .count()

Loading