Skip to content

Commit

Permalink
Purge errored tasks even if task object is not found (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasst authored Dec 7, 2023
1 parent 7496a77 commit ad9466d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 13 deletions.
44 changes: 32 additions & 12 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,14 +492,24 @@ def tasks_from_queue(
skip: int = 0,
limit: int = 1000,
load_executions: int = 0,
include_not_found: bool = False,
) -> Tuple[int, List["Task"]]:
"""
Returns a tuple with the following information:
* total items in the queue
* tasks from the given queue in the given state, latest first.
An integer may be passed in the load_executions parameter to indicate
how many executions should be loaded (starting from the latest).
Return tasks from a queue.
Args:
tiger: TaskTiger instance.
queue: Name of the queue.
state: State of the task (QUEUED, ACTIVE, SCHEDULED, ERROR).
limit: Maximum number of tasks to return.
load_executions: Maximum number of executions to load for each task
(starting from the latest).
include_not_found: Whether to include tasks that cannot be loaded.
Returns:
Tuple with the following information:
* total items in the queue
* tasks from the given queue in the given state, latest first.
"""

key = tiger._key(state, queue)
Expand All @@ -525,10 +535,14 @@ def tasks_from_queue(
)
results = pipeline.execute()

for serialized_data, serialized_executions, ts in zip(
results[0], results[1:], tss
for idx, serialized_data, serialized_executions, ts in zip(
range(len(items)), results[0], results[1:], tss
):
data = json.loads(serialized_data)
if serialized_data is None and include_not_found:
data = {"id": items[idx][0]}
else:
data = json.loads(serialized_data)

executions = [
json.loads(e) for e in serialized_executions if e
]
Expand All @@ -544,11 +558,17 @@ def tasks_from_queue(

tasks.append(task)
else:
data = tiger.connection.mget(
result = tiger.connection.mget(
[tiger._key("task", item[0]) for item in items]
)
for serialized_data, ts in zip(data, tss):
data = json.loads(serialized_data)
for idx, serialized_data, ts in zip(
range(len(items)), result, tss
):
if serialized_data is None and include_not_found:
data = {"id": items[idx][0]}
else:
data = json.loads(serialized_data)

task = Task(
tiger, queue=queue, _data=data, _state=state, _ts=ts
)
Expand Down
7 changes: 6 additions & 1 deletion tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,12 @@ def errored_tasks() -> Iterable[Task]:
task_limit = 5000
while total_tasks is None or skip < total_tasks:
total_tasks, tasks = Task.tasks_from_queue(
self, queue, ERROR, skip=skip, limit=task_limit
self,
queue,
ERROR,
skip=skip,
limit=task_limit,
include_not_found=True,
)
for task in tasks:
if (
Expand Down
11 changes: 11 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,17 @@ def test_purge_errored_tasks_only_errored_unique_task(self):
assert 1 == self.tiger.purge_errored_tasks()
self._ensure_queues(queued={"default": 1}, error={"default": 0})

def test_purge_errored_tasks_if_task_not_found(self):
task = self.tiger.delay(exception_task)

Worker(self.tiger).run(once=True)
self._ensure_queues(error={"default": 1})

self.tiger.connection.delete(self.tiger._key("task", task.id))

self.tiger.purge_errored_tasks()
self._ensure_queues(error={"default": 0})


class TestTasks(BaseTestCase):
"""
Expand Down

0 comments on commit ad9466d

Please sign in to comment.