Skip to content

Commit

Permalink
Handles SQLite
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-s-molina committed Aug 6, 2024
1 parent 1a92652 commit 8bb192e
Showing 1 changed file with 62 additions and 24 deletions.
86 changes: 62 additions & 24 deletions superset/migrations/shared/catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ class Slice(Base):

ModelType = Union[Type[Query], Type[SavedQuery], Type[TabState], Type[TableSchema]]

BATCH_SIZE = 10000

MODELS: list[tuple[ModelType, str]] = [
(Query, "database_id"),
(SavedQuery, "db_id"),
Expand All @@ -124,8 +122,17 @@ def get_known_schemas(database_name: str, session: Session) -> list[str]:
return sorted({name[0][1:-1].split("].[")[-1] for name in names})


def get_batch_size(session: Session) -> int:
max_sqlite_in = 999
return max_sqlite_in if session.bind.dialect.name == "sqlite" else 10000


def print_processed_batch(
start_time: datetime, offset: int, total_rows: int, model: ModelType
start_time: datetime,
offset: int,
total_rows: int,
model: ModelType,
batch_size: int,
) -> None:
"""
Print the progress of batch processing.
Expand All @@ -139,11 +146,12 @@ def print_processed_batch(
offset (int): The current offset in the batch processing.
total_rows (int): The total number of rows to process.
model (ModelType): The model being processed.
batch_size (int): The size of the batch being processed.
"""
elapsed_time = datetime.now() - start_time
elapsed_seconds = elapsed_time.total_seconds()
elapsed_formatted = f"{int(elapsed_seconds // 3600):02}:{int((elapsed_seconds % 3600) // 60):02}:{int(elapsed_seconds % 60):02}"
rows_processed = min(offset + BATCH_SIZE, total_rows)
rows_processed = min(offset + batch_size, total_rows)
logger.info(
f"{elapsed_formatted} - {rows_processed:,} of {total_rows:,} {model.__tablename__} rows processed "
f"({(rows_processed / total_rows) * 100:.2f}%)"
Expand All @@ -158,7 +166,7 @@ def update_catalog_column(
This function iterates over a list of models defined by MODELS and updates
the `catalog` columnto the specified catalog or None depending on the downgrade
parameter. The update is performedin batches to optimize performance and reduce
parameter. The update is performed in batches to optimize performance and reduce
memory usage.
Parameters:
Expand Down Expand Up @@ -186,26 +194,41 @@ def update_catalog_column(
f"Total rows to be processed for {model.__tablename__}: {total_rows:,}"
)

batch_size = get_batch_size(session)

# Update in batches using row numbers
for i in range(0, total_rows, BATCH_SIZE):
for i in range(0, total_rows, batch_size):
subquery = (
session.query(model.id)
.filter(getattr(model, column) == database.id)
.filter(model.catalog == catalog if downgrade else True)
.order_by(model.id)
.offset(i)
.limit(BATCH_SIZE)
.limit(batch_size)
.subquery()
)
session.execute(
sa.update(model)
.where(model.id == subquery.c.id)
.values(catalog=None if downgrade else catalog)
.execution_options(synchronize_session=False)
)

# SQLite does not support multiple-table criteria within UPDATE
if session.bind.dialect.name == "sqlite":
ids_to_update = [row.id for row in session.query(subquery.c.id).all()]
if ids_to_update:
session.execute(
sa.update(model)
.where(model.id.in_(ids_to_update))
.values(catalog=None if downgrade else catalog)
.execution_options(synchronize_session=False)
)
else:
session.execute(

Check warning on line 222 in superset/migrations/shared/catalogs.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/catalogs.py#L222

Added line #L222 was not covered by tests
sa.update(model)
.where(model.id == subquery.c.id)
.values(catalog=None if downgrade else catalog)
.execution_options(synchronize_session=False)
)

# Commit the transaction after each batch
session.commit()
print_processed_batch(start_time, i, total_rows, model)
print_processed_batch(start_time, i, total_rows, model, batch_size)


def update_schema_catalog_perms(
Expand Down Expand Up @@ -257,8 +280,10 @@ def update_schema_catalog_perms(
.filter(Database.id == database.id)
.filter(Slice.datasource_type == "table")
):
chart.catalog_perm = catalog_perm
chart.schema_perm = mapping[chart.datasource_id]
# We only care about tables that exist in the mapping
if mapping.get(chart.datasource_id) is not None:
chart.catalog_perm = catalog_perm
chart.schema_perm = mapping[chart.datasource_id]


def delete_models_non_default_catalog(
Expand Down Expand Up @@ -292,25 +317,38 @@ def delete_models_non_default_catalog(
f"Total rows to be processed for {model.__tablename__}: {total_rows:,}"
)

batch_size = get_batch_size(session)

# Update in batches using row numbers
for i in range(0, total_rows, BATCH_SIZE):
for i in range(0, total_rows, batch_size):
subquery = (

Check warning on line 324 in superset/migrations/shared/catalogs.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/catalogs.py#L324

Added line #L324 was not covered by tests
session.query(model.id)
.filter(getattr(model, column) == database.id)
.filter(model.catalog != catalog)
.order_by(model.id)
.offset(i)
.limit(BATCH_SIZE)
.limit(batch_size)
.subquery()
)
session.execute(
sa.delete(model)
.where(model.id == subquery.c.id)
.execution_options(synchronize_session=False)
)

# SQLite does not support multiple-table criteria within DELETE
if session.bind.dialect.name == "sqlite":
ids_to_delete = [row.id for row in session.query(subquery.c.id).all()]
if ids_to_delete:
session.execute(

Check warning on line 338 in superset/migrations/shared/catalogs.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/catalogs.py#L335-L338

Added lines #L335 - L338 were not covered by tests
sa.delete(model)
.where(model.id.in_(ids_to_delete))
.execution_options(synchronize_session=False)
)
else:
session.execute(

Check warning on line 344 in superset/migrations/shared/catalogs.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/catalogs.py#L344

Added line #L344 was not covered by tests
sa.delete(model)
.where(model.id == subquery.c.id)
.execution_options(synchronize_session=False)
)
# Commit the transaction after each batch
session.commit()
print_processed_batch(start_time, i, total_rows, model)
print_processed_batch(start_time, i, total_rows, model, batch_size)

Check warning on line 351 in superset/migrations/shared/catalogs.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/catalogs.py#L350-L351

Added lines #L350 - L351 were not covered by tests


def upgrade_catalog_perms(engines: set[str] | None = None) -> None:
Expand Down

0 comments on commit 8bb192e

Please sign in to comment.