Jose Mendes

Jose Mendes' Blog

Lambda vs Azure Databricks Delta Architecture

Historically, when implementing big data processing architectures, Lambda has been the desired approach, however, as technology evolves, new paradigms arise and with that, more efficient approaches become available, such as the Databricks Delta architecture. In this blog, I’ll describe both architectures and demonstrate how to build a data pipeline in Azure Databricks following the Databricks Delta architecture.

The Lambda architecture, originally defined by Nathan Marz, is a big data processing architecture that combines both batch and real time processing methods. This approach attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data.

From a high-level perspective, the Lambda architecture is as followed.

image

A valid approach of using Lambda in Azure could be as demonstrated below (please be aware that there are different options for each layer that I won’t be detailing in this blog).

image

For the speed layer we could use Azure Streaming Analytics, a serverless scalable event processing engine that enables the development and run of real-time analytics on multiple streams of data from sources such as devices, sensors, web sites, social media, and other applications.

For the batch layer, we could use Azure Data Lake Storage (ADLS) and Azure Databricks. ADLS is an enterprise-wide hyper-scale repository for big data analytic workloads that enable us to capture data of any size, type, and ingestion speed in one single place for operational and exploratory analytics. Currently there are two versions of ADLS, Gen 1 and Gen 2, with the latest still being in private preview.

Azure Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform that allow us to create streamlined workflows and interactive workspaces that enables collaboration between data scientists, data engineers, and business analysts.

For the serving layer, we could use Azure Data Warehouse, a cloud-based Enterprise Data Warehouse (EDW) that leverages Massively Parallel Processing (MPP) to quickly run complex queries across petabytes of data.

With this architecture, the events are consumed by the Azure Streaming Analytics and landed in ADLS in flat files, that can be partitioned by hour. Once the processing of the file is completed, we can create a batch process via Azure Databricks and store the data in the Azure SQL Data Warehouse. To obtain the data that was not captured by the batch process, we can use Polybase to query the file being updated and then create a view to union both tables. Every time that view is queried, the polybase table will get the latest streamed data, meaning we have a real time query with the capability to obtain the most recent data.

The major problem of the Lambda architecture is that we have to build two separate pipelines, which can be very complex, and, ultimately, difficult to combine the processing of batch and real-time data, however, it is now possible to overcome such limitation if we have the possibility to change our approach.

Databricks Delta delivers a powerful transactional storage layer by harnessing the power of Apache Spark and Databricks File System (DBFS). It is a single data management tool that combines the scale of a data lake, the reliability and performance of a data warehouse, and the low latency of streaming in a single system. The core abstraction of Databricks Delta is an optimized Spark table that stores data as parquet files in DBFS and maintains a transaction log that tracks changes to the table.

From a high-level perspective, the Databricks Delta architecture can be described as followed.

image

An Azure Databricks Delta Raw table stores the data that is either produced by streaming sources or is stored in data lakes. Query tables contains the normalized data from the Raw tables. Summary tables, often used as the source for the presentation layer, contains the aggregated key business metrics that are frequently queried. This unified approach means that there are less complexity due to the removal of storage systems and data management steps, and, more importantly, output queries can be performed on streaming and historical data at the same time.

In the next steps, I’ll demonstrate how to implement the Databricks Delta architecture using a python notebook.

#If Databricks delta is not enabled in the cluster, run this cell
spark.sql("set spark.databricks.delta.preview.enabled=true")

#Define variables
basePath = "/kafka"
taxiRidesRawPath = basePath + "/taxiRidesRaw.delta"
taxiRidesQueryPath = basePath + "/taxiRidesQuery.delta"
taxiFaresQueryPath = basePath + "/taxiFaresQuery.delta"
taxiSummaryPath = basePath + "/taxiSummary.delta"
checkpointPath = basePath + "/checkpoints"

#Load the Kafka stream data to a DataFrame
kafkaDF = (spark
   .readStream
   .option("kafka.bootstrap.servers", "192.168.1.4:9092")
   .option("subscribe", "taxirides")
   .option("startingOffsets", "earliest")
   .option("checkpointLocation", "/taxinyc/kafka.checkpoint")
   .format("kafka")
   .load()
)

#Kafka transmits information using a key, value, and metadata such as topic and partition. The information we're interested in is the value column. Since this is a binary value, we must first cast it to a StringType and then split the columns.
#Stream into the Raw Databricks Delta directory. By using a checkpoint location, the metadata on which data has already been processed will be maintained so the cluster can be shut down without a loss of information.
from pyspark.sql.types import StructType, StructField,LongType,TimestampType,StringType,FloatType,IntegerType
from pyspark.sql.functions import col, split

(kafkaDF
  .select(split(col("value").cast(StringType()),",").alias("message"))
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointPath + "/taxiRidesRaw")
  .outputMode("append")
  .start(taxiRidesRawPath)
)

#Create and populate the raw delta table. Data is stored in a single column as an array Eg. ["6","START","2013-01-01 00:00:00","1970-01-01 00:00:00","-73.866135","40.771091","-73.961334","40.764912","6","2013000006","2013000006"]
spark.sql("DROP TABLE IF EXISTS TaxiRidesRaw")
          
spark.sql("""
   CREATE TABLE TaxiRidesRaw
   USING Delta
   LOCATION '{}'
""".format(taxiRidesRawPath))

#Stream into the Query Databricks delta directory. 
(spark.readStream
  .format("delta")
  .load(str(taxiRidesRawPath))
  .select(col("message")[0].cast(IntegerType()).alias("rideId"),
    col("message")[1].cast(StringType()).alias("rideStatus"),
    col("message")[2].cast(TimestampType()).alias("rideEndTime"),
    col("message")[3].cast(TimestampType()).alias("rideStartTime"),
    col("message")[4].cast(FloatType()).alias("startLong"),
    col("message")[5].cast(FloatType()).alias("startLat"),
    col("message")[6].cast(FloatType()).alias("endLong"),
    col("message")[7].cast(FloatType()).alias("endLat"),
    col("message")[8].cast(IntegerType()).alias("passengerCount"),
    col("message")[9].cast(IntegerType()).alias("taxiId"),
    col("message")[10].cast(IntegerType()).alias("driverId"))
  .filter("rideStartTime <> '1970-01-01T00:00:00.000+0000'")
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", checkpointPath + "/taxiRidesQuery")
  .start(taxiRidesQueryPath)
)

#Create and populate the quer delta table. Data is no longer in a single column
spark.sql("DROP TABLE IF EXISTS TaxiRidesQuery")
          
spark.sql("""
   CREATE TABLE TaxiRidesQuery
   USING Delta
   LOCATION '{}'
""".format(taxiRidesQueryPath))

#Load the data to a DataFrame. The parquet files are stored in a blob storage
taxiFaresDF = (spark.read
                .parquet("/mnt/geospatial/kafka/NYC")
                .write
                .format("delta")
                .mode("append")
                .save(taxiFaresQueryPath)
               )

#Create and populate the query delta table
spark.sql("DROP TABLE IF EXISTS TaxiFaresQuery")
          
spark.sql("""
   CREATE TABLE TaxiFaresQuery
   USING Delta
   LOCATION '{}'
""".format(taxiFaresQueryPath))

#Load the data to a DataFrame
taxiRidesDF = (spark
                .readStream
                .format("delta")
                .load(str(taxiRidesQueryPath))
               )

#Load the data to a DataFrame
taxiFaresDF = (spark
                .read
                .format("delta")
                .load(str(taxiFaresQueryPath))
               )

#Join the steaming data and the batch data. Group by Date and Taxi Driver to obtain the number of rides per day
from pyspark.sql.functions import date_format, col, sum

