Adatis

Adatis BI Blogs

Geospatial analysis in Azure Databricks – Part II

After my last post on running geospatial analysis in Azure Databricks with Magellan (here) I decided to investigate which other libraries were available and discover if they performed better or worse. The first library I investigated was GeoMesa. an Apache licensed open source suite of tools that enables large-scale geospatial analytics on cloud and distributed computing systems, letting you manage and analyze the huge spatio-temporal datasets. GeoMesa does this by providing spatio-temporal data persistence on top of the Accumulo, HBase, and Cassandra distributed column-oriented databases for massive storage of point, line, and polygon data. It allows rapid access to this data via queries that take full advantage of geographical properties to specify distance and area. GeoMesa also provides support for near real time stream processing of spatio-temporal data by layering spatial semantics on top of the Apache Kafka messaging system (further details here).Although their website is rich in documentation, I immediately stumbled in the most basic operation, read a GeoJSON file with the geomesa format. The reason behind this is because, in their tutorials, they assume Apache Accumulo, a distributed key/value store, is used as the backing data store. Because I wanted to make sure I could ingest data from either Azure Blob Storage or Azure Data Lake Storage, I decided to not use their recommendation. As such, after many hours of failed attempts, I decided to abandon the idea of using GeoMesa.My next option was GeoSpark, a cluster computing system for processing large-scale spatial data. GeoSpark extends Apache Spark / SparkSQL with a set of out-of-the-box Spatial Resilient Distributed Datasets (SRDDs)/ SpatialSQL that efficiently load, process, and analyze large-scale spatial data across machines (further details here ). GeoSpark immediately impresses with the possibility of either creating Spatial RDDs and run spatial queries using GeoSpark-core or create Spatial SQL/DataFrame to manage spatial data using GeoSparkSQL. Their website contains tutorials that are easy to follow and offers the possibility to chat with the community on gitter.In the spirit of trying to keep my approach as simple as possible, I decided to compare Magellan with GeoSparkSQL, since SparkSQL is easier to use and working with RDDs can be a complex task, however, it is important to highlight that their recommendation is to use GeoSpark core rather than GeoSparkSQL. The reason for this is because SparkSQL has some limitations, such as not supporting clustered indices, making it difficult to get it exposed to all GeoSpark core features.The data used in the following test cases was based on the NYC Taxicab datasets to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.The table below details the version of the libraries and clusters configuration. There are a couple of points to notice: Magellan does not support Apache Spark 2.3+.The Magellan library 1.0.6 is about to be released this month and should cover some of the limitations identified below.The GeoSpark library 1.2.0 is currently available in SNAPSHOT and will hopefully fix the load of multiline GeoJSON files. Library Version Runtime Version Cluster Specification Magellan 1.0.5 3.5 LTS (includes Apache Spark 2.2.1, Scala 2.11) Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled GeoSpark/ GeoSparkSQL 1.1.3 / 2.3-1.1.3 4.3 (includes Apache Spark 2.3.1, Scala 2.11) Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabledTo test the performance of both libraries, I implemented a set of queries and ran them 3 times, registering how long it took on each run. The best results are highlighted in green.DS1 - NYC Neighbourhoods dataset containing the polygonsDS2 – NYC Taxicab dataset containing the geometry points for the month of January 2015DS3 – NYC Taxicab dataset containing the geometry points for the year of 2015Test NumberDescriptionNumber of ResultsMagellan (avg in sec)GeoSparkSQL (avg in sec)1Select all rows from DS13100.860.692Select all rows from DS212.748.98619.8215.643Select all rows from DS1 where borough is Manhattan372.220.694Select all rows from DS2 where total amount is bigger than 202.111.70718.7117.235Select 100 rows from DS1 ordered by the distance between one point and all polygons100N/A*0.86Select all rows from DS1 where a single point is within all polygons11.630.687Select all rows from DS1 where one point with buffer 0.1 intersects all polygons73N/A*0.808Join DS1 and DS2 and select all rows where polygons contains points12.492.67829.171573.8 (~26min)9Join DS1 and DS2 and select all rows where points are within polygons12.492.67829.311518 (~25min)10Select all rows from DS3146.113.001187.8155.411Select all rows from DS3 where total amount is bigger than 2029.333.13094.8119.412**Join DS1 and DS3 and select all rows where points are within polygons143.664.028168N/A** Although the following link mentions Magellan can perform Distance and Buffer operations, I couldn’t find documentation demonstrating how to perform them, or, in the cases I tried, Azure Databricks threw an error indicating the class was not available.** Considering the time it took to run queries 8/9 using DS2 (~1.8GB), I decided to not test the performance against DS3 (~21.3GB), since I already knew the results were not going to be positive.From the tests above, we can see that GeoSparkSQL is generally better when not performing joins with spatial ranges, where the performance drastically decreases when compared with Magellan. On the other hand, Magellan is still an ongoing project and seems to be lacking some of the basic operations that might be of big importance for some analysis, however, it clearly excels when we need to run spatial analysis in joined datasets.Based on my experience using the libraries and the tests conducted in this blog, my recommendation would be to use Magellan, since even when GeoSparkSQL was better, the performance gains were not that significant, however, as already referred, Magellan might not be an option if the requirements involve operations that are not yet available, such as distances or buffers Following is the implementation of the tests using GeoSparkSQL.//Import Libraries and config session import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistratorimport org.datasyslab.geosparksql.utils.Adapterimport org.datasyslab.geosparksql.UDF.UdfRegistratorimport org.datasyslab.geosparksql.UDT.UdtRegistrator import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.geosparksql.strategy.join.JoinQueryDetectorimport org.apache.spark.sql.Rowimport org.apache.spark.sql.DataFrameimport org.apache.spark.sql.functions._import org.apache.spark.sql.types._//Initiate Spark Session var sparkSession = SparkSession.builder()                     .appName("NYCTaxis")                     // Enable GeoSpark custom Kryo serializer                     .config("spark.serializer", classOf[KryoSerializer].getName)                     .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)                     .getOrCreate() //Register GeoSparkSQL GeoSparkSQLRegistrator.registerAll(sparkSession)//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset. var trips = sparkSession.read             .format("com.databricks.spark.csv")             .option("header", "true")             .schema(schema)             .load("/mnt/geospatial/nyctaxis/*") trips.createOrReplaceTempView("tripstable")//Read GeoJSON file var polygonJsonDF = spark.read                     .option("multiline", "true")                     .json("/mnt/geospatial/neighborhoods/neighborhoods.geojson") //GeoSparkSQL can't read multiline GeoJSON files. This workaround will only work if the file only contains one geometry type (eg. polygons)  val polygons = polygonJsonDF                 .select(explode(col("features")).as("feature"))                 .withColumn("polygon", callUDF("ST_GeomFromGeoJson", to_json(col("feature"))))                 .select($"polygon", $"feature.properties.borough", $"feature.properties.boroughCode", $"feature.properties.neighborhood") polygons.createOrReplaceTempView("polygontable")//Test 1 var polygonAll = sparkSession.sql(         """           | SELECT *           | FROM polygontable         """) polygonAll.count()//Test 2 var tripsAll = sparkSession.sql(         """           | SELECT *           | FROM tripstable         """) tripsAll.count()//Test 3 var polygonWhere = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE borough = 'Manhattan'         """) polygonWhere.count()//Test 4 var tripsWhere = sparkSession.sql(         """           | SELECT *           | FROM tripstable           | WHERE total_amount > 20         """) tripsWhere.count()//Test 5 var polygonGeomDistance = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | ORDER BY ST_Distance(polygon, ST_PointFromText('-74.00672149658203, 40.73177719116211', ','))           | LIMIT 100         """) polygonGeomDistance.count()//Test 6 var polygonGeomWithin = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE ST_Within(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','), polygon)         """) polygonGeomWithin.show() //Test 7 var polygonGeomInterset = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE ST_Intersects(ST_Circle(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','),0.1), polygon)         """) polygonGeomInterset.count() //Test 8 var polygonContainsJoin = sparkSession.sql(         """           | SELECT *           | FROM polygontable, tripstable           | WHERE ST_Contains(polygontable.polygon, ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))))         """) polygonContainsJoin.count() //Test 9 var polygonWithinJoin = sparkSession.sql(         """           | SELECT *           | FROM polygontable, tripstable           | WHERE ST_Within(ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))), polygontable.polygon)         """) polygonWithinJoin.count() Following is the implementation of the tests using Magellan.//Import Libraries import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._ import org.apache.spark.sql.Row import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset and create a Magellan point val trips = sqlContext.read       .format("com.databricks.spark.csv")       .option("mode", "DROPMALFORMED")       .schema(schema)       .load("/mnt/geospatial/nyctaxis/*")       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))//Read GeoJSON file and define index precision val neighborhoods = sqlContext.read       .format("magellan")       .option("type", "geojson")       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")       .select($"polygon",               $"metadata"("borough").as("borough"),              $"metadata"("boroughCode").as("boroughCode"),              $"metadata"("neighborhood").as("neighborhood"))       .index(30)//Test 1 magellan.Utils.injectRules(spark) neighborhoods.count()//Test 2 trips.count()//Test 3 neighborhoods.filter("borough == 'Manhattan'").count()//Test 4 trips.filter("total_amount > 20").count()//Test 6 val points = sc.parallelize(Seq((-74.00672149658203, 40.73177719116211))).toDF("x", "y").select(point($"x", $"y").as("point")) val polygons =neighborhoods.join(points) polygons.filter($"point" within $"polygon").count()//Test 8 trips.join(neighborhoods)         .where($"polygon" >? $"point")         .count()//Test 9 trips.join(neighborhoods)         .where($"point" within $"polygon")         .count()

