Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add process execution simulation to test harness #993

Merged
merged 26 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d772657
Add handle_exec to test harness
weiiwang01 Aug 21, 2023
6c6d4af
Fix static check and linting issues
weiiwang01 Aug 21, 2023
e4c1079
Fix some linting issues
weiiwang01 Aug 21, 2023
32f5b19
Use list sort instead of bisect due to Python 3.8
weiiwang01 Aug 21, 2023
9be8ab9
Better handler registration
weiiwang01 Aug 22, 2023
33297a9
Revert changes in wait_output and fix docs
weiiwang01 Aug 22, 2023
648f96d
Update ops/testing.py
weiiwang01 Aug 23, 2023
d3abeb7
Update ops/testing.py
weiiwang01 Aug 23, 2023
f981383
Update ops/testing.py
weiiwang01 Aug 23, 2023
7116791
Update ops/testing.py
weiiwang01 Aug 23, 2023
b429c15
Update ops/testing.py
weiiwang01 Aug 23, 2023
37c104a
Update ops/testing.py
weiiwang01 Aug 23, 2023
2be5318
Update ops/testing.py
weiiwang01 Aug 23, 2023
76dfd77
Update ops/testing.py
weiiwang01 Aug 23, 2023
78df99a
Update ops/testing.py
weiiwang01 Aug 23, 2023
83096f6
Update ops/testing.py
weiiwang01 Aug 23, 2023
7ff8695
Update ops/testing.py
weiiwang01 Aug 23, 2023
5d087bb
Apply suggestions from code review
weiiwang01 Aug 23, 2023
90a49b7
Apply suggestions from code review
weiiwang01 Aug 23, 2023
6742e9b
Update handle_exec document
weiiwang01 Aug 23, 2023
0e36c4c
Remove changes to ops/pebble.py
benhoyt Aug 24, 2023
eea02d6
Minor handle_exec docstring tweaks and fixes
benhoyt Aug 24, 2023
b9e9c69
simplify the exec handler storage and lookup
weiiwang01 Aug 24, 2023
c570f75
Update exception messages in harness
weiiwang01 Aug 24, 2023
4b603af
Minor refactoring
weiiwang01 Aug 24, 2023
031f566
Merge branch 'main' into feat-handle-exec
weiiwang01 Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 312 additions & 2 deletions ops/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datetime
import fnmatch
import inspect
import io
import ipaddress
import os
import pathlib
Expand All @@ -36,13 +37,15 @@
Any,
AnyStr,
BinaryIO,
Callable,
Dict,
Generic,
Iterable,
List,
Literal,
Mapping,
Optional,
Sequence,
Set,
TextIO,
Tuple,
Expand All @@ -56,6 +59,7 @@
from ops._private import yaml
from ops.charm import CharmBase, CharmMeta, RelationRole
from ops.model import Container, RelationNotFoundError
from ops.pebble import ExecProcess

if TYPE_CHECKING:
from typing_extensions import TypedDict
Expand Down Expand Up @@ -106,6 +110,41 @@
CharmType = TypeVar('CharmType', bound=charm.CharmBase)


@dataclasses.dataclass
class ExecArgs:
"""Represent arguments captured from the :meth:`ops.Container.exec` method call.

These arguments will be passed to the :meth:`Harness.handle_exec` handler function.
See :meth:`ops.pebble.Client.exec` for documentation of properties.
"""
command: List[str]
environment: Dict[str, str]
working_dir: Optional[str]
timeout: Optional[float]
user_id: Optional[int]
user: Optional[str]
group_id: Optional[int]
group: Optional[str]
stdin: Optional[Union[str, bytes]]
encoding: Optional[str]
combine_stderr: bool


@dataclasses.dataclass
class ExecResult:
"""Represents the result of a simulated process execution.

This class is typically used to return the output and exit code from the
:meth:`Harness.handle_exec` result or handler function.
"""
exit_code: int = 0
stdout: Union[str, bytes] = b""
stderr: Union[str, bytes] = b""
benhoyt marked this conversation as resolved.
Show resolved Hide resolved


ExecHandler = Callable[[ExecArgs], Union[None, ExecResult]]


