From 08bb64f42890e45d256117604c3c48d13819034d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Benko?= Date: Thu, 21 Dec 2023 09:33:06 +0100 Subject: [PATCH] SchemaCleaner: process entity removal (#24) Fixes unclear `KeyError` when entity configuration is completely removed. Entity removal is now checked for and included in breaking updates. When the removal is confirmed, `dp3 schema-update` drops all Mongo collections associated with this entity. --- dp3/bin/schema_update.py | 18 ++++++++---- dp3/database/schema_cleaner.py | 51 +++++++++++++++++++++++++--------- 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/dp3/bin/schema_update.py b/dp3/bin/schema_update.py index e522ac23..f45b0a3c 100644 --- a/dp3/bin/schema_update.py +++ b/dp3/bin/schema_update.py @@ -51,19 +51,25 @@ def main(args): config.get("processing_core.worker_processes"), ) - prev_schema, config_schema, updates = db.schema_cleaner.get_schema_status() + prev_schema, config_schema, updates, deleted_entites = db.schema_cleaner.get_schema_status() if prev_schema["schema"] == config_schema["schema"]: log.info("Schema is OK!") return - if not updates: + if not updates and not deleted_entites: db.schema_cleaner.schemas.insert_one(config_schema) log.info("Updated schema without any changes to master records, OK now!") return - log.info("Suggested changes to master records:") - for entity, entity_updates in updates.items(): - log.info(f"{entity}: {dict(entity_updates)}") + if deleted_entites: + log.info("Suggested removal of entities:") + for entity in deleted_entites: + log.info(f"- {entity}") + + if updates: + log.info("Suggested changes to master records:") + for entity, entity_updates in updates.items(): + log.info(f"- {entity}: {dict(entity_updates)}") if args.bypass: if not confirm_changes( @@ -78,6 +84,6 @@ def main(args): if not confirm_changes("Are you sure you want to apply these changes? (y/[n]): "): log.info("Aborted schema update.") return - db.schema_cleaner.execute_updates(updates) + db.schema_cleaner.execute_updates(updates, deleted_entites) db.schema_cleaner.schemas.insert_one(config_schema) log.info("Applied suggested changes, updated schema, OK now!") diff --git a/dp3/database/schema_cleaner.py b/dp3/database/schema_cleaner.py index 45a903d8..35cd03a3 100644 --- a/dp3/database/schema_cleaner.py +++ b/dp3/database/schema_cleaner.py @@ -15,6 +15,10 @@ # current database schema version SCHEMA_VERSION = 2 +# Collections belonging to entity +# Used when deleting no-longer existing entity. +ENTITY_COLLECTIONS = ["{}#master", "{}#raw", "{}#snapshots"] + class SchemaCleaner: """ @@ -78,12 +82,12 @@ def safe_update_schema(self): Raises: ValueError: If conflicting changes are detected. """ - db_schema, config_schema, updates = self.get_schema_status() + db_schema, config_schema, updates, deleted_entites = self.get_schema_status() if db_schema["schema"] == config_schema["schema"]: self.log.info("Schema OK!") return - if not updates: + if not updates and not deleted_entites: self.schemas.insert_one(config_schema) self.log.info("Updated schema, OK now!") return @@ -92,7 +96,7 @@ def safe_update_schema(self): self.log.warning("Please run `dp3 schema-update` to make the changes.") raise ValueError("Schema update failed: Conflicting changes detected.") - def get_schema_status(self) -> tuple[dict, dict, dict]: + def get_schema_status(self) -> tuple[dict, dict, dict, list]: """ Gets the current schema status. `database_schema` is the schema document from the database. @@ -100,26 +104,26 @@ def get_schema_status(self) -> tuple[dict, dict, dict]: `updates` is a dictionary of required updates to each entity. Returns: - Tuple of (`database_schema`, `configuration_schema`, `updates`). + Tuple of (`database_schema`, `configuration_schema`, `updates`, `deleted_entites`). """ schema_doc = self.get_current_schema_doc(infer=True) current_schema = self.construct_schema_doc() if schema_doc["schema"] == current_schema: - return schema_doc, schema_doc, {} + return schema_doc, schema_doc, {}, [] new_schema = { "_id": schema_doc["_id"] + 1, "schema": current_schema, "version": SCHEMA_VERSION, } - updates = self.detect_changes(schema_doc, current_schema) - return schema_doc, new_schema, updates + updates, deleted_entites = self.detect_changes(schema_doc, current_schema) + return schema_doc, new_schema, updates, deleted_entites def detect_changes( self, db_schema_doc: dict, current_schema: dict - ) -> dict[str, dict[str, dict[str, str]]]: + ) -> tuple[dict[str, dict[str, dict[str, str]]], list[str]]: """ Detects changes between configured schema and the one saved in the database. @@ -128,17 +132,24 @@ def detect_changes( current_schema: Schema from the configuration. Returns: - Required updates to each entity. + Tuple of required updates to each entity and list of deleted entites. """ if db_schema_doc["schema"] == current_schema: - return {} + return {}, [] if db_schema_doc["version"] != SCHEMA_VERSION: self.log.info("Schema version changed, skipping detecting changes.") - return {} + return {}, [] updates = {} + deleted_entites = [] for entity, attributes in db_schema_doc["schema"].items(): + # Unset deleted entities + if entity not in current_schema: + self.log.info("Schema breaking change: Entity %s was deleted", entity) + deleted_entites.append(entity) + continue + entity_updates = defaultdict(dict) current_attributes = current_schema[entity] @@ -170,9 +181,23 @@ def detect_changes( if entity_updates: updates[entity] = entity_updates - return updates - def execute_updates(self, updates: dict[str, dict[str, dict[str, str]]]): + return updates, deleted_entites + + def execute_updates( + self, updates: dict[str, dict[str, dict[str, str]]], deleted_entites: list[str] + ): + # Delete entities + for entity in deleted_entites: + try: + for col_placeholder in ENTITY_COLLECTIONS: + col = col_placeholder.format(entity) + self.log.info("%s: Deleting collection: %s", entity, col) + self._db[col].drop() + except Exception as e: + raise ValueError(f"Schema update failed: {e}") from e + + # Update attributes for entity, entity_updates in updates.items(): try: self.log.info(