Skip to content

Commit

Permalink
check first if restore coordinator phase is failed before checking di…
Browse files Browse the repository at this point in the history
…sk full message in test_backup_and_restore_fail_on_disk_full.

[BF-2161]
  • Loading branch information
kathia-barahona committed Aug 10, 2023
1 parent b113bad commit 007d83b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 31 deletions.
8 changes: 2 additions & 6 deletions test/helpers/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2023 Aiven, Helsinki, Finland. https://aiven.io/
from contextlib import contextmanager, suppress
from contextlib import contextmanager
from pathlib import Path
from py.path import local as LocalPath
from test import random_basic_string
from typing import Iterator

Expand All @@ -26,7 +25,4 @@ def mount_tmpfs(path: Path, *, megabytes: int) -> Iterator[Path]:

yield sub_dir
finally:
# Delete all files in the tmpfs filesystem before unmounting it.
with suppress(Exception):
LocalPath(sub_dir).remove(rec=1)
subprocess.check_call(["sudo", "umount", str(sub_dir)])
subprocess.check_call(["sudo", "umount", str(sub_dir)])
27 changes: 7 additions & 20 deletions test/helpers/flow_testers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright (c) 2023 Aiven, Helsinki, Finland. https://aiven.io/
from __future__ import annotations

from _pytest.logging import LogCaptureFixture
from functools import partial
from myhoard.backup_stream import BackupStream
from myhoard.controller import Controller
from myhoard.restore_coordinator import RestoreCoordinator
from test import wait_for_condition, while_asserts
from test.helpers.loggers import get_logger_name, log_duration

Expand All @@ -13,10 +14,9 @@
class ControllerFlowTester:
"""Helper class to test the flow of the Controller for a backup or restore."""

def __init__(self, controller: Controller, global_timeout: int = 10, caplog: LogCaptureFixture | None = None) -> None:
def __init__(self, controller: Controller, global_timeout: int = 10) -> None:
self.controller = controller
self.timeout = global_timeout
self.caplog = caplog
self.logger = logging.getLogger(get_logger_name())

@log_duration
Expand All @@ -35,20 +35,15 @@ def wait_for_single_stream(self, *, timeout: int | None = None) -> None:
while_asserts(self._has_single_stream, timeout=timeout)

@log_duration
def wait_for_restore_complete(self, *, timeout: int | None = None) -> None:
def wait_for_restore_phase(self, phase: RestoreCoordinator.Phase, *, timeout: int | None = None) -> None:
timeout = self.timeout if timeout is None else timeout
wait_for_condition(self._restore_complete, timeout=timeout, description="restore complete")
wait_for_condition(partial(self._restore_phase, phase=phase), timeout=timeout, description=f"restore {phase}")

@log_duration
def wait_for_fetched_backup(self, *, timeout: int | None = None) -> None:
timeout = self.timeout if timeout is None else timeout
wait_for_condition(self._has_fetched_backup, timeout=timeout, description="fetched backup")

@log_duration
def wait_for_disk_full_being_logged(self, *, timeout: int | None = None) -> None:
timeout = self.timeout if timeout is None else timeout
wait_for_condition(self._disk_full_being_logged, timeout=timeout, description="disk full being logged")

def _streaming_binlogs(self) -> None:
assert self.controller.backup_streams
assert all(bs.active_phase == BackupStream.ActivePhase.binlog for bs in self.controller.backup_streams), [
Expand All @@ -61,16 +56,8 @@ def _has_multiple_streams(self) -> None:
def _has_single_stream(self) -> None:
assert len(self.controller.backup_streams) == 1

def _restore_complete(self) -> bool:
return self.controller.restore_coordinator is not None and self.controller.restore_coordinator.is_complete()
def _restore_phase(self, phase: RestoreCoordinator.Phase) -> bool:
return self.controller.restore_coordinator is not None and self.controller.restore_coordinator.phase is phase

def _has_fetched_backup(self) -> bool:
return self.controller.state["backups_fetched_at"] != 0

def _disk_full_being_logged(self) -> bool:
if self.caplog is None:
return False
return any(
"DiskFullError('No space left on device. Cannot complete xbstream-extract!')" in record.message
for record in self.caplog.records
)
16 changes: 11 additions & 5 deletions test/local/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def do_restore(
bs = backup_streams[0]

# Restore backup into an empty database.
flow_tester = ControllerFlowTester(target_controller, caplog=caplog)
flow_tester = ControllerFlowTester(target_controller)
target_controller.start()

try:
Expand All @@ -98,15 +98,21 @@ def do_restore(
target_controller.restore_backup(site=bs.site, stream_id=bs.stream_id)

if fail_because_disk_full:
flow_tester.wait_for_disk_full_being_logged()
flow_tester.wait_for_restore_phase(RestoreCoordinator.Phase.failed)

# check if it failed due to full disk
assert caplog is not None, "caplog is required for checking full disk message."
assert any(
"DiskFullError('No space left on device. Cannot complete xbstream-extract!')" in record.message
for record in caplog.records
)

# Check that we have backups, but none of them are broken.
current_backups = sort_completed_backups(target_controller.state["backups"])
assert current_backups
assert all(b["broken_at"] is None for b in current_backups)
assert target_controller.restore_coordinator is not None
assert target_controller.restore_coordinator.phase is RestoreCoordinator.Phase.failed

else:
flow_tester.wait_for_restore_complete()
flow_tester.wait_for_restore_phase(RestoreCoordinator.Phase.completed)
finally:
target_controller.stop()

0 comments on commit 007d83b

Please sign in to comment.