top of page

Core Layer - Clinical Study Data Maintaning the History Records (SCD2) with timestamp

Requirement

Introduction: The Clinical Study master data contains various study milestones, protocol approvals, drug rollouts, budget estimates, and other critical metrics. Over time, data such as study budget, TA designations, and milestone dates change. To maintain the history of such changes and support accurate reporting, we need to implement Slowly Changing Dimension Type 2 (SCD2) with timestamp-based tracking.

 

Requirements: Create a pyspark Databricks logic, Read only the table "purgo_playground.clinical_study". and assign it to historical_df.

 

Add SCD Type 2 Metadata to Historical Data:

 

  1. Add the following columns to historical_df:

#* effective_start_date: current date minus 100 days.

#* effective_end_date: fixed future date '9999-12-31'.

#* is_active_flag: set to 1 (active).

#* etl_updated_timestamp: current timestamp.

  1. Simulate New Source Data with Changes: Refer the excelsheet and read only the data and update the value.

#* Create updated_source_df by modifying historical_df:

#** For selected study_roll_numbers:

#* Update budget_latest_estimate_amt by adding specific amounts.

#* Update study_conduct_status to new status values.

#* Update drug_latest_name to new drug names.

#** Add/update the following metadata columns:

#* etl_updated_timestamp: current timestamp.

#* effective_start_date: today’s date.

#* effective_end_date: '9999-12-31'.

#* is_active_flag: 1 (active).

  1. Define Keys and Comparison Columns:

#* keys: List containing "study_roll_number" as the primary key.

#* compare_cols: List of columns to track changes:

#** "budget_latest_estimate_amt"

#** "study_conduct_status"

#** "drug_latest_name"

  1. Join New Data with Active Historical Records:

#* Filter historical_df for only active records (is_active_flag = 1).

#* Perform a left join of new data (updated_source_df) with historical data on the key column (study_roll_number).

#* Assign the joined result to joined_df.

  1. Detect Changes Between New and Historical Records:

#* Build a filter condition to detect if any of the compare_cols values have changed.

#* Apply the filter on joined_df to get only changed records.

#* Assign this result to changed_df.

  1. Expire Old Versions of Changed Records:

# From changed_df, select only the historical part (hist.).

#* Update:

#** effective_end_date to today’s date.

#** is_active_flag to 0 (inactive).

#** etl_updated_timestamp to current timestamp.

#* Store this in expired_records.

  1. Insert the New Version of Changed Records:

# From changed_df, select only the new part (new.) as new_records.

  1. Preserve Unchanged Records:

#* Identify records in historical_df that were not updated (i.e., not in changed_df).

#* Use a left anti join to filter them as unchanged_records.

  1. Create Final SCD2 Dataset:

#* Combine:

#** unchanged_records (not changed)

#** expired_records (previous versions of changed records)

#** new_records (latest updated records)

#* Use unionByName for combining.

#* Store the final result in final_scd2_df.

  1. Display Final Data: Sort final_scd2_df by study_roll_number and effective_start_date and display the result.

 

Unity Catalog: “purgo_playground.clinical_study“

Purgo AI Agentic Code

bottom of page