RidesDf = (taxiRidesDF.join(taxiFaresDF, (taxiRidesDF.taxiId == taxiFaresDF.taxiId) & (taxiRidesDF.driverId == taxiFaresDF.driverId))
            .withColumn("date", date_format(taxiRidesDF.rideStartTime, "yyyyMMdd"))
            .groupBy(col("date"),taxiRidesDF.driverId)
            .count()
            .withColumnRenamed("count","RidesPerDay")
            .writeStream
            .format("delta")
            .outputMode("complete")
            .option("checkpointLocation", checkpointPath + "taxiSummary")
            .start(taxiSummaryPath)
)

#Create and populate the summary delta table
spark.sql("DROP TABLE IF EXISTS TaxiSummary")
          
spark.sql("""
   CREATE TABLE TaxiSummary
   USING Delta
   LOCATION '{}'
""".format(taxiSummaryPath))

As always, if you have any questions or comments, do let me know.

Geospatial analysis in Azure Databricks – Part II

After my last post on running geospatial analysis in Azure Databricks with Magellan (here) I decided to investigate which other libraries were available and discover if they performed better or worse.

The first library I investigated was GeoMesa. an Apache licensed open source suite of tools that enables large-scale geospatial analytics on cloud and distributed computing systems, letting you manage and analyze the huge spatio-temporal datasets.

GeoMesa does this by providing spatio-temporal data persistence on top of the Accumulo, HBase, and Cassandra distributed column-oriented databases for massive storage of point, line, and polygon data. It allows rapid access to this data via queries that take full advantage of geographical properties to specify distance and area. GeoMesa also provides support for near real time stream processing of spatio-temporal data by layering spatial semantics on top of the Apache Kafka messaging system (further details here).

Although their website is rich in documentation, I immediately stumbled in the most basic operation, read a GeoJSON file with the geomesa format. The reason behind this is because, in their tutorials, they assume Apache Accumulo, a distributed key/value store, is used as the backing data store. Because I wanted to make sure I could ingest data from either Azure Blob Storage or Azure Data Lake Storage, I decided to not use their recommendation. As such, after many hours of failed attempts, I decided to abandon the idea of using GeoMesa.

My next option was GeoSpark, a cluster computing system for processing large-scale spatial data. GeoSpark extends Apache Spark / SparkSQL with a set of out-of-the-box Spatial Resilient Distributed Datasets (SRDDs)/ SpatialSQL that efficiently load, process, and analyze large-scale spatial data across machines (further details here ).

GeoSpark immediately impresses with the possibility of either creating Spatial RDDs and run spatial queries using GeoSpark-core or create Spatial SQL/DataFrame to manage spatial data using GeoSparkSQL. Their website contains tutorials that are easy to follow and offers the possibility to chat with the community on gitter.

In the spirit of trying to keep my approach as simple as possible, I decided to compare Magellan with GeoSparkSQL, since SparkSQL is easier to use and working with RDDs can be a complex task, however, it is important to highlight that their recommendation is to use GeoSpark core rather than GeoSparkSQL. The reason for this is because SparkSQL has some limitations, such as not supporting clustered indices, making it difficult to get it exposed to all GeoSpark core features.

The data used in the following test cases was based on the NYC Taxicab datasets to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.

The table below details the version of the libraries and clusters configuration. There are a couple of points to notice:

  • Magellan does not support Apache Spark 2.3+.
  • The Magellan library 1.0.6 is about to be released this month and should cover some of the limitations identified below.
  • The GeoSpark library 1.2.0 is currently available in SNAPSHOT and will hopefully fix the load of multiline GeoJSON files.

Library

Version

Runtime Version

Cluster Specification

Magellan

1.0.5

3.5 LTS (includes Apache Spark 2.2.1, Scala 2.11)

Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled

GeoSpark/ GeoSparkSQL

1.1.3 / 2.3-1.1.3

4.3 (includes Apache Spark 2.3.1, Scala 2.11)

Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled

To test the performance of both libraries, I implemented a set of queries and ran them 3 times, registering how long it took on each run. The best results are highlighted in green.

DS1 - NYC Neighbourhoods dataset containing the polygons

DS2 – NYC Taxicab dataset containing the geometry points for the month of January 2015

DS3 – NYC Taxicab dataset containing the geometry points for the year of 2015

Test Number

Description

Number of Results

Magellan

(avg in sec)

GeoSparkSQL

(avg in sec)

1

Select all rows from DS1

310

0.86

0.69

2

Select all rows from DS2

12.748.986

19.82

15.64

3

Select all rows from DS1 where borough is Manhattan

37

2.22

0.69

4

Select all rows from DS2 where total amount is bigger than 20

2.111.707

18.71

17.23

5

Select 100 rows from DS1 ordered by the distance between one point and all polygons

100

N/A*

0.8

6

Select all rows from DS1 where a single point is within all polygons

1

1.63

0.68

7

Select all rows from DS1 where one point with buffer 0.1 intersects all polygons

73

N/A*

0.80

8

Join DS1 and DS2 and select all rows where polygons contains points

12.492.678

29.17

1573.8 (~26min)

9

Join DS1 and DS2 and select all rows where points are within polygons

12.492.678

29.31

1518 (~25min)

10

Select all rows from DS3

146.113.001

187.8

155.4

11

Select all rows from DS3 where total amount is bigger than 20

29.333.130

94.8

119.4

12**

Join DS1 and DS3 and select all rows where points are within polygons

143.664.028

168

N/A*

* Although the following link mentions Magellan can perform Distance and Buffer operations, I couldn’t find documentation demonstrating how to perform them, or, in the cases I tried, Azure Databricks threw an error indicating the class was not available.

** Considering the time it took to run queries 8/9 using DS2 (~1.8GB), I decided to not test the performance against DS3 (~21.3GB), since I already knew the results were not going to be positive.

From the tests above, we can see that GeoSparkSQL is generally better when not performing joins with spatial ranges, where the performance drastically decreases when compared with Magellan. On the other hand, Magellan is still an ongoing project and seems to be lacking some of the basic operations that might be of big importance for some analysis, however, it clearly excels when we need to run spatial analysis in joined datasets.

Based on my experience using the libraries and the tests conducted in this blog, my recommendation would be to use Magellan, since even when GeoSparkSQL was better, the performance gains were not that significant, however, as already referred, Magellan might not be an option if the requirements involve operations that are not yet available, such as distances or buffers

Following is the implementation of the tests using GeoSparkSQL.

//Import Libraries and config session
import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
import org.datasyslab.geosparksql.utils.Adapter
import org.datasyslab.geosparksql.UDF.UdfRegistrator
import org.datasyslab.geosparksql.UDT.UdtRegistrator

import org.apache.spark.serializer.KryoSerializer;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.geosparksql.strategy.join.JoinQueryDetector
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

//Initiate Spark Session
var sparkSession = SparkSession.builder()
                     .appName("NYCTaxis")
                     // Enable GeoSpark custom Kryo serializer
                     .config("spark.serializer", classOf[KryoSerializer].getName)
                     .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
                     .getOrCreate()

//Register GeoSparkSQL
GeoSparkSQLRegistrator.registerAll(sparkSession)

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))

//Read data from the NYC Taxicab dataset. 
var trips = sparkSession.read
             .format("com.databricks.spark.csv")
             .option("header", "true")
             .schema(schema)
             .load("/mnt/geospatial/nyctaxis/*")

trips.createOrReplaceTempView("tripstable")

//Read GeoJSON file
var polygonJsonDF = spark.read
                     .option("multiline", "true")
                     .json("/mnt/geospatial/neighborhoods/neighborhoods.geojson")

