Upsert to Azure SQL Datawarehouse using PySpark

At the moment SQL MERGE operation is not available in Azure SQL Data Warehouse. However, it is possible to implement this feature using Azure SQL Data Warehouse connector in Databricks with some PySpark code.

Upsert can be done in 2 ways

  • Update existing records in target that are newer in source
  • Filter out updated records from source
  • Insert just the new records.

Alternatively,

  • Delete existing records that are older from target
  • Delete existing records from source that are older than target.
  • Insert remaining records

Before we dive into code, its good to understand how Databricks Azure SQL Datawarehouse connector works. If you haven’t worked with this connector before its a good idea to read this post before you proceed. The connector supports basic read and write operations on Azure SQL Datawarehouse. It also has a preAction and postActions feature which allows execution of valid SQL statements at Datawarehouse before or after a write operation. This is the feature I am going to use for Upserts specifically the postActions. However it comes with a catch, the pre and postActions works only on a new table. So this is how it’s done and I am using the second method for upsert

  • Create a temporary staging table in Azure SQL Datawarehouse in overwrite mode and write the input dataframe.
  • After the dataframe is written, create a postAction on the staging table to delete the records from target table that exist in the staging table and is older than the one in staging table.
  • Create a second postAction to delete the records from staging table that exist at target and is older than the one in target table.
  • At this stage create a third postAction to insert the records from staging table to target table

This is how the PySpark code looks like. I created a function with these parameters

  • df -Input dataframe
  • dwhStagingTable – Azure SQL Data Warehouse Table used to stage the input dataframe for merge/upsert operation.
  • dwhTargetTable – Azure SQL Data Warehouse Target table where the dataframe is merged/upserted.
  • lookupColumns – pipe separated columns that uniquely defines a record in input dataframe
  • deltaName – Name of watermark column in input dataframe if any
  • dwhStagingDistributionColumn – Name of the column used as hash distribution column in staging table of DWH. This column will help improve upsert performance by minimizing data movement provided the dwhTargetTable is also hash distributed on the same column.
def upsertDWH(df,dwhStagingTable,dwhTargetTable,lookupColumns,deltaName,dwhStagingDistributionColumn):
    
    #STEP1: Derive dynamic delete statement to delete existing record from TARGET if the source record is newer
    lookupCols =lookupColumns.split("|")
    whereClause=""
    for col in lookupCols:
      whereClause= whereClause + dwhStagingTable  +"."+ col  + "="+ dwhTargetTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is greater than existing record
      whereClause= whereClause + dwhStagingTable  +"."+ deltaName  + ">="+ dwhTargetTable +"." + deltaName
    else:
      #remove last "and"
      remove="and"
      reverse_remove=remove[::-1]
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteSQL = "delete from " + dwhTargetTable + " where exists (select 1 from " + dwhStagingTable + " where " +whereClause +");"

    #STEP2: Delete existing records but outdated records from SOURCE
    whereClause=""
    for col in lookupCols:
      whereClause= whereClause + dwhTargetTable  +"."+ col  + "="+ dwhStagingTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is lesser than existing record
      whereClause= whereClause + dwhTargetTable  +"."+ deltaName  + "> "+ dwhStagingTable +"." + deltaName
    else:
      #remove last "and"
      remove="and"
      reverse_remove=remove[::-1]
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteOutdatedSQL = "delete from " + dwhStagingTable + " where exists (select 1 from " + dwhTargetTable + " where " + whereClause + " );"
    #print("deleteOutdatedSQL={}".format(deleteOutdatedSQL))

    #STEP3: Insert SQL
    insertSQL ="Insert Into " + dwhTargetTable + " select * from " + dwhStagingTable +";"
    #print("insertSQL={}".format(insertSQL))

    #consolidate post actions SQL
    postActionsSQL = deleteSQL + deleteOutdatedSQL + insertSQL
    print("postActionsSQL={}".format(postActionsSQL))

    sqldwJDBC = "Your JDBC Connection String for Azure SQL Data Warehouse. Preferably from Key Vault"
    tempSQLDWFolder = "A temp folder in Azure Datalake Storage or Blob Storage for temp polybase files "
    
    
    #Use Hash Distribution on STG table where possible
    if dwhStagingDistributionColumn is not None and len(dwhStagingDistributionColumn) > 0:
      stgTableOptions ="CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = HASH (" +  dwhStagingDistributionColumn + ")"
    else:
      stgTableOptions ="CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN"
    
    #Upsert/Merge to Target using STG postActions
    df.write.format("com.databricks.spark.sqldw")\
      .option("url", sqldwJDBC).option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable",dwhStagingTable)\
      .option("tableOptions",stgTableOptions)\
      .option("tempDir",tempSQLDWFolder)\
      .option("maxStrLength",4000)\
      .option("postActions",postActionsSQL)\
      .mode("overwrite").save()

Time Zone Conversions in PySpark

PySpark has built-in functions to shift time between time zones. Just need to follow a simple rule. It goes like this. First convert the timestamp from origin time zone to UTC which is a point of reference. Then convert the timestamp from UTC to the required time zone. In this way there is no need to maintain lookup tables and its a generic method to convert time between time zones even for the ones that require daylight savings offset.

Continue reading