PySpark Common Transforms

Quite often I come across transformations that are applicable to several scenarios. So created this reusable Python class that leverages PySpark capabilities to apply common transformation to a dataframe or a subset of columns in a dataframe. The code is in GitHub – bennyaustin/pyspark-utils. There is also an extensive function reference and usage document to go with it. Feel free to use, extend, request features and contribute.

Here is an example of using in a Databricks Notebook. For this example the publicly available NY taxi data set is used. Happy Coding !

# Databricks notebook source
from pyspark.sql import DataFrame,Column,functions,types

# COMMAND ----------

# MAGIC %run "/Common/CommonTransforms"

# COMMAND ----------

dataSchema="VendorID STRING,tpep_pickup_datetime TIMESTAMP,tpep_dropoff_datetime TIMESTAMP,passenger_count INT,trip_distance DOUBLE,RatecodeID STRING,store_and_fwd_flag STRING,PULocationID INT,DOLocationID INT,payment_type INT,fare_amount DOUBLE,extra DOUBLE,mta_tax DOUBLE,tip_amount DOUBLE,tolls_amount DOUBLE,improvement_surcharge DOUBLE,total_amount DOUBLE,congestion_surcharge DOUBLE",schema=dataSchema,header=True)

# COMMAND ----------

input = input.withColumn("sys_date1",lit(20275)) #Date in Julian Format

input = input.withColumn("sys_date2",lit("2020-10-01").cast("date")) #Date in Gregorian Format

# COMMAND ----------


# COMMAND ----------


# Remove duplicates

# Remove duplicates based on key columns

# Remove leading and trailing spaces from all string columns

# Replace Null Value with generic values
output = ct.replaceNull(0)
output = ct.replaceNull("NA")
output = ct.replaceNull("2020-01-01")

# Replace Null value in Timestamp columns
output = ct.replaceNull("1900-01-01T00:00:00","tpep_pickup_datetime")
output = ct.replaceNull("9999-12-31T23:59:59","tpep_dropoff_datetime")

# Replace Null Values with custom defaults
output = ct.replaceNull({"passenger_count":1,"store_and_fwd_flag":"N","tip_amount":0,"tolls_amount":0, "improvement_surcharge":0,"congestion_surcharge":0})

# Convert UTC timestamps to local
output = ct.utc_to_local("Australia/Sydney")
output = ct.utc_to_local("Australia/Sydney",["tpep_pickup_datetime","tpep_dropoff_datetime"])

# Convert local timestamps to UTC
output = ct.local_to_utc("Australia/Sydney")
output = ct.local_to_utc("Australia/Sydney",["tpep_pickup_datetime","tpep_dropoff_datetime"])

# Convert time from one Timezone to another
output = ct.changeTimezone("Australia/Sydney","America/New_York")

# Drop system/non-business columns
output = ct.dropSysColumns("store_and_fwd_flag")

# Add Checksum
output = ct.addChecksumCol("checksum")

# Convert Julian date to Calendar date
output = ct.julian_to_calendar("sys_date1")

# Convert Calendar date to Julian
output =ct.calendar_to_julian("sys_date2")

# Add literal value columns for e.g audit columns
audit={"audit_key":66363,"pipeline_id":"56f63394bb06dd7f6945f636f1d4018bd50f1850", "start_datetime": "2020-10-01 10:00:00", "end_datetime": "2020-10-01 10:02:05"}
output = ct.addLitCols(audit)

# COMMAND ----------


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s