PySpark Common Transforms

Quite often I come across transformations that are applicable to several scenarios. So created this reusable Python class that leverages PySpark capabilities to apply common transformation to a dataframe or a subset of columns in a dataframe. The code is in GitHub – bennyaustin/pyspark-utils. There is also an extensive function reference and usage document to go with it. Feel free to use, extend, request features and contribute.

Here is an example of using CommonTransform.py in a Databricks Notebook. For this example the publicly available NY taxi data set is used. Happy Coding !

# Databricks notebook source
from pyspark.sql import DataFrame,Column,functions,types

# COMMAND ----------

# MAGIC %run "/Common/CommonTransforms"

# COMMAND ----------

dataPath="/FileStore/tables/yellow_tripdata_2020_01_100k.csv"
dataSchema="VendorID STRING,tpep_pickup_datetime TIMESTAMP,tpep_dropoff_datetime TIMESTAMP,passenger_count INT,trip_distance DOUBLE,RatecodeID STRING,store_and_fwd_flag STRING,PULocationID INT,DOLocationID INT,payment_type INT,fare_amount DOUBLE,extra DOUBLE,mta_tax DOUBLE,tip_amount DOUBLE,tolls_amount DOUBLE,improvement_surcharge DOUBLE,total_amount DOUBLE,congestion_surcharge DOUBLE"
input=spark.read.csv(path=dataPath,schema=dataSchema,header=True)

# COMMAND ----------

input = input.withColumn("sys_date1",lit(20275)) #Date in Julian Format

input = input.withColumn("sys_date2",lit("2020-10-01").cast("date")) #Date in Gregorian Format


# COMMAND ----------

display(input)

# COMMAND ----------

ct=CommonTransforms(input)

# Remove duplicates
output=ct.deDuplicate()

# Remove duplicates based on key columns
output=ct.deDuplicate(["VendorID","tpep_pickup_datetime","tpep_dropoff_datetime"])

# Remove leading and trailing spaces from all string columns
output=ct.trim()

# Replace Null Value with generic values
output = ct.replaceNull(0)
output = ct.replaceNull("NA")
output = ct.replaceNull("2020-01-01")

# Replace Null value in Timestamp columns
output = ct.replaceNull("1900-01-01T00:00:00","tpep_pickup_datetime")
output = ct.replaceNull("9999-12-31T23:59:59","tpep_dropoff_datetime")

# Replace Null Values with custom defaults
output = ct.replaceNull({"passenger_count":1,"store_and_fwd_flag":"N","tip_amount":0,"tolls_amount":0, "improvement_surcharge":0,"congestion_surcharge":0})

# Convert UTC timestamps to local
output = ct.utc_to_local("Australia/Sydney")
output = ct.utc_to_local("Australia/Sydney",["tpep_pickup_datetime","tpep_dropoff_datetime"])


# Convert local timestamps to UTC
output = ct.local_to_utc("Australia/Sydney")
output = ct.local_to_utc("Australia/Sydney",["tpep_pickup_datetime","tpep_dropoff_datetime"])

# Convert time from one Timezone to another
output = ct.changeTimezone("Australia/Sydney","America/New_York")

# Drop system/non-business columns
output = ct.dropSysColumns("store_and_fwd_flag")

# Add Checksum
output = ct.addChecksumCol("checksum")

# Convert Julian date to Calendar date
output = ct.julian_to_calendar("sys_date1")

# Convert Calendar date to Julian
output =ct.calendar_to_julian("sys_date2")

# Add literal value columns for e.g audit columns
audit={"audit_key":66363,"pipeline_id":"56f63394bb06dd7f6945f636f1d4018bd50f1850", "start_datetime": "2020-10-01 10:00:00", "end_datetime": "2020-10-01 10:02:05"}
output = ct.addLitCols(audit)

# COMMAND ----------

display(output)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s