# noinspection PyProtectedMember
class Harness(Generic[CharmType]):
"""This class represents a way to build up the model that will drive a test suite.
Expand Down Expand Up @@ -1544,6 +1583,97 @@ def evaluate_status(self) -> None:
"""
charm._evaluate_status(self.charm)

def handle_exec(self,
container: Union[str, Container],
command_prefix: Sequence[str],
*,
handler: Optional[ExecHandler] = None,
result: Optional[Union[int, str, bytes, ExecResult]] = None):
r"""Register a handler to simulate the Pebble command execution.

This allows a test harness to simulate the behavior of running commands in a container.
When :meth:`ops.Container.exec` is triggered, the registered handler is used to
generate stdout and stderr for the simulated execution.

You can provide either a ``handler`` or a ``result``, but not both:

- A ``handler`` is a function accepting :class:`ops.testing.ExecArgs` and returning
:class:`ops.testing.ExecResult` as the simulated process outcome. For cases that
have side effects but don't return output, the handler can return ``None``, which
is equivalent to returning ``ExecResult()``.

- A ``result`` is for simulations that don't need to inspect the ``exec`` arguments; the
output or exit code is provided directly. Setting ``result`` to str or bytes means
use that string as stdout (with exit code 0); setting ``result`` to int means return
that exit code (and no stdout).

If ``handle_exec`` is called more than once with overlapping command prefixes, the
longest match takes precedence. The registration of an execution handler can be updated by
re-registering with the same command prefix.

The execution handler receives the timeout value in the ``ExecArgs``. If needed,
it can raise a ``TimeoutError`` to inform the harness that a timeout occurred.

If :meth:`ops.Container.exec` is called with ``combine_stderr=True``, the execution
handler should, if required, weave the simulated standard error into the standard output.
The harness checks the result and will raise an exception if stderr is non-empty.

Args:
container: The specified container or its name.
command_prefix: The command prefix to register against.
handler: A handler function that simulates the command's execution.
result: A simplified form to specify the command's simulated result.

Example usage::

# produce no output and return 0 for every command
harness.handle_exec('container', [], result=0)

# simple example that just produces output (exit code 0)
harness.handle_exec('webserver', ['ls', '/etc'], result='passwd\nprofile\n')

# slightly more complex (use stdin)
harness.handle_exec(
'c1', ['sha1sum'],
handler=lambda args: ExecResult(stdout=hashlib.sha1(args.stdin).hexdigest()))

# more complex example using args.command
def docker_handler(args: testing.ExecArgs) -> testing.ExecResult:
match args.command:
case ['docker', 'run', image]:
return testing.ExecResult(stdout=f'running {image}')
case ['docker', 'ps']:
return testing.ExecResult(stdout='CONTAINER ID IMAGE ...')
case _:
return testing.ExecResult(exit_code=1, stderr='unknown command')

harness.handle_exec('database', ['docker'], handler=docker_handler)

# handle timeout
def handle_timeout(args: testing.ExecArgs) -> int:
if args.timeout is not None and args.timeout < 10:
raise TimeoutError
return 0

harness.handle_exec('database', ['foo'], handler=handle_timeout)
"""
if (handler is None and result is None) or (handler is not None and result is not None):
raise TypeError("Either handler or result must be provided, but not both.")
container_name = container if isinstance(container, str) else container.name
if result is None:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks. Though this confused me a bit at first. How about we put this in an indented block and perform the checks and set the handlers to lambda _: result only if result is not None (then we can skip the slightly awkward inline if-else below):

if result is not None:
    if isinstance(result, int) and not isinstance(result, bool):
        result = ExecResult(exit_code=result)
    elif isinstance(result, (str, bytes)):
        result = ExecResult(stdout=result)
    elif not isinstance(result, ExecResult):
        raise TypeError(
            f"result must be int, str, bytes, or ExecResult, not {result.__class__.__name__}")
    handler = lambda _: result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, updated. However, I can't change handler = lambda _: result because of the linting checks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I hate it when our tools get in the way of clean simple code. I guess we could do this, but I'm not sure it's any better. I leave it up to you. I'll probably merge this tomorrow!

            # autopep8: off
            handler = lambda _: result  # noqa
            # autopep8: on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three annotations for a single statement might be a little bit too much. I'll probably keep the original statement, thanks.

