Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type three scd upserts functionality #127

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,63 @@ This function is designed to rename a Delta table. It can operate either within
rename_delta_table(existing_delta_table, "new_table_name")
```

## Type 3 SCD Upserts

Perform a type 3 scd upsert on a target delta table.

Parameters:

- `delta_table` (`DeltaTable`): An object representing the delta table to be upserted.
- `updates_df` (`DataFrame`): The data to be used in order to upsert the target delta table.
- `primary_key` (`str`): The primary key (i.e. business key) uniquely identifiy each row in the target delta table.
- `curr_prev_col_names` (`dict[str,str]`): A dictionary of column names to store current and previous values.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate this to two parameters: current_col_name and previous_col_name. I generally prefer exposing public interfaces with full works instead of abbreviations. Is there any advantage to grouping these two arguments in a dictionary?

Copy link
Author

@CommanderWahid CommanderWahid Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that, it depends on if we want to:

  1. Populate multiple columns at a time when applying the type 3 scd upserts:

Suppose that we have a delta table having this structure:
+----+----+----+-------+--------+------------+-------------+--------------+
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent|
+----+----+---+--------+--------+------------+-------------+--------------+

The columns prev_job, prev_country and prev_continent will store the previous values for the columns job, country and continent after applying the type 3 scd.

Uisng a dictionary as a parameter to store the column names (for current and previous values) is less prone to error than storing them in two separate lists.

Option #1: Using a dictinary --> Triggering the type_3_scd_upsert once
col_names = {"country":"prev_country", "job":"prev_job", "continent":"prev_continent"}
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",col_names)

Option #2: Using two parameters --> Triggering the type_3_scd_upsert multiple times
current_col_names = ["country","job","continent"]
previous_col_names = ["prev_country","prev_job","prev_continent"]
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

Here an error on columns order can cause a cartastrophy, like storing the previous values of the column 'job' in the column 'prev_country' (or the opposite):

current_col_names= ["country", "job", "continent"]
previous_col_names= ["prev_job","prev_country","prev_continent"]

  1. Populate one column at a time when applying the type 3 scd upserts

let's use the same delta table structure mentioned above.

In this case we will have something like this:

//type 3 scd upserts for column job
current_col_names = "job"
previous_col_names = "prev_job"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

//type 3 scd upserts for column country
current_col_names = "country"
previous_col_names = "prev_country"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

//type 3 scd upserts for column continent
current_col_names = "continent"
previous_col_names = "prev_continent"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)

`Key`: Column name for current value.
`Value`: Column name for previous value.


Suppose you have the following delta table:

```
+----+----+----+-------+--------+------------+-------------+--------------+
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent|
+----+----+---+--------+--------+------------+-------------+--------------+
| 1| A| AA| null| Japan| null| Asia| null|
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some examples where the previous_job column is not null.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey-dokey

| 2| B| BB| null| London| null| Europe| null|
| 3| C| CC| null| canada| null|North America| null|
+----+----+---+--------+--------+------------+-------------+--------------+
```

The source data to be upserted on target delta table:

```
+----+----+----+-----------+-------------+
|pkey|name|job| country| continent|
+----+----+---+------------+-------------+
| 1| A1| AA| Japan| Asia| // update on name
| 2| B1|BBB| Peru|South America| // updates on name,job,country,continent --> storing previous values on prev_job,prev_country,prev_continent
| 3| C| CC| New Zeland| Oceania| // updates on country,continent --> storing previous values on prev_country,prev_continent
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should demonstrate these two cases:

  • a row that's in source, but not in target
  • a row that's identical in source and target

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
Just to be sure, do you mean by source the update dataframe and the target the delta table ?

