Skip to content

Commit

Permalink
adds latest flag to get timestamps when roots expire (#500)
Browse files Browse the repository at this point in the history
* adds latest flag to get timestamps when roots expire

* fix: use timestamp from column

* fix: invert mask, create separate function

* Bump version: 2.16.0 → 2.17.0

* fix: handle timestamp from before node was created

* remove version conflict

---------

Co-authored-by: Forrest Collman <[email protected]>
  • Loading branch information
akhileshh and fcollman authored Aug 16, 2024
1 parent 7abeb9b commit 8a3bad8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 7 deletions.
40 changes: 35 additions & 5 deletions pychunkedgraph/app/segmentation/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from datetime import datetime
from functools import reduce
from collections import deque

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -1067,7 +1068,34 @@ def handle_is_latest_roots(table_id, is_binary):
return cg.is_latest_roots(node_ids, time_stamp=timestamp)


def handle_root_timestamps(table_id, is_binary):
def _handle_latest(cg, node_ids, timestamp):
latest_mask = cg.is_latest_roots(node_ids, time_stamp=timestamp)
non_latest_ids = node_ids[~latest_mask]
row_dict = cg.client.read_nodes(
node_ids=non_latest_ids,
properties=attributes.Hierarchy.NewParent,
end_time=timestamp,
)

new_roots_ts = []
for n in node_ids:
try:
v = row_dict[n]
new_roots_ts.append(v[-1].timestamp.timestamp()) # sorted descending
except KeyError:
new_roots_ts.append(0)
new_roots_ts = deque(new_roots_ts)

result = []
for x in latest_mask:
if x:
result.append(timestamp.timestamp())
else:
result.append(new_roots_ts.popleft())
return result


def handle_root_timestamps(table_id, is_binary, latest: bool = False):
current_app.request_type = "root_timestamps"
current_app.table_id = table_id

Expand All @@ -1076,11 +1104,13 @@ def handle_root_timestamps(table_id, is_binary):
else:
node_ids = np.array(json.loads(request.data)["node_ids"], dtype=np.uint64)

# Call ChunkedGraph
cg = app_utils.get_cg(table_id)

timestamps = cg.get_node_timestamps(node_ids, return_numpy=False)
return [ts.timestamp() for ts in timestamps]
timestamp = _parse_timestamp("timestamp", time.time(), return_datetime=True)
if latest:
return _handle_latest(cg, node_ids, timestamp)
else:
timestamps = cg.get_node_timestamps(node_ids, return_numpy=False)
return [ts.timestamp() for ts in timestamps]


### OPERATION DETAILS ------------------------------------------------------------
Expand Down
6 changes: 5 additions & 1 deletion pychunkedgraph/app/segmentation/v1/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,11 @@ def handle_is_latest_roots(table_id):
def handle_root_timestamps(table_id):
int64_as_str = request.args.get("int64_as_str", default=False, type=toboolean)
is_binary = request.args.get("is_binary", default=False, type=toboolean)
root_timestamps = common.handle_root_timestamps(table_id, is_binary=is_binary)
latest = request.args.get("latest", default=False, type=toboolean)
is_binary = request.args.get("is_binary", default=False, type=toboolean)
root_timestamps = common.handle_root_timestamps(
table_id, is_binary=is_binary, latest=latest
)
resp = {"timestamp": root_timestamps}

return jsonify_with_kwargs(resp, int64_as_str=int64_as_str)
Expand Down
2 changes: 1 addition & 1 deletion pychunkedgraph/graph/chunkedgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def is_latest_roots(
root_ids: typing.Iterable,
time_stamp: typing.Optional[datetime.datetime] = None,
) -> typing.Iterable:
"""Determines whether root ids are superseeded."""
"""Determines whether root ids are superseded."""
time_stamp = misc_utils.get_valid_timestamp(time_stamp)

row_dict = self.client.read_nodes(
Expand Down

0 comments on commit 8a3bad8

Please sign in to comment.