Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/lambda functions #230

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[*]
end_of_line = lf
insert_final_newline = true

[*.py]
indent_style = space
indent_size = 4
trim_trailing_whitespace = true
38 changes: 38 additions & 0 deletions examples/lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from simpleflow import Workflow
from simpleflow.lambda_function import LambdaFunction
from simpleflow.swf.task import LambdaFunctionTask

"""
The lambda function is:

from __future__ import print_function

import json

print('Loading function')


def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
return 42
"""


class LambdaWorkflow(Workflow):
name = 'basic'
version = 'example'
task_list = 'example'
lambda_role = 'arn:aws:iam::111111000000:role/swf-lambda' # optional, overridable (--lambda-role)

def run(self):
future = self.submit(
LambdaFunctionTask(
LambdaFunction(
'hello-world-python',
idempotent=True,
),
8,
foo='bar',
)
)
print(future.result)
10 changes: 10 additions & 0 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def transform_input(wf_input):
type=comma_separated_list,
required=False,
help='Tags for the workflow execution.')
@click.option('--lambda-role',
required=False,
help='Lambda role.')
@click.option('--decision-tasks-timeout',
required=False,
help='Timeout for the decision tasks.')
Expand All @@ -183,6 +186,7 @@ def start_workflow(workflow,
task_list,
execution_timeout,
tags,
lambda_role,
decision_tasks_timeout,
input,
input_file,
Expand Down Expand Up @@ -210,6 +214,7 @@ def start_workflow(workflow,
execution_timeout=execution_timeout,
input=wf_input,
tag_list=tags,
lambda_role=lambda_role or workflow_class.lambda_role,
decision_tasks_timeout=decision_tasks_timeout,
)
print('{workflow_id} {run_id}'.format(
Expand Down Expand Up @@ -481,6 +486,9 @@ def create_unique_task_list(workflow_id=''):
type=comma_separated_list,
required=False,
help='Tags identifying the workflow execution.')
@click.option('--lambda-role',
required=False,
help='Lambda role.')
@click.option('--decision-tasks-timeout',
required=False,
help='Decision tasks timeout.')
Expand Down Expand Up @@ -515,6 +523,7 @@ def standalone(context,
workflow_id,
execution_timeout,
tags,
lambda_role,
decision_tasks_timeout,
input,
input_file,
Expand Down Expand Up @@ -617,6 +626,7 @@ def standalone(context,
task_list,
execution_timeout,
tags,
lambda_role,
decision_tasks_timeout,
format.input(wf_input),
None,
Expand Down
90 changes: 90 additions & 0 deletions simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class History(object):
:type _markers: collections.OrderedDict[str, list[dict[str, Any]]]
:ivar _timers: timer events
:type _timers: dict[str, dict[str, Any]]]
:ivar _lambda_functions: activity events
:type _lambda_functions: collections.OrderedDict[str, dict[str, Any]]
:ivar _tasks: ordered list of tasks/etc
:type _tasks: list[dict[str, Any]]
"""
Expand All @@ -40,6 +42,7 @@ def __init__(self, history):
self._signaled_workflows = collections.defaultdict(list)
self._markers = collections.OrderedDict()
self._timers = {}
self._lambda_functions = collections.OrderedDict()
self._tasks = []
self._cancel_requested = None
self._cancel_failed = None
Expand Down Expand Up @@ -119,6 +122,14 @@ def cancel_failed_decision_task_completed_event_id(self):
"""
return self._cancel_failed['decision_task_completed_event_id'] if self._cancel_failed else None

@property
def lambda_functions(self):
"""
:return: lambda_functions
:rtype: collections.OrderedDict[str, dict[str, Any]]
"""
return self._lambda_functions

@property
def signaled_workflows(self):
"""
Expand Down Expand Up @@ -639,6 +650,84 @@ def parse_decision_event(self, events, event):
if event.state == 'completed':
self.completed_decision_id = event.id

def parse_lambda_function_event(self, events, event):
"""
Parse a lambda function event.
:param events:
:param event:
"""
def get_lambda():
scheduled_event_id = events[event.scheduled_event_id - 1]
return self._lambda_functions[scheduled_event_id.lambda_id]

if event.state == 'scheduled':
lambda_function = {
'type': 'lambda_function',
'id': event.lambda_id,
'name': event.lambda_name,
'input': event.input,
'state': event.state,
'start_to_close_timeout': getattr(event, 'start_to_close_timeout', None),
'scheduled_id': event.id,
'scheduled_timestamp': event.timestamp,
}
self._lambda_functions[event.lambda_id] = lambda_function
elif event.state == 'schedule_failed':
lambda_function = {
'type': 'lambda_function',
'id': event.lambda_id,
'name': event.lambda_name,
'state': event.state,
'schedule_failed_id': event.id,
'schedule_failed_timestamp': event.timestamp,
}
self._lambda_functions[event.lambda_id] = lambda_function
elif event.state == 'started':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'started_id': event.id,
'started_timestamp': event.timestamp,
})
elif event.state == 'start_failed':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'cause': event.cause,
'message': getattr(event, 'message', ''),
'start_failed_id': event.id,
'start_failed_timestamp': event.timestamp,
'retry': lambda_function.get('retry', -1) + 1,
})
elif event.state == 'completed':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'result': getattr(event, 'result', None),
'completed_id': event.id,
'completed_timestamp': event.timestamp,
})
elif event.state == 'failed':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'reason': getattr(event, 'reason', ''),
'details': getattr(event, 'details', ''),
'failed_id': event.id,
'failed_timestamp': event.timestamp,
'retry': lambda_function.get('retry', -1) + 1,
})
elif event.state == 'timed_out':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'timeout_type': getattr(event, 'timeout_type', 'START_TO_CLOSE'),
'timeout_value': lambda_function['start_to_close_timeout'],
'timed_out_id': event.id,
'timed_out_timestamp': event.timestamp,
'retry': lambda_function.get('retry', -1) + 1,
})

TYPE_TO_PARSER = {
'ActivityTask': parse_activity_event,
'DecisionTask': parse_decision_event,
Expand All @@ -647,6 +736,7 @@ def parse_decision_event(self, events, event):
'ExternalWorkflowExecution': parse_external_workflow_event,
'Marker': parse_marker_event,
'Timer': parse_timer_event,
'LambdaFunction': parse_lambda_function_event,
}

def parse(self):
Expand Down
14 changes: 14 additions & 0 deletions simpleflow/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from simpleflow.base import Submittable


class LambdaFunction(Submittable):
def __init__(self,
name,
start_to_close_timeout=None,
idempotent=None,
is_python_function=True,
):
self.name = name
self.start_to_close_timeout = start_to_close_timeout
self.idempotent = idempotent
self.is_python_function = is_python_function
86 changes: 83 additions & 3 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
MarkerTask,
TimerTask,
CancelTimerTask,
LambdaFunctionTask,
)
from simpleflow.utils import (
hex_hash,
Expand Down Expand Up @@ -383,6 +384,51 @@ def _get_future_from_child_workflow_event(self, event):

return future

def _get_future_from_lambda_function_event(self, event):
"""

:param event: child workflow event
:type event: dict[str, Any]
:return:
:rtype: futures.Future
"""
future = futures.Future()
state = event['state']

if state == 'scheduled':
pass
elif state == 'schedule_failed':
logger.info('failed to schedule {}: {}'.format(
event['name'],
event['cause'],
))
return None
elif state == 'started':
future.set_running()
elif state == 'completed':
future.set_finished(format.decode(event['result']))
elif state == 'failed':
future.set_exception(exceptions.TaskFailed(
name=event['id'],
reason=event['reason'],
details=event.get('details'),
))
elif state == 'start_failed':
future.set_exception(exceptions.TaskFailed(
name=event['id'],
reason=event['cause'],
details=event.get('message'),
))
elif state == 'timed_out':
future.set_exception(exceptions.TimeoutError(
event['timeout_type'],
None,
))
else:
logger.warning('Unknown state: %s', state)

return future

def _get_future_from_marker_event(self, a_task, event):
"""Maps a marker event to a Future with the corresponding
state.
Expand Down Expand Up @@ -520,6 +566,19 @@ def find_child_workflow_event(self, a_task, history):
"""
return history.child_workflows.get(a_task.id)

def find_lambda_function_event(self, a_task, history):
"""
Get the event corresponding to a lambda function, if any.

:param a_task:
:type a_task: LambdaFunctionTask
:param history:
:type history: simpleflow.history.History
:return:
:rtype: Optional[dict]
"""
return history.lambda_functions.get(a_task.id)

def find_signal_event(self, a_task, history):
"""
Get the event corresponding to a signal, if any.
Expand Down Expand Up @@ -593,6 +652,7 @@ def find_timer_event(self, a_task, history):
MarkerTask: find_marker_event,
TimerTask: find_timer_event,
CancelTimerTask: find_timer_event,
LambdaFunctionTask: find_lambda_function_event,
}

def find_event(self, a_task, history):
Expand Down Expand Up @@ -794,12 +854,30 @@ def find_timer_associated_with(self, event, swf_task):
def get_retry_task_timer_id(swf_task):
return '__simpleflow_task_{}'.format(str(swf_task.id))

def resume_lambda_function(self, a_task, event):
"""
Resume a child workflow.

:param a_task:
:type a_task: LambdaTask
:param event:
:type event: dict
:return:
:rtype: simpleflow.futures.Future
"""
future = self._get_future_from_lambda_function_event(event)

if future.finished and future.exception:
raise future.exception

return future

def schedule_task(self, a_task, task_list=None):
"""
Let a task schedule itself.
If too many decisions are in flight, add a timer decision and raise ExecutionBlocked.
:param a_task:
:type a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask
:type a_task: ActivityTask | WorkflowTask | SignalTask [ MarkerTask [ TimerTask | CancelTimerTask | LambdaFunctionTask # noqa
:param task_list:
:type task_list: Optional[str]
:raise: exceptions.ExecutionBlocked if too many decisions waiting
Expand Down Expand Up @@ -862,6 +940,7 @@ def _add_start_timer_decision(self, id, timeout=0):
'external_workflow': get_future_from_external_workflow_event,
'marker': _get_future_from_marker_event,
'timer': _get_future_from_timer_event,
'lambda_function': resume_lambda_function,
}

def resume(self, a_task, *args, **kwargs):
Expand Down Expand Up @@ -1391,8 +1470,9 @@ def get_event_details(self, event_type, event_name):

def handle_cancel_requested(self):
decision = swf.models.decision.WorkflowExecutionDecision()
is_current_decision = self._history.completed_decision_id < self._history.cancel_requested_id
should_cancel = self._workflow.should_cancel(self._history)
history = self._history
is_current_decision = history.completed_decision_id < history.cancel_requested_id
should_cancel = self._workflow.should_cancel(history)
if not should_cancel:
return None # ignore cancel
if is_current_decision:
Expand Down
Loading