Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flakiness in test_backup_and_restore_fail_on_disk_full #167

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions test/helpers/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2023 Aiven, Helsinki, Finland. https://aiven.io/
from contextlib import contextmanager, suppress
from contextlib import contextmanager
from pathlib import Path
from py.path import local as LocalPath
from test import random_basic_string
from typing import Iterator

Expand All @@ -26,7 +25,4 @@ def mount_tmpfs(path: Path, *, megabytes: int) -> Iterator[Path]:

yield sub_dir
finally:
# Delete all files in the tmpfs filesystem before unmounting it.
with suppress(Exception):
LocalPath(sub_dir).remove(rec=1)
subprocess.check_call(["sudo", "umount", str(sub_dir)])
Comment on lines -29 to -32
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ended up removing this, because it was raising an exception and pytest was complaining since it was not able to delete temporary directories

subprocess.check_call(["sudo", "umount", str(sub_dir)])
27 changes: 7 additions & 20 deletions test/helpers/flow_testers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright (c) 2023 Aiven, Helsinki, Finland. https://aiven.io/
from __future__ import annotations

from _pytest.logging import LogCaptureFixture
from functools import partial
from myhoard.backup_stream import BackupStream
from myhoard.controller import Controller
from myhoard.restore_coordinator import RestoreCoordinator
from test import wait_for_condition, while_asserts
from test.helpers.loggers import get_logger_name, log_duration

Expand All @@ -13,10 +14,9 @@
class ControllerFlowTester:
"""Helper class to test the flow of the Controller for a backup or restore."""

def __init__(self, controller: Controller, global_timeout: int = 10, caplog: LogCaptureFixture | None = None) -> None:
def __init__(self, controller: Controller, global_timeout: int = 10) -> None:
self.controller = controller
self.timeout = global_timeout
self.caplog = caplog
self.logger = logging.getLogger(get_logger_name())

@log_duration
Expand All @@ -35,20 +35,15 @@ def wait_for_single_stream(self, *, timeout: int | None = None) -> None:
while_asserts(self._has_single_stream, timeout=timeout)

@log_duration
def wait_for_restore_complete(self, *, timeout: int | None = None) -> None:
def wait_for_restore_phase(self, phase: RestoreCoordinator.Phase, *, timeout: int | None = None) -> None:
timeout = self.timeout if timeout is None else timeout
wait_for_condition(self._restore_complete, timeout=timeout, description="restore complete")
wait_for_condition(partial(self._restore_phase, phase=phase), timeout=timeout, description=f"restore {phase}")

@log_duration
def wait_for_fetched_backup(self, *, timeout: int | None = None) -> None:
timeout = self.timeout if timeout is None else timeout
wait_for_condition(self._has_fetched_backup, timeout=timeout, description="fetched backup")

@log_duration
def wait_for_disk_full_being_logged(self, *, timeout: int | None = None) -> None:
timeout = self.timeout if timeout is None else timeout
wait_for_condition(self._disk_full_being_logged, timeout=timeout, description="disk full being logged")

def _streaming_binlogs(self) -> None:
assert self.controller.backup_streams
assert all(bs.active_phase == BackupStream.ActivePhase.binlog for bs in self.controller.backup_streams), [
Expand All @@ -61,16 +56,8 @@ def _has_multiple_streams(self) -> None:
def _has_single_stream(self) -> None:
assert len(self.controller.backup_streams) == 1

def _restore_complete(self) -> bool:
return self.controller.restore_coordinator is not None and self.controller.restore_coordinator.is_complete()
def _restore_phase(self, phase: RestoreCoordinator.Phase) -> bool:
return self.controller.restore_coordinator is not None and self.controller.restore_coordinator.phase is phase

def _has_fetched_backup(self) -> bool:
return self.controller.state["backups_fetched_at"] != 0

def _disk_full_being_logged(self) -> bool:
if self.caplog is None:
return False
return any(
"DiskFullError('No space left on device. Cannot complete xbstream-extract!')" in record.message
for record in self.caplog.records
)
16 changes: 11 additions & 5 deletions test/local/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def do_restore(
bs = backup_streams[0]

# Restore backup into an empty database.
flow_tester = ControllerFlowTester(target_controller, caplog=caplog)
flow_tester = ControllerFlowTester(target_controller)
target_controller.start()

try:
Expand All @@ -98,15 +98,21 @@ def do_restore(
target_controller.restore_backup(site=bs.site, stream_id=bs.stream_id)

if fail_because_disk_full:
flow_tester.wait_for_disk_full_being_logged()
flow_tester.wait_for_restore_phase(RestoreCoordinator.Phase.failed)

# check if it failed due to full disk
assert caplog is not None, "caplog is required for checking full disk message."
assert any(
"DiskFullError('No space left on device. Cannot complete xbstream-extract!')" in record.message
for record in caplog.records
)

# Check that we have backups, but none of them are broken.
current_backups = sort_completed_backups(target_controller.state["backups"])
assert current_backups
assert all(b["broken_at"] is None for b in current_backups)
assert target_controller.restore_coordinator is not None
assert target_controller.restore_coordinator.phase is RestoreCoordinator.Phase.failed

else:
flow_tester.wait_for_restore_complete()
flow_tester.wait_for_restore_phase(RestoreCoordinator.Phase.completed)
finally:
target_controller.stop()
Loading