Upsert to Azure SQL Datawarehouse using PySpark

At the moment SQL MERGE operation is not available in Azure SQL Data Warehouse. However, it is possible to implement this feature using Azure SQL Data Warehouse connector in Databricks with some PySpark code.

Upsert can be done in 2 ways

  • Update existing records in target that are newer in source
  • Filter out updated records from source
  • Insert just the new records.


  • Delete existing records that are older from target
  • Delete existing records from source that are older than target.
  • Insert remaining records

Before we dive into code, its good to understand how Databricks Azure SQL Datawarehouse connector works. If you haven’t worked with this connector before its a good idea to read this post before you proceed. The connector supports basic read and write operations on Azure SQL Datawarehouse. It also has a preAction and postActions feature which allows execution of valid SQL statements at Datawarehouse before or after a write operation. This is the feature I am going to use for Upserts specifically the postActions. However it comes with a catch, the pre and postActions works only on a new table. So this is how it’s done and I am using the second method for upsert

  • Create a temporary staging table in Azure SQL Datawarehouse in overwrite mode and write the input dataframe.
  • After the dataframe is written, create a postAction on the staging table to delete the records from target table that exist in the staging table and is older than the one in staging table.
  • Create a second postAction to delete the records from staging table that exist at target and is older than the one in target table.
  • At this stage create a third postAction to insert the records from staging table to target table

This is how the PySpark code looks like. I created a function with these parameters

  • df -Input dataframe
  • dwhStagingTable – Azure SQL Data Warehouse Table used to stage the input dataframe for merge/upsert operation.
  • dwhTargetTable – Azure SQL Data Warehouse Target table where the dataframe is merged/upserted.
  • lookupColumns – pipe separated columns that uniquely defines a record in input dataframe
  • deltaName – Name of watermark column in input dataframe if any
  • dwhStagingDistributionColumn – Name of the column used as hash distribution column in staging table of DWH. This column will help improve upsert performance by minimizing data movement provided the dwhTargetTable is also hash distributed on the same column.
def upsertDWH(df,dwhStagingTable,dwhTargetTable,lookupColumns,deltaName,dwhStagingDistributionColumn):
    #STEP1: Derive dynamic delete statement to delete existing record from TARGET if the source record is newer
    lookupCols =lookupColumns.split("|")
    for col in lookupCols:
      whereClause= whereClause + dwhStagingTable  +"."+ col  + "="+ dwhTargetTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is greater than existing record
      whereClause= whereClause + dwhStagingTable  +"."+ deltaName  + ">="+ dwhTargetTable +"." + deltaName
      #remove last "and"
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteSQL = "delete from " + dwhTargetTable + " where exists (select 1 from " + dwhStagingTable + " where " +whereClause +");"

    #STEP2: Delete existing records but outdated records from SOURCE
    for col in lookupCols:
      whereClause= whereClause + dwhTargetTable  +"."+ col  + "="+ dwhStagingTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is lesser than existing record
      whereClause= whereClause + dwhTargetTable  +"."+ deltaName  + "> "+ dwhStagingTable +"." + deltaName
      #remove last "and"
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteOutdatedSQL = "delete from " + dwhStagingTable + " where exists (select 1 from " + dwhTargetTable + " where " + whereClause + " );"

    #STEP3: Insert SQL
    insertSQL ="Insert Into " + dwhTargetTable + " select * from " + dwhStagingTable +";"

    #consolidate post actions SQL
    postActionsSQL = deleteSQL + deleteOutdatedSQL + insertSQL

    sqldwJDBC = "Your JDBC Connection String for Azure SQL Data Warehouse. Preferably from Key Vault"
    tempSQLDWFolder = "A temp folder in Azure Datalake Storage or Blob Storage for temp polybase files "
    #Use Hash Distribution on STG table where possible
    if dwhStagingDistributionColumn is not None and len(dwhStagingDistributionColumn) > 0:
      stgTableOptions ="CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = HASH (" +  dwhStagingDistributionColumn + ")"
    #Upsert/Merge to Target using STG postActions
      .option("url", sqldwJDBC).option("forwardSparkAzureStorageCredentials", "true")\

