Jose Mendes

Jose Mendes' Blog

PII Anonymisation and Self-Joins in U-SQL

It’s been a while since I wrote my last blog, so I decided to share one of the latest challenges I faced in a project.

Requirements:

  • Anonymise any Personably Identifiable Information (PII) data stored in an Azure Data Lake Store (ADLS);
  • Anonymise any PII data from customers identified in a configuration file.


Details:

  • All PII data should be anonymised as soon as the files land in the DEV or UAT ADLS;
  • The PII data landing in the PROD ADLS should only be anonymised if identified in the configuration file.


Goal:

  • Create a single U-SQL pattern to achieve the requirements.


Step 1

Reference assemblies and declare variables. Pay special attention to the variable Environment. This will be dynamically populated by the Azure Data Factory (ADF) pipeline activity and will identify in which environment the U-SQL is executed.

REFERENCE ASSEMBLY [USQL.Core];

USING [USQL].[Core].[Utilities];

USING [USQL].[Core].[Anonymisation];

//Set variables

DECLARE @month string = DateTime.UtcNow.ToString("MM");

DECLARE @day string = DateTime.UtcNow.ToString("dd");

DECLARE @year string = DateTime.UtcNow.ToString("yyyy");

DECLARE @schemaVersion int = 1;

DECLARE @Environment string = "DEV";

DECLARE @inputLocation = "RAW/Sensitive/" + @schemaVersion + "/" + @year + "/" + @month + "/" + @day + "/{*}.csv";

DECLARE @outputLocation = "RAW/Anonymized/" + @schemaVersion + "/" + @year + "/" + @month + "/" + @day + "/Customers.csv";

DECLARE @configLocation = "RAW/Config/Configuration.csv";


Step 2

Extract the data from the source and configuration file. The configuration file only includes an ID that identifies a customer.

//Extract data from source file

@ExtractSourceData =

EXTRACT [CustomerId] string,

[FirstName] string,

[LastName] string,

[EmailAddress] string,

[HomeTel] string,

[MobileNumber] string,

[Address] string,

[PostalCode] string

FROM @inputLocation

USING Extractors.Text(delimiter : '|', silent : false, quoting : true, skipFirstNRows : 1);


//Extract data from the configuration file

@ExtractConfigurationData =

EXTRACT [Id] string

FROM @configLocation

USING Extractors.Text(silent : true, quoting : true, skipFirstNRows : 1);


Step 3

Create two rowsets, one to include the distinct list of CustomerId from the source file and the other to include the distinct list of Id from the configuration file.

//Obtain a list of distinct CustomerId from source file

@SelectSourceData =

SELECT DISTINCT [CustomerId]

FROM @ExtractSourceData;


//Obtain a list of distinct Ids from configuration file

@SelectConfigurationData =

SELECT DISTINCT [Id]

FROM @ExtractConfigurationData;


Step 4

This is one of the most important details in this script. U-SQL does not support self-joins, which is needed to ensure we anonymise all data if we are in a non-production environment. To overcome this limitation, we create a new rowset to union the IDs from the source and configuration file.

//Create a new rowset to use on self-join

@UnionIds =

SELECT [CustomerId], "" AS [Id]

FROM @SelectSourceData

UNION

SELECT "" AS [CustomerId], [Id]

FROM @SelectConfigurationData;


Step 5

In this step, we identify which records should and shouldn’t be anonymised. If you remember from the requirements, if the data is in a non-production environment, we have to anonymise all PII data, however, if we are in production, we should only anonymise the records identified in the configuration file. This could easily be achieved with a self-join, however, because it isn’t supported by U-SQL, we use the rowset from the previous step.

//Identify records to be anonymised

@FlagAnonymiseRecords =

