Skip to content

Commit

Permalink
Database: Unordered inserts and bulk writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
xsedla1o committed Jul 18, 2024
1 parent 3c556af commit 5b7fd31
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions dp3/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,16 @@ def update_master_records(self, etype: str, eids: list[str], records: list[dict]
"""
master_col = self._master_col_name(etype)
try:
self._db[master_col].bulk_write(
res = self._db[master_col].bulk_write(
[
ReplaceOne({"_id": eid}, record, upsert=True)
for eid, record in zip(eids, records)
]
],
ordered=False,
)
self.log.debug("Updated master records of %s (%s).", etype, len(eids))
for error in res.bulk_api_result.get("writeErrors", []):
self.log.error("Error in bulk write: %s", error)
except Exception as e:
raise DatabaseError(f"Update of master records failed: {e}\n{records}") from e

Expand Down Expand Up @@ -886,8 +889,15 @@ def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime):

snapshot_col = self._snapshots_col_name(etype)
try:
self._db[snapshot_col].insert_many(snapshots)
self.log.debug(f"Inserted snapshots: {snapshots}")
res = self._db[snapshot_col].insert_many(snapshots, ordered=False)
if len(res.inserted_ids) != len(snapshots):
self.log.error(
"Inserted only %s snapshots when trying to insert %s",
len(res.inserted_ids),
len(snapshots),
)
else:
self.log.debug(f"Inserted snapshots: {snapshots}")
except Exception as e:
raise DatabaseError(f"Insert of snapshots failed: {e}\n{snapshots}") from e

Expand Down

0 comments on commit 5b7fd31

Please sign in to comment.