Skip to content

Commit

Permalink
Ignore unknown tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
nfcampos committed Nov 5, 2024
1 parent de8e487 commit 18a3fa4
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions libs/langgraph/langgraph/pregel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ def _prepare_state_snapshot(
for tid, k, v in saved.pending_writes:
if k in (ERROR, INTERRUPT, SCHEDULED):
continue
if tid not in next_tasks:
continue
next_tasks[tid].writes.append((k, v))
if tasks := [t for t in next_tasks.values() if t.writes]:
apply_writes(saved.checkpoint, channels, tasks, None)
Expand Down Expand Up @@ -615,6 +617,8 @@ async def _aprepare_state_snapshot(
for tid, k, v in saved.pending_writes:
if k in (ERROR, INTERRUPT, SCHEDULED):
continue
if tid not in next_tasks:
continue
next_tasks[tid].writes.append((k, v))
if tasks := [t for t in next_tasks.values() if t.writes]:
apply_writes(saved.checkpoint, channels, tasks, None)
Expand Down Expand Up @@ -891,6 +895,8 @@ def update_state(
for tid, k, v in saved.pending_writes:
if k in (ERROR, INTERRUPT, SCHEDULED):
continue
if tid not in next_tasks:
continue
next_tasks[tid].writes.append((k, v))
if tasks := [t for t in next_tasks.values() if t.writes]:
apply_writes(checkpoint, channels, tasks, None)
Expand Down Expand Up @@ -1073,6 +1079,8 @@ async def aupdate_state(
for tid, k, v in saved.pending_writes:
if k in (ERROR, INTERRUPT, SCHEDULED):
continue
if tid not in next_tasks:
continue
next_tasks[tid].writes.append((k, v))
if tasks := [t for t in next_tasks.values() if t.writes]:
apply_writes(checkpoint, channels, tasks, None)
Expand Down

0 comments on commit 18a3fa4

Please sign in to comment.