elif isinstance(result, int) and not isinstance(result, bool):
result = ExecResult(exit_code=result)
elif isinstance(result, (str, bytes)):
result = ExecResult(stdout=result)
elif not isinstance(result, ExecResult):
raise TypeError(
f"result must be int, str, bytes, or ExecResult, not {result.__class__.__name__}")
self._backend._pebble_clients[container_name]._handle_exec(
command_prefix=command_prefix,
handler=(lambda _: result) if handler is None else handler # type: ignore
)


def _get_app_or_unit_name(app_or_unit: AppUnitOrName) -> str:
"""Return name of given application or unit (return strings directly)."""
Expand Down Expand Up @@ -2321,6 +2451,53 @@ def _check_protocol_and_port(self, protocol: str, port: Optional[int]):
raise model.ModelError(f'ERROR invalid protocol "{protocol}", expected "tcp", "udp", or "icmp"\n') # NOQA: test_quote_backslashes


@_copy_docstrings(pebble.ExecProcess)
class _TestingExecProcess:
def __init__(self,
command: List[str],
timeout: Optional[float],
exit_code: Optional[int],
stdin: Union[TextIO, BinaryIO, None],
stdout: Union[TextIO, BinaryIO, None],
stderr: Union[TextIO, BinaryIO, None],
is_timeout: bool):
self._command = command
self._timeout = timeout
self._is_timeout = is_timeout
if exit_code is None and not is_timeout:
raise ValueError("when is_timeout is False, exit_code must not be None")
self._exit_code = exit_code
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr

def wait(self):
if self._is_timeout:
raise pebble.TimeoutError(
f'timed out waiting for change ({self._timeout} seconds)'
)
if self._exit_code != 0:
raise pebble.ExecError(self._command, cast(int, self._exit_code), None, None)

def wait_output(self) -> Tuple[AnyStr, Optional[AnyStr]]:
if self._is_timeout:
raise pebble.TimeoutError(
f'timed out waiting for change ({self._timeout} seconds)'
)
out_value = self.stdout.read() if self.stdout is not None else None
err_value = self.stderr.read() if self.stderr is not None else None
if self._exit_code != 0:
raise pebble.ExecError[AnyStr](self._command,
cast(int, self._exit_code),
cast(Union[AnyStr, None], out_value),
cast(Union[AnyStr, None], err_value))
return cast(AnyStr, out_value), cast(Union[AnyStr, None], err_value)

def send_signal(self, sig: Union[int, str]):
# the process is always terminated when ExecProcess is return in the simulation.
raise BrokenPipeError("[Errno 32] Broken pipe")