//GeoSparkSQL can't read multiline GeoJSON files. This workaround will only work if the file only contains one geometry type (eg. polygons) 
val polygons = polygonJsonDF
                 .select(explode(col("features")).as("feature"))
                 .withColumn("polygon", callUDF("ST_GeomFromGeoJson", to_json(col("feature"))))
                 .select($"polygon", $"feature.properties.borough", $"feature.properties.boroughCode", $"feature.properties.neighborhood")

polygons.createOrReplaceTempView("polygontable")

//Test 1
var polygonAll = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
         """)

polygonAll.count()

//Test 2
var tripsAll = sparkSession.sql(
         """
           | SELECT *
           | FROM tripstable
         """)

tripsAll.count()

//Test 3
var polygonWhere = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE borough = 'Manhattan'
         """)

polygonWhere.count()

//Test 4
var tripsWhere = sparkSession.sql(
         """
           | SELECT *
           | FROM tripstable
           | WHERE total_amount > 20
         """)

tripsWhere.count()

//Test 5
var polygonGeomDistance = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | ORDER BY ST_Distance(polygon, ST_PointFromText('-74.00672149658203, 40.73177719116211', ','))
           | LIMIT 100
         """)
polygonGeomDistance.count()

//Test 6
var polygonGeomWithin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE ST_Within(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','), polygon)
         """)
polygonGeomWithin.show()

//Test 7
var polygonGeomInterset = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable
           | WHERE ST_Intersects(ST_Circle(ST_PointFromText('-74.00672149658203, 40.73177719116211', ','),0.1), polygon)
         """)
polygonGeomInterset.count()

//Test 8
var polygonContainsJoin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable, tripstable
           | WHERE ST_Contains(polygontable.polygon, ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))))
         """)
polygonContainsJoin.count()

//Test 9
var polygonWithinJoin = sparkSession.sql(
         """
           | SELECT *
           | FROM polygontable, tripstable
           | WHERE ST_Within(ST_Point(CAST(tripstable.pickup_longitude AS Decimal(24,20)), CAST(tripstable.pickup_latitude AS Decimal(24,20))), polygontable.polygon)
         """)

polygonWithinJoin.count()

Following is the implementation of the tests using Magellan.

//Import Libraries
import magellan._ import org.apache.spark.sql.magellan.dsl.expressions._

import org.apache.spark.sql.Row import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))

//Read data from the NYC Taxicab dataset and create a Magellan point 
val trips = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("mode", "DROPMALFORMED")
       .schema(schema)
       .load("/mnt/geospatial/nyctaxis/*")
       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))

//Read GeoJSON file and define index precision
val neighborhoods = sqlContext.read
       .format("magellan")
       .option("type", "geojson")
       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")
       .select($"polygon",
              $"metadata"("borough").as("borough"),
              $"metadata"("boroughCode").as("boroughCode"),
              $"metadata"("neighborhood").as("neighborhood"))
       .index(30)

//Test 1
magellan.Utils.injectRules(spark)

neighborhoods.count()

//Test 2
trips.count()

//Test 3
neighborhoods.filter("borough == 'Manhattan'").count()

//Test 4
trips.filter("total_amount > 20").count()

//Test 6
val points = sc.parallelize(Seq((-74.00672149658203, 40.73177719116211))).toDF("x", "y").select(point($"x", $"y").as("point"))
val polygons =neighborhoods.join(points)

polygons.filter($"point" within $"polygon").count()

//Test 8

trips.join(neighborhoods)
         .where($"polygon" >? $"point")
         .count()

//Test 9

trips.join(neighborhoods)
         .where($"point" within $"polygon")
         .count()

Geospatial analysis with Azure Databricks

A few months ago, I wrote a blog demonstrating how to extract and analyse geospatial data in Azure Data Lake Analytics (ADLA) (here). The article aimed to prove that it was possible to run spatial analysis using U-SQL, even though it does not natively support spatial data analytics. The outcome of that experience was positive, however, with serious limitations in terms of performance. ADLA dynamically provisions resources and can perform analytics on terabytes to petabytes of data, however, because it must use the SQL Server Data Types and Spatial assemblies to perform spatial analysis, all the parallelism capabilities are suddenly limited. For example, if you are running an aggregation, ADLA will split the processing between multiple vertices, making it faster, however, when running intersections between points and polygons, because it is a SQL threaded operation, it will only use one vertex, and consequently, the job might take hours to complete.

Since I last wrote my blog, the data analytics landscape has changed, and with that, new options became available, namely Azure Databricks. In this blog, I’ll demonstrate how to run spatial analysis and export the results to a mounted point using the Magellan library and Azure Databricks.

Magellan is a distributed execution engine for geospatial analytics on big data. It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries (further details here).

Although people mentioned in their GitHub page that the 1.0.5 Magellan library is available for Apache Spark 2.3+ clusters, I learned through a very difficult process that the only way to make it work in Azure Databricks is if you have an Apache Spark 2.2.1 cluster with Scala 2.11. The cluster I used for this experience consisted of a Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled.

In terms of datasets, I used the NYC Taxicab dataset to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON dataset to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.

As always, first we need to import the libraries.

//Import Libraries
import magellan._
import org.apache.spark.sql.magellan.dsl.expressions._

import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._

Next, we define the schema of the NYC Taxicab dataset and load the data to a DataFrame. While loading the data, we convert the pickup longitude and latitude into a Magellan Point. If there was a need, in the same operation we could also add another Magellan Point from the drop off longitude and latitude.

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))
//Read data from the NYC Taxicab dataset and create a Magellan point 
val trips = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("mode", "DROPMALFORMED")
       .schema(schema)
       .load("/mnt/geospatial/nyctaxis/*")
       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))

The next step is to load the neighbourhood data. As mentioned in their documentation, Magellan supports the reading of ESRI, GeoJSON, OSM-XML and WKT formats. From the GeoJSON dataset, Magellan will extract a collection of polygons and read the metadata into a Map.

There are three things to notice in the code below. First, the extraction of the polygon, second, the selection of the key corresponding to the neighbourhood name and finally, the provision of a hint that defines what the index precision should be. This operation, alongside with the injection of a spatial join rule into Catalyst, massively increases the performance of the queries. To have a better understanding of this operation, read this excellent blog.

//Read GeoJSON file and define index precision
val neighborhoods = sqlContext.read
       .format("magellan")
       .option("type", "geojson")
       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")
       .select($"polygon",
        $"metadata"("neighborhood").as("neighborhood"))
       .index(30)

Now that we have our two datasets loaded, we can run our geospatial query, to identify in which neighbourhood the pickup points fall under. To achieve our goal, we need to join the two DataFrames and apply a within predicate. As a curiosity, if we consider that m represents the number of points (12.748.987), n the number of polygons (310) p the average # of edges per polygon (104) and O(mnp), then, we will roughly perform 4 trillion calculations on a single node to determine where each point falls.

//Inject rules and join DataFrames with within predicate
magellan.Utils.injectRules(spark)


val intersected = trips.join(neighborhoods)
         .where($"point" within $"polygon")

The above code, does not take longer than 1 second to execute. It is only when we want to obtain details about our DataFrame that the computing time is visible. For example, if we want to know which state has the most pickups, we can write the following code which takes in average 40 seconds.

