Skip to content

Commit

Permalink
Store MySQL server version in basebackup_info (basebackup.json) and c…
Browse files Browse the repository at this point in the history
…heck backup version on restore_backup
  • Loading branch information
egor-voynov-aiven committed Aug 20, 2024
1 parent 4bfc957 commit 065fa09
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 5 deletions.
3 changes: 2 additions & 1 deletion myhoard.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@
"copy_threads": 1,
"compress_threads": 1,
"encrypt_threads": 1
}
},
"restrict_backup_version_higher": null
}
4 changes: 4 additions & 0 deletions myhoard/backup_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
DEFAULT_XTRABACKUP_SETTINGS,
ERR_TIMEOUT,
first_contains_gtids_not_in_second,
get_mysql_version,
GtidExecuted,
make_fs_metadata,
mysql_cursor,
Expand Down Expand Up @@ -978,10 +979,12 @@ def _take_basebackup(self) -> None:
# FLUSH BINARY LOGS might take a long time if the server is under heavy load,
# use longer than normal timeout here with multiple retries and increasing timeout.
connect_params = dict(self.mysql_client_params)
mysql_version = None
for retry, multiplier in [(True, 1), (True, 2), (False, 3)]:
try:
connect_params["timeout"] = DEFAULT_MYSQL_TIMEOUT * 5 * multiplier
with mysql_cursor(**connect_params) as cursor:
mysql_version = get_mysql_version(cursor)
cursor.execute("FLUSH BINARY LOGS")
cursor.execute("SELECT @@GLOBAL.gtid_executed AS gtid_executed")
gtid_executed = parse_gtid_range_string(cast(dict, cursor.fetchone())["gtid_executed"])
Expand Down Expand Up @@ -1031,6 +1034,7 @@ def _take_basebackup(self) -> None:
"start_size": self.basebackup_operation.data_directory_size_start,
"start_ts": start_time,
"uploaded_from": self.server_id,
"mysql_version": mysql_version,
}
self.file_storage.store_file_from_memory(
self._build_full_name("basebackup.json"),
Expand Down
4 changes: 2 additions & 2 deletions myhoard/basebackup_operation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from contextlib import suppress
from distutils.version import LooseVersion # pylint:disable=deprecated-module
from myhoard.errors import BlockMismatchError, XtraBackupError
from myhoard.util import get_mysql_version, mysql_cursor
from packaging.version import Version
from rohmu.util import increase_pipe_capacity, set_stream_nonblocking
from typing import Optional

Expand Down Expand Up @@ -156,7 +156,7 @@ def _optimize_tables(self) -> None:
params["timeout"] = CURSOR_TIMEOUT_DURING_OPTIMIZE
with mysql_cursor(**params) as cursor:
version = get_mysql_version(cursor)
if LooseVersion(version) < LooseVersion("8.0.29"):
if Version(version) < Version("8.0.29"):
return

# allow OPTIMIZE TABLE to run on tables without primary keys
Expand Down
10 changes: 10 additions & 0 deletions myhoard/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from http.client import RemoteDisconnected
from httplib2 import ServerNotFoundError
from packaging.version import Version
from rohmu import get_transfer
from rohmu.compressor import DecompressSink
from rohmu.encryptor import DecryptSink
Expand Down Expand Up @@ -48,6 +49,7 @@

class BaseBackup(TypedDict):
end_ts: float
mysql_version: Optional[str]


class Backup(TypedDict):
Expand Down Expand Up @@ -158,6 +160,7 @@ def __init__(
temp_dir,
restore_free_memory_percentage=None,
xtrabackup_settings: Dict[str, int],
restrict_backup_version_higher: Optional[Version] = None,
):
super().__init__()
self.log = logging.getLogger(self.__class__.__name__)
Expand Down Expand Up @@ -232,6 +235,7 @@ def __init__(
self.xtrabackup_settings = xtrabackup_settings
self._get_upload_backup_site()
self._update_mode_tag()
self.restrict_backup_version_higher = restrict_backup_version_higher

def is_log_backed_up(self, *, log_index: int):
return all(
Expand Down Expand Up @@ -306,6 +310,12 @@ def restore_backup(
continue
if not backup["basebackup_info"]:
raise ValueError(f"Backup {backup!r} cannot be restored")
basebackup_mysql_version = backup["basebackup_info"].get("mysql_version")
if basebackup_mysql_version and self.restrict_backup_version_higher:
if Version(basebackup_mysql_version) > self.restrict_backup_version_higher:
raise ValueError(
f"Backup was taken with MySQL version {basebackup_mysql_version} which is higher than allowed {self.restrict_backup_version_higher}: {backup!r}"
)

if backup.get("broken_at"):
raise ValueError(f"Cannot restore a broken backup: {backup!r}")
Expand Down
6 changes: 6 additions & 0 deletions myhoard/myhoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from myhoard.statsd import StatsClient
from myhoard.util import DEFAULT_XTRABACKUP_SETTINGS, detect_running_process_id, wait_for_port
from myhoard.web_server import WebServer
from packaging.version import Version

import argparse
import asyncio
Expand Down Expand Up @@ -192,6 +193,10 @@ async def _start(self):
tags=statsd_config["tags"],
)
mysql = self.config["mysql"]
raw_restrict_backup_version_higher = self.config.get("restrict_backup_version_higher")
restrict_backup_version_higher = (
Version(raw_restrict_backup_version_higher) if raw_restrict_backup_version_higher else None
)
self.controller = Controller(
backup_settings=self.config["backup_settings"],
backup_sites=self.config["backup_sites"],
Expand All @@ -211,6 +216,7 @@ async def _start(self):
stats=statsd,
temp_dir=self.config["temporary_directory"],
xtrabackup_settings=self.config.get("xtrabackup", DEFAULT_XTRABACKUP_SETTINGS),
restrict_backup_version_higher=restrict_backup_version_higher,
)
self.controller.start()
self.web_server = WebServer(
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies = [
"python-snappy == 0.6.1",
"rohmu >= 1.1.2",
"sentry-sdk==1.14.0",
"packaging",
]

[project.optional-dependencies]
Expand Down
1 change: 1 addition & 0 deletions test/test_backup_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def _run_backup_stream_test(session_tmpdir, mysql_master: MySQLConfig, backup_st

assert bs.is_binlog_safe_to_delete(new_binlogs[0])
assert bs.is_log_backed_up(log_index=new_binlogs[0]["local_index"])
assert bs.state["basebackup_info"].get("mysql_version") is not None

# remote_gtid_executed will be updated once the stream notices the new binlog that was uploaded above
wait_for_condition(lambda: bs_observer.state["remote_gtid_executed"] != gtid_executed)
Expand Down
4 changes: 2 additions & 2 deletions test/test_basebackup_operation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from . import build_statsd_client, MySQLConfig, restart_mysql
from distutils.version import LooseVersion # pylint:disable=deprecated-module
from packaging.version import Version
from myhoard.basebackup_operation import BasebackupOperation
from typing import IO
from unittest import SkipTest
Expand Down Expand Up @@ -139,7 +139,7 @@ def stream_handler(_stream):
def test_backup_with_non_optimized_tables(mysql_master: MySQLConfig) -> None:
with myhoard_util.mysql_cursor(**mysql_master.connect_options) as cursor:
version = myhoard_util.get_mysql_version(cursor)
if LooseVersion(version) < LooseVersion("8.0.29"):
if Version(version) < Version("8.0.29"):
raise SkipTest("DB version doesn't need OPTIMIZE TABLE")

def create_test_db(*, db_name: str, table_name: str, add_pk: bool) -> None:
Expand Down
25 changes: 25 additions & 0 deletions test/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from rohmu import get_transfer
from typing import Any, cast, Dict, List, Optional, Set, TypedDict
from unittest.mock import MagicMock, patch
from packaging.version import Version

import contextlib
import datetime
Expand Down Expand Up @@ -1679,6 +1680,30 @@ def restoration_is_failed():
new_controller.stop()


def test_forbid_to_restore_backup(
default_backup_site,
master_controller,
) -> None:
controller, _ = master_controller
controller.restrict_backup_version_higher = Version("8.0.1")
controller.state["backups"] = [
Backup(
basebackup_info=BaseBackup(end_ts=0.0, mysql_version="8.0.30"),
closed_at=None,
completed_at=None,
recovery_site=False,
stream_id="1234",
resumable=False,
site="default",
)
]
with pytest.raises(ValueError, match="Backup was taken with MySQL version 8.0.30 which is higher than allowed 8.0.1:"):
controller.restore_backup(site="default", stream_id="1234")
controller.restrict_backup_version_higher = Version("8.0.30")
controller.restore_backup(site="default", stream_id="1234")
controller.state = controller.Mode.restore


@patch.object(RestoreCoordinator, "MAX_BASEBACKUP_ERRORS", 2)
def test_restore_failed_basebackup_and_retry_with_prior(
default_backup_site,
Expand Down

0 comments on commit 065fa09

Please sign in to comment.