Skip to content

Commit

Permalink
Database: Add "latest" tag to snapshots.
Browse files Browse the repository at this point in the history
Avoids sorting when fetching latest snapshots.
  • Loading branch information
xsedla1o committed Aug 29, 2024
1 parent 919b240 commit f8cdd93
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions dp3/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ def _init_database_schema(self, db_name) -> None:
partialFilterExpression={"count": {"$lt": self._snapshot_bucket_size}},
background=True,
)

# To fetch all the latest snapshots without relying on date
self._db[snapshot_col].create_index(
[("latest", pymongo.DESCENDING), ("_id", pymongo.ASCENDING)],
partialFilterExpression={"latest": {"$eq": True}},
background=True,
)

# To fetch the oversized entities only
self._db[snapshot_col].create_index(
[("oversized", pymongo.DESCENDING)],
Expand Down Expand Up @@ -936,7 +944,7 @@ def get_latest_snapshots(
fulltext_filters = {}

if not generic_filter:
generic_filter = {}
generic_filter: dict[str, Any] = {"latest": True}

# Create base of query
query = generic_filter
Expand All @@ -962,13 +970,9 @@ def get_latest_snapshots(
else:
query["last." + attr] = fulltext_filter

lsd = self._get_latest_snapshots_date()
if lsd is not None:
query["_time_created"] = {"$gt": lsd - self._bucket_delta}

try:
return self._db[snapshot_col].find(query, {"last": 1}).sort(
[("_time_created", pymongo.DESCENDING), ("_id", pymongo.ASCENDING)]
[("_id", pymongo.ASCENDING)]
), self._db[snapshot_col].count_documents(query)
except OperationFailure as e:
raise DatabaseError("Invalid query") from e
Expand Down Expand Up @@ -1164,6 +1168,7 @@ def save_snapshot(self, etype: str, snapshot: dict, ctime: datetime):
"_id": self._snapshot_bucket_id(eid, ctime),
"_time_created": ctime,
"oversized": False,
"latest": True,
},
},
upsert=True,
Expand Down Expand Up @@ -1266,6 +1271,7 @@ def save_snapshots(self, etype: str, snapshots: list[dict], ctime: datetime):
"_id": self._snapshot_bucket_id(eid, ctime),
"_time_created": ctime,
"oversized": False,
"latest": True,
},
},
upsert=True,
Expand All @@ -1288,6 +1294,26 @@ def save_snapshots(self, etype: str, snapshots: list[dict], ctime: datetime):
if upserts:
try:
res = self._db[snapshot_col].bulk_write(upserts, ordered=False)

# Unset latest snapshots if new snapshots were inserted
if res.upserted_count > 0:
unset_latest_updates = []
for upsert_id in res.upserted_ids.values():
eid = upsert_id.rsplit("_#", maxsplit=1)[0]
unset_latest_updates.append(
UpdateOne(
self._snapshot_bucket_eid_filter(eid)
| {"latest": True, "count": self._snapshot_bucket_size},
{"$unset": {"latest": 1}},
)
)
up_res = self._db[snapshot_col].bulk_write(unset_latest_updates)
if up_res.modified_count != res.upserted_count:
self.log.info(
"Upserted the first snapshot for %d entities.",
res.upserted_count - up_res.modified_count,
)

if res.modified_count + res.upserted_count != len(upserts):
self.log.error(
"Some snapshots were not updated, %s != %s",
Expand Down Expand Up @@ -1533,11 +1559,6 @@ def get_distinct_val_count(self, etype: str, attr: str) -> dict[Any, int]:
except KeyError as e:
raise DatabaseError(f"Attribute '{attr}' does not exist") from e

# Find newest fully completed snapshot date
latest_snapshot_date = self._get_latest_snapshots_date()
if latest_snapshot_date is None:
return {}

if attr_spec.t not in AttrType.PLAIN | AttrType.OBSERVATIONS:
raise DatabaseError(f"Attribute '{attr}' isn't plain or observations")

Expand All @@ -1564,7 +1585,7 @@ def get_distinct_val_count(self, etype: str, attr: str) -> dict[Any, int]:
agg_query_group_id += ".eid"

agg_query = [
{"$match": {"_time_created": {"$gt": latest_snapshot_date - self._bucket_delta}}},
{"$match": {"latest": True}},
*unwinding,
{"$group": {"_id": agg_query_group_id, "count": {"$sum": 1}}},
{"$sort": {"_id": 1, "count": -1}},
Expand Down

0 comments on commit f8cdd93

Please sign in to comment.