//Neighbourhoods that received the most pickups
display(intersected 
       .groupBy('neighborhood)
       .count()
       .orderBy($"count".desc))

If we want to save the data and identify which pickups fall inside the NYC neighbourhoods, then we have to rewrite our intersected DataFrame to select all columns except the Magellan Points and Polygons, add a new column to the DataFrame and export the data back to the blob, as shown below.

//select pickup points that don't fall inside a neighbourhood
val nonIntersected = trips
                       .select($"vendorId",$"pickup_datetime", $"dropoff_datetime", $"passenger_count", $"trip_distance", $"pickup_longitude", $"pickup_latitude", $"rateCodeId", $"store_fwd",$"dropoff_longitude", $"dropoff_latitude",$"payment_type",$"fare_amount", $"extra", $"mta_tax", $"tip_amount", $"tolls_amount", $"improvement_surcharge", $"total_amount")
                       .except(intersected)
//add new column intersected_flag
val intersectedFlag = "1"
val nonIntersectedFlag = "0"
val tripsIntersected = intersected.withColumn("intersected_flag",expr(intersectedFlag))
val tripsNotIntersected = nonIntersected.withColumn("intersected_flag",expr(nonIntersectedFlag))
//Union DataFrames
val allTrips = tripsNotIntersected.union(tripsIntersected)
//Save data to the blob
intersected.write
   .format("com.databricks.spark.csv")
   .option("header", "true")
   .save("/mnt/geospatial/trips/trips.csv")

In summary, reading a dataset with 1.8GB, apply geospatial analysis and export it back to the blob storage only took in average 1 min, which is miles better when compared with my previous attempt with U-SQL.

As always, if you have any comments or questions, do let me know.

PII Anonymisation and Self-Joins in U-SQL

It’s been a while since I wrote my last blog, so I decided to share one of the latest challenges I faced in a project.

Requirements:

  • Anonymise any Personably Identifiable Information (PII) data stored in an Azure Data Lake Store (ADLS);
  • Anonymise any PII data from customers identified in a configuration file.


Details:

  • All PII data should be anonymised as soon as the files land in the DEV or UAT ADLS;
  • The PII data landing in the PROD ADLS should only be anonymised if identified in the configuration file.


Goal:

  • Create a single U-SQL pattern to achieve the requirements.


Step 1

Reference assemblies and declare variables. Pay special attention to the variable Environment. This will be dynamically populated by the Azure Data Factory (ADF) pipeline activity and will identify in which environment the U-SQL is executed.

REFERENCE ASSEMBLY [USQL.Core];

USING [USQL].[Core].[Utilities];

USING [USQL].[Core].[Anonymisation];

//Set variables

DECLARE @month string = DateTime.UtcNow.ToString("MM");

DECLARE @day string = DateTime.UtcNow.ToString("dd");

DECLARE @year string = DateTime.UtcNow.ToString("yyyy");

DECLARE @schemaVersion int = 1;

DECLARE @Environment string = "DEV";

DECLARE @inputLocation = "RAW/Sensitive/" + @schemaVersion + "/" + @year + "/" + @month + "/" + @day + "/{*}.csv";

DECLARE @outputLocation = "RAW/Anonymized/" + @schemaVersion + "/" + @year + "/" + @month + "/" + @day + "/Customers.csv";

DECLARE @configLocation = "RAW/Config/Configuration.csv";


Step 2

Extract the data from the source and configuration file. The configuration file only includes an ID that identifies a customer.

//Extract data from source file

@ExtractSourceData =

EXTRACT [CustomerId] string,

[FirstName] string,

[LastName] string,

[EmailAddress] string,

[HomeTel] string,

[MobileNumber] string,

[Address] string,

[PostalCode] string

FROM @inputLocation

USING Extractors.Text(delimiter : '|', silent : false, quoting : true, skipFirstNRows : 1);


//Extract data from the configuration file

@ExtractConfigurationData =

EXTRACT [Id] string

FROM @configLocation

USING Extractors.Text(silent : true, quoting : true, skipFirstNRows : 1);


Step 3

Create two rowsets, one to include the distinct list of CustomerId from the source file and the other to include the distinct list of Id from the configuration file.

//Obtain a list of distinct CustomerId from source file

@SelectSourceData =

SELECT DISTINCT [CustomerId]

FROM @ExtractSourceData;


//Obtain a list of distinct Ids from configuration file

@SelectConfigurationData =

SELECT DISTINCT [Id]

FROM @ExtractConfigurationData;


Step 4

This is one of the most important details in this script. U-SQL does not support self-joins, which is needed to ensure we anonymise all data if we are in a non-production environment. To overcome this limitation, we create a new rowset to union the IDs from the source and configuration file.

//Create a new rowset to use on self-join

@UnionIds =

SELECT [CustomerId], "" AS [Id]

FROM @SelectSourceData

UNION

SELECT "" AS [CustomerId], [Id]

FROM @SelectConfigurationData;


Step 5

In this step, we identify which records should and shouldn’t be anonymised. If you remember from the requirements, if the data is in a non-production environment, we have to anonymise all PII data, however, if we are in production, we should only anonymise the records identified in the configuration file. This could easily be achieved with a self-join, however, because it isn’t supported by U-SQL, we use the rowset from the previous step.

//Identify records to be anonymised

@FlagAnonymiseRecords =

SELECT DISTINCT A.[CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @ExtractSourceData AS A

JOIN @UnionIds AS B

ON A.[CustomerId] == (@Environment == "PROD" ? B.[Id] : B.[CustomerId]);


//Identify records that shouldn't be anonymised.

//ANTISEMIJOIN works as a SQL NOT IN

@FlagDoNotAnonymiseRecords =

SELECT DISTINCT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @ExtractSourceData AS A

ANTISEMIJOIN

(

SELECT DISTINCT [CustomerId]

FROM @FlagAnonymiseRecords

) AS B

ON A.[CustomerId] == B.[CustomerId];


Step 6

Now that we identified the records that should be anonymised, we can start applying the correct mask. This is achieved by using different classes created in an assembly that is registered in the Azure Data Lake Analytics (ADLA).

//Anonymise data

@AnonymizsData =

SELECT [CustomerId],

Utilities.ReturnLenght([FirstName]) == "0" ? [FirstName] : Anonymisation.AnonymiseForename([CustomerId], [FirstName]) AS [FirstName],

Utilities.ReturnLenght([LastName]) == "0" ? [LastName] : Anonymisation.AnonymiseSurname([CustomerId], [LastName]) AS [LastName],

Utilities.ReturnLenght([EmailAddress]) == "0" ? [EmailAddress] : Anonymisation.AnonymiseEmail([EmailAddress]) AS [HomeTel],

Utilities.ReturnLenght([HomeTel]) == "0" ? [HomeTel] : Anonymisation.AnonymiseNumbers([HomeTel]) AS [HomeTel],

Utilities.ReturnLenght([MobileNumber]) == "0" ? [MobileNumber] : Anonymisation.AnonymiseNumbers([MobileNumber]) AS [CellNumber],

Utilities.ReturnLenght([PostalCode]) == "0" ? [PostalCode] : Anonymisation.AnonymisePostalCode([PostalCode]) AS [PostalCode]

FROM @FlagAnonymiseRecords;


Step 7

The last step in this process is to union the anonymised and non-anonymised rowsets and output the file to the ADLS.

//Union anonymised and non-anonymised data

@FullData =

SELECT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @AnonymiseData

UNION

SELECT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @FlagDoNotAnonymiseRecords;


//Select data for output

@Output =

SELECT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @FullData;


//Output data to destination

OUTPUT @Output

TO @outputLocation

USING Outputters.Text(outputHeader : true, delimiter : '|', quoting : true);

As always, if you have any questions or comments do let me know.

How to support your organisation with Azure Cosmos DB Graph (Gremlin)?

Let me start this blog with two questions. 1) How can Joanna, from the HR department, query the mentor/mentee structure from the organisation? 2) How can Robin, from the resourcing department, find the most suitable and available consultant to start a new project next week? I’m sure at this point you are thinking that to solve both problems they could simply query the HR and the Resource Planning systems, and you are right, but, what if they could get the answer for both questions from a single system? In this blog I’ll demonstrate how to achieve such requirement using Azure Cosmos DB Graph (Gremlin).

Graph Theory

In graph theory, a graph is an ordered pair comprising a set of vertices and edges. A vertex is the fundamental unit of which graphs are formed and are usually represented by a circle with a label. An edge is represented by a line or arrow extending from one vertex to another.

