From aa9f5ba352c7f496eed7f82a64290c089f348c71 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 25 Jul 2023 16:47:26 -0300 Subject: [PATCH] [DPE-2269] Added k8s upgrade handler (#77) * Added k8s upgrade handler * Added PyYAML to charm-binary-python-packages * Removed recursion * Removed PyYAML from charm-binary-python-packages * Added docstring for upgrade finished hook * Added unit name in debug log call * Moved call to upgrade finished event * Created _set_rolling_update_partition * Created resume-upgrade action * Fixed existing and added part of new unit tests * Added comments * Moved set_results call and added missing unit tests * Removed print call --- .gitignore | 1 + lib/charms/data_platform_libs/v0/upgrade.py | 144 +++++++++++++++- tests/unit/test_upgrade.py | 181 +++++++++++++++++++- 3 files changed, 312 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 6d29047b..2b3c9682 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ tests/integration/database-charm/lib/charms/data_platform_libs/v0/data_interface tests/integration/kafka-charm/lib/charms/data_platform_libs/v0/data_interfaces.py tests/integration/s3-charm/lib/charms/data_platform_libs/v0/s3.py' .vscode/ +.idea/ diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py index f2cd1b56..90fd92f2 100644 --- a/lib/charms/data_platform_libs/v0/upgrade.py +++ b/lib/charms/data_platform_libs/v0/upgrade.py @@ -38,7 +38,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 6 PYDEPS = ["pydantic>=1.10,<2"] @@ -353,6 +353,21 @@ def __init__(self, message: str, cause: str, resolution: Optional[str] = None): super().__init__(message, cause=cause, resolution=resolution) +class KubernetesClientError(UpgradeError): + """Exception flagging that a call to Kubernetes API failed. + + For example, if the cluster fails :class:`DataUpgrade._set_rolling_update_partition` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual error resolution (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + class VersionError(UpgradeError): """Exception flagging that the old `version` fails to meet the new `upgrade_supported`s. @@ -400,6 +415,10 @@ class UpgradeGrantedEvent(EventBase): """ +class UpgradeFinishedEvent(EventBase): + """Used to tell units that they finished the upgrade.""" + + class UpgradeEvents(CharmEvents): """Upgrade events. @@ -407,6 +426,7 @@ class UpgradeEvents(CharmEvents): """ upgrade_granted = EventSource(UpgradeGrantedEvent) + upgrade_finished = EventSource(UpgradeFinishedEvent) # --- EVENT HANDLER --- @@ -442,11 +462,16 @@ def __init__( ) self.framework.observe(self.charm.on.upgrade_charm, self._on_upgrade_charm) self.framework.observe(getattr(self.on, "upgrade_granted"), self._on_upgrade_granted) + self.framework.observe(getattr(self.on, "upgrade_finished"), self._on_upgrade_finished) # actions self.framework.observe( getattr(self.charm.on, "pre_upgrade_check_action"), self._on_pre_upgrade_check_action ) + if self.substrate == "k8s": + self.framework.observe( + getattr(self.charm.on, "resume_upgrade_action"), self._on_resume_upgrade_action + ) @property def peer_relation(self) -> Optional[Relation]: @@ -556,7 +581,7 @@ def build_upgrade_stack(self) -> List[int]: Called by leader unit during :meth:`_on_pre_upgrade_check_action`. Returns: - Iterable of integeter unit.ids, LIFO ordered in upgrade order + Iterable of integer unit.ids, LIFO ordered in upgrade order i.e `[5, 2, 4, 1, 3]`, unit `3` upgrades first, `5` upgrades last """ # don't raise if k8s substrate, uses default statefulset order @@ -597,6 +622,13 @@ def set_unit_completed(self) -> None: self.peer_relation.data[self.charm.unit].update({"state": "completed"}) + # Emit upgrade_finished event to run unit's post upgrade operations. + if self.substrate == "k8s": + logger.debug( + f"{self.charm.unit.name} has completed the upgrade, emitting `upgrade_finished` event..." + ) + getattr(self.on, "upgrade_finished").emit() + def _on_upgrade_created(self, event: RelationCreatedEvent) -> None: """Handler for `upgrade-relation-created` events.""" if not self.peer_relation: @@ -654,6 +686,37 @@ def _on_pre_upgrade_check_action(self, event: ActionEvent) -> None: logger.info("Setting upgrade-stack to relation data...") self.upgrade_stack = built_upgrade_stack + def _on_resume_upgrade_action(self, event: ActionEvent) -> None: + """Handle resume upgrade action. + + Continue the upgrade by setting the partition to the next unit. + """ + if not self.peer_relation: + event.fail(message="Could not find upgrade relation.") + return + + if not self.charm.unit.is_leader(): + event.fail(message="Action must be ran on the Juju leader.") + return + + if not self.upgrade_stack: + event.fail(message="Nothing to resume, upgrade stack unset.") + return + + # Check whether this is being run after juju refresh was called + # (the size of the upgrade stack should match the number of total + # unit minus one). + if len(self.upgrade_stack) != len(self.peer_relation.units): + event.fail(message="Upgrade can be resumed only once after juju refresh is called.") + return + + try: + next_partition = self.upgrade_stack[-1] + self._set_rolling_update_partition(partition=next_partition) + event.set_results({"message": f"Upgrade will resume on unit {next_partition}"}) + except KubernetesClientError: + event.fail(message="Cannot set rolling update partition.") + def _upgrade_supported_check(self) -> None: """Checks if previous versions can be upgraded to new versions. @@ -716,7 +779,9 @@ def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None: return # all units sets state to ready - self.peer_relation.data[self.charm.unit].update({"state": "ready"}) + self.peer_relation.data[self.charm.unit].update( + {"state": "ready" if self.substrate == "vm" else "upgrading"} + ) def on_upgrade_changed(self, event: EventBase) -> None: """Handler for `upgrade-relation-changed` events.""" @@ -765,11 +830,80 @@ def on_upgrade_changed(self, event: EventBase) -> None: # if unit top of stack, emit granted event if self.charm.unit == top_unit and top_state in ["ready", "upgrading"]: logger.debug( - f"{top_unit} is next to upgrade, emitting `upgrade_granted` event and upgrading..." + f"{top_unit.name} is next to upgrade, emitting `upgrade_granted` event and upgrading..." ) self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) getattr(self.on, "upgrade_granted").emit() - @abstractmethod def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: """Handler for `upgrade-granted` events.""" + # don't raise if k8s substrate, only return + if self.substrate == "k8s": + return + + raise NotImplementedError + + def _on_upgrade_finished(self, _) -> None: + """Handler for `upgrade-finished` events.""" + if self.substrate == "vm" or not self.peer_relation: + return + + # Emit the upgrade relation changed event in the leader to update the upgrade_stack. + if self.charm.unit.is_leader(): + self.charm.on[self.relation_name].relation_changed.emit( + self.model.get_relation(self.relation_name) + ) + + # This hook shouldn't run for the last unit (the first that is upgraded). For that unit it + # should be done through an action after the upgrade success on that unit is double-checked. + unit_number = int(self.charm.unit.name.split("/")[1]) + if unit_number == len(self.peer_relation.units): + logger.info( + f"{self.charm.unit.name} unit upgraded. Evaluate and run `resume-upgrade` action to continue upgrade" + ) + return + + # Also, the hook shouldn't run for the first unit (the last that is upgraded). + if unit_number == 0: + logger.info(f"{self.charm.unit.name} unit upgraded. Upgrade is complete") + return + + try: + # Use the unit number instead of the upgrade stack to avoid race conditions + # (i.e. the leader updates the upgrade stack after this hook runs). + next_partition = unit_number - 1 + logger.debug(f"Set rolling update partition to unit {next_partition}") + self._set_rolling_update_partition(partition=next_partition) + except KubernetesClientError: + logger.exception("Cannot set rolling update partition") + self.set_unit_failed() + self.log_rollback_instructions() + + def _set_rolling_update_partition(self, partition: int) -> None: + """Patch the StatefulSet's `spec.updateStrategy.rollingUpdate.partition`. + + Args: + partition: partition to set. + + K8s only. It should decrement the rolling update strategy partition by using a code + like the following: + + from lightkube.core.client import Client + from lightkube.core.exceptions import ApiError + from lightkube.resources.apps_v1 import StatefulSet + + try: + patch = {"spec": {"updateStrategy": {"rollingUpdate": {"partition": partition}}}} + Client().patch(StatefulSet, name=self.charm.model.app.name, namespace=self.charm.model.name, obj=patch) + logger.debug(f"Kubernetes StatefulSet partition set to {partition}") + except ApiError as e: + if e.status.code == 403: + cause = "`juju trust` needed" + else: + cause = str(e) + raise KubernetesClientError("Kubernetes StatefulSet patch failed", cause) + """ + if self.substrate == "vm": + return + + raise NotImplementedError diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index a5344f78..ea1e333e 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -13,6 +13,7 @@ BaseModel, DataUpgrade, DependencyModel, + KubernetesClientError, VersionError, build_complete_sem_ver, verify_caret_requirements, @@ -33,6 +34,8 @@ GANDALF_ACTIONS = """ pre-upgrade-check: description: "YOU SHALL NOT PASS" +resume-upgrade: + description: "LET IT PASS" """ GANDALF_DEPS = { @@ -376,6 +379,8 @@ def _on_upgrade_granted(self, _): with pytest.raises(TypeError): GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel) + +def test_on_upgrade_granted_raises_not_implemented_vm(harness, mocker): # missing on-upgrade-granted class GandalfUpgrade(DataUpgrade): def pre_upgrade_check(self): @@ -384,8 +389,23 @@ def pre_upgrade_check(self): def log_rollback_instructions(self): pass - with pytest.raises(TypeError): - GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel) + gandalf = GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel) + with pytest.raises(NotImplementedError): + mock_event = mocker.MagicMock() + gandalf._on_upgrade_granted(mock_event) + + +def test_on_upgrade_granted_succeeds_k8s(harness, mocker): + class GandalfUpgrade(DataUpgrade): + def pre_upgrade_check(self): + pass + + def log_rollback_instructions(self): + pass + + gandalf = GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel, substrate="k8s") + mock_event = mocker.MagicMock() + gandalf._on_upgrade_granted(mock_event) def test_data_upgrade_succeeds(harness): @@ -403,6 +423,17 @@ def test_build_upgrade_stack_succeeds_k8s(harness): gandalf.build_upgrade_stack() +def test_set_rolling_update_partition_succeeds_vm(harness): + gandalf = GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel) + gandalf._set_rolling_update_partition(0) + + +def test_set_rolling_update_partition_raises_not_implemented_k8s(harness): + gandalf = GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel, substrate="k8s") + with pytest.raises(NotImplementedError): + gandalf._set_rolling_update_partition(0) + + def test_set_unit_failed_resets_stack(harness): gandalf = GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel, substrate="k8s") harness.add_relation("upgrade", "gandalf") @@ -416,18 +447,25 @@ def test_set_unit_failed_resets_stack(harness): assert not gandalf._upgrade_stack -def test_set_unit_completed_resets_stack(harness): - gandalf = GandalfUpgrade(charm=harness.charm, dependency_model=GandalfModel, substrate="k8s") +@pytest.mark.parametrize("substrate,upgrade_finished_call_count", [("vm", 0), ("k8s", 1)]) +def test_set_unit_completed_resets_stack(harness, mocker, substrate, upgrade_finished_call_count): + gandalf = GandalfUpgrade( + charm=harness.charm, dependency_model=GandalfModel, substrate=substrate + ) harness.add_relation("upgrade", "gandalf") gandalf._upgrade_stack = ["1", "2", "3"] harness.set_leader(True) assert gandalf._upgrade_stack + upgrade_finished_spy = mocker.spy(gandalf, "_on_upgrade_finished") + gandalf.set_unit_completed() assert not gandalf._upgrade_stack + assert upgrade_finished_spy.call_count == upgrade_finished_call_count + def test_upgrade_created_sets_idle_and_deps(harness): gandalf_model = GandalfModel(**GANDALF_DEPS) @@ -542,7 +580,76 @@ def test_pre_upgrade_check_action_builds_upgrade_stack_k8s(harness, mocker): assert json.loads(relation_stack) == [0, 1] -def test_upgrade_suppported_check_fails(harness): +def test_resume_upgrade_action_fails_non_leader(harness, mocker): + gandalf_model = GandalfModel(**GANDALF_DEPS) + harness.charm.upgrade = GandalfUpgrade( + charm=harness.charm, dependency_model=gandalf_model, substrate="k8s" + ) + harness.add_relation("upgrade", "gandalf") + + mock_event = mocker.MagicMock() + harness.charm.upgrade._on_resume_upgrade_action(mock_event) + + mock_event.fail.assert_called_once() + + +def test_resume_upgrade_action_fails_without_upgrade_stack(harness, mocker): + gandalf_model = GandalfModel(**GANDALF_DEPS) + harness.charm.upgrade = GandalfUpgrade( + charm=harness.charm, dependency_model=gandalf_model, substrate="k8s" + ) + harness.add_relation("upgrade", "gandalf") + + harness.set_leader(True) + + mock_event = mocker.MagicMock() + harness.charm.upgrade._on_resume_upgrade_action(mock_event) + + mock_event.fail.assert_called_once() + + +@pytest.mark.parametrize( + "upgrade_stack, has_k8s_error, has_succeeded", + [([0], False, False), ([0, 1, 2], False, False), ([0, 1], False, True), ([0, 1], True, False)], +) +def test_resume_upgrade_action_succeeds_only_when_ran_at_the_right_moment( + harness, mocker, upgrade_stack, has_k8s_error, has_succeeded +): + class GandalfUpgrade(DataUpgrade): + def pre_upgrade_check(self): + pass + + def log_rollback_instructions(self): + pass + + def _set_rolling_update_partition(self, partition: int): + if has_k8s_error: + raise KubernetesClientError("fake message", "fake cause") + + gandalf_model = GandalfModel(**GANDALF_DEPS) + harness.charm.upgrade = GandalfUpgrade( + charm=harness.charm, dependency_model=gandalf_model, substrate="k8s" + ) + harness.add_relation("upgrade", "gandalf") + for number in range(1, 3): + harness.add_relation_unit(harness.charm.upgrade.peer_relation.id, f"gandalf/{number}") + + harness.set_leader(True) + + harness.update_relation_data( + harness.charm.upgrade.peer_relation.id, + "gandalf", + {"upgrade-stack": json.dumps(upgrade_stack)}, + ) + + mock_event = mocker.MagicMock() + harness.charm.upgrade._on_resume_upgrade_action(mock_event) + + assert mock_event.fail.call_count == (0 if has_succeeded else 1) + assert mock_event.set_results.call_count == (1 if has_succeeded else 0) + + +def test_upgrade_supported_check_fails(harness): bad_deps = { "gandalf_the_white": { "dependencies": {"gandalf_the_grey": ">5"}, @@ -565,7 +672,7 @@ def test_upgrade_suppported_check_fails(harness): harness.charm.upgrade._upgrade_supported_check() -def test_upgrade_suppported_check_succeeds(harness): +def test_upgrade_supported_check_succeeds(harness): good_deps = { "gandalf_the_white": { "dependencies": {"gandalf_the_grey": ">5"}, @@ -633,9 +740,10 @@ def test_upgrade_charm_runs_checks_on_leader(harness, mocker): harness.charm.upgrade._upgrade_supported_check.assert_called_once() -def test_upgrade_charm_sets_ready(harness, mocker): +@pytest.mark.parametrize("substrate,state", [("vm", "ready"), ("k8s", "upgrading")]) +def test_upgrade_charm_sets_right_state(harness, mocker, substrate, state): harness.charm.upgrade = GandalfUpgrade( - charm=harness.charm, dependency_model=GandalfModel(**GANDALF_DEPS), substrate="k8s" + charm=harness.charm, dependency_model=GandalfModel(**GANDALF_DEPS), substrate=substrate ) harness.add_relation("upgrade", "gandalf") harness.charm.upgrade.peer_relation.data[harness.charm.unit].update({"state": "idle"}) @@ -645,7 +753,7 @@ def test_upgrade_charm_sets_ready(harness, mocker): mocker.patch.object(harness.charm.upgrade, "_upgrade_supported_check") harness.charm.on.upgrade_charm.emit() - assert harness.charm.upgrade.state == "ready" + assert harness.charm.upgrade.state == state def test_upgrade_changed_fails_unit_if_any_failed(harness, mocker): @@ -748,6 +856,61 @@ def test_upgrade_changed_emits_upgrade_granted_only_if_top_of_stack( assert harness.charm.upgrade.state == post_state +@pytest.mark.parametrize( + "substrate,is_leader,unit_number,call_count,has_k8s_error", + [ + ("vm", False, 1, 0, False), + ("k8s", True, 3, 0, False), + ("k8s", True, 0, 0, False), + ("k8s", True, 1, 1, False), + ("k8s", True, 2, 1, False), + ("k8s", False, 1, 1, False), + ("k8s", False, 1, 1, True), + ], +) +def test_upgrade_finished_calls_set_rolling_update_partition_only_for_right_units_on_k8s( + harness, mocker, substrate, is_leader, unit_number, call_count, has_k8s_error +): + class GandalfUpgrade(DataUpgrade): + def pre_upgrade_check(self): + pass + + def log_rollback_instructions(self): + pass + + def _set_rolling_update_partition(self, partition: int): + if has_k8s_error: + raise KubernetesClientError("fake message", "fake cause") + + def set_unit_failed(self): + pass + + harness.charm.upgrade = GandalfUpgrade( + charm=harness.charm, dependency_model=GandalfModel(**GANDALF_DEPS), substrate=substrate + ) + + with harness.hooks_disabled(): + harness.set_leader(is_leader) + harness.add_relation("upgrade", "gandalf") + harness.charm.unit.name = f"gandalf/{unit_number}" + for number in range(4): + if number != unit_number: + harness.add_relation_unit( + harness.charm.upgrade.peer_relation.id, f"gandalf/{number}" + ) + + set_partition_spy = mocker.spy(harness.charm.upgrade, "_set_rolling_update_partition") + set_unit_failed_spy = mocker.spy(harness.charm.upgrade, "set_unit_failed") + log_rollback_instructions_spy = mocker.spy(harness.charm.upgrade, "log_rollback_instructions") + + mock_event = mocker.MagicMock() + harness.charm.upgrade._on_upgrade_finished(mock_event) + + assert set_partition_spy.call_count == call_count + assert set_unit_failed_spy.call_count == (1 if has_k8s_error else 0) + assert log_rollback_instructions_spy.call_count == (1 if has_k8s_error else 0) + + def test_upgrade_changed_recurses_on_leader_and_clears_stack(harness, mocker): harness.charm.upgrade = GandalfUpgrade( charm=harness.charm, dependency_model=GandalfModel(**GANDALF_DEPS), substrate="k8s"