Skip to content

Commit

Permalink
Improve inspect context and detect undefined task logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Yogesh Kumar committed Jun 6, 2023
1 parent 676ce3e commit 2cd3065
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
14 changes: 12 additions & 2 deletions orquesta/specs/native/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ def detect_undefined_tasks(self, parent=None):
# Identify the undefined task in task transitions.
result = []
traversed = []
queued_task = []
q = queue.Queue()

for task in self.get_start_tasks():
Expand Down Expand Up @@ -412,8 +413,12 @@ def detect_undefined_tasks(self, parent=None):
continue

if self.has_task(next_task_name):
if next_task_name not in RESERVED_TASK_NAMES + traversed:
if (
next_task_name not in RESERVED_TASK_NAMES + traversed
and next_task_name not in queued_task
):
q.put(next_task_name)
queued_task.append(next_task_name)
else:
entry = {
"message": 'The task "%s" is not defined.' % next_task_name,
Expand Down Expand Up @@ -523,6 +528,7 @@ def inspect_context(self, parent=None):
ctxs = {}
errors = []
traversed = []
task_ctx_map = {}
parent_ctx = parent.get("ctx", []) if parent else []
rolling_ctx = list(set(parent_ctx))
q = queue.Queue()
Expand Down Expand Up @@ -592,7 +598,11 @@ def inspect_context(self, parent=None):
next_task_spec = self.get_task(next_task_name)

if not next_task_spec.has_join():
q.put((next_task_name, branch_ctx))
if next_task_name not in task_ctx_map or task_ctx_map[next_task_name] != set(
branch_ctx
):
q.put((next_task_name, branch_ctx))
task_ctx_map[next_task_name] = set(branch_ctx)
else:
next_task_ctx = ctxs.get(next_task_name, [])
ctxs[next_task_name] = list(set(next_task_ctx + branch_ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_serialization_function_of_data_size(self):
conductor.deserialize(conductor.serialize())

def test_runtime_function_of_splits_count(self):
num_tasks = 25
num_tasks = 75

wf_def = {"input": ["data"], "tasks": {}}

Expand All @@ -100,7 +100,33 @@ def test_runtime_function_of_splits_count(self):
t2 = datetime.datetime.utcnow()

delta = t2 - t1
self.assertLess(delta.seconds, 3)
self.assertLess(delta.seconds, 2)

def test_inspect_performance(self):
num_tasks = 75

wf_def = {"input": ["data"], "tasks": {}}

for i in range(1, num_tasks):
task_name = "t" + str(i)
next_task_name = "t" + str(i + 1)
transition = [
{"when": "<% succeeded() %>", "do": next_task_name},
{"when": "<% failed() %>", "do": next_task_name},
]
wf_def["tasks"][task_name] = {"action": "core.noop", "next": transition}

wf_def["tasks"]["t%d" % num_tasks] = {"action": "core.noop"}
wf_def["tasks"]["t%d" % (num_tasks + 1)] = {"action": "core.noop"}

spec = native_specs.WorkflowSpec(wf_def)

t1 = datetime.datetime.utcnow()
self.assertDictEqual(spec.inspect(), {})
t2 = datetime.datetime.utcnow()

delta = t2 - t1
self.assertLess(delta.seconds, 2)


class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest):
Expand Down

0 comments on commit 2cd3065

Please sign in to comment.