Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get all composite key candidates function added #58

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,26 @@ Suppose you have the following Delta table:

Running `mack.find_composite_key_candidates(delta_table)` on that table will return `["col1", "col3"]`.

## Find All possible valid Composite Key Candidates in the Delta table

The `find_all_composite_key_combos` function helps you find all of the possible valid composite key candidates that
uniquely identifies the rows your Delta table. It returns a list of column combinations that can be used as a
composite key.

Suppose you have the following Delta table:

```
+----+----+----+
|x |y |z |
+----+----+----+
| 1| 1| 1|
| 2| 1| 1|
| 3| 2| 1|
+----+----+----+
```

Running `mack.find_all_composite_key_combos(delta_table)` on that table will return `["x", "x,y", "x,z", "x,y,z"]`.

## Append md5 column

The `with_md5_cols` function appends a `md5` hash of specified columns to the DataFrame. This can be used as a unique key if the selected columns form a composite key.
Expand Down
29 changes: 28 additions & 1 deletion mack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from delta import DeltaTable
import pyspark
from pyspark.sql.functions import count, col, row_number, md5, concat_ws
from pyspark.sql.functions import count, col, row_number, md5, concat_ws, collect_set, size, lit
from pyspark.sql.window import Window
from pyspark.sql.dataframe import DataFrame

Expand Down Expand Up @@ -384,6 +384,33 @@ def find_composite_key_candidates(
return list(df_col_excluded.select(*c).columns)


def find_all_composite_key_combos(
df: Union[DeltaTable, DataFrame], exclude_cols: List[str] = None
):
if type(df) == DeltaTable:
df = df.toDF()
if exclude_cols is None:
exclude_cols = []
df_col_excluded = df.drop(*exclude_cols)
col_select_condition = df_col_excluded.distinct().count()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@souvik-databricks why does it need to distinct? If .distinct().count() < .count(), there is no unique combination, or?

Copy link
Collaborator Author

@souvik-databricks souvik-databricks Jan 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertkossendey .distinct() is required because if your data has duplicates in them and then the collect_set() in line 401 will matchup and we have only unique combination of values for the columns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@souvik-databricks still struggling to understand, could you add a test with a completely duplicate line? Because that should return no combination at all right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertkossendey So in that scenario if you have a case of duplicate record then that would indicate that from the columns that you have included to search in, you don't have valid composite key combos. You would have to include more columns to act as part of the composite key.

If in the whole dataset you have a full exact duplicate record then it won't make any sense to keep it as that's then redundant information.

And I will add a test with duplicate record with the explanation of the output for that test case in the readme.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in the whole dataset you have a full exact duplicate record then it won't make any sense to keep it as that's then redundant information.

@souvik-databricks but isn't that the information that there is no valid key candidate in the provided dataset?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was OOO for last week. @robertkossendey I am back now, I will resume the work now.

initcols = df_col_excluded.columns
for i in range(len(initcols)+1):
for c in list(combinations(initcols, i+2)):
df_col_excluded = df_col_excluded.withColumn(','.join(c), concat_ws(',', *c))
finalcols = df_col_excluded.columns
exprs = [size(collect_set(x)).alias(x) for x in finalcols]
df_col_excluded = df_col_excluded \
.withColumn("column_combos ->", lit("distinct_row_counts ->")) \
.groupBy("column_combos ->") \
.agg(*exprs)
columns = [
column for column in df_col_excluded.columns if
df_col_excluded.select(column).collect()[0][0] == col_select_condition
]
df_col_excluded.select("column_combos ->", *columns).show(truncate=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@souvik-databricks we can remove that show, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can remove the show

return columns


def with_md5_cols(
df: Union[DeltaTable, DataFrame],
cols: List[str],
Expand Down
26 changes: 26 additions & 0 deletions tests/test_public_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,32 @@ def test_find_composite_key(tmp_path):
assert composite_keys == expected_keys


def test_find_all_composite_key_combos(tmp_path):
path = f"{tmp_path}/find_all_composite_key_combos"
data = [
(1, 1, 1),
(2, 1, 1),
(3, 2, 1),
]
df = spark.createDataFrame(
data,
[
"x",
"y",
"z",
],
)
df.write.format("delta").save(path)

delta_table = DeltaTable.forPath(spark, path)

composite_keys = mack.find_all_composite_key_combos(delta_table)

expected_keys = ['x', 'x,y', 'x,z', 'x,y,z']

assert composite_keys == expected_keys


def test_find_composite_key_with_value_error(tmp_path):
path = f"{tmp_path}/find_composite_key"
data = [
Expand Down