From 5b7fd31b79fa99812fe3f6329cf07a852322298d Mon Sep 17 00:00:00 2001 From: Ondrej Sedlacek Date: Thu, 18 Jul 2024 14:58:02 +0200 Subject: [PATCH] Database: Unordered inserts and bulk writes. --- dp3/database/database.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index 72208d95..61ffde27 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -466,13 +466,16 @@ def update_master_records(self, etype: str, eids: list[str], records: list[dict] """ master_col = self._master_col_name(etype) try: - self._db[master_col].bulk_write( + res = self._db[master_col].bulk_write( [ ReplaceOne({"_id": eid}, record, upsert=True) for eid, record in zip(eids, records) - ] + ], + ordered=False, ) self.log.debug("Updated master records of %s (%s).", etype, len(eids)) + for error in res.bulk_api_result.get("writeErrors", []): + self.log.error("Error in bulk write: %s", error) except Exception as e: raise DatabaseError(f"Update of master records failed: {e}\n{records}") from e @@ -886,8 +889,15 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime): snapshot_col = self._snapshots_col_name(etype) try: - self._db[snapshot_col].insert_many(snapshots) - self.log.debug(f"Inserted snapshots: {snapshots}") + res = self._db[snapshot_col].insert_many(snapshots, ordered=False) + if len(res.inserted_ids) != len(snapshots): + self.log.error( + "Inserted only %s snapshots when trying to insert %s", + len(res.inserted_ids), + len(snapshots), + ) + else: + self.log.debug(f"Inserted snapshots: {snapshots}") except Exception as e: raise DatabaseError(f"Insert of snapshots failed: {e}\n{snapshots}") from e