Adatis

Adatis BI Blogs

Geospatial analysis in Azure Databricks – Part II

After my last post on running geospatial analysis in Azure Databricks with Magellan (here) I decided to investigate which other libraries were available and discover if they performed better or worse. The first library I investigated was GeoMesa. an Apache licensed open source suite of tools that enables large-scale geospatial analytics on cloud and distributed computing systems, letting you manage and analyze the huge spatio-temporal datasets. GeoMesa does this by providing spatio-temporal data persistence on top of the Accumulo, HBase, and Cassandra distributed column-oriented databases for massive storage of point, line, and polygon data. It allows rapid access to this data via queries that take full advantage of geographical properties to specify distance and area. GeoMesa also provides support for near real time stream processing of spatio-temporal data by layering spatial semantics on top of the Apache Kafka messaging system (further details here).Although their website is rich in documentation, I immediately stumbled in the most basic operation, read a GeoJSON file with the geomesa format. The reason behind this is because, in their tutorials, they assume Apache Accumulo, a distributed key/value store, is used as the backing data store. Because I wanted to make sure I could ingest data from either Azure Blob Storage or Azure Data Lake Storage, I decided to not use their recommendation. As such, after many hours of failed attempts, I decided to abandon the idea of using GeoMesa.My next option was GeoSpark, a cluster computing system for processing large-scale spatial data. GeoSpark extends Apache Spark / SparkSQL with a set of out-of-the-box Spatial Resilient Distributed Datasets (SRDDs)/ SpatialSQL that efficiently load, process, and analyze large-scale spatial data across machines (further details here ). GeoSpark immediately impresses with the possibility of either creating Spatial RDDs and run spatial queries using GeoSpark-core or create Spatial SQL/DataFrame to manage spatial data using GeoSparkSQL. Their website contains tutorials that are easy to follow and offers the possibility to chat with the community on gitter.In the spirit of trying to keep my approach as simple as possible, I decided to compare Magellan with GeoSparkSQL, since SparkSQL is easier to use and working with RDDs can be a complex task, however, it is important to highlight that their recommendation is to use GeoSpark core rather than GeoSparkSQL. The reason for this is because SparkSQL has some limitations, such as not supporting clustered indices, making it difficult to get it exposed to all GeoSpark core features.The data used in the following test cases was based on the NYC Taxicab datasets to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.The table below details the version of the libraries and clusters configuration. There are a couple of points to notice: Magellan does not support Apache Spark 2.3+.The Magellan library 1.0.6 is about to be released this month and should cover some of the limitations identified below.The GeoSpark library 1.2.0 is currently available in SNAPSHOT and will hopefully fix the load of multiline GeoJSON files. Library Version Runtime Version Cluster Specification Magellan 1.0.5 3.5 LTS (includes Apache Spark 2.2.1, Scala 2.11) Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled GeoSpark/ GeoSparkSQL 1.1.3 / 2.3-1.1.3 4.3 (includes Apache Spark 2.3.1, Scala 2.11) Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabledTo test the performance of both libraries, I implemented a set of queries and ran them 3 times, registering how long it took on each run. The best results are highlighted in green.DS1 - NYC Neighbourhoods dataset containing the polygonsDS2 – NYC Taxicab dataset containing the geometry points for the month of January 2015DS3 – NYC Taxicab dataset containing the geometry points for the year of 2015Test NumberDescriptionNumber of ResultsMagellan (avg in sec)GeoSparkSQL (avg in sec)1Select all rows from DS13100.860.692Select all rows from DS212.748.98619.8215.643Select all rows from DS1 where borough is Manhattan372.220.694Select all rows from DS2 where total amount is bigger than 202.111.70718.7117.235Select 100 rows from DS1 ordered by the distance between one point and all polygons100N/A*0.86Select all rows from DS1 where a single point is within all polygons11.630.687Select all rows from DS1 where one point with buffer 0.1 intersects all polygons73N/A*0.808Join DS1 and DS2 and select all rows where polygons contains points12.492.67829.171573.8 (~26min)9Join DS1 and DS2 and select all rows where points are within polygons12.492.67829.311518 (~25min)10Select all rows from DS3146.113.001187.8155.411Select all rows from DS3 where total amount is bigger than 2029.333.13094.8119.412**Join DS1 and DS3 and select all rows where points are within polygons143.664.028168N/A** Although the following link mentions Magellan can perform Distance and Buffer operations, I couldn’t find documentation demonstrating how to perform them, or, in the cases I tried, Azure Databricks threw an error indicating the class was not available.** Considering the time it took to run queries 8/9 using DS2 (~1.8GB), I decided to not test the performance against DS3 (~21.3GB), since I already knew the results were not going to be positive.From the tests above, we can see that GeoSparkSQL is generally better when not performing joins with spatial ranges, where the performance drastically decreases when compared with Magellan. On the other hand, Magellan is still an ongoing project and seems to be lacking some of the basic operations that might be of big importance for some analysis, however, it clearly excels when we need to run spatial analysis in joined datasets.Based on my experience using the libraries and the tests conducted in this blog, my recommendation would be to use Magellan, since even when GeoSparkSQL was better, the performance gains were not that significant, however, as already referred, Magellan might not be an option if the requirements involve operations that are not yet available, such as distances or buffers Following is the implementation of the tests using GeoSparkSQL.//Import Libraries and config session import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistratorimport org.datasyslab.geosparksql.utils.Adapterimport org.datasyslab.geosparksql.UDF.UdfRegistratorimport org.datasyslab.geosparksql.UDT.UdtRegistrator import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.geosparksql.strategy.join.JoinQueryDetectorimport org.apache.spark.sql.Rowimport org.apache.spark.sql.DataFrameimport org.apache.spark.sql.functions._import org.apache.spark.sql.types._//Initiate Spark Session var sparkSession = SparkSession.builder()                     .appName("NYCTaxis")                     // Enable GeoSpark custom Kryo serializer                     .config("spark.serializer", classOf[KryoSerializer].getName)                     .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)                     .getOrCreate() //Register GeoSparkSQL GeoSparkSQLRegistrator.registerAll(sparkSession)//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset. var trips = sparkSession.read             .format("com.databricks.spark.csv")             .option("header", "true")             .schema(schema)             .load("/mnt/geospatial/nyctaxis/*") trips.createOrReplaceTempView("tripstable")//Read GeoJSON file var polygonJsonDF = spark.read                     .option("multiline", "true")                     .json("/mnt/geospatial/neighborhoods/neighborhoods.geojson") //GeoSparkSQL can't read multiline GeoJSON files. This workaround will only work if the file only contains one geometry type (eg. polygons)  val polygons = polygonJsonDF                 .select(explode(col("features")).as("feature"))                 .withColumn("polygon", callUDF("ST_GeomFromGeoJson", to_json(col("feature"))))                 .select($"polygon", $"feature.properties.borough", $"feature.properties.boroughCode", $"feature.properties.neighborhood") polygons.createOrReplaceTempView("polygontable")//Test 1 var polygonAll = sparkSession.sql(         """           | SELECT *           | FROM polygontable         """) polygonAll.count()//Test 2 var tripsAll = sparkSession.sql(         """           | SELECT *           | FROM tripstable         """) tripsAll.count()//Test 3 var polygonWhere = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE borough = 'Manhattan'         """) polygonWhere.count()//Test 4 var tripsWhere = sparkSession.sql(         """           | SELECT *           | FROM tripstable           | WHERE total_amount > 20         """) tripsWhere.count()//Test 5 var polygonGeomDistance = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | ORDER BY ST_Distance(polygon, ST_PointFromText('-74.00672149658203, 40.73177719116211', ','))           | LIMIT 100         """) polygonGeomDistance.count()//Test 6 var polygonGeomWithin = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE ST_Within(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','), polygon)         """) polygonGeomWithin.show() //Test 7 var polygonGeomInterset = sparkSession.sql(         """           | SELECT *           | FROM polygontable           | WHERE ST_Intersects(ST_Circle(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','),0.1), polygon)         """) polygonGeomInterset.count() //Test 8 var polygonContainsJoin = sparkSession.sql(         """           | SELECT *           | FROM polygontable, tripstable           | WHERE ST_Contains(polygontable.polygon, ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))))         """) polygonContainsJoin.count() //Test 9 var polygonWithinJoin = sparkSession.sql(         """           | SELECT *           | FROM polygontable, tripstable           | WHERE ST_Within(ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))), polygontable.polygon)         """) polygonWithinJoin.count() Following is the implementation of the tests using Magellan.//Import Libraries import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._ import org.apache.spark.sql.Row import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._//Define schema for the NYC taxi data val schema = StructType(Array(     StructField("vendorId", StringType, false),     StructField("pickup_datetime", StringType, false),     StructField("dropoff_datetime", StringType, false),     StructField("passenger_count", IntegerType, false),     StructField("trip_distance", DoubleType, false),     StructField("pickup_longitude", DoubleType, false),     StructField("pickup_latitude", DoubleType, false),     StructField("rateCodeId", StringType, false),     StructField("store_fwd", StringType, false),     StructField("dropoff_longitude", DoubleType, false),     StructField("dropoff_latitude", DoubleType, false),     StructField("payment_type", StringType, false),     StructField("fare_amount", StringType, false),     StructField("extra", StringType, false),     StructField("mta_tax", StringType, false),     StructField("tip_amount", StringType, false),     StructField("tolls_amount", StringType, false),     StructField("improvement_surcharge", StringType, false),     StructField("total_amount", DoubleType, false)))//Read data from the NYC Taxicab dataset and create a Magellan point val trips = sqlContext.read       .format("com.databricks.spark.csv")       .option("mode", "DROPMALFORMED")       .schema(schema)       .load("/mnt/geospatial/nyctaxis/*")       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))//Read GeoJSON file and define index precision val neighborhoods = sqlContext.read       .format("magellan")       .option("type", "geojson")       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")       .select($"polygon",               $"metadata"("borough").as("borough"),              $"metadata"("boroughCode").as("boroughCode"),              $"metadata"("neighborhood").as("neighborhood"))       .index(30)//Test 1 magellan.Utils.injectRules(spark) neighborhoods.count()//Test 2 trips.count()//Test 3 neighborhoods.filter("borough == 'Manhattan'").count()//Test 4 trips.filter("total_amount > 20").count()//Test 6 val points = sc.parallelize(Seq((-74.00672149658203, 40.73177719116211))).toDF("x", "y").select(point($"x", $"y").as("point")) val polygons =neighborhoods.join(points) polygons.filter($"point" within $"polygon").count()//Test 8 trips.join(neighborhoods)         .where($"polygon" >? $"point")         .count()//Test 9 trips.join(neighborhoods)         .where($"point" within $"polygon")         .count()

