Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible bug in exception behaviour of WhenAllTask #33

Open
ajstewart opened this issue Oct 29, 2024 · 0 comments
Open

Possible bug in exception behaviour of WhenAllTask #33

ajstewart opened this issue Oct 29, 2024 · 0 comments

Comments

@ajstewart
Copy link

ajstewart commented Oct 29, 2024

Just to preface this with I'm coming from using the library in Dapr Workflows.

I'm a bit confused with what the intended behaviour should be with the WhenAllTask and what happens when one of the tasks raises an exception. As I'm having trouble catching an exception when using when_all() - the trouble being I can't.

I've narrowed it down to the fact that there are a couple of instances in this code here:

class WhenAllTask(CompositeTask[List[T]]):
"""A task that completes when all of its child tasks complete."""
def __init__(self, tasks: List[Task[T]]):
super().__init__(tasks)
self._completed_tasks = 0
self._failed_tasks = 0
@property
def pending_tasks(self) -> int:
"""Returns the number of tasks that have not yet completed."""
return len(self._tasks) - self._completed_tasks
def on_child_completed(self, task: Task[T]):
if self.is_complete:
raise ValueError('The task has already completed.')
self._completed_tasks += 1
if task.is_failed and self._exception is None:
self._exception = task.get_exception()
self._is_complete = True
if self._completed_tasks == len(self._tasks):
# The order of the result MUST match the order of the tasks provided to the constructor.
self._result = [task.get_result() for task in self._tasks]
self._is_complete = True
def get_completed_tasks(self) -> int:
return self._completed_tasks

Where an Exception is raised directly:

  1. raise ValueError('The task has already completed.')
  2. self._result = [task.get_result() for task in self._tasks]

Number 2. is raised as get_result() for a task raises an associated exception if present as far as I can see.

When either of these are raised directly then the workflow immediately transitions to a FAILED status with no chance to catch.

While I can see that having the workflow just raise the exception is probably by design, I've come across a use case where I wanted to be able to handle a fan out failure as I need to do a clean up step before failing the workflow.

However if you allow for the _exception to be set and avoid raising these errors directly with something like:

def on_child_completed(self, task: Task[T]):
        if self.is_complete:
            if self._exception is None:
                self._exception = ValueError('The task has already completed.')

            self._result = []  # I don't know what this should be

        self._completed_tasks += 1
        if task.is_failed and self._exception is None:
            self._failed_tasks += 1
            self._exception = task.get_exception()
            self._is_complete = True
        if self._completed_tasks == len(self._tasks):
            # The order of the result MUST match the order of the tasks provided to the constructor.
            self._result = [task.get_result() if not task.is_failed else task.get_exception() for task in self._tasks]
            self._is_complete = True

I am able to get the behaviour I expect where the workflow task then raises the exception that has been assigned to it and I can catch it and do what I need to do following the failure.

Though I'm not sure about that first is_complete check, is it necessary? I'm not sure if all tasks should be allowed to complete and just give the user the choice of whether to have the exception raised or have the exceptions included in the result (a bit like asyncio gather tasks I suppose).

I would be happy to open a PR for this but I'm not sure what would be wanted here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant