Skip to content

Commit

Permalink
SchemaCleaner: process entity removal (#24)
Browse files Browse the repository at this point in the history
Fixes unclear `KeyError` when entity configuration is completely
removed.
Entity removal is now checked for and included in breaking updates.
When the removal is confirmed, `dp3 schema-update` drops all Mongo
collections associated with this entity.
  • Loading branch information
DavidB137 authored Dec 21, 2023
1 parent e4ed694 commit 08bb64f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 19 deletions.
18 changes: 12 additions & 6 deletions dp3/bin/schema_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,25 @@ def main(args):
config.get("processing_core.worker_processes"),
)

prev_schema, config_schema, updates = db.schema_cleaner.get_schema_status()
prev_schema, config_schema, updates, deleted_entites = db.schema_cleaner.get_schema_status()
if prev_schema["schema"] == config_schema["schema"]:
log.info("Schema is OK!")
return

if not updates:
if not updates and not deleted_entites:
db.schema_cleaner.schemas.insert_one(config_schema)
log.info("Updated schema without any changes to master records, OK now!")
return

log.info("Suggested changes to master records:")
for entity, entity_updates in updates.items():
log.info(f"{entity}: {dict(entity_updates)}")
if deleted_entites:
log.info("Suggested removal of entities:")
for entity in deleted_entites:
log.info(f"- {entity}")

if updates:
log.info("Suggested changes to master records:")
for entity, entity_updates in updates.items():
log.info(f"- {entity}: {dict(entity_updates)}")

if args.bypass:
if not confirm_changes(
Expand All @@ -78,6 +84,6 @@ def main(args):
if not confirm_changes("Are you sure you want to apply these changes? (y/[n]): "):
log.info("Aborted schema update.")
return
db.schema_cleaner.execute_updates(updates)
db.schema_cleaner.execute_updates(updates, deleted_entites)
db.schema_cleaner.schemas.insert_one(config_schema)
log.info("Applied suggested changes, updated schema, OK now!")
51 changes: 38 additions & 13 deletions dp3/database/schema_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# current database schema version
SCHEMA_VERSION = 2

# Collections belonging to entity
# Used when deleting no-longer existing entity.
ENTITY_COLLECTIONS = ["{}#master", "{}#raw", "{}#snapshots"]


class SchemaCleaner:
"""
Expand Down Expand Up @@ -78,12 +82,12 @@ def safe_update_schema(self):
Raises:
ValueError: If conflicting changes are detected.
"""
db_schema, config_schema, updates = self.get_schema_status()
db_schema, config_schema, updates, deleted_entites = self.get_schema_status()
if db_schema["schema"] == config_schema["schema"]:
self.log.info("Schema OK!")
return

if not updates:
if not updates and not deleted_entites:
self.schemas.insert_one(config_schema)
self.log.info("Updated schema, OK now!")
return
Expand All @@ -92,34 +96,34 @@ def safe_update_schema(self):
self.log.warning("Please run `dp3 schema-update` to make the changes.")
raise ValueError("Schema update failed: Conflicting changes detected.")

def get_schema_status(self) -> tuple[dict, dict, dict]:
def get_schema_status(self) -> tuple[dict, dict, dict, list]:
"""
Gets the current schema status.
`database_schema` is the schema document from the database.
`configuration_schema` is the schema document constructed from the current configuration.
`updates` is a dictionary of required updates to each entity.
Returns:
Tuple of (`database_schema`, `configuration_schema`, `updates`).
Tuple of (`database_schema`, `configuration_schema`, `updates`, `deleted_entites`).
"""
schema_doc = self.get_current_schema_doc(infer=True)

current_schema = self.construct_schema_doc()

if schema_doc["schema"] == current_schema:
return schema_doc, schema_doc, {}
return schema_doc, schema_doc, {}, []

new_schema = {
"_id": schema_doc["_id"] + 1,
"schema": current_schema,
"version": SCHEMA_VERSION,
}
updates = self.detect_changes(schema_doc, current_schema)
return schema_doc, new_schema, updates
updates, deleted_entites = self.detect_changes(schema_doc, current_schema)
return schema_doc, new_schema, updates, deleted_entites

def detect_changes(
self, db_schema_doc: dict, current_schema: dict
) -> dict[str, dict[str, dict[str, str]]]:
) -> tuple[dict[str, dict[str, dict[str, str]]], list[str]]:
"""
Detects changes between configured schema and the one saved in the database.
Expand All @@ -128,17 +132,24 @@ def detect_changes(
current_schema: Schema from the configuration.
Returns:
Required updates to each entity.
Tuple of required updates to each entity and list of deleted entites.
"""
if db_schema_doc["schema"] == current_schema:
return {}
return {}, []

if db_schema_doc["version"] != SCHEMA_VERSION:
self.log.info("Schema version changed, skipping detecting changes.")
return {}
return {}, []

updates = {}
deleted_entites = []
for entity, attributes in db_schema_doc["schema"].items():
# Unset deleted entities
if entity not in current_schema:
self.log.info("Schema breaking change: Entity %s was deleted", entity)
deleted_entites.append(entity)
continue

entity_updates = defaultdict(dict)
current_attributes = current_schema[entity]

Expand Down Expand Up @@ -170,9 +181,23 @@ def detect_changes(

if entity_updates:
updates[entity] = entity_updates
return updates

def execute_updates(self, updates: dict[str, dict[str, dict[str, str]]]):
return updates, deleted_entites

def execute_updates(
self, updates: dict[str, dict[str, dict[str, str]]], deleted_entites: list[str]
):
# Delete entities
for entity in deleted_entites:
try:
for col_placeholder in ENTITY_COLLECTIONS:
col = col_placeholder.format(entity)
self.log.info("%s: Deleting collection: %s", entity, col)
self._db[col].drop()
except Exception as e:
raise ValueError(f"Schema update failed: {e}") from e

# Update attributes
for entity, entity_updates in updates.items():
try:
self.log.info(
Expand Down

0 comments on commit 08bb64f

Please sign in to comment.