Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update durabletask protos, set custom status #31

Merged
merged 5 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v0.2.0 (Unreleased)

### New

- Support for orchestration custom status ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)

### Updates

- Updated `durabletask-protobuf` submodule reference to latest

## v0.1.1a1

### New
Expand Down
7 changes: 5 additions & 2 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def __init__(self, *,
def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Union[TInput, None] = None,
instance_id: Union[str, None] = None,
start_at: Union[datetime, None] = None) -> str:
start_at: Union[datetime, None] = None,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -113,7 +114,9 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
instanceId=instance_id if instance_id else uuid.uuid4().hex,
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""))
version=wrappers_pb2.StringValue(value=""),
orchestrationIdReusePolicy=reuse_id_policy,
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
Expand Down
385 changes: 203 additions & 182 deletions durabletask/internal/orchestrator_service_pb2.py

Large diffs are not rendered by default.

1,244 changes: 650 additions & 594 deletions durabletask/internal/orchestrator_service_pb2.pyi

Large diffs are not rendered by default.

418 changes: 330 additions & 88 deletions durabletask/internal/orchestrator_service_pb2_grpc.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def is_replaying(self) -> bool:
"""
pass

@abstractmethod
def set_custom_status(self, custom_status: str) -> None:
"""Set the custom status.
"""
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
"""Create a Timer Task to fire after at the specified deadline.
Expand Down
22 changes: 17 additions & 5 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
TypeVar, Union)

import grpc
from google.protobuf import empty_pb2
from google.protobuf import empty_pb2, wrappers_pb2

import durabletask.internal.helpers as ph
import durabletask.internal.helpers as pbh
Expand Down Expand Up @@ -188,8 +188,8 @@ def stop(self):
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
try:
executor = _OrchestrationExecutor(self._registry, self._logger)
actions = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions)
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=result.actions, customStatus=wrappers_pb2.StringValue(value=result.custom_status))
except Exception as ex:
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
failure_details = pbh.new_failure_details(ex)
Expand Down Expand Up @@ -242,6 +242,7 @@ def __init__(self, instance_id: str):
self._pending_events: Dict[str, List[task.CompletableTask]] = {}
self._new_input: Optional[Any] = None
self._save_events = False
self._custom_status: str = ""

def run(self, generator: Generator[task.Task, Any, Any]):
self._generator = generator
Expand Down Expand Up @@ -355,6 +356,9 @@ def is_replaying(self) -> bool:
def current_utc_datetime(self, value: datetime):
self._current_utc_datetime = value

def set_custom_status(self, custom_status: str) -> None:
self._custom_status = custom_status

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
return self.create_timer_internal(fire_at)

Expand Down Expand Up @@ -457,6 +461,14 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
self.set_continued_as_new(new_input, save_events)


class ExecutionResults:
actions: List[pb.OrchestratorAction]
custom_status: str

def __init__(self, actions: List[pb.OrchestratorAction], custom_status: str):
self.actions = actions
self.custom_status = custom_status

class _OrchestrationExecutor:
_generator: Optional[task.Orchestrator] = None

Expand All @@ -466,7 +478,7 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
self._is_suspended = False
self._suspended_events: List[pb.HistoryEvent] = []

def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> List[pb.OrchestratorAction]:
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> ExecutionResults:
if not new_events:
raise task.OrchestrationStateError("The new history event list must have at least one event in it.")

Expand Down Expand Up @@ -501,7 +513,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
actions = ctx.get_actions()
if self._logger.level <= logging.DEBUG:
self._logger.debug(f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}")
return actions
return ExecutionResults(actions=actions, custom_status=ctx._custom_status)

def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
if self._is_suspended and _is_suspendable(event):
Expand Down
23 changes: 23 additions & 0 deletions tests/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,26 @@ def throw_activity(ctx: task.ActivityContext, _):
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
assert state.failure_details.stack_trace is not None
assert throw_activity_counter == 4

def test_custom_status():

def empty_orchestrator(ctx: task.OrchestrationContext, _):
ctx.set_custom_status("foobaz")

# Start a worker, which will connect to the sidecar in a background thread
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(empty_orchestrator)
w.start()

c = client.TaskHubGrpcClient()
id = c.schedule_new_orchestration(empty_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)

assert state is not None
assert state.name == task.get_name(empty_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
assert state.serialized_input is None
assert state.serialized_output is None
assert state.serialized_custom_status is "\"foobaz\""
Loading
Loading