Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type three scd upserts functionality #127

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

CommanderWahid
Copy link

Add a new functionality to perform a type 3 scd upsert on a target delta table:

  1. Add type_3_scd_upsert function.
  2. Add unit test for type_3_scd_upsert function.
  3. Update README to include the new feature.

- `delta_table` (`DeltaTable`): An object representing the delta table to be upserted.
- `updates_df` (`DataFrame`): The data to be used in order to upsert the target delta table.
- `primary_key` (`str`): The primary key (i.e. business key) uniquely identifiy each row in the target delta table.
- `curr_prev_col_names` (`dict[str,str]`): A dictionary of column names to store current and previous values.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate this to two parameters: current_col_name and previous_col_name. I generally prefer exposing public interfaces with full works instead of abbreviations. Is there any advantage to grouping these two arguments in a dictionary?

Copy link
Author

@CommanderWahid CommanderWahid Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that, it depends on if we want to:

  1. Populate multiple columns at a time when applying the type 3 scd upserts:

Suppose that we have a delta table having this structure:
+----+----+----+-------+--------+------------+-------------+--------------+
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent|
+----+----+---+--------+--------+------------+-------------+--------------+

The columns prev_job, prev_country and prev_continent will store the previous values for the columns job, country and continent after applying the type 3 scd.

Uisng a dictionary as a parameter to store the column names (for current and previous values) is less prone to error than storing them in two separate lists.

Option #1: Using a dictinary --> Triggering the type_3_scd_upsert once
col_names = {"country":"prev_country", "job":"prev_job", "continent":"prev_continent"}
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",col_names)

Option #2: Using two parameters --> Triggering the type_3_scd_upsert multiple times
current_col_names = ["country","job","continent"]
previous_col_names = ["prev_country","prev_job","prev_continent"]
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

Here an error on columns order can cause a cartastrophy, like storing the previous values of the column 'job' in the column 'prev_country' (or the opposite):

current_col_names= ["country", "job", "continent"]
previous_col_names= ["prev_job","prev_country","prev_continent"]

  1. Populate one column at a time when applying the type 3 scd upserts

let's use the same delta table structure mentioned above.

In this case we will have something like this:

//type 3 scd upserts for column job
current_col_names = "job"
previous_col_names = "prev_job"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

//type 3 scd upserts for column country
current_col_names = "country"
previous_col_names = "prev_country"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

//type 3 scd upserts for column continent
current_col_names = "continent"
previous_col_names = "prev_continent"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

+----+----+----+-------+--------+------------+-------------+--------------+
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent|
+----+----+---+--------+--------+------------+-------------+--------------+
| 1| A| AA| null| Japan| null| Asia| null|
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some examples where the previous_job column is not null.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey-dokey

+----+----+---+------------+-------------+
| 1| A1| AA| Japan| Asia| // update on name
| 2| B1|BBB| Peru|South America| // updates on name,job,country,continent --> storing previous values on prev_job,prev_country,prev_continent
| 3| C| CC| New Zeland| Oceania| // updates on country,continent --> storing previous values on prev_country,prev_continent
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should demonstrate these two cases:

  • a row that's in source, but not in target
  • a row that's identical in source and target

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
Just to be sure, do you mean by source the update dataframe and the target the delta table ?

@@ -735,3 +736,119 @@ def rename_delta_table(
delta_table.toDF().write.format("delta").mode("overwrite").saveAsTable(
new_table_name
)

def type_3_scd_upsert(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the user use this function with an effective_date column?

Copy link
Author

@CommanderWahid CommanderWahid Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recomend to use it as an optional parameter.

If needed, the function have to update the effective_date each time a real update occurs on the current and previous columns.

A structure of a delta table with an effective_date could be something like this:

+----+----+----+-------+------------------+
|pkey|name|job|prev_job|job_effective_date
+----+----+---+--------+------------------+

or much more wider

+----+----+----+-------+------------------+-------+------------+-----------------+---------+-------------------------+
pkey| name| job| prev_job| job_effective_date| country| prev_country| country_effective| continent| prev_continent
+----+----+---+--------+------------------+-------+------------+-----------------+---------+-------------------------+

Now how we will provide the column names representing the effective_date to the function ?

A parameter as a list of list ? --> param = ( (job,prev_job,job_effective_date),(country,prev_country,country_effective), (continent,prev_continent) )

A three parameters: current_col_names, previous_col_names and effective_date_col_names ?

I don't like the second option unless we consider that the function cannot handle multiple scd columns at a time.

@MrPowers
Copy link
Owner

@CommanderWahid - thanks for submitting this PR.

I don't know much about Type 3 SCD, but I am trying to learn here.

I added some comments. Thanks for your patience here. I am going to have to learn as we go and we'll have to collaborate to make sure we get the right abstraction!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants