Upsert to Azure Synapse Analytics using PySpark

At the moment SQL MERGE operation is not available in Azure Synapse Analytics. However, it is possible to implement this feature using Azure Synapse Analytics connector in Databricks with some PySpark code.

Upsert can be done in 2 ways

  • Update existing records in target that are newer in source
  • Filter out updated records from source
  • Insert just the new records.

Alternatively,

  • Delete existing records that are older from target
  • Delete existing records from source that are older than target.
  • Insert remaining records

Before we dive into code, its good to understand how Databricks Azure Synapse Analytics connector works. If you haven’t worked with this connector before its a good idea to read this post before you proceed. The connector supports basic read and write operations on Azure Synapse Analytics. It also has a preAction and postActions feature which allows execution of valid SQL statements at Synapse before or after a write operation. This is the feature I am going to use for Upserts specifically the postActions. However it comes with a catch, the pre and postActions works only on a new table. So this is how it’s done and I am using the second method for upsert

  • Create a temporary staging table in Azure Synapse Analytics in overwrite mode and write the input dataframe.
  • After the dataframe is written, create a postAction on the staging table to delete the records from target table that exist in the staging table and is older than the one in staging table.
  • Create a second postAction to delete the records from staging table that exist at target and is older than the one in target table.
  • At this stage create a third postAction to insert the records from staging table to target table

This is how the PySpark code looks like. I created a function with these parameters

  • df -Input dataframe
  • dwhStagingTable – Azure Synapse Analytics Table used to stage the input dataframe for merge/upsert operation.
  • dwhTargetTable – Azure Synapse Analytics Target table where the dataframe is merged/upserted.
  • lookupColumns – pipe separated columns that uniquely defines a record in input dataframe
  • deltaName – Name of watermark column in input dataframe if any
  • dwhStagingDistributionColumn – Name of the column used as hash distribution column in staging table of DWH. This column will help improve upsert performance by minimizing data movement provided the dwhTargetTable is also hash distributed on the same column.
def upsertDWH(df,dwhStagingTable,dwhTargetTable,lookupColumns,deltaName,dwhStagingDistributionColumn):
    
    #STEP1: Derive dynamic delete statement to delete existing record from TARGET if the source record is newer
    lookupCols =lookupColumns.split("|")
    whereClause=""
    for col in lookupCols:
      whereClause= whereClause + dwhStagingTable  +"."+ col  + "="+ dwhTargetTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is greater than existing record
      whereClause= whereClause + dwhStagingTable  +"."+ deltaName  + ">="+ dwhTargetTable +"." + deltaName
    else:
      #remove last "and"
      remove="and"
      reverse_remove=remove[::-1]
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteSQL = "delete from " + dwhTargetTable + " where exists (select 1 from " + dwhStagingTable + " where " +whereClause +");"

    #STEP2: Delete existing records but outdated records from SOURCE
    whereClause=""
    for col in lookupCols:
      whereClause= whereClause + dwhTargetTable  +"."+ col  + "="+ dwhStagingTable +"." + col + " and "

    if deltaName is not None and  len(deltaName) >0:
      #Check if the last updated is lesser than existing record
      whereClause= whereClause + dwhTargetTable  +"."+ deltaName  + "> "+ dwhStagingTable +"." + deltaName
    else:
      #remove last "and"
      remove="and"
      reverse_remove=remove[::-1]
      whereClause = whereClause[::-1].replace(reverse_remove,"",1)[::-1]

    deleteOutdatedSQL = "delete from " + dwhStagingTable + " where exists (select 1 from " + dwhTargetTable + " where " + whereClause + " );"
    #print("deleteOutdatedSQL={}".format(deleteOutdatedSQL))

    #STEP3: Insert SQL
    insertSQL ="Insert Into " + dwhTargetTable + " select * from " + dwhStagingTable +";"
    #print("insertSQL={}".format(insertSQL))

    #consolidate post actions SQL
    postActionsSQL = deleteSQL + deleteOutdatedSQL + insertSQL
    print("postActionsSQL={}".format(postActionsSQL))

    sqldwJDBC = "Your JDBC Connection String for Azure Synapse Analytics. Preferably from Key Vault"
    tempSQLDWFolder = "A temp folder in Azure Datalake Storage or Blob Storage for temp polybase files "
    
    
    #Use Hash Distribution on STG table where possible
    if dwhStagingDistributionColumn is not None and len(dwhStagingDistributionColumn) > 0:
      stgTableOptions ="CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = HASH (" +  dwhStagingDistributionColumn + ")"
    else:
      stgTableOptions ="CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN"
    
    #Upsert/Merge to Target using STG postActions
    df.write.format("com.databricks.spark.sqldw")\
      .option("url", sqldwJDBC).option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable",dwhStagingTable)\
      .option("tableOptions",stgTableOptions)\
      .option("tempDir",tempSQLDWFolder)\
      .option("maxStrLength",4000)\
      .option("postActions",postActionsSQL)\
      .mode("overwrite").save()

9 thoughts on “Upsert to Azure Synapse Analytics using PySpark

  1. Merge statement in Synapse is now available. Can we use it in postActions sql in PySpark to load data to synapse? I am getting error “Incorrect syntax near ‘)'” even the same statement works fine when executed in synapse.

  2. Hi Austin,

    We have requirement like Target table should contains only latest data and my Source should contain Delta data for single day.
    So i am thinking to implement below 2step logic.
    1)Delete the data from target which exist in Source currently.
    2)insert all the data from target to Source.
    Is there any issue in implementing this logic?

    Thanks a lot for the nice article on this.

      1. Hi Benny,
        I am automating this pipeline and when I am executing first time as target table is not available how to handle this.
        Do I need to manually create it or is there any option there where we can create both stage table and target table for the first time and next time onwards we can handle it as above mentioned.

      2. Suggest creating both the target and staging table in Synapse manually. Also note the table schema must be an exact match between target and staging table. Data type, length, precision etc. must be identical because at the moment there is a bug in Synapse which will cause excessive data movements if the data types don’t match.

    1. Rajaniesh, the timestamp logic prevents overwriting a newer data with an older one. If you don’t have timestamp column then you should customize the postAction sql to fit your requirement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s