From 6853b0cda6b63e746a24759efdb9fb48b1dedd13 Mon Sep 17 00:00:00 2001 From: Brendon Smith Date: Mon, 11 Nov 2024 16:39:34 -0500 Subject: [PATCH] WIP: Add and test Gunicorn worker --- .github/workflows/ci.yml | 9 +- docs/contributing.md | 1 + inboard/gunicorn_workers.py | 134 +++++++++++++++++ pyproject.toml | 2 + tests/test_gunicorn_workers.py | 262 +++++++++++++++++++++++++++++++++ 5 files changed, 406 insertions(+), 2 deletions(-) create mode 100644 inboard/gunicorn_workers.py create mode 100644 tests/test_gunicorn_workers.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b8ea3e1..a8d3a23 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -110,9 +110,14 @@ jobs: - name: Run Hatch script for code quality checks run: hatch run ${{ env.HATCH_ENV }}:check - name: Run tests - run: hatch run ${{ env.HATCH_ENV }}:coverage run + run: | + export COVERAGE_PROCESS_START="$PWD/pyproject.toml" + hatch run ${{ env.HATCH_ENV }}:coverage run + timeout-minutes: 5 - name: Enforce test coverage - run: hatch run ${{ env.HATCH_ENV }}:coverage report + run: | + hatch run ${{ env.HATCH_ENV }}:coverage combine -q + hatch run ${{ env.HATCH_ENV }}:coverage report - name: Build Python package run: hatch build - name: Upload Python package artifacts diff --git a/docs/contributing.md b/docs/contributing.md index 7ae8bcc..d3ed94d 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -72,6 +72,7 @@ As explained in the [VSCode docs](https://code.visualstudio.com/docs/containers/ - [pytest configuration](https://docs.pytest.org/en/latest/reference/customize.html) is in _[pyproject.toml](https://github.com/br3ndonland/inboard/blob/develop/pyproject.toml)_. - [FastAPI testing](https://fastapi.tiangolo.com/tutorial/testing/) and [Starlette testing](https://www.starlette.io/testclient/) rely on the [Starlette `TestClient`](https://www.starlette.io/testclient/). - Test coverage reports are generated by [coverage.py](https://github.com/nedbat/coveragepy). To generate test coverage reports, first run tests with `coverage run`, then generate a report with `coverage report`. To see interactive HTML coverage reports, run `coverage html` instead of `coverage report`. +- Some of the tests start separate subprocesses. These tests are more complex in some ways, and can take longer, than the standard single-process tests. A [pytest mark](https://docs.pytest.org/en/latest/example/markers.html) is included to help control the behavior of subprocess tests. To run the test suite without subprocess tests, [select tests](https://docs.pytest.org/en/stable/example/markers.html) with `coverage run -m pytest -m "not subprocess"`. Note that test coverage will be lower without the subprocess tests. ## Docker diff --git a/inboard/gunicorn_workers.py b/inboard/gunicorn_workers.py new file mode 100644 index 0000000..25dcfb7 --- /dev/null +++ b/inboard/gunicorn_workers.py @@ -0,0 +1,134 @@ +""" +Copyright © 2017-present, [Encode OSS Ltd](https://www.encode.io/). +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +import asyncio +import logging +import signal +import sys +from typing import Any + +from gunicorn.arbiter import Arbiter +from gunicorn.workers.base import Worker +from uvicorn.config import Config +from uvicorn.main import Server + + +class UvicornWorker(Worker): + """ + A worker class for Gunicorn that interfaces with an ASGI consumer callable, + rather than a WSGI callable. + """ + + CONFIG_KWARGS: dict[str, Any] = {"loop": "auto", "http": "auto"} + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + logger = logging.getLogger("uvicorn.error") + logger.handlers = self.log.error_log.handlers + logger.setLevel(self.log.error_log.level) + logger.propagate = False + + logger = logging.getLogger("uvicorn.access") + logger.handlers = self.log.access_log.handlers + logger.setLevel(self.log.access_log.level) + logger.propagate = False + + config_kwargs: dict = { + "app": None, + "log_config": None, + "timeout_keep_alive": self.cfg.keepalive, + "timeout_notify": self.timeout, + "callback_notify": self.callback_notify, + "limit_max_requests": self.max_requests, + "forwarded_allow_ips": self.cfg.forwarded_allow_ips, + } + + if self.cfg.is_ssl: + ssl_kwargs = { + "ssl_keyfile": self.cfg.ssl_options.get("keyfile"), + "ssl_certfile": self.cfg.ssl_options.get("certfile"), + "ssl_keyfile_password": self.cfg.ssl_options.get("password"), + "ssl_version": self.cfg.ssl_options.get("ssl_version"), + "ssl_cert_reqs": self.cfg.ssl_options.get("cert_reqs"), + "ssl_ca_certs": self.cfg.ssl_options.get("ca_certs"), + "ssl_ciphers": self.cfg.ssl_options.get("ciphers"), + } + config_kwargs.update(ssl_kwargs) + + if self.cfg.settings["backlog"].value: + config_kwargs["backlog"] = self.cfg.settings["backlog"].value + + config_kwargs.update(self.CONFIG_KWARGS) + + self.config = Config(**config_kwargs) + + def init_process(self) -> None: + self.config.setup_event_loop() + super().init_process() + + def init_signals(self) -> None: + # Reset signals so Gunicorn doesn't swallow subprocess return codes + # other signals are set up by Server.install_signal_handlers() + # See: https://github.com/encode/uvicorn/issues/894 + for s in self.SIGNALS: + signal.signal(s, signal.SIG_DFL) + + signal.signal(signal.SIGUSR1, self.handle_usr1) + # Don't let SIGUSR1 disturb active requests by interrupting system calls + signal.siginterrupt(signal.SIGUSR1, False) + + def _install_sigquit_handler(self) -> None: + """Install a SIGQUIT handler on workers. + + - https://github.com/encode/uvicorn/issues/1116 + - https://github.com/benoitc/gunicorn/issues/2604 + """ + + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGQUIT, self.handle_exit, signal.SIGQUIT, None) + + async def _serve(self) -> None: + self.config.app = self.wsgi + server = Server(config=self.config) + self._install_sigquit_handler() + await server.serve(sockets=self.sockets) + if not server.started: + sys.exit(Arbiter.WORKER_BOOT_ERROR) + + def run(self) -> None: + return asyncio.run(self._serve()) + + async def callback_notify(self) -> None: + self.notify() + + +class UvicornH11Worker(UvicornWorker): + CONFIG_KWARGS = {"loop": "asyncio", "http": "h11"} diff --git a/pyproject.toml b/pyproject.toml index 9790078..aacdba4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ starlette = [ ] tests = [ "coverage[toml]>=7,<8", + "coverage_enable_subprocess==1.0", "httpx>=0.23,<1", "pytest>=8.1.1,<9", "pytest-mock>=3,<4", @@ -75,6 +76,7 @@ show_missing = true [tool.coverage.run] command_line = "-m pytest" +parallel = true source = ["inboard", "tests"] [tool.hatch.build.targets.sdist] diff --git a/tests/test_gunicorn_workers.py b/tests/test_gunicorn_workers.py new file mode 100644 index 0000000..14da6ca --- /dev/null +++ b/tests/test_gunicorn_workers.py @@ -0,0 +1,262 @@ +from __future__ import annotations + +import signal +import subprocess +import sys +import tempfile +import time +from typing import TYPE_CHECKING + +import httpx +import pytest + +if TYPE_CHECKING: + from collections.abc import Generator + from ssl import SSLContext + from typing import IO + + from uvicorn._types import ( + ASGIReceiveCallable, + ASGISendCallable, + HTTPResponseBodyEvent, + HTTPResponseStartEvent, + LifespanStartupFailedEvent, + Scope, + ) + +pytestmark = pytest.mark.skipif(sys.platform == "win32", reason="requires unix") +gunicorn_arbiter = pytest.importorskip("gunicorn.arbiter", reason="requires gunicorn") +gunicorn_workers = pytest.importorskip( + "inboard.gunicorn_workers", reason="requires gunicorn" +) + + +class Process(subprocess.Popen): + client: httpx.Client + output: IO[bytes] + + def read_output(self) -> str: + self.output.seek(0) + return self.output.read().decode() + + +@pytest.fixture( + params=( + pytest.param(gunicorn_workers.UvicornWorker, marks=pytest.mark.subprocess), + pytest.param(gunicorn_workers.UvicornH11Worker, marks=pytest.mark.subprocess), + ) +) +def worker_class(request: pytest.FixtureRequest) -> str: + """Gunicorn worker class names to test. + + This is a parametrized fixture. When the fixture is used in a test, the test + will be automatically parametrized, running once for each fixture parameter. All + tests using the fixture will be automatically marked with `pytest.mark.subprocess`. + + https://docs.pytest.org/en/latest/how-to/fixtures.html + https://docs.pytest.org/en/latest/proposals/parametrize_with_fixtures.html + """ + worker_class = request.param + return f"{worker_class.__module__}.{worker_class.__name__}" + + +async def app( + scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable +) -> None: + assert scope["type"] == "http" + start_event: HTTPResponseStartEvent = { + "type": "http.response.start", + "status": 204, + "headers": [], + } + body_event: HTTPResponseBodyEvent = { + "type": "http.response.body", + "body": b"", + "more_body": False, + } + await send(start_event) + await send(body_event) + + +@pytest.fixture( + params=( + pytest.param(False, id="TLS off"), + pytest.param(True, id="TLS on"), + ) +) +def gunicorn_process( + request: pytest.FixtureRequest, + tls_ca_certificate_pem_path: str, + tls_ca_ssl_context: SSLContext, + tls_certificate_private_key_path: str, + tls_certificate_server_cert_path: str, + unused_tcp_port: int, + worker_class: str, +) -> Generator[Process, None, None]: + """Yield a subprocess running a Gunicorn arbiter with a Uvicorn worker. + + An instance of `httpx.Client` is available on the `client` attribute. + Output is saved to a temporary file and accessed with `read_output()`. + """ + app = "tests.test_workers:app" + bind = f"127.0.0.1:{unused_tcp_port}" + use_tls: bool = request.param + args = [ + "gunicorn", + "--bind", + bind, + "--graceful-timeout", + "1", + "--log-level", + "debug", + "--worker-class", + worker_class, + "--workers", + "1", + ] + if use_tls is True: + args_for_tls = [ + "--ca-certs", + tls_ca_certificate_pem_path, + "--certfile", + tls_certificate_server_cert_path, + "--keyfile", + tls_certificate_private_key_path, + ] + args.extend(args_for_tls) + base_url = f"https://{bind}" + verify: SSLContext | bool = tls_ca_ssl_context + else: + base_url = f"http://{bind}" + verify = False + args.append(app) + with ( + httpx.Client(base_url=base_url, verify=verify) as client, + tempfile.TemporaryFile() as output, + ): + with Process(args, stdout=output, stderr=output) as process: + time.sleep(1) + assert not process.poll() + process.client = client + process.output = output + yield process + process.terminate() + process.wait(timeout=2) + + +def test_get_request_to_asgi_app(gunicorn_process: Process) -> None: + """Test a GET request to the Gunicorn Uvicorn worker's ASGI app.""" + response = gunicorn_process.client.get("/") + output_text = gunicorn_process.read_output() + assert response.status_code == 204 + assert "uvicorn.workers", "startup complete" in output_text + + +@pytest.mark.parametrize("signal_to_send", gunicorn_arbiter.Arbiter.SIGNALS) +def test_gunicorn_arbiter_signal_handling( + gunicorn_process: Process, signal_to_send: signal.Signals +) -> None: + """Test Gunicorn arbiter signal handling. + + This test iterates over the signals handled by the Gunicorn arbiter, + sends each signal to the process running the arbiter, and asserts that + Gunicorn handles the signal and logs the signal handling event accordingly. + + https://docs.gunicorn.org/en/latest/signals.html + """ + signal_abbreviation = gunicorn_arbiter.Arbiter.SIG_NAMES[signal_to_send] + expected_text = f"Handling signal: {signal_abbreviation}" + gunicorn_process.send_signal(signal_to_send) + time.sleep(0.5) + output_text = gunicorn_process.read_output() + try: + assert expected_text in output_text + except AssertionError: # pragma: no cover + # occasional flakes are seen with certain signals + flaky_signals = [ + getattr(signal, "SIGHUP", None), + getattr(signal, "SIGTERM", None), + getattr(signal, "SIGTTIN", None), + getattr(signal, "SIGTTOU", None), + getattr(signal, "SIGUSR2", None), + getattr(signal, "SIGWINCH", None), + ] + if signal_to_send not in flaky_signals: + time.sleep(2) + output_text = gunicorn_process.read_output() + assert expected_text in output_text + + +async def app_with_lifespan_startup_failure( + scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable +) -> None: + """An ASGI app instance for testing Uvicorn worker boot errors.""" + if scope["type"] == "lifespan": + message = await receive() + if message["type"] == "lifespan.startup": + lifespan_startup_failed_event: LifespanStartupFailedEvent = { + "type": "lifespan.startup.failed", + "message": "ASGI application failed to start", + } + await send(lifespan_startup_failed_event) + + +@pytest.fixture +def gunicorn_process_with_lifespan_startup_failure( + unused_tcp_port: int, worker_class: str +) -> Generator[Process, None, None]: + """Yield a subprocess running a Gunicorn arbiter with a Uvicorn worker. + + Output is saved to a temporary file and accessed with `read_output()`. + The lifespan startup error in the ASGI app helps test worker boot errors. + """ + args = [ + "gunicorn", + "--bind", + f"127.0.0.1:{unused_tcp_port}", + "--graceful-timeout", + "1", + "--log-level", + "debug", + "--worker-class", + worker_class, + "--workers", + "1", + "tests.test_workers:app_with_lifespan_startup_failure", + ] + with tempfile.TemporaryFile() as output: + with Process(args, stdout=output, stderr=output) as process: + time.sleep(1) + process.output = output + yield process + process.terminate() + process.wait(timeout=2) + + +def test_uvicorn_worker_boot_error( + gunicorn_process_with_lifespan_startup_failure: Process, +) -> None: + """Test Gunicorn arbiter shutdown behavior after Uvicorn worker boot errors. + + Previously, if Uvicorn workers raised exceptions during startup, + Gunicorn continued trying to boot workers ([#1066]). To avoid this, + the Uvicorn worker was updated to exit with `Arbiter.WORKER_BOOT_ERROR`, + but no tests were included at that time ([#1077]). This test verifies + that Gunicorn shuts down appropriately after a Uvicorn worker boot error. + + When a worker exits with `Arbiter.WORKER_BOOT_ERROR`, the Gunicorn arbiter will + also terminate, so there is no need to send a separate signal to the arbiter. + + [#1066]: https://github.com/encode/uvicorn/issues/1066 + [#1077]: https://github.com/encode/uvicorn/pull/1077 + """ + expected_text = "Worker failed to boot" + output_text = gunicorn_process_with_lifespan_startup_failure.read_output() + try: + assert expected_text in output_text + assert gunicorn_process_with_lifespan_startup_failure.poll() + except AssertionError: # pragma: no cover + time.sleep(2) + output_text = gunicorn_process_with_lifespan_startup_failure.read_output() + assert expected_text in output_text + assert gunicorn_process_with_lifespan_startup_failure.poll()