From 8a3bad8510d3f7c012c900757d6ab7c630595c83 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Fri, 16 Aug 2024 15:22:10 -0500 Subject: [PATCH] adds latest flag to get timestamps when roots expire (#500) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adds latest flag to get timestamps when roots expire * fix: use timestamp from column * fix: invert mask, create separate function * Bump version: 2.16.0 → 2.17.0 * fix: handle timestamp from before node was created * remove version conflict --------- Co-authored-by: Forrest Collman --- pychunkedgraph/app/segmentation/common.py | 40 +++++++++++++++++--- pychunkedgraph/app/segmentation/v1/routes.py | 6 ++- pychunkedgraph/graph/chunkedgraph.py | 2 +- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/pychunkedgraph/app/segmentation/common.py b/pychunkedgraph/app/segmentation/common.py index 6b430f7b6..08cea92ca 100644 --- a/pychunkedgraph/app/segmentation/common.py +++ b/pychunkedgraph/app/segmentation/common.py @@ -5,6 +5,7 @@ import time from datetime import datetime from functools import reduce +from collections import deque import numpy as np import pandas as pd @@ -1067,7 +1068,34 @@ def handle_is_latest_roots(table_id, is_binary): return cg.is_latest_roots(node_ids, time_stamp=timestamp) -def handle_root_timestamps(table_id, is_binary): +def _handle_latest(cg, node_ids, timestamp): + latest_mask = cg.is_latest_roots(node_ids, time_stamp=timestamp) + non_latest_ids = node_ids[~latest_mask] + row_dict = cg.client.read_nodes( + node_ids=non_latest_ids, + properties=attributes.Hierarchy.NewParent, + end_time=timestamp, + ) + + new_roots_ts = [] + for n in node_ids: + try: + v = row_dict[n] + new_roots_ts.append(v[-1].timestamp.timestamp()) # sorted descending + except KeyError: + new_roots_ts.append(0) + new_roots_ts = deque(new_roots_ts) + + result = [] + for x in latest_mask: + if x: + result.append(timestamp.timestamp()) + else: + result.append(new_roots_ts.popleft()) + return result + + +def handle_root_timestamps(table_id, is_binary, latest: bool = False): current_app.request_type = "root_timestamps" current_app.table_id = table_id @@ -1076,11 +1104,13 @@ def handle_root_timestamps(table_id, is_binary): else: node_ids = np.array(json.loads(request.data)["node_ids"], dtype=np.uint64) - # Call ChunkedGraph cg = app_utils.get_cg(table_id) - - timestamps = cg.get_node_timestamps(node_ids, return_numpy=False) - return [ts.timestamp() for ts in timestamps] + timestamp = _parse_timestamp("timestamp", time.time(), return_datetime=True) + if latest: + return _handle_latest(cg, node_ids, timestamp) + else: + timestamps = cg.get_node_timestamps(node_ids, return_numpy=False) + return [ts.timestamp() for ts in timestamps] ### OPERATION DETAILS ------------------------------------------------------------ diff --git a/pychunkedgraph/app/segmentation/v1/routes.py b/pychunkedgraph/app/segmentation/v1/routes.py index 0f7ac9d9c..e9708bf5e 100644 --- a/pychunkedgraph/app/segmentation/v1/routes.py +++ b/pychunkedgraph/app/segmentation/v1/routes.py @@ -532,7 +532,11 @@ def handle_is_latest_roots(table_id): def handle_root_timestamps(table_id): int64_as_str = request.args.get("int64_as_str", default=False, type=toboolean) is_binary = request.args.get("is_binary", default=False, type=toboolean) - root_timestamps = common.handle_root_timestamps(table_id, is_binary=is_binary) + latest = request.args.get("latest", default=False, type=toboolean) + is_binary = request.args.get("is_binary", default=False, type=toboolean) + root_timestamps = common.handle_root_timestamps( + table_id, is_binary=is_binary, latest=latest + ) resp = {"timestamp": root_timestamps} return jsonify_with_kwargs(resp, int64_as_str=int64_as_str) diff --git a/pychunkedgraph/graph/chunkedgraph.py b/pychunkedgraph/graph/chunkedgraph.py index a5b08ff03..210bff50b 100644 --- a/pychunkedgraph/graph/chunkedgraph.py +++ b/pychunkedgraph/graph/chunkedgraph.py @@ -517,7 +517,7 @@ def is_latest_roots( root_ids: typing.Iterable, time_stamp: typing.Optional[datetime.datetime] = None, ) -> typing.Iterable: - """Determines whether root ids are superseeded.""" + """Determines whether root ids are superseded.""" time_stamp = misc_utils.get_valid_timestamp(time_stamp) row_dict = self.client.read_nodes(