SELECT DISTINCT A.[CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @ExtractSourceData AS A

JOIN @UnionIds AS B

ON A.[CustomerId] == (@Environment == "PROD" ? B.[Id] : B.[CustomerId]);


//Identify records that shouldn't be anonymised.

//ANTISEMIJOIN works as a SQL NOT IN

@FlagDoNotAnonymiseRecords =

SELECT DISTINCT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @ExtractSourceData AS A

ANTISEMIJOIN

(

SELECT DISTINCT [CustomerId]

FROM @FlagAnonymiseRecords

) AS B

ON A.[CustomerId] == B.[CustomerId];


Step 6

Now that we identified the records that should be anonymised, we can start applying the correct mask. This is achieved by using different classes created in an assembly that is registered in the Azure Data Lake Analytics (ADLA).

//Anonymise data

@AnonymizsData =

SELECT [CustomerId],

Utilities.ReturnLenght([FirstName]) == "0" ? [FirstName] : Anonymisation.AnonymiseForename([CustomerId], [FirstName]) AS [FirstName],

Utilities.ReturnLenght([LastName]) == "0" ? [LastName] : Anonymisation.AnonymiseSurname([CustomerId], [LastName]) AS [LastName],

Utilities.ReturnLenght([EmailAddress]) == "0" ? [EmailAddress] : Anonymisation.AnonymiseEmail([EmailAddress]) AS [HomeTel],

Utilities.ReturnLenght([HomeTel]) == "0" ? [HomeTel] : Anonymisation.AnonymiseNumbers([HomeTel]) AS [HomeTel],

Utilities.ReturnLenght([MobileNumber]) == "0" ? [MobileNumber] : Anonymisation.AnonymiseNumbers([MobileNumber]) AS [CellNumber],

Utilities.ReturnLenght([PostalCode]) == "0" ? [PostalCode] : Anonymisation.AnonymisePostalCode([PostalCode]) AS [PostalCode]

FROM @FlagAnonymiseRecords;


Step 7

The last step in this process is to union the anonymised and non-anonymised rowsets and output the file to the ADLS.

//Union anonymised and non-anonymised data

@FullData =

SELECT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @AnonymiseData

UNION

SELECT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @FlagDoNotAnonymiseRecords;


//Select data for output

@Output =

SELECT [CustomerId],

[FirstName],

[LastName],

[EmailAddress],

[HomeTel],

[MobileNumber],

[Address],

[PostalCode]

FROM @FullData;


//Output data to destination

OUTPUT @Output

TO @outputLocation

USING Outputters.Text(outputHeader : true, delimiter : '|', quoting : true);

As always, if you have any questions or comments do let me know.

Extraction and Analysis of GeoSpatial data with Azure Data Lake Analytics

I recently had to implement a solution to prove it was possible to integrate a shape file (.SHP) in Azure Data Lake Store (ADLS) for post geographic spatial analysis using Azure Data Lake Analytics (ADLA).

A shape file is a data set used by a geographic analysis application that stores a collection of geographic features, such as streets or zip code boundaries, in the form of points, lines or area features.

As you already figured, storing a shape file in ADLS is not a difficult goal to achieve, however, how can you possibly use ADLA to obtain the geographic data from the file? In this blog I’ll explain how we can extract the data to a supported format, such as CSV, and use it to run geographic spatial analysis in ADLA, with the support of the spatial data types introduced in the SQL Server 2008 (details here).

As always, whenever we face a limitation of ADLA, C# is our best friend. In order to read the content of a shape file, we need to start by adding a geospatial assembly to our solution, which, in my case, was the “Catfood” ESRI Shapefile Reader (details here).

The shape file used in this example contains a list of parks in London. The following code demonstrates how to extract the metadata and the geographic shapes to a CSV file. The only shapes extracted are polygons, although it is possible to add more if needed.

public static void CreateWorkForThreads()
{
    //Create a new dataset and store the data in a table
    DataSet ds = CreateNewDataSet();
    DataTable dt = ds.Tables[0];

    int i;
    int count = 0;

    // Parse the shapefile and select the columns we are interested in
    using (Shapefile shapefile = new Shapefile(@"path\file.shp"))
    {
        foreach (Shape shape in shapefile)
        {
            string[] metadataNames = shape.GetMetadataNames();
            string geometry = "";
            int countParts = 0;
            int countShape = 0;

            DataRow dr = dt.NewRow();

            //Extract the metadata. The first iteraction will extract the name of the columns
            if (metadataNames != null)
            {
                foreach (string metadataName in metadataNames)
                {
                    if (count == 0)
                        dr[metadataName] = metadataName;
                    else
                        dr[metadataName] = shape.GetMetadata(metadataName);
                }

            }

            //Shape is not part of the metadata, so manually defining the name of the column
            if (count == 0)
            {
                dr["shape"] = "shape";
            }
            else
            {
                // cast shape based on the type
                switch (shape.Type)
                {
                    case ShapeType.Point:
                        // a point is just a single x/y point
                        ShapePoint shapePoint = shape as ShapePoint;
                        MessageBox.Show("Point (" + shapePoint.Point.X.ToString() + ", " + shapePoint.Point.Y.ToString() + ")");
                        break;

                    case ShapeType.Polygon:
                        // a polygon contains one or more parts - each part is a list of points which
                        // are clockwise for boundaries and anti-clockwise for holes 
                        // see http://www.esri.com/library/whitepapers/pdfs/shapefile.pdf
                        ShapePolygon shapePolygon = shape as ShapePolygon;
                        foreach (PointD[] part in shapePolygon.Parts)
                        {
                            countShape = 0;

                            if (countParts == 0)
                                geometry = "(";
                            else
                                geometry = geometry + " | (";

                            foreach (PointD point in part)
                            {
                                if (part.Length - 1 != countShape)
                                    geometry = geometry + point.X + " " + point.Y + " |";
                                else
                                    geometry = geometry + point.X + " " + point.Y + " )";

                                countShape++;
                            }

                            countParts++;
                        }
                        break;

                    default:
                        break;
                }

                //Build our Polygon. 
                //Eg. POLYGON((-122.358 47.653, -122.348 47.649| -122.348 47.658, -122.358 47.658, -122.358 47.653))
                dr["shape"] = "POLYGON(" + geometry + ")";
            }

            dt.Rows.Add(dr);
            count++;
        }
    }

    //Extract the data to a csv file
    using (System.IO.StreamWriter sw =
    new System.IO.StreamWriter(@"path\filename.csv"))
    {
        foreach (DataRow row in dt.Rows)
        {
            object[] array = row.ItemArray;
            for (i = 0; i < array.Length - 1; i++)
            {
                sw.Write(array[i].ToString() + ",");
            }
            sw.WriteLine(array[i].ToString());
        }
    }
}

public static DataSet CreateNewDataSet()
{
    DataSet dsTemp = new DataSet();
    DataTable dtTemp = new DataTable("londonparks");
    dtTemp.Columns.Add("id", typeof(string));
    dtTemp.Columns.Add("parkname", typeof(string));
    dtTemp.Columns.Add("street", typeof(string));
    dtTemp.Columns.Add("postcode", typeof(string));
    dtTemp.Columns.Add("shape", typeof(string));
    dsTemp.Tables.Add(dtTemp);

    return dsTemp;
}

Now that we have a valid file that can be processed by ADLA, we can upload it to ADLS and start performing geospatial analysis. To do so, I simply followed the logic described in Sacha’s blog (here).

The following U-SQL has in consideration a dataset that contains details of the trajectory of a courier, tracked on a daily basis. With the following code, we identify if a courier drove by a park by using the Intersect function. Because we have to cross two datasets, a C# function was created to help the evaluation of multiple events.

// Reference the assemblies we require in our script.
REFERENCE SYSTEM ASSEMBLY [System.Xml];
REFERENCE ASSEMBLY [SQLServerExtensions].[SqlSpatial];
REFERENCE ASSEMBLY [USQL.Core];

// Once the appropriate assemblies are registered, we can alias them using the USING keyword.
USING Geometry = Microsoft.SqlServer.Types.SqlGeometry;
USING Geography = Microsoft.SqlServer.Types.SqlGeography;
USING SqlChars = System.Data.SqlTypes.SqlChars;
USING [USQL].[Core].[Utilities];

// Extract the list of parks
@parks =
   EXTRACT
      [ID]       	    string,
      [PARKNAME]        string,
      [STREET]	        string,
      [POSTCODE]        string,
      [SHAPE]           string
   FROM "RAW/Parks.csv"
   USING Extractors.Text(delimiter : ',', silent : false, quoting : true, skipFirstNRows : 1);

//Extract data from the file containing the courier trajectory
@trajectories =
    EXTRACT
        GPSDateTimeUTC          DateTime,
        ReceivedDatetimeUTC     DateTime,
        VehicleKey              string,
        Altitude                int,
        Longitude               double,
        Latitude                double,
        Distance                decimal,
        VehicleSpeedMph         decimal
    FROM "CURATED/Trajectory/Trajectory.TXT"
    USING Extractors.Text(delimiter : '|', silent : false, quoting : true, skipFirstNRows : 1);

//Get the list of vehicles that drove by the park. 
@vehicleIntersection =
    SELECT DISTINCT a. *,
                    "1" AS VehicleIntersected
    FROM @trajectories AS a
         CROSS JOIN
             @parks AS b
    WHERE Utilities.Intersect(b.[SHAPE], a.[Longitude], a.[Latitude]).ToString() == "True";

//Get the list of vehicles that didn't drive by the park. 
@vehicleWithoutIntersection =
    SELECT a. *,
           "0" AS VehicleIntersected
    FROM @trajectories AS a
         LEFT JOIN
             @vehicleIntersection AS b
         ON b.VehicleKey == a.VehicleKey
            AND b.GPSDateTimeUTC == a.GPSDateTimeUTC
    WHERE b.VehicleKey IS NULL;

//Union both datasets to get the complete set of data
@finalData =
    SELECT *
    FROM @vehicleIntersection
    UNION ALL
    SELECT *
    FROM @vehicleWithoutIntersection;

//Export the results to a csv file
OUTPUT
   @finalData TO "LABORATORY/GeoSpatialIntersection.csv"
   USING Outputters.Text(outputHeader : true, delimiter : ',', quoting : true);

And here is the C# function. It accepts three parameters and calculate the intersection of a point with a shape.

public static string Intersect(string shape, double longitude, double latitude)
{
	//Because we had a csv file, the coordinates in the polygon were separated by |
	//It is important to use the .MakeValid() method to validate any invalid shape
	//In case the dataset had multypoligon shapes, without the MakeValid(), the function would throw an error
    var g =
        Geography.STGeomFromText(
            new SqlChars(
                shape.Replace('|',',')), 4326).MakeValid();
    
    var h = Geography.Point(longitude, latitude, 4326);

    return g.STIntersects(h).ToString();
}

As always, if you have any questions or comments, do let me know.