Geospatial analysis with Azure Databricks

A few months ago, I wrote a blog demonstrating how to extract and analyse geospatial data in Azure Data Lake Analytics (ADLA) (here). The article aimed to prove that it was possible to run spatial analysis using U-SQL, even though it does not natively support spatial data analytics. The outcome of that experience was positive, however, with serious limitations in terms of performance. ADLA dynamically provisions resources and can perform analytics on terabytes to petabytes of data, however, because it must use the SQL Server Data Types and Spatial assemblies to perform spatial analysis, all the parallelism capabilities are suddenly limited. For example, if you are running an aggregation, ADLA will split the processing between multiple vertices, making it faster, however, when running intersections between points and polygons, because it is a SQL threaded operation, it will only use one vertex, and consequently, the job might take hours to complete. Since I last wrote my blog, the data analytics landscape has changed, and with that, new options became available, namely Azure Databricks. In this blog, I’ll demonstrate how to run spatial analysis and export the results to a mounted point using the Magellan library and Azure Databricks.Magellan is a distributed execution engine for geospatial analytics on big data. It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries (further details here).Although people mentioned in their GitHub page that the 1.0.5 Magellan library is available for Apache Spark 2.3+ clusters, I learned through a very difficult process that the only way to make it work in Azure Databricks is if you have an Apache Spark 2.2.1 cluster with Scala 2.11. The cluster I used for this experience consisted of a Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled. In terms of datasets, I used the NYC Taxicab dataset to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON dataset to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.As always, first we need to import the libraries.//Import Libraries import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._ import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._Next, we define the schema of the NYC Taxicab dataset and load the data to a DataFrame. While loading the data, we convert the pickup longitude and latitude into a Magellan Point. If there was a need, in the same operation we could also add another Magellan Point from the drop off longitude and latitude.//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset and create a Magellan point val trips = sqlContext.read       .format("com.databricks.spark.csv")       .option("mode", "DROPMALFORMED")       .schema(schema)       .load("/mnt/geospatial/nyctaxis/*")       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))The next step is to load the neighbourhood data. As mentioned in their documentation, Magellan supports the reading of ESRI, GeoJSON, OSM-XML and WKT formats. From the GeoJSON dataset, Magellan will extract a collection of polygons and read the metadata into a Map. There are three things to notice in the code below. First, the extraction of the polygon, second, the selection of the key corresponding to the neighbourhood name and finally, the provision of a hint that defines what the index precision should be. This operation, alongside with the injection of a spatial join rule into Catalyst, massively increases the performance of the queries. To have a better understanding of this operation, read this excellent blog.//Read GeoJSON file and define index precision val neighborhoods = sqlContext.read       .format("magellan")       .option("type", "geojson")       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")       .select($"polygon",         $"metadata"("neighborhood").as("neighborhood"))       .index(30)Now that we have our two datasets loaded, we can run our geospatial query, to identify in which neighbourhood the pickup points fall under. To achieve our goal, we need to join the two DataFrames and apply a within predicate. As a curiosity, if we consider that m represents the number of points (12.748.987), n the number of polygons (310) p the average # of edges per polygon (104) and O(mnp), then, we will roughly perform 4 trillion calculations on a single node to determine where each point falls. //Inject rules and join DataFrames with within predicate magellan.Utils.injectRules(spark) val intersected = trips.join(neighborhoods)         .where($"point" within $"polygon")The above code, does not take longer than 1 second to execute. It is only when we want to obtain details about our DataFrame that the computing time is visible. For example, if we want to know which state has the most pickups, we can write the following code which takes in average 40 seconds.//Neighbourhoods that received the most pickups display(intersected        .groupBy('neighborhood)       .count()       .orderBy($"count".desc))If we want to save the data and identify which pickups fall inside the NYC neighbourhoods, then we have to rewrite our intersected DataFrame to select all columns except the Magellan Points and Polygons, add a new column to the DataFrame and export the data back to the blob, as shown below.//select pickup points that don't fall inside a neighbourhood val nonIntersected = trips                       .select($"vendorId",$"pickup_datetime", $"dropoff_datetime", $"passenger_count", $"trip_distance", $"pickup_longitude", $"pickup_latitude", $"rateCodeId", $"store_fwd",$"dropoff_longitude", $"dropoff_latitude",$"payment_type",$"fare_amount", $"extra", $"mta_tax", $"tip_amount", $"tolls_amount", $"improvement_surcharge", $"total_amount")                       .except(intersected)//add new column intersected_flagval intersectedFlag = "1"val nonIntersectedFlag = "0"val tripsIntersected = intersected.withColumn("intersected_flag",expr(intersectedFlag)) val tripsNotIntersected = nonIntersected.withColumn("intersected_flag",expr(nonIntersectedFlag))//Union DataFramesval allTrips = tripsNotIntersected.union(tripsIntersected)//Save data to the blobintersected.write   .format("com.databricks.spark.csv")   .option("header", "true")   .save("/mnt/geospatial/trips/trips.csv")In summary, reading a dataset with 1.8GB, apply geospatial analysis and export it back to the blob storage only took in average 1 min, which is miles better when compared with my previous attempt with U-SQL.As always, if you have any comments or questions, do let me know.