@_copy_docstrings(pebble.Client)
class _TestingPebbleClient:
"""This conforms to the interface for pebble.Client but provides canned data.
Expand All @@ -2337,6 +2514,11 @@ def __init__(self, backend: _TestingModelBackend, container_root: pathlib.Path):
self._service_status: Dict[str, pebble.ServiceStatus] = {}
self._root = container_root
self._backend = backend
self._exec_handlers: Dict[Tuple[str, ...], ExecHandler] = {}

def _handle_exec(self, command_prefix: Sequence[str], handler: ExecHandler):
prefix = tuple(command_prefix)
self._exec_handlers[prefix] = handler

def _check_connection(self):
if not self._backend._can_connect(self):
Expand Down Expand Up @@ -2698,8 +2880,136 @@ def remove_path(self, path: str, *, recursive: bool = False):
else:
file_path.unlink()

def exec(self, command, **kwargs): # type:ignore
raise NotImplementedError(self.exec) # type:ignore
def _find_exec_handler(self, command: List[str]) -> Optional[ExecHandler]:
for prefix_len in reversed(range(len(command) + 1)):
weiiwang01 marked this conversation as resolved.
Show resolved Hide resolved
command_prefix = tuple(command[:prefix_len])
if command_prefix in self._exec_handlers:
return self._exec_handlers[command_prefix]
benhoyt marked this conversation as resolved.
Show resolved Hide resolved

def _transform_exec_handler_output(self,
data: Union[str, bytes],
encoding: Optional[str]) -> Union[io.BytesIO, io.StringIO]:
if isinstance(data, bytes):
if encoding is None:
return io.BytesIO(data)
else:
return io.StringIO(data.decode(encoding=encoding))
else:
if encoding is None:
raise ValueError(
f"exec handler must return bytes if encoding is None,"
f"not {data.__class__.__name__}")
else:
return io.StringIO(data)

def exec(
self,
command: List[str],
*,
service_context: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
timeout: Optional[float] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None,
stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None,
stdout: Optional[Union[TextIO, BinaryIO]] = None,
stderr: Optional[Union[TextIO, BinaryIO]] = None,
encoding: Optional[str] = 'utf-8',
combine_stderr: bool = False
) -> ExecProcess[Any]:
self._check_connection()
handler = self._find_exec_handler(command)
if handler is None:
message = "execution handler not found, please register one using Harness.handle_exec"
raise pebble.APIError(
body={}, code=500, status='Internal Server Error', message=message
)
weiiwang01 marked this conversation as resolved.
Show resolved Hide resolved
environment = {} if environment is None else environment
if service_context is not None:
plan = self.get_plan()
if service_context not in plan.services:
message = f'context service "{service_context}" not found'
body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error',
'result': {'message': message}}
raise pebble.APIError(
body=body, code=500, status='Internal Server Error', message=message
)
service = plan.services[service_context]
environment = {**service.environment, **environment}
working_dir = service.working_dir if working_dir is None else working_dir
user = service.user if user is None else user
user_id = service.user_id if user_id is None else user_id
group = service.group if group is None else group
group_id = service.group_id if group_id is None else group_id

if hasattr(stdin, "read"):
stdin = stdin.read() # type: ignore

exec_args = ExecArgs(
command=command,
environment=environment,
working_dir=working_dir,
timeout=timeout,
user_id=user_id,
user=user,
group_id=group_id,
group=group,
stdin=cast(Union[str, bytes, None], stdin),
encoding=encoding,
combine_stderr=combine_stderr
)
proc_stdin = self._transform_exec_handler_output(b"", encoding)
if stdin is not None:
proc_stdin = None
proc_stdout = self._transform_exec_handler_output(b"", encoding)
proc_stderr = self._transform_exec_handler_output(b"", encoding)
try:
result = handler(exec_args)
except TimeoutError:
if timeout is not None:
exec_process = _TestingExecProcess(command=command,
timeout=timeout,
exit_code=None,
stdin=proc_stdin,
stdout=proc_stdout,
stderr=proc_stderr,
is_timeout=True)
return cast(pebble.ExecProcess[Any], exec_process)
else:
raise RuntimeError(
"a TimeoutError occurred in the execution handler, "
"but no timeout value was provided in the execution arguments."
)
if result is None:
exit_code = 0
proc_stdout = self._transform_exec_handler_output(b'', encoding)
proc_stderr = self._transform_exec_handler_output(b'', encoding)
elif isinstance(result, ExecResult):
exit_code = result.exit_code
proc_stdout = self._transform_exec_handler_output(result.stdout, encoding)
proc_stderr = self._transform_exec_handler_output(result.stderr, encoding)
else:
raise TypeError(f"execution handler returned an unexpected type: {type(result)!r}.")
if combine_stderr and proc_stderr.getvalue():
raise ValueError("execution handler returned a non-empty stderr "
"even though combine_stderr is enabled.")
if stdout is not None:
shutil.copyfileobj(cast(io.IOBase, proc_stdout), cast(io.IOBase, stdout))
proc_stdout = None
if stderr is not None:
shutil.copyfileobj(cast(io.IOBase, proc_stderr), cast(io.IOBase, stderr))
proc_stderr = None
exec_process = _TestingExecProcess(command=command,
timeout=timeout,
exit_code=exit_code,
stdin=proc_stdin,
stdout=proc_stdout,
stderr=proc_stderr,
is_timeout=False)
return cast(pebble.ExecProcess[Any], exec_process)

def send_signal(self, sig: Union[int, str], service_names: Iterable[str]):
if not service_names:
Expand Down
Loading
Loading