Graphs can be used to model many types of relations and processes in many different areas. For example, we can represent the link structure of a website as a graph, the web pages as vertices and the links from one page to another as edges.

Azure Cosmos DB

Azure Cosmos DB is a globally distributed multi-model database with the capacity to store various types of data, such as document, relational, graph and key values. It provides all the necessary tools to elastically scale throughput and storage across any number of Azure's geographic regions (further details here).

Azure Cosmos DB supports a variety of popular API’s to access and query the data, such as SQL, MongoDB, Cassandra, Graph (Gremlin) and Table API. In this instance, I will focus on the Graph (Gremlin) API.

Gremlin

Gremlin is the graph traversal language of Apache TinkerPop, an open source Graph Computing Framework. Gremlin allows the users to write complex queries to traverse their graphs by using a composed sequence of steps, with each step performing an operation on the data stream (further details here). There are 4 fundamental steps:

· transform: transform the objects in the stream

· filter: remove objects from the stream

· sideEffect: pass the object, but yield some side effect

· branch: decide which step to take

Scenario

The image in the left is an example of the mentor/mentee structure. If we convert it to a graph (image in the right), we have the people represented as vertices and the relationship mentor as edges.

clip_image002

Now, let’s create the database and the graph. When creating the Azure Cosmos DB, we need to ensure we select the Gremlin (graph) API.

clip_image004

clip_image006

To populate and query our graph, we have three options, the Azure Portal, the Gremlin Console or the Guided Gremlin Tour. The last two tools can be downloaded from the Quick Start section once we create the sample container or directly downloaded here and here.

clip_image008

In the Azure Portal, CRUD operations can be performed via the Data Explorer UI and Gremlin queries. The results can be visualised in a Graph or in GraphJSON format, the Gremlin standard format to represent vertices, edges and properties using JSON.

In the proposed scenario, we will create 7 vertices and 6 edges. The vertices will be labelled as person and will have 3 properties, First Name, Position and In Bench (indicates if a consultant is allocated to a project or not). The edges, labelled as mentor, will define the relation between consultants.

Create a vertex