Analysis of Spatial Data Using Cosmos DB

IntroductionRecently, while researching Cosmos DB, I came across the in-built capabilities for managing spatial data.Cosmos DB is Microsoft’s globally distributed, multi-model database. It has the capability to store various types of data, such as document, graph and key-value and can elastically scale to cope with varying needs. The piece of Cosmos DB that this post will be discussing is the spatial capabilities of the document model.The problem I have chosen to solve using the spatial functionality is working out which airfields are within the range of an aircraft when given its true airspeed and fuel endurance in hours; with the range being calculated by multiplying the airspeed by the endurance.The DataThe first step was to find a data set containing a list of all of the world’s airfields, this was found on GitHub at https://github.com/mwgg/Airports. The data set contains the details we need, which are:ICAO code – this is the unique identifier for an airportAirport NameLatitude and Longitude of the AirportThe next step was to create a Cosmos DB account in the Azure Portal and write a C# app to do some transformations on the data and load the documents into our Cosmos DB collection.I first created a C# object that matched the structure of the JSON data:using Newtonsoft.Json; namespace LoadAirportData { public class Airport { [JsonProperty("id")] public string Id => this.Icao; [JsonProperty("icao")] public string Icao { get; set; } [JsonProperty("iata")] public string Iata { get; set; } [JsonProperty("name")] public string Name { get; set; } [JsonProperty("city")] public string City { get; set; } [JsonProperty("state")] public string State { get; set; } [JsonProperty("country")] public string Country { get; set; } [JsonProperty("elevation")] public int Elevation { get; set; } [JsonProperty("lat")] public double Latitude { get; set; } [JsonProperty("lon")] public double Longitude { get; set; } [JsonProperty("tz")] public string TimeZone { get; set; } } } I then created a C# object that matched the structure of the document I wanted to insert into Cosmos DB. The “Point” data type is used to serialize the latitude and longitude into the GeoJSON structure that Cosmos DB supports:using Microsoft.Azure.Documents.Spatial; using Newtonsoft.Json; namespace LoadAirportData { public class AirportDocument { public AirportDocument(Airport airport) { Id = airport.Icao; Iata = airport.Iata; Name = airport.Name; Location = new Point(airport.Longitude, airport.Latitude); } [JsonProperty("id")] public string Id { get; set; } [JsonProperty("iata")] public string Iata { get; set; } [JsonProperty("name")] public string Name { get; set; } [JsonProperty("location")] public Point Location { get; set; } } } Finally I created a method that dropped the Cosmos DB database, recreated the database and the document collection then loaded the documents into the collection:using Microsoft.Azure.Documents.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.Azure.Documents; using Newtonsoft.Json; using System.Net; using System.Collections.Concurrent; namespace LoadAirportData { public class CosmosDbImporter { private const string ENDPOINT_URL = "<YOUR ENDPOINT>"; private const string PRIMARY_KEY = "<YOUR KEY>"; private const string DATABASE_NAME = "airports"; private const string COLLECTION_NAME = "airportData"; private const string IMPORT_PATH = @"C:\Temp\Airports.json"; public async Task ImportData() { var documentClient = new DocumentClient(new Uri(ENDPOINT_URL), PRIMARY_KEY); // Delete the database if it exists try { await documentClient.DeleteDatabaseAsync(UriFactory.CreateDatabaseUri(DATABASE_NAME)); } catch (DocumentClientException ex) { if (ex.StatusCode != HttpStatusCode.NotFound) throw; } // Create the Database await documentClient.CreateDatabaseIfNotExistsAsync(new Database() { Id = DATABASE_NAME }); // Create the collection and switch on spatial indexing DocumentCollection collection = new DocumentCollection() { Id = COLLECTION_NAME }; collection.IndexingPolicy = new IndexingPolicy(new SpatialIndex(DataType.Point)); await documentClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(DATABASE_NAME), collection); // Read the file and deserialize to our Airport object var data = System.IO.File.ReadAllText(IMPORT_PATH); var airports = JsonConvert.DeserializeObject<Dictionary<string, Airport>>(data); // Upload documents to CosmosDB await Task.WhenAll( from partition in Partitioner.Create(airports.Values).GetPartitions(50) select Task.Run(async delegate { using (partition) { while (partition.MoveNext()) { Console.WriteLine($"Processing {partition.Current.Icao}"); var airportDocument = new AirportDocument(partition.Current); await documentClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DATABASE_NAME, COLLECTION_NAME), airportDocument); } } })); } } } One thing to note is the above code enables spatial indexing when creating the collection, without this enabled performance is extremely poor when performing spatial queries.The beauty of Cosmos DB is that it is able to elastically scale to the performance level specified by the user through the number of RUs (request units) that are allocated to the collection. While loading the data into Cosmos DB I wanted to scale up my database to take advantage of the multithreading in my C# app and speed up my processing so I just went in to the Azure Portal and adjusted the number of RUs allocated to the collection, the change was almost instant and my process instantly sped up. Once I had finished importing the data I was able to scale my database back down so I’m no longer paying for any unused capacity.QueryingNow the data is in Cosmos DB we can begin to play around with some spatial queries.To query airfields within a certain distance of a specified point I can run the following query which returns all of the airfields within 25km of Blackbushe airport. As you can see, the SQL syntax for querying Cosmos DB is very similar to T-SQL meaning it’s very easy to re-use your SQL Server skills:SELECT airports.id AS ICAO, airports.name AS Name, ST_DISTANCE(airports.location, {"type": "Point", "coordinates": [-0.8475000262,51.3238983154]}) AS Distance FROM airports WHERE ST_DISTANCE(airports.location, {"type": "Point", "coordinates": [-0.8475000262,51.3238983154]}) < 25000The above query returns the following results, which are the 7 airfields that are within 25km of Blackbushe:[ { "ICAO": "EGHL", "Name": "Lasham Airport", "Distance": 19964.7890768588 }, { "ICAO": "EGVO", "Name": "RAF Odiham", "Distance": 11985.957064869535 }, { "ICAO": "EGTF", "Name": "Fairoaks Airport", "Distance": 20229.321025944442 }, { "ICAO": "EGLF", "Name": "Farnborough Airport", "Distance": 7286.035340157135 }, { "ICAO": "EGLK", "Name": "Blackbushe Airport", "Distance": 0 }, { "ICAO": "EGLM", "Name": "White Waltham Airfield", "Distance": 20312.693531316185 }, { "ICAO": "EGLP", "Name": "Brimpton Airfield", "Distance": 23311.94703537874 } ]The AppThe next step is to create an application that uses the functionality to find the airfields within the range of an aircraft. To do this I created a basic ASP.NET MVC application that has a form with the following fields:When the form is submitted the following C# code is executed:[HttpPost] public async Task Index(AirportFinderModel model) { var documentClient = new DocumentClient(new Uri(ENDPOINT_URL), PRIMARY_KEY); var baseAirfield = await documentClient.ReadDocumentAsync(UriFactory.CreateDocumentUri(DATABASE_NAME, COLLECTION_NAME, model.BaseAirfield)); var availableDistanceMeters = (model.EnduranceHours * model.TrueAirspeed) * 1852; var result = documentClient .CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(DATABASE_NAME, COLLECTION_NAME)) .Where(a => a.Location.Distance(baseAirfield.Document.Location) <= availableDistanceMeters) .ToList(); return View("Result", new AirportFinderResultModel() { MapPoints = JsonConvert.SerializeObject(result.Select(a => new MapPoint() { Title = a.Id, Description = a.Name, Longitude = a.Location.Position.Longitude, Latitude = a.Location.Position.Latitude })) }); } The above code connects to Cosmos DB and retrieves the details for the base airfield that was specified, it then calculates the range of the aircraft in meters by multiplying the endurance (in hours) by the true airspeed in knots (nautical miles per hour) and then multiplying that my 1852 (number of meters in a nautical mile). A Linq query is then run against Cosmos DB using the built-in spatial functions to find airfields within the specified distance. The result is then converted into a JSON array that can be understood by the Google Maps API that is being used on the client side.The client side uses the Google Maps API to plot the airfields on a map, giving us a view like the one below when given a base airfield of Blackbushe (EGLK), a true airspeed of 100kts and an endurance of 4.5 hours:The current functionality of the app is extremely basic but could easily be expanded to make the range calculation more accurate by looking at wind and other factors that can affect range. This could be done by creating a polygon representing our range and then using the ST_WITHIN function to find all airfields within that polygon. The functionality could also be enhanced to take into account other attributes of the airfield by deciding if an airfield is suitable based on runway length and other facilities.ConclusionAs you can see, using Cosmos DB it is extremely easy to analyse spatial data and the elastic scaling capability allows you to easily increase capacity of your application to cope with increased usage and large amounts of data. The spatial capabilities of Cosmos DB can make it a viable alternative to other databases such as SQL Server that aren’t always as easy to scale on demand and don’t have the flexibility that a document model can give.As always, if you have any questions or comments then let me know.