Upsert to Azure SQL Datawarehouse using PySpark

At the moment SQL MERGE operation is not available in Azure SQL Data Warehouse. However, it is possible to implement this feature using Azure SQL Data Warehouse connector in Databricks with some PySpark code.

Upsert can be done in 2 ways

  • Update existing records in target that are newer in source
  • Filter out updated records from source
  • Insert just the new records.

Alternatively,

  • Delete existing records that are older from target
  • Delete existing records from source that are older than target.
  • Insert remaining records

Before we dive into code, its good to understand how Databricks Azure SQL Datawarehouse connector works. If you haven’t worked with this connector before its a good idea to read this post before you proceed. The connector supports basic read and write operations on Azure SQL Datawarehouse. It also has a preAction and postActions feature which allows execution of valid SQL statements at Datawarehouse before or after a write operation. This is the feature I am going to use for Upserts specifically the postActions. However it comes with a catch, the pre and postActions works only on a new table. So this is how it’s done and I am using the second method for upsert

  • Create a temporary staging table in Azure SQL Datawarehouse in overwrite mode and write the input dataframe.
  • After the dataframe is written, create a postAction on the staging table to delete the records from target table that exist in the staging table and is older than the one in staging table.
  • Create a second postAction to delete the records from staging table that exist at target and is older than the one in target table.
  • At this stage create a third postAction to insert the records from staging table to target table

This is how the PySpark code looks like. I created a function with these parameters

  • df -Input dataframe
  • dwhStagingTable – Azure SQL Data Warehouse Table used to stage the input dataframe for merge/upsert operation.
  • dwhTargetTable – Azure SQL Data Warehouse Target table where the dataframe is merged/upserted.
  • lookupColumns – pipe separated columns that uniquely defines a record in input dataframe
  • deltaName – Name of watermark column in input dataframe if any
  • dwhStagingDistributionColumn – Name of the column used as hash distribution column in staging table of DWH. This column will help improve upsert performance by minimizing data movement provided the dwhTargetTable is also hash distributed on the same column.
def upsertDWH(df,dwhStagingTable,dwhTargetTable,lookupColumns,deltaName,dwhStagingDistributionColumn):
    
    #STEP1: Derive dynamic delete statement to delete existing record from TARGET if the source record is newer
    lookupCols =lookupColumns.split("|")
    whereClause=""
    for col in lookupCols:
      whereClause= whereClause + dwhStagingTable  +"."+ col  + "="+ dwhTargetTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is greater than existing record
      whereClause= whereClause + dwhStagingTable  +"."+ deltaName  + ">="+ dwhTargetTable +"." + deltaName
    else:
      #remove last "and"
      remove="and"
      reverse_remove=remove[::-1]
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteSQL = "delete from " + dwhTargetTable + " where exists (select 1 from " + dwhStagingTable + " where " +whereClause +");"

    #STEP2: Delete existing records but outdated records from SOURCE
    whereClause=""
    for col in lookupCols:
      whereClause= whereClause + dwhTargetTable  +"."+ col  + "="+ dwhStagingTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is lesser than existing record
      whereClause= whereClause + dwhTargetTable  +"."+ deltaName  + "> "+ dwhStagingTable +"." + deltaName
    else:
      #remove last "and"
      remove="and"
      reverse_remove=remove[::-1]
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteOutdatedSQL = "delete from " + dwhStagingTable + " where exists (select 1 from " + dwhTargetTable + " where " + whereClause + " );"
    #print("deleteOutdatedSQL={}".format(deleteOutdatedSQL))

    #STEP3: Insert SQL
    insertSQL ="Insert Into " + dwhTargetTable + " select * from " + dwhStagingTable +";"
    #print("insertSQL={}".format(insertSQL))

    #consolidate post actions SQL
    postActionsSQL = deleteSQL + deleteOutdatedSQL + insertSQL
    print("postActionsSQL={}".format(postActionsSQL))

    sqldwJDBC = "Your JDBC Connection String for Azure SQL Data Warehouse. Preferably from Key Vault"
    tempSQLDWFolder = "A temp folder in Azure Datalake Storage or Blob Storage for temp polybase files "
    
    
    #Use Hash Distribution on STG table where possible
    if dwhStagingDistributionColumn is not None and len(dwhStagingDistributionColumn) > 0:
      stgTableOptions ="CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = HASH (" +  dwhStagingDistributionColumn + ")"
    else:
      stgTableOptions ="CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN"
    
    #Upsert/Merge to Target using STG postActions
    df.write.format("com.databricks.spark.sqldw")\
      .option("url", sqldwJDBC).option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable",dwhStagingTable)\
      .option("tableOptions",stgTableOptions)\
      .option("tempDir",tempSQLDWFolder)\
      .option("maxStrLength",4000)\
      .option("postActions",postActionsSQL)\
      .mode("overwrite").save()

Time Zone Conversions in PySpark

PySpark has built-in functions to shift time between time zones. Just need to follow a simple rule. It goes like this. First convert the timestamp from origin time zone to UTC which is a point of reference. Then convert the timestamp from UTC to the required time zone. In this way there is no need to maintain lookup tables and its a generic method to convert time between time zones even for the ones that require daylight savings offset.

Continue reading

HdfsBridge::recordReaderFillBuffer – Unexpected error encountered filling record reader buffer: IllegalArgumentException: Must be 12 bytes

Parquet is my preferred format for storing files in data lake. Parquet’s columnar storage and compression makes it very efficient for in-memory processing tasks like Spark/Databricks notebooks while saving cost on storage. Parquet also supports almost all encoding schemes out there. Perhaps the coolest thing in Parquet is unlike CSV there is no such thing as column/row separator. So there is no need to escape those characters if they are part of data.

Azure SQL Data Warehouse supports Parquet data format for External (PolyBase) tables. External tables reference the underlying storage blobs and gives an option to query the data lake using SQL. In fact this is recommended in Microsoft’s reference architecture. With some Parquet files this error gets thrown when the External Table is queried

HdfsBridge::recordReaderFillBuffer – Unexpected error encountered filling record reader buffer: IllegalArgumentException: Must be 12 bytes

In the absence of clear exception message it took a while to figure this out. This error usually happens on a Timestamp column specifically when the data is in yyyy/MM/dd hh:mm:ss format. For some reason SQL Data Warehouse expects the Timestamp data to be in yyyy-MM-dd hh:mm:ss format. Changing the date separator from / to – resolved this issue, although it must be mentioned the underlying file is a perfectly valid Parquet file.

Kaggle: TalkingData

A brief retrospective of my submission for Kaggle data science competition that predicts the gender and age group of a smartphone user based on their usage pattern.

Continue reading

Kaggle: Grupo Bimbo

A brief retrospective of my submission for Kaggle data science competition that forecasts inventory demand for Grupo Bimbo.

Continue reading

Common Type 2 SCD Anti-patterns

Slowly Changing Dimension (SCD) is great for tracking historical changes to dimension attributes. SCDs have evolved over the years and besides the conventional type 1 (update), type 2 (add row) and type 3 (add column), now there are extensions up to type 7 including type 0. Almost every DW/BI project has at least few type 2 dimensions where a change to an attribute causes the current dimension record to be end dated and creates a new record with the new value.

Continue Reading

Forecasting Exchange Rates Using R Time Series

Time Series is the historical representation of data points collected at periodic intervals of time. Statistical tools like R use forecasting models to analyse historical time series data to predict future values with reasonable accuracy. In this post I will be using R time series to forecast the exchange rate of Australian dollar using daily closing rate of the dollar collected over a period of two years.

Continue reading