diff --git a/CHANGELOG.md b/CHANGELOG.md index 865fd2c53..a7b2f543f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * Adding `dry_run` parameter to `Workflow.run()`. It allows to test resources, dependencies and infrastructure while ignoring user task code. * Added `orq reset` as a shortcut for `orq down`, `orq up` +* New CLI output formatting for a subset of commands. 🧟 *Deprecations* diff --git a/setup.cfg b/setup.cfg index a3af7244a..91b80d80d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -56,6 +56,7 @@ install_requires = inquirer~=3.0 tabulate pygments~=2.0 + rich~=13.5 # For automatic login aiohttp~=3.8 # Decoding JWTs diff --git a/src/orquestra/sdk/_base/cli/_services/_down.py b/src/orquestra/sdk/_base/cli/_services/_down.py index 92301cd04..7b93a4e6f 100644 --- a/src/orquestra/sdk/_base/cli/_services/_down.py +++ b/src/orquestra/sdk/_base/cli/_services/_down.py @@ -1,6 +1,7 @@ ################################################################################ # © Copyright 2023 Zapata Computing Inc. ################################################################################ +import subprocess from typing import Optional from orquestra.sdk.schema.responses import ServiceResponse @@ -39,15 +40,40 @@ def on_cmd_call( manage_ray=manage_ray, manage_all=manage_all ) - with self._presenter.show_progress( - resolved_services, label="Stopping" - ) as progress: - for service in progress: - service.down() + success = True + responses = [] - services = [ - ServiceResponse(name=svc.name, is_running=svc.is_running(), info=None) - for svc in resolved_services - ] + with self._presenter.progress_spinner("Stopping"): + for service in resolved_services: + try: + service.down() + responses.append( + ServiceResponse( + name=service.name, + is_running=service.is_running(), + info=None, + ) + ) + except subprocess.CalledProcessError as e: + success = False + responses.append( + ServiceResponse( + name=service.name, + is_running=True, + info="\n".join( + [ + "command:", + str(e.cmd), + "stdout:", + *e.stdout.decode().splitlines(), + "stderr:", + *e.stderr.decode().splitlines(), + ] + ), + ) + ) - self._presenter.show_services(services=services) + if success: + self._presenter.show_services(responses) + else: + self._presenter.show_failure(responses) diff --git a/src/orquestra/sdk/_base/cli/_services/_up.py b/src/orquestra/sdk/_base/cli/_services/_up.py index c1a77ca78..58f649a77 100644 --- a/src/orquestra/sdk/_base/cli/_services/_up.py +++ b/src/orquestra/sdk/_base/cli/_services/_up.py @@ -43,10 +43,8 @@ def on_cmd_call( responses = [] success = True - with self._presenter.show_progress( - resolved_services, label="Starting" - ) as progress: - for service in progress: + with self._presenter.progress_spinner("Starting"): + for service in resolved_services: try: service.up() diff --git a/src/orquestra/sdk/_base/cli/_task/_logs.py b/src/orquestra/sdk/_base/cli/_task/_logs.py index f83ae7765..8870eb712 100644 --- a/src/orquestra/sdk/_base/cli/_task/_logs.py +++ b/src/orquestra/sdk/_base/cli/_task/_logs.py @@ -21,7 +21,8 @@ class Action: def __init__( self, - presenter=_presenters.WrappedCorqOutputPresenter(), + logs_presenter=_presenters.LogsPresenter(), + error_presenter=_presenters.WrappedCorqOutputPresenter(), dumper=_dumpers.LogsDumper(), wf_run_repo=_repos.WorkflowRunRepo(), config_resolver: t.Optional[_arg_resolvers.WFConfigResolver] = None, @@ -43,14 +44,15 @@ def __init__( ) # output - self._presenter = presenter + self._logs_presenter = logs_presenter + self._error_presenter = error_presenter self._dumper = dumper def on_cmd_call(self, *args, **kwargs): try: self._on_cmd_call_with_exceptions(*args, **kwargs) except Exception as e: - self._presenter.show_error(e) + self._error_presenter.show_error(e) def _on_cmd_call_with_exceptions( self, @@ -85,6 +87,6 @@ def _on_cmd_call_with_exceptions( wf_run_id=resolved_wf_run_id, dir_path=download_dir, ) - self._presenter.show_dumped_wf_logs(dump_path) + self._logs_presenter.show_dumped_wf_logs(dump_path) else: - self._presenter.show_logs(logs) + self._logs_presenter.show_logs(logs) diff --git a/src/orquestra/sdk/_base/cli/_ui/_presenters.py b/src/orquestra/sdk/_base/cli/_ui/_presenters.py index 916e9c3a9..fac04ad97 100644 --- a/src/orquestra/sdk/_base/cli/_ui/_presenters.py +++ b/src/orquestra/sdk/_base/cli/_ui/_presenters.py @@ -6,83 +6,55 @@ mostly adapters over the corq's formatters. """ import os -import pprint import sys import typing as t import webbrowser from contextlib import contextmanager from functools import singledispatchmethod from pathlib import Path -from typing import Iterable, Iterator, List, Sequence +from typing import Optional, Sequence import click +from rich.box import SIMPLE_HEAVY +from rich.console import Console, Group, RenderableType +from rich.live import Live +from rich.pretty import Pretty +from rich.rule import Rule +from rich.spinner import Spinner +from rich.table import Column, Table from tabulate import tabulate -from orquestra.sdk._base import _dates, _env, _services, serde +from orquestra.sdk._base import _dates, _env, serde from orquestra.sdk._base._dates import Instant from orquestra.sdk._base._logs._interfaces import LogOutput, WorkflowLogs from orquestra.sdk.schema import responses from orquestra.sdk.schema.configs import ConfigName, RuntimeConfiguration, RuntimeName from orquestra.sdk.schema.ir import ArtifactFormat -from orquestra.sdk.schema.workflow_run import ( - TaskInvocationId, - WorkflowRun, - WorkflowRunId, - WorkflowRunOnlyID, -) +from orquestra.sdk.schema.workflow_run import TaskInvocationId, WorkflowRunId from . import _errors from . import _models as ui_models -from ._corq_format import per_command -class WrappedCorqOutputPresenter: - """ - Uses corq's responses and formatters for pretty-printing dorq data. - """ - - def show_wf_runs_list(self, wf_runs: List[WorkflowRun]): - resp = responses.GetWorkflowRunResponse( - meta=responses.ResponseMetadata( - success=True, - code=responses.ResponseStatusCode.OK, - message="Success", - ), - workflow_runs=wf_runs, - ) - per_command.pretty_print_response(resp, project_dir=None) - - def show_submitted_wf_run(self, wf_run_id: WorkflowRunId): - resp = responses.SubmitWorkflowDefResponse( - meta=responses.ResponseMetadata( - success=True, - code=responses.ResponseStatusCode.OK, - message="Success", - ), - workflow_runs=[WorkflowRunOnlyID(id=wf_run_id)], - ) - per_command.pretty_print_response(resp, project_dir=None) +class RichPresenter: + def __init__(self, console: Optional[Console] = None): + self._console = console or Console() - def show_stopped_wf_run(self, wf_run_id: WorkflowRunId): - click.echo(f"Workflow run {wf_run_id} stopped.") + @contextmanager + def progress_spinner(self, spinner_label: str = "Loading"): + with Live( + Spinner("dots", spinner_label), console=self._console, transient=True + ) as live: + yield live - def show_dumped_wf_logs( - self, path: Path, log_type: t.Optional[WorkflowLogs.WorkflowLogTypeName] = None - ): - """ - Tell the user where logs have been saved. - Args: - path: The path to the dump file. - log_type: additional information identify the type of logs saved. - """ - click.echo( - f"Workflow {log_type.value + ' ' if log_type else ''}logs saved at {path}" - ) +class LogsPresenter(RichPresenter): + """ + Present workflow and task logs + """ @singledispatchmethod - @staticmethod - def _format_logs(*args) -> t.List[str]: + def _rich_logs(*args) -> RenderableType: """ Format the logs into a list of strings to be printed. """ @@ -90,29 +62,30 @@ def _format_logs(*args) -> t.List[str]: f"No log lines constructor for args {args}" ) # pragma: no cover - @_format_logs.register(dict) + @_rich_logs.register(dict) @staticmethod - def _(logs: dict) -> t.List[str]: - log_lines = [] - for invocation_id, invocation_logs in logs.items(): - log_lines.append(f"task-invocation-id: {invocation_id}") - log_lines.extend(WrappedCorqOutputPresenter._format_logs(invocation_logs)) - return log_lines + def _(logs: dict) -> RenderableType: + from rich.console import Group - @_format_logs.register(list) - @staticmethod - def _(logs: list) -> t.List[str]: - return logs + renderables = [] + for invocation_id, invocation_logs in logs.items(): + renderables.append( + Group( + f"[bold]{invocation_id}[/bold]", + LogsPresenter._rich_logs(invocation_logs), + ) + ) + return Group(*renderables) - @_format_logs.register(LogOutput) + @_rich_logs.register(LogOutput) @staticmethod - def _(logs: LogOutput) -> t.List[str]: - output = [] + def _(logs: LogOutput) -> RenderableType: + table = Table("Stream", "Content", show_header=False, box=SIMPLE_HEAVY) if len(logs.out) > 0: - output.extend(["stdout:", *logs.out]) + table.add_row("[bold blue]stdout[/bold blue]", "\n".join(logs.out)) if len(logs.err) > 0: - output.extend(["stderr:", *logs.err]) - return output + table.add_row("[bold red]stderr[/bold red]", "\n".join(logs.err)) + return table def show_logs( self, @@ -121,35 +94,60 @@ def show_logs( ): """ Present logs to the user. - """ - _logs = self._format_logs(logs) - resp = responses.GetLogsResponse( - meta=responses.ResponseMetadata( - success=True, - code=responses.ResponseStatusCode.OK, - message="Successfully got workflow run logs.", - ), - logs=_logs, - ) + Args: + logs: The logs to display, this may be in a dictionary or as a plain + LogOutput object + log_type: An optional name used to split multiple log types + """ + _logs = self._rich_logs(logs) + renderables = [_logs] if log_type: _log_type = f"{log_type.value} logs".replace("_", " ") - click.echo(f"=== {_log_type.upper()} " + "=" * (75 - len(_log_type))) - per_command.pretty_print_response(resp, project_dir=None) - if log_type: - click.echo("=" * 80 + "\n\n") + renderables.insert(0, Rule(_log_type, align="left")) + renderables.append(Rule()) + self._console.print(Group(*renderables)) + + def show_dumped_wf_logs( + self, path: Path, log_type: t.Optional[WorkflowLogs.WorkflowLogTypeName] = None + ): + """ + Tell the user where logs have been saved. + + Args: + path: The path to the dump file. + log_type: additional information identify the type of logs saved. + """ + self._console.print( + f"Workflow {log_type.value + ' ' if log_type else ''}" + f"logs saved at [bold]{path}[/bold]" + ) + + +class WrappedCorqOutputPresenter: + """ + Uses corq's responses and formatters for pretty-printing dorq data. + """ + + def show_stopped_wf_run(self, wf_run_id: WorkflowRunId): + click.echo(f"Workflow run {wf_run_id} stopped.") def show_error(self, exception: Exception): status_code = _errors.pretty_print_exception(exception) - sys.exit(status_code.value) def show_message(self, message: str): click.echo(message=message) -class ArtifactPresenter: +class ArtifactPresenter(RichPresenter): + def _values_table(self, values: t.Sequence[t.Any]) -> Table: + table = Table("Index", "Type", "Pretty Printed", box=SIMPLE_HEAVY) + for i, value in enumerate(values): + table.add_row(str(i), f"{type(value)}", Pretty(value)) + return table + def show_task_outputs( self, values: t.Sequence[t.Any], @@ -162,16 +160,11 @@ def show_task_outputs( Args: values: plain, deserialized artifact values. """ - click.echo( - f"In workflow {wf_run_id}, task invocation {task_inv_id} produced " - f"{len(values)} outputs." + header = ( + f"In workflow {wf_run_id}, task invocation {task_inv_id} " + f"produced {len(values)} outputs." ) - - for value_i, value in enumerate(values): - click.echo() - click.echo(f"Output {value_i}. Object type: {type(value)}") - click.echo("Pretty printed value:") - click.echo(pprint.pformat(value)) + self._console.print(Group(header, self._values_table(values))) def show_workflow_outputs( self, values: t.Sequence[t.Any], wf_run_id: WorkflowRunId @@ -182,13 +175,8 @@ def show_workflow_outputs( Args: values: plain, deserialized artifact values. """ - click.echo(f"Workflow run {wf_run_id} has {len(values)} outputs.") - - for value_i, value in enumerate(values): - click.echo() - click.echo(f"Output {value_i}. Object type: {type(value)}") - click.echo("Pretty printed value:") - click.echo(pprint.pformat(value)) + header = f"Workflow run {wf_run_id} has {len(values)} outputs." + self._console.print(Group(header, self._values_table(values))) def show_dumped_artifact(self, dump_details: serde.DumpDetails): """ @@ -207,46 +195,29 @@ def show_dumped_artifact(self, dump_details: serde.DumpDetails): else: format_name = dump_details.format.name - click.echo(f"Artifact saved at {dump_details.file_path} " f"as {format_name}.") - - -class ServicePresenter: - @contextmanager - def show_progress( - self, services: Sequence[_services.Service], *, label: str - ) -> Iterator[Iterable[_services.Service]]: - """ - Starts a progress bar on the context enter. + self._console.print( + f"Artifact saved at {dump_details.file_path} " f"as {format_name}." + ) - Yields an iterable of services; when you iterate over it, the progress bar is - advanced. - """ - with click.progressbar( - services, - show_eta=False, - item_show_func=lambda svc: f"{label} {svc.name}" - if svc is not None - else None, - ) as bar: - yield bar +class ServicePresenter(RichPresenter): def show_services(self, services: Sequence[responses.ServiceResponse]): - click.echo( - tabulate( - [ - [ - click.style(svc.name, bold=True), - click.style("Running", fg="green") - if svc.is_running - else click.style("Not Running", fg="red"), - svc.info, - ] - for svc in services - ], - colalign=("right",), - tablefmt="plain", - ), + status_table = Table( + Column("Service", style="bold"), + Column("Status"), + Column("Info", style="blue"), + box=SIMPLE_HEAVY, + show_header=False, ) + for svc in services: + status_table.add_row( + svc.name, + "[green]Running[/green]" + if svc.is_running + else "[red]Not Running[/red]", + svc.info or "", + ) + self._console.print(status_table) def show_failure(self, service_responses: Sequence[responses.ServiceResponse]): self.show_services(service_responses) @@ -346,58 +317,70 @@ def _format_tasks_succeeded(summary: ui_models.WFRunSummary) -> str: return f"{summary.n_tasks_succeeded} / {summary.n_task_invocations_total}" -class WFRunPresenter: - def show_wf_run(self, summary: ui_models.WFRunSummary): - click.echo("Workflow overview") - click.echo( - tabulate( - [ - ["workflow def name", summary.wf_def_name], - ["run ID", summary.wf_run_id], - ["status", summary.wf_run_status.state.name], - [ - "start time", - _format_datetime(summary.wf_run_status.start_time), - ], - [ - "end time", - _format_datetime(summary.wf_run_status.end_time), - ], - ["tasks succeeded", _format_tasks_succeeded(summary)], - ] - ) +class WFRunPresenter(RichPresenter): + def show_submitted_wf_run(self, wf_run_id: WorkflowRunId): + self._console.print( + f"[green]Workflow Submitted![/green] Run ID: [bold]{wf_run_id}[/bold]" ) - click.echo() - task_rows = [ - ["function", "invocation ID", "status", "start_time", "end_time", "message"] - ] + def get_wf_run(self, summary: ui_models.WFRunSummary): + summary_table = Table( + Column(style="bold", justify="right"), + Column(), + show_header=False, + box=SIMPLE_HEAVY, + ) + summary_table.add_row("Workflow Def Name", summary.wf_def_name) + summary_table.add_row("Run ID", summary.wf_run_id) + summary_table.add_row("Status", summary.wf_run_status.state.name) + summary_table.add_row( + "Start Time", _format_datetime(summary.wf_run_status.start_time) + ) + summary_table.add_row( + "End Time", _format_datetime(summary.wf_run_status.end_time) + ) + summary_table.add_row("Succeeded Tasks", _format_tasks_succeeded(summary)) + task_details = Table( + "Function", + "Invocation ID", + "Status", + "Start Time", + "End Time", + "Message", + box=SIMPLE_HEAVY, + ) for task_row in summary.task_rows: - task_rows.append( - [ - task_row.task_fn_name, - task_row.inv_id, - task_row.status.state.value, - _format_datetime(task_row.status.start_time), - _format_datetime(task_row.status.end_time), - task_row.message or "", - ] + task_details.add_row( + task_row.task_fn_name, + task_row.inv_id, + task_row.status.state.name, + _format_datetime(task_row.status.start_time), + _format_datetime(task_row.status.end_time), + task_row.message, ) - click.echo("Task details") - click.echo(tabulate(task_rows, headers="firstrow")) + title = Rule("Workflow Overview", align="left") + task_title = Rule("Task Details", align="left") + return Group(title, summary_table, task_title, task_details) + + def show_wf_run(self, summary: ui_models.WFRunSummary): + self._console.print(self.get_wf_run(summary)) def show_wf_list(self, summary: ui_models.WFList): - rows = [["Workflow Run ID", "Status", "Tasks Succeeded", "Start Time"]] - for model_row in summary.wf_rows: - rows.append( - [ - model_row.workflow_run_id, - model_row.status, - model_row.tasks_succeeded, - _format_datetime(model_row.start_time), - ] + table = Table( + "Workflow Run ID", + "Status", + "Succeeded Tasks", + "Start Time", + box=SIMPLE_HEAVY, + ) + for run in summary.wf_rows: + table.add_row( + run.workflow_run_id, + run.status, + run.tasks_succeeded, + _format_datetime(run.start_time), ) - click.echo(tabulate(rows, headers="firstrow")) + self._console.print(table) class PromptPresenter: diff --git a/src/orquestra/sdk/_base/cli/_workflow/_logs.py b/src/orquestra/sdk/_base/cli/_workflow/_logs.py index 60fe4e58f..69bcf9cec 100644 --- a/src/orquestra/sdk/_base/cli/_workflow/_logs.py +++ b/src/orquestra/sdk/_base/cli/_workflow/_logs.py @@ -23,7 +23,8 @@ class Action: def __init__( self, - presenter=_presenters.WrappedCorqOutputPresenter(), + logs_presenter=_presenters.LogsPresenter(), + error_presenter=_presenters.WrappedCorqOutputPresenter(), dumper=_dumpers.LogsDumper(), wf_run_repo=_repos.WorkflowRunRepo(), config_resolver: t.Optional[_arg_resolvers.WFConfigResolver] = None, @@ -41,7 +42,8 @@ def __init__( ) # output - self._presenter = presenter + self._logs_presenter = logs_presenter + self._error_presenter = error_presenter self._dumper = dumper def on_cmd_call( @@ -65,7 +67,7 @@ def on_cmd_call( other=other, ) except Exception as e: - self._presenter.show_error(e) + self._error_presenter.show_error(e) def _on_cmd_call_with_exceptions( self, @@ -114,6 +116,6 @@ def _on_cmd_call_with_exceptions( log_type=log_type, ) - self._presenter.show_dumped_wf_logs(dump_path, log_type=log_type) + self._logs_presenter.show_dumped_wf_logs(dump_path, log_type=log_type) else: - self._presenter.show_logs(log, log_type=log_type) + self._logs_presenter.show_logs(log, log_type=log_type) diff --git a/src/orquestra/sdk/_base/cli/_workflow/_submit.py b/src/orquestra/sdk/_base/cli/_workflow/_submit.py index 0efca1256..d5757fde0 100644 --- a/src/orquestra/sdk/_base/cli/_workflow/_submit.py +++ b/src/orquestra/sdk/_base/cli/_workflow/_submit.py @@ -23,7 +23,8 @@ class Action: def __init__( self, prompter=_prompts.Prompter(), - presenter=_presenters.WrappedCorqOutputPresenter(), + submit_presenter=_presenters.WFRunPresenter(), + error_presenter=_presenters.WrappedCorqOutputPresenter(), wf_def_repo=_repos.WorkflowDefRepo(), wf_run_repo=_repos.WorkflowRunRepo(), config_resolver: t.Optional[_arg_resolvers.ConfigResolver] = None, @@ -31,7 +32,8 @@ def __init__( ): # text IO self._prompter = prompter - self._presenter = presenter + self._submit_presenter = submit_presenter + self._error_presenter = error_presenter # data sources self._wf_run_repo = wf_run_repo @@ -57,7 +59,7 @@ def on_cmd_call( module, name, config, workspace_id, project_id, force ) except Exception as e: - self._presenter.show_error(e) + self._error_presenter.show_error(e) def _on_cmd_call_with_exceptions( self, @@ -145,4 +147,4 @@ def _on_cmd_call_with_exceptions( # abort return - self._presenter.show_submitted_wf_run(wf_run_id) + self._submit_presenter.show_submitted_wf_run(wf_run_id) diff --git a/tests/cli/services/test_down.py b/tests/cli/services/test_down.py new file mode 100644 index 000000000..a210abd36 --- /dev/null +++ b/tests/cli/services/test_down.py @@ -0,0 +1,101 @@ +################################################################################ +# © Copyright 2023 Zapata Computing Inc. +################################################################################ +""" +Unit tests for ``orq services up`` CLI action. +""" +import inspect +import subprocess +from unittest.mock import create_autospec + +import pytest + +from orquestra.sdk._base import _services +from orquestra.sdk._base.cli import _arg_resolvers +from orquestra.sdk._base.cli._services import _down +from orquestra.sdk._base.cli._ui import _presenters +from orquestra.sdk.schema.responses import ServiceResponse + + +class TestAction: + """ + Test boundary:: + + [_down.Action]->[Prompter] + """ + + class TestPassingAllValues: + @staticmethod + @pytest.fixture + def service(): + service = create_autospec(_services.Service) + service.name = "testing" + service.is_running.return_value = False + + return service + + @staticmethod + @pytest.fixture + def action(service): + service_resolver = create_autospec(_arg_resolvers.ServiceResolver) + service_resolver.resolve.return_value = [service] + presenter = create_autospec(_presenters.ServicePresenter) + + action = _down.Action( + presenter=presenter, + service_resolver=service_resolver, + ) + + return action + + @staticmethod + def test_success(service: _services.Service, action): + # When + action.on_cmd_call(manage_ray=None, manage_all=None) + + # Then + action._presenter.progress_spinner.assert_called_with("Stopping") + action._presenter.show_services.assert_called_with( + services=[ + ServiceResponse(name=service.name, is_running=False, info=None) + ] + ) + + @staticmethod + def test_failure(service, action: _down.Action): + # Given + service.down.side_effect = subprocess.CalledProcessError( + returncode=1, + cmd=[ + "ray", + "stop", + ], + output=b"", + stderr=inspect.cleandoc( + """ + Could not terminate `...` due to ... + """ + ).encode(), + ) + + # When + action.on_cmd_call(manage_ray=None, manage_all=None) + + # Then + action._presenter.show_failure.assert_called_with( + [ + ServiceResponse( + name=service.name, + is_running=True, + info=inspect.cleandoc( + """ + command: + ['ray', 'stop'] + stdout: + stderr: + Could not terminate `...` due to ... + """ # noqa: E501 + ), + ) + ] + ) diff --git a/tests/cli/services/test_up.py b/tests/cli/services/test_up.py index 59d64b1b9..7ed615488 100644 --- a/tests/cli/services/test_up.py +++ b/tests/cli/services/test_up.py @@ -6,7 +6,6 @@ """ import inspect import subprocess -from contextlib import contextmanager from unittest.mock import create_autospec import pytest @@ -39,13 +38,7 @@ def service(): def action(service): service_resolver = create_autospec(_arg_resolvers.ServiceResolver) service_resolver.resolve.return_value = [service] - - @contextmanager - def progress_ctx(self, label): - yield [service] - presenter = create_autospec(_presenters.ServicePresenter) - presenter.show_progress = progress_ctx action = _up.Action( presenter=presenter, @@ -60,6 +53,7 @@ def test_success(service: _services.Service, action): action.on_cmd_call(manage_ray=None, manage_all=None) # Then + action._presenter.progress_spinner.assert_called_with("Starting") action._presenter.show_services.assert_called_with( services=[ ServiceResponse(name=service.name, is_running=True, info="Started!") diff --git a/tests/cli/task/test_task_logs.py b/tests/cli/task/test_task_logs.py index 4802fe68d..c00fad9e0 100644 --- a/tests/cli/task/test_task_logs.py +++ b/tests/cli/task/test_task_logs.py @@ -6,7 +6,7 @@ """ from pathlib import Path -from unittest.mock import create_autospec +from unittest.mock import Mock, create_autospec import pytest @@ -18,7 +18,10 @@ from orquestra.sdk._base.cli._dumpers import LogsDumper from orquestra.sdk._base.cli._repos import WorkflowRunRepo from orquestra.sdk._base.cli._task import _logs -from orquestra.sdk._base.cli._ui._presenters import WrappedCorqOutputPresenter +from orquestra.sdk._base.cli._ui._presenters import ( + LogsPresenter, + WrappedCorqOutputPresenter, +) class TestAction: @@ -44,7 +47,8 @@ def action(): resolved_invocation_id = "" # Mocks - presenter = create_autospec(WrappedCorqOutputPresenter) + logs_presenter = create_autospec(LogsPresenter) + error_presenter = create_autospec(WrappedCorqOutputPresenter) dumper = create_autospec(LogsDumper) dumped_path = "" @@ -65,7 +69,8 @@ def action(): task_inv_id_resolver.resolve.return_value = resolved_invocation_id return _logs.Action( - presenter=presenter, + logs_presenter=logs_presenter, + error_presenter=error_presenter, dumper=dumper, wf_run_repo=wf_run_repo, config_resolver=config_resolver, @@ -94,7 +99,7 @@ def test_no_download_dir(action): # Then # We should pass input CLI args to config resolver. - action._presenter.show_error.assert_not_called() + action._error_presenter.show_error.assert_not_called() action._config_resolver.resolve.assert_called_with(wf_run_id, config) @@ -119,7 +124,7 @@ def test_no_download_dir(action): # We expect printing the workflow run returned from the repo. logs = action._wf_run_repo.get_task_logs.return_value - action._presenter.show_logs.assert_called_with(logs) + action._logs_presenter.show_logs.assert_called_with(logs) # We don't expect any dumps. assert action._dumper.dump.mock_calls == [] @@ -145,7 +150,7 @@ def test_download_dir_passed(action): # Then # We should pass input CLI args to config resolver. - action._presenter.show_error.assert_not_called() + action._error_presenter.show_error.assert_not_called() action._config_resolver.resolve.assert_called_with(wf_run_id, config) # We should pass resolved_config to run ID resolver. @@ -168,7 +173,7 @@ def test_download_dir_passed(action): ) # Do not print logs to stdout - action._presenter.show_logs.assert_not_called() + action._logs_presenter.show_logs.assert_not_called() # Expect dumping logs to the FS. logs = action._wf_run_repo.get_task_logs.return_value @@ -177,4 +182,31 @@ def test_download_dir_passed(action): ) dumped_path = action._dumper.dump.return_value - action._presenter.show_dumped_wf_logs.assert_called_with(dumped_path) + action._logs_presenter.show_dumped_wf_logs.assert_called_with(dumped_path) + + @staticmethod + def test_failure(action, monkeypatch): + # Given + # CLI inputs + wf_run_id = "" + config = "" + download_dir = Path("/tmp/my/awesome/dir") + task_inv_id = "" + fn_name = "" + exception = Exception("") + monkeypatch.setattr( + action, "_on_cmd_call_with_exceptions", Mock(side_effect=exception) + ) + + # When + action.on_cmd_call( + wf_run_id=wf_run_id, + config=config, + download_dir=download_dir, + fn_name=fn_name, + task_inv_id=task_inv_id, + ) + + # Then + # We should pass input CLI args to config resolver. + action._error_presenter.show_error.assert_called_with(exception) diff --git a/tests/cli/ui/data/list_wf_runs.txt b/tests/cli/ui/data/list_wf_runs.txt new file mode 100644 index 000000000..f5f678c25 --- /dev/null +++ b/tests/cli/ui/data/list_wf_runs.txt @@ -0,0 +1,6 @@ + + Workflow Run ID Status Succeeded Tasks Start Time + ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + wf1 mocked status x/y Fri Feb 24 08:26:07 2023 + wf2 mocked status x/y + diff --git a/tests/cli/ui/data/wf_runs/running.txt b/tests/cli/ui/data/wf_runs/running.txt index 8dfbb8c78..d71b75006 100644 --- a/tests/cli/ui/data/wf_runs/running.txt +++ b/tests/cli/ui/data/wf_runs/running.txt @@ -1,15 +1,16 @@ -Workflow overview ------------------ ------------------------ -workflow def name hello_orq -run ID wf.1 -status RUNNING -start time Fri Feb 24 08:26:07 2023 -end time -tasks succeeded 1 / 2 ------------------ ------------------------ - -Task details -function invocation ID status start_time end_time message -------------- --------------- --------- ------------------------ ------------------------ --------- -generate_data inv-1-gen-dat SUCCEEDED Fri Feb 24 08:26:07 2023 Fri Feb 24 08:26:07 2023 -train_model inv-2-tra-mod RUNNING Fri Feb 24 08:26:07 2023 +Workflow Overview ────────────────────────────────────────────────────────────────────────────────────────────────────── + + Workflow Def Name hello_orq + Run ID wf.1 + Status RUNNING + Start Time Fri Feb 24 08:26:07 2023 + End Time + Succeeded Tasks 1 / 2 + +Task Details ─────────────────────────────────────────────────────────────────────────────────────────────────────────── + + Function Invocation ID Status Start Time End Time Message + ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + generate_data inv-1-gen-dat SUCCEEDED Fri Feb 24 08:26:07 2023 Fri Feb 24 08:26:07 2023 + train_model inv-2-tra-mod RUNNING Fri Feb 24 08:26:07 2023 + diff --git a/tests/cli/ui/data/wf_runs/waiting.txt b/tests/cli/ui/data/wf_runs/waiting.txt index 2f486462d..fc85ec45c 100644 --- a/tests/cli/ui/data/wf_runs/waiting.txt +++ b/tests/cli/ui/data/wf_runs/waiting.txt @@ -1,13 +1,14 @@ -Workflow overview ------------------ ------------------------ -workflow def name hello_orq -run ID wf.1 -status WAITING -start time Fri Feb 24 08:26:07 2023 -end time -tasks succeeded 0 / 21 ------------------ ------------------------ - -Task details -function invocation ID status start_time end_time message ----------- --------------- -------- ------------ ---------- --------- +Workflow Overview ────────────────────────────────────────────────────────────────────────────────────────────────────── + + Workflow Def Name hello_orq + Run ID wf.1 + Status WAITING + Start Time Fri Feb 24 08:26:07 2023 + End Time + Succeeded Tasks 0 / 21 + +Task Details ─────────────────────────────────────────────────────────────────────────────────────────────────────────── + + Function Invocation ID Status Start Time End Time Message + ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + diff --git a/tests/cli/ui/test_presenters.py b/tests/cli/ui/test_presenters.py index b37041940..5bbf175fc 100644 --- a/tests/cli/ui/test_presenters.py +++ b/tests/cli/ui/test_presenters.py @@ -4,10 +4,12 @@ import sys import typing as t from datetime import datetime, timezone +from io import StringIO from pathlib import Path from unittest.mock import Mock, create_autospec import pytest +from rich.console import Console from orquestra import sdk from orquestra.sdk._base import serde @@ -17,7 +19,6 @@ from orquestra.sdk._base.cli._ui import _errors from orquestra.sdk._base.cli._ui import _models as ui_models from orquestra.sdk._base.cli._ui import _presenters -from orquestra.sdk._base.cli._ui._corq_format import per_command from orquestra.sdk.schema.configs import RuntimeConfiguration from orquestra.sdk.schema.ir import ArtifactFormat from orquestra.sdk.schema.responses import ResponseStatusCode, ServiceResponse @@ -41,261 +42,262 @@ def sys_exit_mock(monkeypatch): return exit_mock -class TestWrappedCorqOutputPresenter: - class TestPassingDataToCorq: - """ - Tests WrappedCorqOutputPresenter's methods that delegate formatting outputs to - the older, corq formatters. - """ +CONSOLE_WIDTH = 120 - @staticmethod - def test_show_submitted_wf_run(monkeypatch): - # Given - pretty_print_mock = Mock() - monkeypatch.setattr(per_command, "pretty_print_response", pretty_print_mock) - - wf_run_id = "wf.1" - presenter = _presenters.WrappedCorqOutputPresenter() - - # When - presenter.show_submitted_wf_run(wf_run_id) +@pytest.fixture +def test_console(): + return Console(file=StringIO(), width=CONSOLE_WIDTH) - # Then - called_args = pretty_print_mock.call_args.args - response_model = called_args[0] - assert response_model.workflow_runs[0].id == wf_run_id - @staticmethod - def test_show_logs_with_dict(monkeypatch): - # Given - pretty_print_mock = Mock() - monkeypatch.setattr(per_command, "pretty_print_response", pretty_print_mock) - task_invocation = "my_task_invocation" - task_logs = LogOutput(out=["my_log"], err=[]) - logs = {task_invocation: task_logs} - - presenter = _presenters.WrappedCorqOutputPresenter() +class TestLogsPresenter: + @staticmethod + @pytest.fixture + def rule(): + def _inner(prefix: t.Optional[str] = None): + line = "─" * CONSOLE_WIDTH + if prefix is None: + return line + return f"{prefix} {line[len(prefix)+1:]}" - # When - presenter.show_logs(logs) + return _inner - # Then - called_args = pretty_print_mock.call_args.args - response_model = called_args[0] - assert "stdout:" in response_model.logs - assert task_logs.out[0] in response_model.logs - assert task_invocation in response_model.logs[0] + @staticmethod + def test_show_logs_with_dict(test_console: Console): + # Given + task_invocation = "my_task_invocation" + task_logs = LogOutput(out=["my_log"], err=[]) + logs = {task_invocation: task_logs} - @staticmethod - def test_show_logs_with_logoutput(monkeypatch): - # Given - pretty_print_mock = Mock() - monkeypatch.setattr(per_command, "pretty_print_response", pretty_print_mock) - logs = LogOutput(out=["my_log"], err=[]) + presenter = _presenters.LogsPresenter(console=test_console) - presenter = _presenters.WrappedCorqOutputPresenter() + # When + presenter.show_logs(logs) - # When - presenter.show_logs(logs) + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + assert "my_task_invocation" in output + assert "stdout" in output + assert "my_log" in output - # Then - called_args = pretty_print_mock.call_args.args - response_model = called_args[0] - assert "stdout:" in response_model.logs - assert logs.out[0] in response_model.logs + @staticmethod + def test_show_logs_with_logoutput(test_console: Console): + # Given + logs = LogOutput(out=["my_log"], err=[]) + presenter = _presenters.LogsPresenter(console=test_console) - @staticmethod - def test_print_stdout_when_available(monkeypatch): - # Given - pretty_print_mock = Mock() - monkeypatch.setattr(per_command, "pretty_print_response", pretty_print_mock) - logs = LogOutput(out=["my_log"], err=[]) + # When + presenter.show_logs(logs) - presenter = _presenters.WrappedCorqOutputPresenter() + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + assert "stdout" in output + assert "my_log" in output - # When - presenter.show_logs(logs) + @staticmethod + def test_print_stdout_when_available(test_console: Console): + # Given + logs = LogOutput(out=["my_log"], err=[]) + presenter = _presenters.LogsPresenter(console=test_console) - # Then - called_args = pretty_print_mock.call_args.args - response_model = called_args[0] - assert "stdout:" in response_model.logs - assert "stderr:" not in response_model.logs + # When + presenter.show_logs(logs) - @staticmethod - def test_print_stderr_when_available(monkeypatch): - # Given - pretty_print_mock = Mock() - monkeypatch.setattr(per_command, "pretty_print_response", pretty_print_mock) - logs = LogOutput(out=[], err=["my_log"]) + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + assert "stdout" in output + assert "stderr" not in output - presenter = _presenters.WrappedCorqOutputPresenter() + @staticmethod + def test_print_stderr_when_available(test_console: Console): + # Given + logs = LogOutput(out=[], err=["my_log"]) + presenter = _presenters.LogsPresenter(console=test_console) - # When - presenter.show_logs(logs) + # When + presenter.show_logs(logs) - # Then - called_args = pretty_print_mock.call_args.args - response_model = called_args[0] - assert "stdout:" not in response_model.logs - assert "stderr:" in response_model.logs + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + assert "stdout" not in output + assert "stderr" in output - @staticmethod - def test_print_both_when_available(monkeypatch): - # Given - pretty_print_mock = Mock() - monkeypatch.setattr(per_command, "pretty_print_response", pretty_print_mock) - logs = LogOutput(out=["my_log"], err=["my_log"]) + @staticmethod + def test_print_both_when_available(test_console: Console): + # Given + logs = LogOutput(out=["my_log"], err=["my_log"]) + presenter = _presenters.LogsPresenter(console=test_console) - presenter = _presenters.WrappedCorqOutputPresenter() + # When + presenter.show_logs(logs) - # When - presenter.show_logs(logs) + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + assert "stdout" in output + assert "stderr" in output - # Then - called_args = pretty_print_mock.call_args.args - response_model = called_args[0] - assert "stdout:" in response_model.logs - assert "stderr:" in response_model.logs + @staticmethod + def test_show_dumped_wf_logs(test_console: Console): + # Given + dummy_path: Path = Path("/my/cool/path") + presenter = _presenters.LogsPresenter(console=test_console) - class TestPrinting: - """ - Tests WrappedCorqOutputPresenter's methods that print outputs directly. - """ + # When + presenter.show_dumped_wf_logs(dummy_path) - @staticmethod - def test_stopped_wf_run(capsys): - # Given - wf_run_id = "wf.1" - presenter = _presenters.WrappedCorqOutputPresenter() + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + assert f"Workflow logs saved at {dummy_path}" in output - # When - presenter.show_stopped_wf_run(wf_run_id) + @staticmethod + def test_with_mapped_logs(test_console, rule: t.Callable[..., str]): + # Given + logs = { + "": LogOutput( + out=[ + "", + "", + ], + err=[], + ) + } + presenter = _presenters.LogsPresenter(console=test_console) - # Then - captured = capsys.readouterr() - assert f"Workflow run {wf_run_id} stopped" in captured.out + # When + presenter.show_logs(logs) - @staticmethod - def test_show_dumped_wf_logs(capsys): - # Given - dummy_path: Path = Path("/my/cool/path") - presenter = _presenters.WrappedCorqOutputPresenter() + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + for line in [ + "", + "stdout", + "", + "", + ]: + assert line in output + assert rule() not in output - # When - presenter.show_dumped_wf_logs(dummy_path) + @staticmethod + def test_with_logoutput_logs(test_console: Console): + # Given + logs = LogOutput(out=["", ""], err=[]) + presenter = _presenters.LogsPresenter(console=test_console) - # Then - captured = capsys.readouterr() - assert f"Workflow logs saved at {dummy_path}" in captured.out + # When + presenter.show_logs(logs) - class TestShowLogs: - @staticmethod - def test_with_mapped_logs(capsys): - # Given - logs = { - "": LogOutput( - out=[ - "", - "", - ], - err=[], - ) - } - presenter = _presenters.WrappedCorqOutputPresenter() + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + " ", + " stdout ", + " ", + " ", + "", + ] + ) + assert output == expected - # When - presenter.show_logs(logs) + @staticmethod + def test_with_log_type(test_console, rule: t.Callable[..., str]): + # Given + logs = LogOutput(out=["", ""], err=[]) + log_type = Mock(value="") + presenter = _presenters.LogsPresenter(console=test_console) - # Then - captured = capsys.readouterr() - for line in [ - "task-invocation-id: ", - "stdout:", - "", - "", - ]: - assert line in captured.out - assert "=" * 80 not in captured.out + # When + presenter.show_logs(logs, log_type=log_type) - @staticmethod - def test_with_logoutput_logs(capsys): - # Given - logs = LogOutput( - out=["", ""], err=[] - ) - presenter = _presenters.WrappedCorqOutputPresenter() + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + rule(" logs"), + " ", + " stdout ", + " ", + " ", + rule(), + "", + ] + ) + assert expected == output - # When - presenter.show_logs(logs) + @staticmethod + def test_stderr_output(test_console: Console): + # Given + logs = LogOutput(out=[], err=["", ""]) + presenter = _presenters.LogsPresenter(console=test_console) - # Then - captured = capsys.readouterr() - assert ( - "stdout:\n\n\n" - in captured.out - ) - assert "=" * 80 not in captured.out + # When + presenter.show_logs(logs) - @staticmethod - def test_with_log_type(capsys): - # Given - logs = LogOutput( - out=["", ""], err=[] - ) - log_type = Mock(value="") - presenter = _presenters.WrappedCorqOutputPresenter() + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + " ", + " stderr ", + " ", + " ", + "", + ] + ) + assert output == expected - # When - presenter.show_logs(logs, log_type=log_type) + @staticmethod + def test_both_output(test_console: Console): + # Given + logs = LogOutput(out=[""], err=[""]) + presenter = _presenters.LogsPresenter(console=test_console) - # Then - captured = capsys.readouterr() - for line in [ - "=== LOGS ===================================================", # noqa: E501 - "stdout:", - "", - "", - "================================================================================", # noqa: E501 - ]: - assert line in captured.out + # When + presenter.show_logs(logs) - @staticmethod - def test_stderr_output(capsys): - # Given - logs = LogOutput( - out=[], err=["", ""] - ) - presenter = _presenters.WrappedCorqOutputPresenter() + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + " ", + " stdout ", + " stderr ", + " ", + "", + ] + ) + assert output == expected - # When - presenter.show_logs(logs) - # Then - captured = capsys.readouterr() - assert ( - "stderr:\n\n\n" - in captured.out - ) +class TestWrappedCorqOutputPresenter: + class TestPrinting: + """ + Tests WrappedCorqOutputPresenter's methods that print outputs directly. + """ @staticmethod - def test_both_output(capsys): + def test_stopped_wf_run(capsys): # Given - logs = LogOutput( - out=[""], err=[""] - ) + wf_run_id = "wf.1" presenter = _presenters.WrappedCorqOutputPresenter() # When - presenter.show_logs(logs) + presenter.show_stopped_wf_run(wf_run_id) # Then captured = capsys.readouterr() - assert ( - "stdout:\n\nstderr:\n\n" - in captured.out - ) + assert f"Workflow run {wf_run_id} stopped" in captured.out @staticmethod def test_handling_error(monkeypatch, sys_exit_mock): @@ -324,145 +326,189 @@ def test_handling_error(monkeypatch, sys_exit_mock): class TestArtifactPresenter: class TestDumpedWFResult: @staticmethod - def test_json(capsys): + def test_json(test_console: Console): # Given details = serde.DumpDetails( file_path=Path("tests/some-path/wf.1234_1.json"), format=ArtifactFormat.JSON, ) - presenter = _presenters.ArtifactPresenter() + presenter = _presenters.ArtifactPresenter(console=test_console) # When presenter.show_dumped_artifact(details) # Then - captured = capsys.readouterr() + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() # We can't assert on the full path because separators are # platform-dependent. - assert "Artifact saved at tests" in captured.out - assert "wf.1234_1.json as a text json file." in captured.out + assert "Artifact saved at tests" in output + assert "wf.1234_1.json as a text json file." in output @staticmethod - def test_pickle(capsys): + def test_pickle(test_console: Console): # Given details = serde.DumpDetails( file_path=Path("tests/some-path/wf.1234_1.pickle"), format=ArtifactFormat.ENCODED_PICKLE, ) - presenter = _presenters.ArtifactPresenter() + presenter = _presenters.ArtifactPresenter(console=test_console) # When presenter.show_dumped_artifact(details) # Then - captured = capsys.readouterr() + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() # We can't assert on the full path because separators are # platform-dependent. - assert "Artifact saved at tests" in captured.out - assert "wf.1234_1.pickle as a binary pickle file." in captured.out + assert "Artifact saved at tests" in output + assert "wf.1234_1.pickle as a binary pickle file." in output @staticmethod - def test_other_format(capsys): + def test_other_format(test_console: Console): # Given details = serde.DumpDetails( file_path=Path("tests/some-path/wf.1234_1.npz"), format=ArtifactFormat.NUMPY_ARRAY, ) - presenter = _presenters.ArtifactPresenter() + presenter = _presenters.ArtifactPresenter(console=test_console) # When presenter.show_dumped_artifact(details) # Then - captured = capsys.readouterr() + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() # We can't assert on the full path because separators are # platform-dependent. - assert "Artifact saved at tests" in captured.out - assert "wf.1234_1.npz as NUMPY_ARRAY." in captured.out + assert "Artifact saved at tests" in output + assert "wf.1234_1.npz as NUMPY_ARRAY." in output @staticmethod - def test_show_workflow_outputs(capsys): + @pytest.mark.skipif( + sys.platform.startswith("win32"), + reason="Windows uses different symbols than macOS and Linux", + ) + def test_show_workflow_outputs(test_console: Console): # Given values = [set([21, 38]), {"hello": "there"}] wf_run_id = "wf.1234" - presenter = _presenters.ArtifactPresenter() + presenter = _presenters.ArtifactPresenter(console=test_console) # When presenter.show_workflow_outputs(values, wf_run_id) # Then - captured = capsys.readouterr() - assert ( - "Workflow run wf.1234 has 2 outputs.\n" - "\n" - "Output 0. Object type: \n" - "Pretty printed value:\n" - "{21, 38}\n" - "\n" - "Output 1. Object type: \n" - "Pretty printed value:\n" - "{'hello': 'there'}\n" - ) == captured.out + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + "Workflow run wf.1234 has 2 outputs.", + " ", + " Index Type Pretty Printed ", + " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ", + " 0 {21, 38} ", + " 1 {'hello': 'there'} ", + " ", + "", + ] + ) + assert output == expected @staticmethod - def test_show_task_outputs(capsys): + @pytest.mark.skipif( + sys.platform.startswith("win32"), + reason="Windows uses different symbols than macOS and Linux", + ) + def test_show_task_outputs(test_console: Console): # Given values = [set([21, 38]), {"hello": "there"}] wf_run_id = "wf.1234" task_inv_id = "inv6" - presenter = _presenters.ArtifactPresenter() + presenter = _presenters.ArtifactPresenter(console=test_console) # When presenter.show_task_outputs(values, wf_run_id, task_inv_id) # Then - captured = capsys.readouterr() - assert ( - "In workflow wf.1234, task invocation inv6 produced 2 outputs.\n" - "\n" - "Output 0. Object type: \n" - "Pretty printed value:\n" - "{21, 38}\n" - "\n" - "Output 1. Object type: \n" - "Pretty printed value:\n" - "{'hello': 'there'}\n" - ) == captured.out + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + "In workflow wf.1234, task invocation inv6 produced 2 outputs.", + " ", + " Index Type Pretty Printed ", + " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ", + " 0 {21, 38} ", + " 1 {'hello': 'there'} ", + " ", + "", + ] + ) + assert output == expected class TestServicesPresenter: class TestShowServices: - def test_running(self, capsys): + def test_running(self, test_console: Console): # Given services = [ServiceResponse(name="mocked", is_running=True, info=None)] - presenter = _presenters.ServicePresenter() + presenter = _presenters.ServicePresenter(console=test_console) # When presenter.show_services(services) # Then - captured = capsys.readouterr() - assert "mocked Running" in captured.out + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + " ", + " mocked Running ", + " ", + "", + ] + ) + assert output == expected - def test_not_running(self, capsys): + def test_not_running(self, test_console: Console): # Given services = [ServiceResponse(name="mocked", is_running=False, info=None)] - presenter = _presenters.ServicePresenter() + presenter = _presenters.ServicePresenter(console=test_console) # When presenter.show_services(services) # Then - captured = capsys.readouterr() - assert "mocked Not Running" in captured.out + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + " ", + " mocked Not Running ", + " ", + "", + ] + ) + assert output == expected - def test_with_info(self, capsys): + def test_with_info(self, test_console: Console): # Given services = [ ServiceResponse(name="mocked", is_running=False, info="something") ] - presenter = _presenters.ServicePresenter() + presenter = _presenters.ServicePresenter(console=test_console) # When presenter.show_services(services) # Then - captured = capsys.readouterr() - assert "mocked Not Running something" in captured.out + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + expected = "\n".join( + [ + " ", + " mocked Not Running something ", + " ", + "", + ] + ) + assert output == expected @staticmethod def test_show_failure(capsys, sys_exit_mock): @@ -604,6 +650,26 @@ def test_rpint_configs_list(capsys): class TestWorkflowRunPresenter: @staticmethod + def test_show_submitted_wf_run(test_console: Console): + # Given + wf_run_id = "wf.1" + + presenter = _presenters.WFRunPresenter(console=test_console) + + # When + presenter.show_submitted_wf_run(wf_run_id) + + # Then + expected = "Workflow Submitted! Run ID: wf.1\n" + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + assert output == expected + + @staticmethod + @pytest.mark.skipif( + sys.platform.startswith("win32"), + reason="Windows uses different symbols than macOS and Linux", + ) @pytest.mark.parametrize( "summary,expected_path", [ @@ -660,10 +726,10 @@ class TestWorkflowRunPresenter: ], ) def test_show_wf_run( - monkeypatch, capsys, summary: ui_models.WFRunSummary, expected_path: Path + monkeypatch, test_console, summary: ui_models.WFRunSummary, expected_path: Path ): # Given - presenter = _presenters.WFRunPresenter() + presenter = _presenters.WFRunPresenter(console=test_console) monkeypatch.setattr( _presenters, "_format_datetime", @@ -673,10 +739,50 @@ def test_show_wf_run( presenter.show_wf_run(summary) # Then - captured = capsys.readouterr() + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() + + expected = expected_path.read_text() + assert output == expected + + @staticmethod + @pytest.mark.skipif( + sys.platform.startswith("win32"), + reason="Windows uses different symbols than macOS and Linux", + ) + def test_show_wf_list(monkeypatch: pytest.MonkeyPatch, test_console: Console): + # Given + expected_path = DATA_DIR / "list_wf_runs.txt" + summary = ui_models.WFList( + wf_rows=[ + ui_models.WFList.WFRow( + workflow_run_id="wf1", + status="mocked status", + tasks_succeeded="x/y", + start_time=UTC_INSTANT, + ), + ui_models.WFList.WFRow( + workflow_run_id="wf2", + status="mocked status", + tasks_succeeded="x/y", + start_time=None, + ), + ] + ) + presenter = _presenters.WFRunPresenter(console=test_console) + monkeypatch.setattr( + _presenters, + "_format_datetime", + lambda x: "Fri Feb 24 08:26:07 2023" if x else "", + ) + # When + presenter.show_wf_list(summary) + # Then + assert isinstance(test_console.file, StringIO) + output = test_console.file.getvalue() expected = expected_path.read_text() - assert captured.out == expected + assert output == expected class TestPromptPresenter: diff --git a/tests/cli/workflow/test_logs.py b/tests/cli/workflow/test_logs.py index 84387988e..57a3ccfa8 100644 --- a/tests/cli/workflow/test_logs.py +++ b/tests/cli/workflow/test_logs.py @@ -14,7 +14,10 @@ from orquestra.sdk._base.cli._arg_resolvers import WFConfigResolver, WFRunResolver from orquestra.sdk._base.cli._dumpers import LogsDumper from orquestra.sdk._base.cli._repos import WorkflowRunRepo -from orquestra.sdk._base.cli._ui._presenters import WrappedCorqOutputPresenter +from orquestra.sdk._base.cli._ui._presenters import ( + LogsPresenter, + WrappedCorqOutputPresenter, +) from orquestra.sdk._base.cli._workflow import _logs @@ -40,7 +43,8 @@ def action(): resolved_config = "" # Mocks - presenter = create_autospec(WrappedCorqOutputPresenter) + logs_presenter = create_autospec(LogsPresenter) + error_presenter = create_autospec(WrappedCorqOutputPresenter) dumper = create_autospec(LogsDumper) wf_run_repo = create_autospec(WorkflowRunRepo) @@ -62,7 +66,8 @@ def action(): wf_run_resolver.resolve_id.return_value = resolved_id action = _logs.Action( - presenter=presenter, + logs_presenter=logs_presenter, + error_presenter=error_presenter, dumper=dumper, wf_run_repo=wf_run_repo, config_resolver=config_resolver, @@ -100,7 +105,7 @@ def test_no_download_dir(action, task_switch, system_switch, env_setup_switch): # Then # We should pass input CLI args to config resolver. - action._presenter.show_error.assert_not_called() + action._error_presenter.show_error.assert_not_called() action._config_resolver.resolve.assert_called_with(wf_run_id, config) # We should pass resolved_config to run ID resolver. @@ -127,23 +132,23 @@ def test_no_download_dir(action, task_switch, system_switch, env_setup_switch): # We expect printing the workflow run returned from the repo. if task_switch: - print(action._presenter.show_logs.call_args_list) + print(action._logs_presenter.show_logs.call_args_list) task_logs = action._wf_run_repo.get_wf_logs.return_value.per_task - action._presenter.show_logs.assert_any_call( + action._logs_presenter.show_logs.assert_any_call( task_logs, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.PER_TASK ), if system_switch: sys_logs = action._wf_run_repo.get_wf_logs.return_value.system - action._presenter.show_logs.assert_any_call( + action._logs_presenter.show_logs.assert_any_call( sys_logs, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.SYSTEM ) if env_setup_switch: env_setup_logs = action._wf_run_repo.get_wf_logs.return_value.env_setup - action._presenter.show_logs.assert_any_call( + action._logs_presenter.show_logs.assert_any_call( env_setup_logs, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.ENV_SETUP, ) - assert action._presenter.show_logs.call_count == sum( + assert action._logs_presenter.show_logs.call_count == sum( [task_switch, system_switch, env_setup_switch] ) @@ -185,7 +190,7 @@ def test_download_dir_passed( # Then # We should pass input CLI args to config resolver. - action._presenter.show_error.assert_not_called() + action._error_presenter.show_error.assert_not_called() action._config_resolver.resolve.assert_called_with(wf_run_id, config) # We should pass resolved_config to run ID resolver. @@ -219,7 +224,7 @@ def test_download_dir_passed( download_dir, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.PER_TASK, ) - action._presenter.show_dumped_wf_logs.assert_any_call( + action._logs_presenter.show_dumped_wf_logs.assert_any_call( dumped_path, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.PER_TASK, ) @@ -231,7 +236,7 @@ def test_download_dir_passed( download_dir, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.SYSTEM, ) - action._presenter.show_dumped_wf_logs.assert_any_call( + action._logs_presenter.show_dumped_wf_logs.assert_any_call( dumped_path, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.SYSTEM ) if env_setup_switch: @@ -242,16 +247,43 @@ def test_download_dir_passed( download_dir, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.ENV_SETUP, ) - action._presenter.show_dumped_wf_logs.assert_any_call( + action._logs_presenter.show_dumped_wf_logs.assert_any_call( dumped_path, log_type=_logs.WorkflowLogs.WorkflowLogTypeName.ENV_SETUP, ) assert action._dumper.dump.call_count == sum( [task_switch, system_switch, env_setup_switch] ) - assert action._presenter.show_dumped_wf_logs.call_count == sum( + assert action._logs_presenter.show_dumped_wf_logs.call_count == sum( [task_switch, system_switch, env_setup_switch] ) # Do not print logs to stdout - action._presenter.show_logs.assert_not_called() + action._logs_presenter.show_logs.assert_not_called() + + @staticmethod + def test_failure(action, monkeypatch): + # Given + # CLI inputs + wf_run_id = "" + config = "" + download_dir = Path("/tmp/my/awesome/dir") + exception = Exception("") + monkeypatch.setattr( + action, "_on_cmd_call_with_exceptions", Mock(side_effect=exception) + ) + + # When + action.on_cmd_call( + wf_run_id=wf_run_id, + config=config, + download_dir=download_dir, + task="", + system="", + env_setup="", + other="", + ) + + # Then + # We should pass input CLI args to config resolver. + action._error_presenter.show_error.assert_called_with(exception) diff --git a/tests/cli/workflow/test_submit.py b/tests/cli/workflow/test_submit.py index c473855d5..9344b56a9 100644 --- a/tests/cli/workflow/test_submit.py +++ b/tests/cli/workflow/test_submit.py @@ -51,7 +51,8 @@ def test_success(force: bool): project = "project'" prompter = create_autospec(_prompts.Prompter) - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_id = "wf.test" wf_run_repo = create_autospec(_repos.WorkflowRunRepo) @@ -65,7 +66,8 @@ def test_success(force: bool): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, ) @@ -90,7 +92,7 @@ def test_success(force: bool): ) # We expect telling the user the wf run ID. - presenter.show_submitted_wf_run.assert_called_with(wf_run_id) + submit_presenter.show_submitted_wf_run.assert_called_with(wf_run_id) class TestOmittingName: @staticmethod @@ -109,7 +111,8 @@ def test_multiple_wf_defs_in_module(force: bool): prompter = create_autospec(_prompts.Prompter) prompter.choice.return_value = selected_name - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_id = "wf.test" wf_run_repo = create_autospec(_repos.WorkflowRunRepo) @@ -125,7 +128,8 @@ def test_multiple_wf_defs_in_module(force: bool): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, ) @@ -151,7 +155,7 @@ def test_multiple_wf_defs_in_module(force: bool): ) # We expect telling the user the wf run ID. - presenter.show_submitted_wf_run.assert_called_with(wf_run_id) + submit_presenter.show_submitted_wf_run.assert_called_with(wf_run_id) @staticmethod @pytest.mark.parametrize("force", [False, True]) @@ -167,8 +171,8 @@ def test_single_wf_def(force: bool): prompter = create_autospec(_prompts.Prompter) - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_id = "wf.test" wf_run_repo = create_autospec(_repos.WorkflowRunRepo) @@ -184,7 +188,8 @@ def test_single_wf_def(force: bool): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, ) @@ -210,7 +215,7 @@ def test_single_wf_def(force: bool): ) # We expect telling the user the wf run ID. - presenter.show_submitted_wf_run.assert_called_with(wf_run_id) + submit_presenter.show_submitted_wf_run.assert_called_with(wf_run_id) @staticmethod @pytest.mark.parametrize("force", [False, True]) @@ -223,7 +228,8 @@ def test_no_wf_defs_in_module(force: bool): project = "project" prompter = create_autospec(_prompts.Prompter) - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_repo = create_autospec(_repos.WorkflowRunRepo) wf_def_repo = create_autospec(_repos.WorkflowDefRepo) wf_def_repo.get_worklow_names.side_effect = ( @@ -232,7 +238,8 @@ def test_no_wf_defs_in_module(force: bool): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, ) @@ -252,7 +259,7 @@ def test_no_wf_defs_in_module(force: bool): # We expect presenting the error. _assert_called_with_type( - presenter.show_error, exceptions.NoWorkflowDefinitionsFound + error_presenter.show_error, exceptions.NoWorkflowDefinitionsFound ) @staticmethod @@ -265,7 +272,8 @@ def test_invalid_module(force: bool): project = "project" prompter = create_autospec(_prompts.Prompter) - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_repo = create_autospec(_repos.WorkflowRunRepo) sys_path = [module, "foo", "bar"] @@ -278,7 +286,8 @@ def test_invalid_module(force: bool): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, ) @@ -298,7 +307,7 @@ def test_invalid_module(force: bool): # We expect telling the user about the error. _assert_called_with_type( - presenter.show_error, exceptions.WorkflowDefinitionModuleNotFound + error_presenter.show_error, exceptions.WorkflowDefinitionModuleNotFound ) class TestDirtyRepo: @@ -322,7 +331,8 @@ def test_no_force(): # Simulate a user saying "yes" prompter.confirm.return_value = True - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_id = "wf.test" wf_run_repo = create_autospec(_repos.WorkflowRunRepo) @@ -345,7 +355,8 @@ def _fake_submit_method(*args, ignore_dirty_repo, **kwargs): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, ) @@ -363,7 +374,7 @@ def _fake_submit_method(*args, ignore_dirty_repo, **kwargs): prompter.confirm.assert_called() # We expect telling the user the wf run ID. - presenter.show_submitted_wf_run.assert_called_with(wf_run_id) + submit_presenter.show_submitted_wf_run.assert_called_with(wf_run_id) @staticmethod def test_force(): @@ -376,7 +387,8 @@ def test_force(): force = True prompter = create_autospec(_prompts.Prompter) - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_id = "wf.test" wf_run_repo = create_autospec(_repos.WorkflowRunRepo) @@ -399,7 +411,8 @@ def _fake_submit_method(*args, ignore_dirty_repo, **kwargs): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, ) @@ -417,7 +430,7 @@ def _fake_submit_method(*args, ignore_dirty_repo, **kwargs): prompter.confirm.assert_not_called() # We expect telling the user the wf run ID. - presenter.show_submitted_wf_run.assert_called_with(wf_run_id) + submit_presenter.show_submitted_wf_run.assert_called_with(wf_run_id) class TestProjectResolve: @pytest.mark.parametrize("workspace_support", [True, False]) @@ -428,7 +441,8 @@ def test_workspace_and_project_resolve(self, workspace_support): config = "cluster_z" prompter = create_autospec(_prompts.Prompter) - presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) + submit_presenter = create_autospec(_presenters.WFRunPresenter) + error_presenter = create_autospec(_presenters.WrappedCorqOutputPresenter) wf_run_id = "wf.test" wf_run_repo = create_autospec(_repos.WorkflowRunRepo) @@ -454,7 +468,8 @@ def test_workspace_and_project_resolve(self, workspace_support): action = _submit.Action( prompter=prompter, - presenter=presenter, + submit_presenter=submit_presenter, + error_presenter=error_presenter, wf_run_repo=wf_run_repo, wf_def_repo=wf_def_repo, spaces_resolver=spaces_resolver, @@ -491,4 +506,4 @@ def test_workspace_and_project_resolve(self, workspace_support): ) # We expect telling the user the wf run ID. - presenter.show_submitted_wf_run.assert_called_with(wf_run_id) + submit_presenter.show_submitted_wf_run.assert_called_with(wf_run_id) diff --git a/tests/runtime/performance/test_cli_perf.py b/tests/runtime/performance/test_cli_perf.py index 639b60a76..7bad72018 100644 --- a/tests/runtime/performance/test_cli_perf.py +++ b/tests/runtime/performance/test_cli_perf.py @@ -85,7 +85,7 @@ def orq_workflow_run(ray_cluster, orq_project_dir): output = _run_orq_command(["wf", "submit", "-c", "local", "workflow_defs"]) # Parse the stdout to get the workflow ID stdout = output.stdout.decode() - match = re.match("Workflow submitted! Run ID: (?P.*)", stdout) + match = re.match("Workflow Submitted! Run ID: (?P.*)", stdout) assert match is not None workflow_id = match.groupdict().get("wf_run_id") assert workflow_id is not None diff --git a/tests/sdk/test_consistent_return_shapes.py b/tests/sdk/test_consistent_return_shapes.py index 5a10b7284..7ec968645 100644 --- a/tests/sdk/test_consistent_return_shapes.py +++ b/tests/sdk/test_consistent_return_shapes.py @@ -25,6 +25,7 @@ import re import shutil import subprocess +import sys import tempfile import typing as t from pathlib import Path @@ -428,6 +429,10 @@ def test_consistent_returns_for_multiple_values( "mock_config_env_var", "mock_db_env_var", ) +@pytest.mark.skipif( + sys.platform.startswith("win32"), + reason="Windows uses different symbols than macOS and Linux", +) @pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning") @pytest.mark.slow class TestCLI: @@ -450,11 +455,11 @@ def test_consistent_returns_for_single_value( ) m = re.match( - r"Workflow submitted! Run ID: (?P.*)", run_ray.stdout.decode() + r"Workflow Submitted! Run ID: (?P.*)", run_ray.stdout.decode() ) assert m is not None run_id_ray = m.group("run_id").strip() - assert "Workflow submitted!" in run_ce.stdout.decode() + assert "Workflow Submitted!" in run_ce.stdout.decode() # WHEN results_ray = ( @@ -481,9 +486,10 @@ def test_consistent_returns_for_single_value( assert [line.strip() for line in results_ce] == [ f"Workflow run {mock_ce_run_single} has 1 outputs.", "", - "Output 0. Object type: ", - "Pretty printed value:", - f"{single_result_vanilla}", + "Index Type Pretty Printed", + "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", + "0 [1, 2, 3]", + "", "", ] @@ -508,11 +514,11 @@ def test_consistent_returns_for_multiple_values( ) m = re.match( - r"Workflow submitted! Run ID: (?P.*)", run_ray.stdout.decode() + r"Workflow Submitted! Run ID: (?P.*)", run_ray.stdout.decode() ) assert m is not None run_id_ray = m.group("run_id").strip() - assert "Workflow submitted!" in run_ce.stdout.decode() + assert "Workflow Submitted!" in run_ce.stdout.decode() # WHEN results_ray = ( @@ -539,13 +545,11 @@ def test_consistent_returns_for_multiple_values( assert [line.strip() for line in results_ce] == [ f"Workflow run {mock_ce_run_multiple} has 2 outputs.", "", - "Output 0. Object type: ", - "Pretty printed value:", - f"{multiple_result_vanilla[0]}", + "Index Type Pretty Printed", + "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", + "0 [1, 2, 3]", + "1 [1, 2, 3]", "", - "Output 1. Object type: ", - "Pretty printed value:", - f"{multiple_result_vanilla[1]}", "", ] @@ -582,7 +586,7 @@ def test_consistent_downloads_for_single_value( ), f"STDOUT: {run_ce.stdout.decode()},\n\nSTDERR: {run_ce.stderr.decode()}" m = re.match( - r"Workflow submitted! Run ID: (?P.*)", run_ray.stdout.decode() + r"Workflow Submitted! Run ID: (?P.*)", run_ray.stdout.decode() ) assert m is not None run_id_ray = m.group("run_id").strip() @@ -654,7 +658,7 @@ def test_consistent_downloads_for_multiple_values( ) m = re.match( - r"Workflow submitted! Run ID: (?P.*)", run_ray.stdout.decode() + r"Workflow Submitted! Run ID: (?P.*)", run_ray.stdout.decode() ) assert m is not None run_id_ray = m.group("run_id").strip()