| 5| D| DD|South Africa| Africa| // new row
+----+----+---+------------+-------------+
```

Here's how to perform the type 3 scd upsert:

```scala
mack.type_3_scd_upsert(delta_table, updatesDF, "pkey", {"country":"prev_country", "job":"prev_job", "continent":"prev_continent"})
```

Here's the table after the upsert:

```
+----+----+----+-------+------------+------------+-------------+--------------+
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent|
+----+----+---+--------+------------+------------+-------------+--------------+
| 1| A1| AA| null| Japan| null| Asia| null|
| 2| B1|BBB| BB| Peru| London|South America| Europe|
| 3| C| CC| null| New Zeland| canada| Oceania| North America|
| 5| D| DD| null|South Africa| null| Africa| null|
+----+----+---+--------+------------+------------+-------------+--------------+
```

## Dictionary

Expand Down
117 changes: 117 additions & 0 deletions mack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from itertools import combinations
from typing import List, Union, Dict, Optional
from collections import Counter

from delta import DeltaTable
import pyspark
Expand Down Expand Up @@ -735,3 +736,119 @@ def rename_delta_table(
delta_table.toDF().write.format("delta").mode("overwrite").saveAsTable(
new_table_name
)

def type_3_scd_upsert(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the user use this function with an effective_date column?

Copy link
Author

@CommanderWahid CommanderWahid Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recomend to use it as an optional parameter.

If needed, the function have to update the effective_date each time a real update occurs on the current and previous columns.

A structure of a delta table with an effective_date could be something like this:

+----+----+----+-------+------------------+
|pkey|name|job|prev_job|job_effective_date
+----+----+---+--------+------------------+

or much more wider

+----+----+----+-------+------------------+-------+------------+-----------------+---------+-------------------------+
pkey| name| job| prev_job| job_effective_date| country| prev_country| country_effective| continent| prev_continent
+----+----+---+--------+------------------+-------+------------+-----------------+---------+-------------------------+

Now how we will provide the column names representing the effective_date to the function ?

A parameter as a list of list ? --> param = ( (job,prev_job,job_effective_date),(country,prev_country,country_effective), (continent,prev_continent) )

A three parameters: current_col_names, previous_col_names and effective_date_col_names ?

I don't like the second option unless we consider that the function cannot handle multiple scd columns at a time.

delta_table: DeltaTable,
updates_df: DataFrame,
primary_key: str,
curr_prev_col_names: dict[str,str]
) -> None:
"""
Apply scd type 3 updates on a target delta table.

:param delta_table: The target delta table.
:type delta_table: DeltaTable

:param updates_df: The source dataframe that will be used to apply scd type 3 on the target delta table.
:type updates_df: DataFrame

:param primary_key: The primary key (i.e. business key) uniquely identifiy each row in the target delta table.
:type primary_key: str

:param curr_prev_col_names: A dictionary of column names to store current and previous values.
-> Key: Column name for current value.
-> Value: Column name for previous value.
:type curr_prev_col_names: dict[str,str]

:raises TypeError: Raises type error when find a duplication in the items' value of the dictionary 'curr_prev_col_names'.
:raises TypeError: Raises type error when find a key equals to a value in items of the dictionary 'curr_prev_col_names'.
:raises TypeError: Raises type error when required columns are missing in the delta table.
:raises TypeError: Raises type error when required columns are missing in the update dataframe.
"""

# validate the curr_prev_col_names parameters
## raise an error in case of dict values duplication
count_dict = Counter(curr_prev_col_names.values())
prev_col_name_duplicates = [(key,value) for key, value in curr_prev_col_names.items() if count_dict[value] > 1]

if prev_col_name_duplicates:
raise TypeError(
f"Find duplication in the values of the dictionary curr_prev_col_names: {prev_col_name_duplicates!r}"
)
## raise error when find key equals to value
keys_equal_to_values = [(key,value) for key, value in curr_prev_col_names.items() if key == value]
if keys_equal_to_values:
raise TypeError(
f"Keys cannot be equal to values in the dictionary curr_prev_col_names: {keys_equal_to_values!r}"
)

# validate the existing Delta table
base_col_names = delta_table.toDF().columns
required_base_col_names = (
[primary_key]
+ [items for item in curr_prev_col_names.items() for items in item]
)
missing_col_names = [item for item in required_base_col_names if item not in base_col_names]
if missing_col_names:
raise TypeError(
f"Cannot find these columns {missing_col_names!r} in the base table {base_col_names!r}"
)

# validate the updates DataFrame
updates_col_names = updates_df.columns
prev_col_names = list(curr_prev_col_names.values())
required_updates_col_names = [item for item in base_col_names if item not in (prev_col_names)] # filter out all prev_col_names from base_col_names
if sorted(updates_col_names) != sorted(required_updates_col_names):
raise TypeError(
f"The updates DataFrame has these columns {updates_col_names!r}, but these columns are required {required_updates_col_names!r}"
)

# merge condition
merge_condition = pyspark.sql.functions.expr(f"trg.{primary_key} = src.{primary_key}")

# update condition
updates_attr = [attr for attr in base_col_names if attr not in (primary_key,prev_col_names)]
updates_condition = list(
map(lambda attr: f"trg.{attr} <> src.{attr}", updates_attr)
)
updates_condition = " OR ".join(updates_condition)

# rows to be inserted
previous_state_for_inserts = list(
map(lambda item: f"NULL as {item}", prev_col_names)
)

staged_inserts_df = (
updates_df.alias('inserts')
.join(delta_table.toDF().alias('trg'),primary_key,'leftanti')
.selectExpr(["inserts.*"] + previous_state_for_inserts)
)

# rows to be updated
previous_state_for_updates = list(
map(lambda item: f"coalesce(nullif(trg.{item[0]},updates.{item[0]}),trg.{item[1]}) as {item[1]}" ,curr_prev_col_names.items())
)

staged_updates_df = (
updates_df.alias('updates')
.join(delta_table.toDF().alias('trg'),primary_key)
.selectExpr(["updates.*"] + previous_state_for_updates)
)

# input data = staged_updates_df + staged_inserts_df
staged_inputs_df = staged_updates_df.union(staged_inserts_df)

# perform the merge
res = (
delta_table.alias('trg')
.merge(
source=staged_inputs_df.alias('src'),
condition=merge_condition
)
.whenMatchedUpdateAll(
condition=updates_condition
)
.whenNotMatchedInsertAll()
.execute()
)
return res
Loading