Time Zone Conversions in PySpark

PySpark has built-in functions to shift time between time zones. Just need to follow a simple rule. It goes like this. First convert the timestamp from origin time zone to UTC which is a point of reference. Then convert the timestamp from UTC to the required time zone. In this way there is no need to maintain lookup tables and its a generic method to convert time between time zones even for the ones that require daylight savings offset.

Continue reading

HdfsBridge::recordReaderFillBuffer – Unexpected error encountered filling record reader buffer: IllegalArgumentException: Must be 12 bytes

Parquet is my preferred format for storing files in data lake. Parquet’s columnar storage and compression makes it very efficient for in-memory processing tasks like Spark/Databricks notebooks while saving cost on storage. Parquet also supports almost all encoding schemes out there. Perhaps the coolest thing in Parquet is unlike CSV there is no such thing as column/row separator. So there is no need to escape those characters if they are part of data.

Azure SQL Data Warehouse supports Parquet data format for External (PolyBase) tables. External tables reference the underlying storage blobs and gives an option to query the data lake using SQL. In fact this is recommended in Microsoft’s reference architecture. With some Parquet files this error gets thrown when the External Table is queried

HdfsBridge::recordReaderFillBuffer – Unexpected error encountered filling record reader buffer: IllegalArgumentException: Must be 12 bytes

In the absence of clear exception message it took a while to figure this out. This error usually happens on a Timestamp column specifically when the data is in yyyy/MM/dd hh:mm:ss format. For some reason SQL Data Warehouse expects the Timestamp data to be in yyyy-MM-dd hh:mm:ss format. Changing the date separator from / to – resolved this issue, although it must be mentioned the underlying file is a perfectly valid Parquet file.

Kaggle: TalkingData

A brief retrospective of my submission for Kaggle data science competition that predicts the gender and age group of a smartphone user based on their usage pattern.

Continue reading

Kaggle: Grupo Bimbo

A brief retrospective of my submission for Kaggle data science competition that forecasts inventory demand for Grupo Bimbo.

Continue reading

Common Type 2 SCD Anti-patterns

Slowly Changing Dimension (SCD) is great for tracking historical changes to dimension attributes. SCDs have evolved over the years and besides the conventional type 1 (update), type 2 (add row) and type 3 (add column), now there are extensions up to type 7 including type 0. Almost every DW/BI project has at least few type 2 dimensions where a change to an attribute causes the current dimension record to be end dated and creates a new record with the new value.

Continue Reading

Forecasting Exchange Rates Using R Time Series

Time Series is the historical representation of data points collected at periodic intervals of time. Statistical tools like R use forecasting models to analyse historical time series data to predict future values with reasonable accuracy. In this post I will be using R time series to forecast the exchange rate of Australian dollar using daily closing rate of the dollar collected over a period of two years.

Continue reading

Energy Rating Analysis of Air conditioners using R Decision Trees

Decision tree is a data mining model that graphically represents the parameters that are most likely to influence the outcome and the extent of influence. The output is similar to a tree/flowchart with nodes, branches and leaves. The nodes represent the parameters, the branches represent the classification question/decision and the leaves represent the outcome (Screen Capture 1). Internally, decision tree algorithm performs a recursive classification on the input dataset and assigns each record to a segment of the tree where it fits closest.
There are several packages in R that generate decision trees. For this post, I am using the ctree() function available in party package. The data I am using as input is energy rating of household air conditioners.

Continue Reading

R: Box Plot

Box plot is an effective way to visualize the distribution of your data.It only takes a few lines of code in R to come up with a basic box plot.

Continue Reading

Pig: Using CUBE Operator to Analyse Energy Rating of Air Conditioners


CUBE operator in Pig computes all possible combination of the specified fields. In this post I will demonstrate the use of Cube operator to analyse energy rating of air conditioners in Hortonworks Data Platform (HDP). Continue Reading