Skip to content

Commit

Permalink
fix(upgrade): include cx edges at node_ts explicitly
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Sep 26, 2024
1 parent e23d767 commit fbea005
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions pychunkedgraph/ingest/upgrade/parent_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _get_cx_edges_at_timestamp(node, response, ts):


def _populate_cx_edges_with_timestamps(
cg: ChunkedGraph, layer: int, nodes: list, earliest_ts
cg: ChunkedGraph, layer: int, nodes: list, nodes_ts:list, earliest_ts
):
"""
Collect timestamps of edits from children, since we use the same timestamp
Expand All @@ -63,9 +63,10 @@ def _populate_cx_edges_with_timestamps(
all_children = np.concatenate(list(CHILDREN.values()))
response = cg.client.read_nodes(node_ids=all_children, properties=attrs)
timestamps_d = get_parent_timestamps(cg, nodes)
for node in nodes:
for node, node_ts in zip(nodes, nodes_ts):
CX_EDGES[node] = {}
timestamps = timestamps_d[node]
timestamps.add(node_ts)
for ts in sorted(timestamps):
if ts < earliest_ts:
ts = earliest_ts
Expand All @@ -82,6 +83,7 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l
try:
cx_edges_d = CX_EDGES[node][node_ts]
except KeyError:
print(CX_EDGES)
raise KeyError(f"{node}:{node_ts}")
edges = np.concatenate([empty_2d] + list(cx_edges_d.values()))
if edges.size:
Expand Down Expand Up @@ -149,7 +151,7 @@ def update_chunk(
nodes = list(CHILDREN.keys())
random.shuffle(nodes)
nodes_ts = cg.get_node_timestamps(nodes, return_numpy=False, normalize=True)
_populate_cx_edges_with_timestamps(cg, layer, nodes, earliest_ts)
_populate_cx_edges_with_timestamps(cg, layer, nodes, nodes_ts, earliest_ts)

task_size = int(math.ceil(len(nodes) / mp.cpu_count() / 2))
chunked_nodes = chunked(nodes, task_size)
Expand Down

0 comments on commit fbea005

Please sign in to comment.