From fbea0052be13523b1ed192b23cc464b4ad832bdd Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Thu, 26 Sep 2024 16:59:39 +0000 Subject: [PATCH] fix(upgrade): include cx edges at node_ts explicitly --- pychunkedgraph/ingest/upgrade/parent_layer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pychunkedgraph/ingest/upgrade/parent_layer.py b/pychunkedgraph/ingest/upgrade/parent_layer.py index 2869fcf8..a7e79b8f 100644 --- a/pychunkedgraph/ingest/upgrade/parent_layer.py +++ b/pychunkedgraph/ingest/upgrade/parent_layer.py @@ -51,7 +51,7 @@ def _get_cx_edges_at_timestamp(node, response, ts): def _populate_cx_edges_with_timestamps( - cg: ChunkedGraph, layer: int, nodes: list, earliest_ts + cg: ChunkedGraph, layer: int, nodes: list, nodes_ts:list, earliest_ts ): """ Collect timestamps of edits from children, since we use the same timestamp @@ -63,9 +63,10 @@ def _populate_cx_edges_with_timestamps( all_children = np.concatenate(list(CHILDREN.values())) response = cg.client.read_nodes(node_ids=all_children, properties=attrs) timestamps_d = get_parent_timestamps(cg, nodes) - for node in nodes: + for node, node_ts in zip(nodes, nodes_ts): CX_EDGES[node] = {} timestamps = timestamps_d[node] + timestamps.add(node_ts) for ts in sorted(timestamps): if ts < earliest_ts: ts = earliest_ts @@ -82,6 +83,7 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l try: cx_edges_d = CX_EDGES[node][node_ts] except KeyError: + print(CX_EDGES) raise KeyError(f"{node}:{node_ts}") edges = np.concatenate([empty_2d] + list(cx_edges_d.values())) if edges.size: @@ -149,7 +151,7 @@ def update_chunk( nodes = list(CHILDREN.keys()) random.shuffle(nodes) nodes_ts = cg.get_node_timestamps(nodes, return_numpy=False, normalize=True) - _populate_cx_edges_with_timestamps(cg, layer, nodes, earliest_ts) + _populate_cx_edges_with_timestamps(cg, layer, nodes, nodes_ts, earliest_ts) task_size = int(math.ceil(len(nodes) / mp.cpu_count() / 2)) chunked_nodes = chunked(nodes, task_size)