Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Order columns for Data Skipping #76

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,43 @@ You can leverage the upsert code if your SCD table meets these requirements:
* Any change in an attribute column triggers an upsert
* SCD logic is exposed via `effective_time`, `end_time` and `is_current` column (you can also use date or version columns for SCD upserts)

## Order Columns

The `order_columns` function reorders the columns of a DataFrame so that they can be indexed for
Data Skipping. It moves all the numeric and time columns to the front. You can use the user_defined_columns parameter if you have some columns
that should be indexed regardless of their data type. The Data Skipping configuration of the SparkSession is automatically
set to the length of indexable columns.

Suppose you have the following table:

```
+----+----+-------------------+-----+-----+----+
|col1|col2| col3| col4| col5|col6|
+----+----+-------------------+-----+-----+----+
| 1| a|2019-01-01 00:00:00|test1|test4| 3.0|
| 2| b|2019-01-01 00:00:00|test2|test5| 3.0|
| 3| c|2019-01-01 00:00:00|test3|test6| 3.0|
+----+----+-------------------+-----+-----+----+
```

Run the `order_columns` function:

```python
mack.order_columns(data_frame, ["col4"])
```

Here's the ending state of the table:

```
+-----+----+-------------------+----+----+-----+
| col4|col1| col3|col6|col2| col5|
+-----+----+-------------------+----+----+-----+
|test1| 1|2019-01-01 00:00:00| 3.0| a|test4|
|test2| 2|2019-01-01 00:00:00| 3.0| b|test5|
|test3| 3|2019-01-01 00:00:00| 3.0| c|test6|
+-----+----+-------------------+----+----+-----+
```

## Kill duplicates

The `kill_duplicate` function completely removes all duplicate rows from a Delta table.
Expand Down
83 changes: 82 additions & 1 deletion mack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from itertools import combinations
from typing import List, Union, Dict
from typing import List, Union, Dict, Callable

from delta import DeltaTable
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, concat_ws, count, md5, row_number
from pyspark.sql.types import (
DateType,
LongType,
TimestampType,
FloatType,
IntegerType,
DoubleType,
)
from pyspark.sql.window import Window


Expand Down Expand Up @@ -596,3 +605,75 @@ def with_md5_cols(
if type(df) == DeltaTable:
df = df.toDF()
return df.withColumn(output_col_name, md5(concat_ws("||", *cols)))


def order_columns(
df: DataFrame,
user_defined_columns: List[str] = None,
user_defined_order: Callable[[List[str]], List[str]] = None,
) -> DataFrame:
"""
<description>

:param df: <description>
:type df: DataFrame
:param user_defined_columns: <description>
:type user_defined_columns: List[str]
:param user_defined_order:
:type user_defined_order: Callable[[List[str]], List[str]]

:returns: <description>
:rtype: DataFrame
"""

if not user_defined_order:

if user_defined_columns is None:
user_defined_columns = []

efficient_index_types = [
IntegerType,
FloatType,
DoubleType,
LongType,
TimestampType,
DateType,
]

remaining_columns = [
field
for field in df.schema.fields
if field.name not in user_defined_columns
]

indexable_cols = [
field.name
for field in remaining_columns
if type(field.dataType) in efficient_index_types
]
non_indexable_cols = [
field.name
for field in remaining_columns
if not (type(field.dataType) in efficient_index_types)
]

num_cols = len(user_defined_columns + indexable_cols)

SparkSession.getActiveSession().conf.set(
"spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols",
str(num_cols),
)

cols = [*user_defined_columns, *indexable_cols, *non_indexable_cols]
else:
df_cols = df.columns

cols = user_defined_order(df_cols)

for column in cols:
if column not in df_cols:
raise TypeError(
f"The data frame has these columns {df_cols!r}, but the user defined order function returned {cols!r}"
)

return df.select(*cols)
32 changes: 32 additions & 0 deletions tests/test_public_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,3 +753,35 @@ def test_with_md5_cols(tmp_path):
chispa.assert_df_equality(
with_md5, expected_df, ignore_row_order=True, ignore_schema=True
)


def test_order_columns():
data = [
(1, "a", dt(2019, 1, 1), "test1", "test4", 3.0),
(2, "b", dt(2019, 1, 1), "test2", "test5", 3.0),
(3, "c", dt(2019, 1, 1), "test3", "test6", 3.0),
]
df = spark.createDataFrame(
data,
[
"col1",
"col2",
"col3",
"col4",
"col5",
"col6",
],
)

actual_df = mack.order_columns(df, ["col4"])

expected_df = df.select("col4", "col1", "col3", "col6", "col2", "col5")

chispa.assert_df_equality(actual_df, expected_df)

def user_defined_sorting(cols):
return sorted(cols)

user_sorted_df = mack.order_columns(df, user_defined_order=user_defined_sorting)

chispa.assert_df_equality(user_sorted_df, df)