g.addV('person').property('firstName', 'Tim’).property('position', 'Director') .property('inBench', '1')

Create an edge

g.V().hasLabel('person').has('firstName', 'Tim').addE('mentor').to(g.V().hasLabel('person').has('firstName', 'Neil'))

Below is how the graph looks like when all vertices and edges are created.

clip_image010

Joanna has now all that is needed to obtain valuable information. For example, she can traverse the graph and obtain the list of Principal and Senior Consultants that are mentored by Tim.

g.V().hasLabel('person').has('firstName', 'Tim').outE('mentor').inV().hasLabel('person').has('position', within('Principal Consultant','Senior Consultant')).group().by('firstName').by(values('position'))

[

{

"Neil": "Principal Consultant",

"Scott": "Senior Consultant"

}

]

To support Robin, we will add two new instances of vertices, Blog and Project. The Blog vertex will be related with the Person vertex and will indicate who wrote blogs and which technology was covered. The Project vertex will be related with the Person vertex and will indicate who worked in a certain project. The Project vertex will have the client, the name of the project, the duration and which technology was used as properties.

clip_image012

If Robin needs to find a consultant with experience in Azure Cosmos DB, he can query the graph and verify who either wrote a blog or worked in a previous project with that technology. On top of that, he can filter the results by indicating he is only interested in consultants that are currently in the bench.

g.V().hasLabel('person').where(outE('worked').or().outE('wrote').has('tech', 'Azure CosmosDB')).has('inBench', '1')

clip_image014

Conclusions

Hopefully, using a very simple scenario, I managed to demonstrate the potential of Azure Cosmos DB Graph to build complex queries, implement powerful graph traversal logic and help the business to quickly obtain insights from very complex models..

As always, if you have any questions or comments do let me know.

Can the Custom Vision Service support the calculation of KPIs?

Let’s assume we have a company that distributes alcoholic drinks across the country. To determine their performance, they define a set of KPIs that will evaluate, between others, how many establishments (eg. pubs, bars, …) have their products exposed in the shelf. To achieve this goal, they have a set of sales reps that visit each establishment and take note of which products are exposed. One possible way to track the data is by accessing a mobile application, manually fill the form and upload the data, but, what if we could automate the process of identifying the products in a shelf by simply uploading a picture? To do that, we would need to apply a machine learning algorithm to classify the image and identify each product.

To prove if the above scenario is achievable, I’ll demonstrate how to create a project using a tool called Custom Vision, a service that allow us to easily build a predictive model with just a few clicks and without the need of deep machine learning knowledge.

What is the Custom Vision Service?

Azure Custom Vision Service is a Microsoft Cognitive Services tool for tagging images using a custom computer vision model. Although very similar to Microsoft’s Computer Vision API, it has the advantage of fine-tuning a predictive model to a specific dataset, however, there are still a couple of minor disadvantages. For example, the service can only identify if an object is in an image and not where it stands within the image.

Build the model

To build a predictive model with this service we can either use the web interface or the REST API with support for C# and Python.

The first step was to create a project by providing a name, a domain and a resource group. When selecting a domain, we can either choose a general domain optimized for a range of images or select a specific domain optimized for a certain scenario. In my case, I selected the Food domain, given I wanted to identify different kind of bottles.

Below a description of each domain detailed by Microsoft.

clip_image002

The next step was to upload and tag images. Here are a couple of considerations:

- To start the prototype, we need at least two different tags and a couple of images, usually, a minimum of 30 per class

- It is best practice to use a variety of good quality images (different angles, lights, background, size, …) to ensure a better differentiation and accurate results from the classifier. In my case, Google was the source of the images, which was a bit limited in some cases (surprisingly!!!)

- It is best practice to include images that represent what the classifier will find in the real world, rather than images with neutral backgrounds

- It is advised to avoid images with multiple entities. If we upload images with a bottle and a glass, because the classifier will learn the characteristics that the photos have in common, when comparing two images, the classifier might be comparing a bottle + cup with a single bottle

I started by uploading 60 images for 2 classes, Baileys and Others. Once the images were uploaded, I trained my model and obtained the following results.

clip_image004

The Precision and Recall indicators demonstrates how good the classifier is performing. Above, we can see that the analysis is done for the entire model and for each tag. It is important to refer that 100% precision is usually not achievable, however, having a model with 75%+ in Precision and Recall is an indication of an effective model.

Precision – Indicates how likely the classifier is correctly classifying an image. Knowing that we had 60 images, having a precision of 84.3% means that roughly 51 of the images were correctly tagged

Recall – From out of all the images that should have been classified correctly, how many did the classifier identified accurately. Having a precision of 34.5% means that only 20 images were correctly classified

Probability Threshold – The slider, set by default at 90%, indicates what is the value used to calculate Precision and Recall. Let’s consider the following example. The probability that image A has a Baileys bottle is 94%. If the probability threshold is 90%, then Image A will be taken into consideration as a “correct prediction”.

In the example below, we can see that I obtained better results when I changed the probability threshold to 75%. Based on this information I had two options, either correctly tag the wrong images or replace them with better ones.

clip_image006

Fortunately, Custom Vision Service has a very handy functionality that, for each iteration, highlights which images confused the model according to the probability threshold value. In the image below, there are two images with a red square. If we hover the mouse on the image we can see the prediction for each tag. In this case, the prediction is below 90% for both tags, meaning the image was not considered in the Precision and Recall calculation.

clip_image008

After a couple of iterations we finally obtained a model we could use to run a quick test. When having multiple iterations, we select the one with best results by selecting the option Make Default.

To test the model, I selected two images from the internet showing individual brands. As we can see, the model correctly classified each image.

clip_image010

clip_image012

Since I was happy with the results, I decided to increase the complexity of the model by creating a new tag and uploading a new set of images. After training the model, I noticed the results were not as good as before, since the new images were creating some confusion to the model. It took a couple of iterations until I got an acceptable model. Following the result of a quick test.

clip_image014

I now had a model that could correctly classify 3 different entities, so I decided to increase the challenge and added an image with multiple entities. The result I obtained helped me understand the flaw in my model.

clip_image016

The model identified the image should be classified as Others, however, although we have a Baileys and Hendricks bottle in the image, the probability for those two classes was too low. Here is why:

- When uploading images to the project I only used 1 tag per image. Based on that, the model will always try to classify an image with a single tag. As soon as I added more tags per image, my predictions improved

- All the examples used were showing a single entity, ie, only one type of bottle per image, except for the Others category. Example: I uploaded 30 images of Baileys bottles, 30 images of Hendricks bottles, and, for the category Others, 3 different types of bottles

- My model was trained to identify bottles where the image didn’t have multiple entities. As referred above, the model should always be trained with images that represent what the classifier will predict

- The number of images per class didn’t have enough variety. As any machine learning model, if we improve the number of examples, the model will perform better

Final Considerations

With just a couple of clicks and no deep machine learning knowledge I was able to create a predictive model that could accurately classify a set of images. With further iterations I could potentially have a model that could achieve the requirements of the scenario proposed at the beginning of this blog, however, would Custom Vision Service be the right tool? In my opinion the use of the Computer Vision API would be more adequate, but, this is definitely a service with massive potential.

As always, if you have any queries or considerations do let me know.

Azure Event Grid in a Modern Data Warehouse Architecture

In this blog I’ll give a light introduction to Azure Event Grid and demonstrate how it is possible to integrate the service in an modern data warehouse architecture.

Azure Event Grid is a fully managed event routing service that went into general availability on the 30th January 2018. With this service, we can subscribe to any event happening across our Azure resources and take advantage of serverless platforms like Azure Functions and Logic Apps to easily create serverless workflows. It has a built-in publish support for events with services like Blob Storage and Resource Groups and supports custom web hooks that can publish events to Azure and third -party services.

Following is a list of key terms from this service.

clip_image002

Events – In this context, an Event is a message that contains data describing what happened in the service. Eg. a new file was uploaded to a container in a Blob Storage. The event will contain information about the file, such as the name of the file.

Event Publishers – It is the source of the events published to the Event Grid. Following is a list of current and planned event publishers.

Available

Planned

- Azure Subscriptions (management operations)

- Custom Topics

- Event Hubs

- IoT Hub

- Resource Groups (management operations)

- Storage Blob

- Storage General-purpose v2 (GPv2)

- Azure Automation

- Azure Active Directory

- API Management

- Logic Apps

- IoT Hub

- Service Bus

- Azure Data Lake Store

- Cosmos DB

Topics - The endpoint where publishers send events

Event Subscriptions – Receives specific events from the Topic and sends them to the Event Handlers

Event Handlers – It is the receiver of the events subscribed by the event subscriptions. Following is a list of current and planned event handlers.

Available

Planned

- Azure Automation

- Azure Functions

- Event Hubs

- Logic Apps

- Microsoft Flow

- WebHooks

- Fabric Controller

- Service Bus

- Event Hubs

- Azure Data Factory

- Storage Queues


Azure Event Grid can be used as any other message queue service, however, the service stands when integrated in an event-based architecture. Let’s consider that we have an application that uploads csv files to a Blob Storage several times a day. As soon as a set of files are available, we want to move them to an Azure Data Lake Store (ADLS) to apply data transformations using Azure Data Lake Analytics (ADLA). Azure Data Factory (ADF) will be used to orchestrate the data movement. Finally, we need to extract complete data sets from the ADLS using the PolyBase features in Azure SQL DW and present them as tables to Azure Analysis Services (AAS). A Tabular model hosted in AAS will populate a set of Power BI reports.

In the diagram below we can see that we can subscribe the Blob Storage events using an Event Grid subscription and trigger a loading process using Logic App as soon as a file is uploaded to the Blob Storage.

Following I’ll detail how we can implement the sections surrounded by the red square.

image

Blob Storage

Blob storage events are only available in the Blob Storage and StorageV2 (general purpose v2) accounts. In this example, I created a Blob Storage account. Once the storage account was deployed I created the container to where the application was uploading the files.

clip_image006

Logic App

In Logic Apps we subscribed to the events from the Blob Storage and implemented a logic to validate if we had all the required files to start a new data load. If true, we called an Azure Function that triggered an ADF pipeline. An Azure Function was required because as of the time of writing, there wasn’t a Logic App connector to ADF. The ADF pipeline then executed a couple of U-SQL stored procedures that applied data transformations to the ingested data and created our dimension and fact files in the ADLS.

The following screens demonstrate how to create the logic app, the Azure Event Grid trigger and an overview of the workflow with all the requested steps.

clip_image008

When adding the Azure Even Grid trigger we are prompted to sign in. We should use the account with our subscription.

clip_image010

clip_image012

Once we hit save, we can go back to the blob storage account and find a new event grid subscription was created.

clip_image014

Final Considerations

Azure Event Grid uses a pay-per-event pricing model, meaning we only pay what we use. The first 100,000 operations per month are free and beyond that, £0.448 per million operations.

As always, if you have any queries or considerations do let me know.

Extraction and Analysis of GeoSpatial data with Azure Data Lake Analytics

I recently had to implement a solution to prove it was possible to integrate a shape file (.SHP) in Azure Data Lake Store (ADLS) for post geographic spatial analysis using Azure Data Lake Analytics (ADLA).

A shape file is a data set used by a geographic analysis application that stores a collection of geographic features, such as streets or zip code boundaries, in the form of points, lines or area features.

As you already figured, storing a shape file in ADLS is not a difficult goal to achieve, however, how can you possibly use ADLA to obtain the geographic data from the file? In this blog I’ll explain how we can extract the data to a supported format, such as CSV, and use it to run geographic spatial analysis in ADLA, with the support of the spatial data types introduced in the SQL Server 2008 (details here).

As always, whenever we face a limitation of ADLA, C# is our best friend. In order to read the content of a shape file, we need to start by adding a geospatial assembly to our solution, which, in my case, was the “Catfood” ESRI Shapefile Reader (details here).

The shape file used in this example contains a list of parks in London. The following code demonstrates how to extract the metadata and the geographic shapes to a CSV file. The only shapes extracted are polygons, although it is possible to add more if needed.

public static void CreateWorkForThreads()
{
    //Create a new dataset and store the data in a table
    DataSet ds = CreateNewDataSet();
    DataTable dt = ds.Tables[0];

    int i;
    int count = 0;

    // Parse the shapefile and select the columns we are interested in
    using (Shapefile shapefile = new Shapefile(@"path\file.shp"))
    {
        foreach (Shape shape in shapefile)
        {
            string[] metadataNames = shape.GetMetadataNames();
            string geometry = "";
            int countParts = 0;
            int countShape = 0;

            DataRow dr = dt.NewRow();

            //Extract the metadata. The first iteraction will extract the name of the columns
            if (metadataNames != null)
            {
                foreach (string metadataName in metadataNames)
                {
                    if (count == 0)
                        dr[metadataName] = metadataName;
                    else
                        dr[metadataName] = shape.GetMetadata(metadataName);
                }

            }

            //Shape is not part of the metadata, so manually defining the name of the column
            if (count == 0)
            {
                dr["shape"] = "shape";
            }
            else
            {
                // cast shape based on the type
                switch (shape.Type)
                {
                    case ShapeType.Point:
                        // a point is just a single x/y point
                        ShapePoint shapePoint = shape as ShapePoint;
                        MessageBox.Show("Point (" + shapePoint.Point.X.ToString() + ", " + shapePoint.Point.Y.ToString() + ")");
                        break;

                    case ShapeType.Polygon:
                        // a polygon contains one or more parts - each part is a list of points which
                        // are clockwise for boundaries and anti-clockwise for holes 
                        // see http://www.esri.com/library/whitepapers/pdfs/shapefile.pdf
                        ShapePolygon shapePolygon = shape as ShapePolygon;
                        foreach (PointD[] part in shapePolygon.Parts)
                        {
                            countShape = 0;

                            if (countParts == 0)
                                geometry = "(";
                            else
                                geometry = geometry + " | (";

                            foreach (PointD point in part)
                            {
                                if (part.Length - 1 != countShape)
                                    geometry = geometry + point.X + " " + point.Y + " |";
                                else
                                    geometry = geometry + point.X + " " + point.Y + " )";

                                countShape++;
                            }

                            countParts++;
                        }
                        break;

                    default:
                        break;
                }

                //Build our Polygon. 
                //Eg. POLYGON((-122.358 47.653, -122.348 47.649| -122.348 47.658, -122.358 47.658, -122.358 47.653))
                dr["shape"] = "POLYGON(" + geometry + ")";
            }

            dt.Rows.Add(dr);
            count++;
        }
    }

    //Extract the data to a csv file
    using (System.IO.StreamWriter sw =
    new System.IO.StreamWriter(@"path\filename.csv"))
    {
        foreach (DataRow row in dt.Rows)
        {
            object[] array = row.ItemArray;
            for (i = 0; i < array.Length - 1; i++)
            {
                sw.Write(array[i].ToString() + ",");
            }
            sw.WriteLine(array[i].ToString());
        }
    }
}

public static DataSet CreateNewDataSet()
{
    DataSet dsTemp = new DataSet();
    DataTable dtTemp = new DataTable("londonparks");
    dtTemp.Columns.Add("id", typeof(string));
    dtTemp.Columns.Add("parkname", typeof(string));
    dtTemp.Columns.Add("street", typeof(string));
    dtTemp.Columns.Add("postcode", typeof(string));
    dtTemp.Columns.Add("shape", typeof(string));
    dsTemp.Tables.Add(dtTemp);

    return dsTemp;
}

Now that we have a valid file that can be processed by ADLA, we can upload it to ADLS and start performing geospatial analysis. To do so, I simply followed the logic described in Sacha’s blog (here).

The following U-SQL has in consideration a dataset that contains details of the trajectory of a courier, tracked on a daily basis. With the following code, we identify if a courier drove by a park by using the Intersect function. Because we have to cross two datasets, a C# function was created to help the evaluation of multiple events.

// Reference the assemblies we require in our script.
REFERENCE SYSTEM ASSEMBLY [System.Xml];
REFERENCE ASSEMBLY [SQLServerExtensions].[SqlSpatial];
REFERENCE ASSEMBLY [USQL.Core];

// Once the appropriate assemblies are registered, we can alias them using the USING keyword.
USING Geometry = Microsoft.SqlServer.Types.SqlGeometry;
USING Geography = Microsoft.SqlServer.Types.SqlGeography;
USING SqlChars = System.Data.SqlTypes.SqlChars;
USING [USQL].[Core].[Utilities];

// Extract the list of parks
@parks =
   EXTRACT
      [ID]       	    string,
      [PARKNAME]        string,
      [STREET]	        string,
      [POSTCODE]        string,
      [SHAPE]           string
   FROM "RAW/Parks.csv"
   USING Extractors.Text(delimiter : ',', silent : false, quoting : true, skipFirstNRows : 1);

//Extract data from the file containing the courier trajectory
@trajectories =
    EXTRACT
        GPSDateTimeUTC          DateTime,
        ReceivedDatetimeUTC     DateTime,
        VehicleKey              string,
        Altitude                int,
        Longitude               double,
        Latitude                double,
        Distance                decimal,
        VehicleSpeedMph         decimal
    FROM "CURATED/Trajectory/Trajectory.TXT"
    USING Extractors.Text(delimiter : '|', silent : false, quoting : true, skipFirstNRows : 1);

//Get the list of vehicles that drove by the park. 
@vehicleIntersection =
    SELECT DISTINCT a. *,
                    "1" AS VehicleIntersected
    FROM @trajectories AS a
         CROSS JOIN
             @parks AS b
    WHERE Utilities.Intersect(b.[SHAPE], a.[Longitude], a.[Latitude]).ToString() == "True";

//Get the list of vehicles that didn't drive by the park. 
@vehicleWithoutIntersection =
    SELECT a. *,
           "0" AS VehicleIntersected
    FROM @trajectories AS a
         LEFT JOIN
             @vehicleIntersection AS b
         ON b.VehicleKey == a.VehicleKey
            AND b.GPSDateTimeUTC == a.GPSDateTimeUTC
    WHERE b.VehicleKey IS NULL;

//Union both datasets to get the complete set of data
@finalData =
    SELECT *
    FROM @vehicleIntersection
    UNION ALL
    SELECT *
    FROM @vehicleWithoutIntersection;

//Export the results to a csv file
OUTPUT
   @finalData TO "LABORATORY/GeoSpatialIntersection.csv"
   USING Outputters.Text(outputHeader : true, delimiter : ',', quoting : true);

And here is the C# function. It accepts three parameters and calculate the intersection of a point with a shape.

public static string Intersect(string shape, double longitude, double latitude)
{
	//Because we had a csv file, the coordinates in the polygon were separated by |
	//It is important to use the .MakeValid() method to validate any invalid shape
	//In case the dataset had multypoligon shapes, without the MakeValid(), the function would throw an error
    var g =
        Geography.STGeomFromText(
            new SqlChars(
                shape.Replace('|',',')), 4326).MakeValid();
    
    var h = Geography.Point(longitude, latitude, 4326);

    return g.STIntersects(h).ToString();
}

As always, if you have any questions or comments, do let me know.

Working with Manual and Automated KPIs in a Tabular Cube

In a recent project, we had to produce a scorecard using a set of manual and calculated KPIs. To obtain the manual KPI figures, we used Master Data Services (MDS) where the users could insert the values, while for the calculated, we used the base measures created in the tabular cube.

So far, this requirement does not look very complicated, however, what if I tell you that the same KPI can either be manually or automatically calculated by the cube? And that we have to present the values for different levels of a hierarchy? And that some of the KPIs are not absolute values but ratios? Now that I got your attention, let’s have a look at the solution we implemented.


How to join manual and automated KPIs?

Because the client couldn’t always provide the data to calculate the base measures, we delivered an MDS model to, among other functionalities, manually insert the numeric values. You can check this blog if you want to know more about the Survey model (http://blogs.adatis.co.uk/tristanrobinson/post/Modelling-Survey-Style-Data).

Since we were working with different markets, the same KPI could either be manually or automatically calculated, which means, the cube had to to select the appropriate scenario, depending on the selected market. In order to achieve such requirement, we created 3 measures.

AutKPI – Using base measures from multiple tables, we defined the DAX code to calculate the KPI

ManKPI – Knowing all the MDS values were in one table, we defined a simple DAX query to sum the values

Actual – This measure was implemented with an IF statement. Eg.

        Actual:=IF(ISBLANK([AutKPI]), [ManKPI], [AutKPI])


How to aggregate ratio KPIs?

Let’s have a look at the example below, where we are calculating the KPI for two levels of a geography hierarchy.

clip_image002

Automated KPI 1 – Europe is naturally aggregating the values from Great Britain and Ireland

Automated KPI 2 - Considering we are using base measures, the cube can properly calculate the KPI at Europe level.

Manual KPI 1 – All manual entries were aggregated with a SUM. Because those are absolute values, the figure for Europe is correct

Manual KPI 2 ­– Following the same logic as Manual KPI 1, we can see the Europe value is incorrect. Because this is a ratio we can't aggregate the value from the lower levels. The simplest approach to resolve this problem was to create a new calculation using an AVERAGE function, however, considering the requirements, we had to introduce a weighted average.


Implementing weighted averages

The first step to this approach is to define a weight for each market. Since the values can change according to the user’s needs, we added a new entity to the MDS model.

clip_image004

Now let’s consider the example below showing the weighted approach.

clip_image006

Following is the formula to calculate the KPI at Europe level. For a better understanding, I split it in different steps.

C1 GB: Manual KPI x Weight

C1 Ireland: Manual KPI x Weight

C2: C1 GB + C1 Ireland

C3: GB Weight + Ireland Weight

Europe KPI: C2 / C3

The scope of the project stated we had to implement the following logic:

· When presenting the KPIs at market level, don’t apply the weighting

· When presenting the KPIs at region level, apply the weighting but only for the ratio KPIs

The biggest challenge of this requirement was to overwrite the aggregating logic of the geography hierarchy. To achieve that, we implemented a dynamic segmentation pattern on the ratio KPIs (more details on this link http://www.daxpatterns.com/dynamic-segmentation/). This approach can be split in four steps.

First step is the calculation of our numerator.

Num Weighted AutKPI – Because the base measures from our automated KPIs are from different tables, we had to firstly group our data by market and region level and only then apply the calculation. Eg.

Num Weighted AutKPI:=
                 CALCULATE(
                    SUMX(
                       SUMMARIZE(
                          'KPI Value',
                           Market[Region],
                           Market[Market]
                           ),
                        [AutKPI] * [KPI Weight]
                    )
                 )


Num Weighted ManKPI – On this instance, the grouping was not necessary because we only had one measure to consider. Eg.

Num Weighted ManKPI:=
                 CALCULATE(
                    SUMX(
                       'KPI Value',
                        CALCULATE(
                           SUM ( 'KPI Value'[KPIActual] ) * [KPI Weight]),
                           'KPI'[KPI] = "Manual KPI"
                           )
                        )
                    )


The second step is the calculation of our denominator.

Den Weighted AutKPI – Once again, because the weights were stored in a single table no grouping was necessary.

Den Weighted AutKPI:=
                 CALCULATE(
                    SUMX(
                       'KPI Value',
                        CALCULATE([KPI Weight])
                        )
                    ,'KPI'[KPI] = “Automated KPI"
                 )


Den Weighted ManKPI – The same logic applies on this instance.

Den Weighted ManKPI:=
                 CALCULATE(
                    SUMX(
                       'KPI Value',
                        CALCULATE([KPI Weight])
                        )
                    ,'KPI'[KPI] = “Manual KPI"
                 )


The third step is the division of our measures.

Weighted AutKPI:= DIVIDE([Num Weighted AutKPI], [Den Weighted AutKPI])
Weighted ManKPI:= DIVIDE([Num Weighted ManKPI], [Num Weighted ManKPI])


The fourth step is the calculation of our Weighted Actual measure, by once again, using an IF function.

Weighted Actual:= IF(ISBLANK([Weighted AutKPI]), [Weighted ManKPI], [Weighted AutKPI])


Finally, considering we only wanted to use the weighted measures for a subset of our KPIs, we created a new measure using a SWITCH function. Eg.

Actuals:=
      SWITCH(
         VALUES(‘KPI’[KPI]),
         “Percentage KPI”, [Weighted Actual],
         "Percentage KPI2", [Weighted Actual],
         “Absolute KPI”, [Actual],
         "Absolute KPI2",[Actual]
      )


Hopefully, I was able to clearly demonstrate our problem and how we managed to implement a solution to solve it. As always, if you have any questions or comments, do let me know.

How to run a successful testing stage

As we all know, testing is one of the most important stages of an IT project, however, either because the client doesn’t know how to test the solution, because we don’t have sample data we can use to compare against our results or because there is not a general approach we can apply to all projects, testing is sometimes set to failure. On this blog, I will share the approach adopted on the project I have been working.

Step 1 - Build test scenarios

This step can only succeed with the help of the Business Analyst or any other person from the business side.

In this project, we are creating a set of KPIs to be used on Power BI and Excel. Considering the end goal, the BA created a set of scenarios (example below) that we used to test our values.

image

 

Step 2 – Create SQL scripts to query the source data

One of the biggest risks of this approach lies on this step. Here, we want to create a set of SQL scripts that will follow the logic implemented in the cube. If the logic is wrong, the KPI will show incorrect values, even though we managed to match the results from the source data and the cube. This is where the input of the business user is crucial, since only him will be able to look at the numbers and confirm they are accordingly.

Building the test script is very simple. All we should do is set a couple of variables and make sure all the business rules are applied.

USE STAGE

DECLARE @StartDate DATETIME = '20170602'
DECLARE @EndDate DATETIME = '20170603'
DECLARE @OutletUniverse Int

IF OBJECT_ID(N'tempdb..#CallDataAggregated', N'U') IS NOT NULL DROP TABLE #CallDataAggregated;

SELECT
 COUNT(DISTINCT B.VISIT_ID) AS POPCount
INTO #CallDataAggregated
FROM Dms.SalRdCallPerformanceRep A
INNER JOIN Dms.SalVsDailyTimingSum B
	ON B.DIST_CD = A.DIST_CD
		AND B.SLSMAN_CD = A.SLSMAN_CD
		AND B.CUST_CD = A.CUST_CD
		AND B.VISIT_DT = A.VISIT_DT
INNER JOIN Dms.SalSlSalesman C 
		ON C.SLSMAN_CD = A.SLSMAN_CD
		AND C.DIST_CD = A.DIST_CD
WHERE (A.VISIT_DT >= @StartDate AND A.VISIT_DT < @EndDate)
	AND USER_DEFINE1 IN ('DSM',
						'EBSM',
						'HTSR',
						'KAM',
						'OTSR',
						'PTSR',
						'RVR',
						'TMR')
	AND B.VISIT_TYPE IN ('S','E','X')

SELECT @OutletUniverse = 
 MAX(OutletUniverse)
FROM Warehouse.Fct.MarketConfiguration
WHERE MarketKey = 13
 AND (DateKey >= CONVERT(VARCHAR(8),@StartDate,112) AND DateKey < CONVERT(VARCHAR(8),@EndDate,112))

SELECT
  POPCount
 ,@OutletUniverse
 ,(CONVERT(FLOAT,POPCount) / @OutletUniverse) AS Coverage
FROM #CallDataAggregated
 

 

Step 3 – Share the results with the Business Analyst and Testers

Once our testing is complete and the results are approved by the BA, we release the KPIs to UAT. If we are very lucky, we will have a tester that will then carry with his own checks, however, if that is not the case, we will have to make the work for them.

 

Step 4 – Product testing session with the business users

To sign off the KPIs, the business users need to agree with the results that are shown on the cube, however, they don’t always have the time, skills or tools to query the data. To resolve such problem, we created some examples in excel were we compare the source data with the cube.

KPI UAT Cube – In this sheet, we run a query in the cube for a specific scenario

clip_image002[6]

KPI Source ��� We query the source data ensuring that all the business rules are applied, which is a risky approach as discussed above

KPI Pivot – We create a Pivot table based on the data from the KPI Source sheet

clip_image004[5]

Once the excel scenarios are completed, we arrange a session with the business users and demonstrate that the values from the cube match with the source data. If they agree with the results, the KPIs are signed off and the testing stage is considered a success.

If you have any questions or thoughts, please leave your comment below.