diff --git a/pychunkedgraph/ingest/upgrade/parent_layer.py b/pychunkedgraph/ingest/upgrade/parent_layer.py index 2869fcf8..a7e79b8f 100644 --- a/pychunkedgraph/ingest/upgrade/parent_layer.py +++ b/pychunkedgraph/ingest/upgrade/parent_layer.py @@ -51,7 +51,7 @@ def _get_cx_edges_at_timestamp(node, response, ts): def _populate_cx_edges_with_timestamps( - cg: ChunkedGraph, layer: int, nodes: list, earliest_ts + cg: ChunkedGraph, layer: int, nodes: list, nodes_ts:list, earliest_ts ): """ Collect timestamps of edits from children, since we use the same timestamp @@ -63,9 +63,10 @@ def _populate_cx_edges_with_timestamps( all_children = np.concatenate(list(CHILDREN.values())) response = cg.client.read_nodes(node_ids=all_children, properties=attrs) timestamps_d = get_parent_timestamps(cg, nodes) - for node in nodes: + for node, node_ts in zip(nodes, nodes_ts): CX_EDGES[node] = {} timestamps = timestamps_d[node] + timestamps.add(node_ts) for ts in sorted(timestamps): if ts < earliest_ts: ts = earliest_ts @@ -82,6 +83,7 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l try: cx_edges_d = CX_EDGES[node][node_ts] except KeyError: + print(CX_EDGES) raise KeyError(f"{node}:{node_ts}") edges = np.concatenate([empty_2d] + list(cx_edges_d.values())) if edges.size: @@ -149,7 +151,7 @@ def update_chunk( nodes = list(CHILDREN.keys()) random.shuffle(nodes) nodes_ts = cg.get_node_timestamps(nodes, return_numpy=False, normalize=True) - _populate_cx_edges_with_timestamps(cg, layer, nodes, earliest_ts) + _populate_cx_edges_with_timestamps(cg, layer, nodes, nodes_ts, earliest_ts) task_size = int(math.ceil(len(nodes) / mp.cpu_count() / 2)) chunked_nodes = chunked(nodes, task_size)