Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into improve-cli-structure
Browse files Browse the repository at this point in the history
  • Loading branch information
agileshaw committed Dec 13, 2023
2 parents 1f3a0c4 + 6e82a8d commit e99b0d3
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 124 deletions.
66 changes: 30 additions & 36 deletions cou/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from signal import SIGINT, SIGTERM
from typing import Optional

from aioconsole import ainput
from juju.errors import JujuError

from cou.commands import parse_args
Expand All @@ -32,10 +31,9 @@
from cou.steps.analyze import Analysis
from cou.steps.execute import apply_step
from cou.steps.plan import generate_plan, manually_upgrade_data_plane
from cou.utils import progress_indicator
from cou.utils import print_and_debug, progress_indicator, prompt_input
from cou.utils.cli import interrupt_handler
from cou.utils.juju_utils import COUModel
from cou.utils.text_styler import bold, normal

AVAILABLE_OPTIONS = "cas"

Expand Down Expand Up @@ -74,17 +72,6 @@ def _missing_(cls, value: object) -> Enum:
raise ValueError(f"{value} is not a valid member of VerbosityLevel.")


def prompt_message(parameter: str) -> str:
"""Generate eye-catching prompt.
:param parameter: String to show at the prompt with the user options.
:type parameter: str
:return: Prompt string with the user options.
:rtype: str
"""
return normal("\n" + parameter + " (") + bold("y") + normal("/") + bold("N") + normal("): ")


def get_log_level(quiet: bool = False, verbosity: int = 0) -> str:
"""Get a log level based on input options.
Expand All @@ -100,6 +87,28 @@ def get_log_level(quiet: bool = False, verbosity: int = 0) -> str:
return VerbosityLevel(verbosity).name


async def continue_upgrade() -> bool:
"""Determine whether to continue with the upgrade based on user input.
:return: Boolean value indicate whether to continue with the upgrade.
:rtype: bool
"""
input_value = await prompt_input(
["Would you like to start the upgrade?", "Continue"], separator=" ", default="n"
)

match input_value:
case "y" | "yes":
logger.info("Start the upgrade.")
return True
case "n" | "no":
logger.info("Exiting COU without running upgrades.")
case _:
print_and_debug("No valid input provided! Exiting COU without upgrades.")

return False


async def analyze_and_plan(
model_name: Optional[str], backup_database: bool
) -> tuple[Analysis, UpgradePlan]:
Expand Down Expand Up @@ -139,12 +148,11 @@ async def get_upgrade_plan(model_name: Optional[str], backup_database: bool) ->
:type backup_database: bool
"""
analysis_result, upgrade_plan = await analyze_and_plan(model_name, backup_database)
logger.debug(upgrade_plan)
print(upgrade_plan) # print plan to console even in quiet mode
print_and_debug(upgrade_plan)
manually_upgrade_data_plane(analysis_result)
print(
"Please note that the actually upgrade steps could be different "
"because the plan will be re-calculated at upgrade time."
"Please note that the actually upgrade steps could be different if the cloud state "
"changes because the plan will be re-calculated at upgrade time."
)


Expand All @@ -166,24 +174,10 @@ async def run_upgrade(
:type quiet: bool
"""
analysis_result, upgrade_plan = await analyze_and_plan(model_name, backup_database)
logger.debug(upgrade_plan)
print(upgrade_plan)

if prompt:
prompt_input = (
await ainput(prompt_message("Would you like to start the upgrade?"))
).casefold()

match prompt_input:
case "y":
logger.info("Start the upgrade.")
case "n":
logger.info("Exiting COU without running upgrades.")
return
case _:
print("No valid input provided! Exiting COU without upgrades.")
logger.debug("No valid input provided! Exiting COU without upgrades.")
return
print_and_debug(upgrade_plan)

if prompt and not await continue_upgrade():
return

# NOTE(rgildein): add handling upgrade plan canceling for SIGINT (ctrl+c) and SIGTERM
loop = asyncio.get_event_loop()
Expand Down
35 changes: 7 additions & 28 deletions cou/steps/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,14 @@
import logging
import sys

from aioconsole import ainput

from cou.steps import ApplicationUpgradePlan, BaseStep, UpgradeStep
from cou.utils import progress_indicator
from cou.utils.text_styler import bold, normal
from cou.utils import print_and_debug, progress_indicator, prompt_input

AVAILABLE_OPTIONS = ["y", "n"]
AVAILABLE_OPTIONS = ["y", "yes", "n", "no"]

logger = logging.getLogger(__name__)


def prompt_message(parameter: str) -> str:
"""Generate eye-catching prompt.
:param parameter: String to show at the prompt with the user options.
:type parameter: str
:return: Prompt string with the user options.
:rtype: str
"""
return (
normal("\n" + parameter + "\nContinue (")
+ bold("y")
+ normal("/")
+ bold("n")
+ normal("): ")
)


async def _run_step(step: BaseStep, prompt: bool, overwrite_progress: bool = False) -> None:
"""Run a step and all its sub-steps.
Expand Down Expand Up @@ -110,19 +90,18 @@ async def apply_step(step: BaseStep, prompt: bool, overwrite_progress: bool = Fa
description_to_prompt = str(step)

result = ""
while result.casefold() not in AVAILABLE_OPTIONS:
while result not in AVAILABLE_OPTIONS:
if not prompt or not step.prompt:
result = "y"
else:
result = (await ainput(prompt_message(description_to_prompt))).casefold()
result = await prompt_input([description_to_prompt, "Continue"])

match result:
case "y":
case "y" | "yes":
logger.info("Running: %s", step.description)
await _run_step(step, prompt, overwrite_progress)
case "n":
case "n" | "no":
logger.info("Aborting plan")
sys.exit(1)
case _:
print("No valid input provided!")
logger.debug("No valid input provided!")
print_and_debug("No valid input provided!")
53 changes: 53 additions & 0 deletions cou/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,63 @@

"""Utilities for charmed-openstack-upgrader."""

import inspect
import logging
import os
from pathlib import Path
from typing import Any, Optional

from aioconsole import ainput
from halo import Halo

from cou.utils.text_styler import bold, normal

COU_DATA = Path(f"/home/{os.getenv('USER')}/.local/share/cou") if os.getenv("USER") else Path(".")
progress_indicator = Halo(spinner="line", placement="right")


def print_and_debug(message: Any) -> None:
"""Print and log message at debug level.
:param message: The object to print and log.
:type message: Any
"""
print(message)

logger = logging.getLogger(inspect.stack()[1].filename)
logger.debug(message)


async def prompt_input(
text_list: list[str],
separator: str = "\n",
choices: Optional[list[str]] = None,
default: str = "",
) -> str:
"""Generate eye-catching prompt.
:param text_list: List of text to show at the prompt with the user options.
:type text_list: list[str]
:param separator: Separator between each text. Default to newline.
:type separator: str
:param choices: List of options to show at the prompt with the user options. If no value
supplied, 'y' and 'n' will be used by default.
:type choices: Optional[list[str]]
:param default: The default choice if user doesn't a provide valid input.
:type default: str
:return: The input value (if any) or the default choice.
:rtype: str
:raise ValueError: raise ValueError if default choice is invalid
"""
if not choices:
choices = ["y", "n"]

message_str = normal(separator).join(normal(text) for text in text_list)
choices_str = normal("/").join(
bold(choice.upper() if choice == default.casefold() else choice) for choice in choices
)
formatted_message = normal("\n") + message_str + normal(" (") + choices_str + normal("): ")

input_value = await ainput(formatted_message)

return (input_value or default).casefold()
68 changes: 35 additions & 33 deletions tests/unit/steps/test_steps_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
UpgradePlan,
UpgradeStep,
)
from cou.steps.execute import _run_step, apply_step, prompt_message
from cou.steps.execute import _run_step, apply_step


@pytest.mark.asyncio
Expand Down Expand Up @@ -98,70 +98,72 @@ async def test_run_step_no_progress_indicator(mock_progress_indicator, plan):


@pytest.mark.asyncio
@pytest.mark.parametrize("input_value", ["n", "N"])
@patch("cou.steps.execute.ainput")
@pytest.mark.parametrize("input_value", ["n", "no"])
@patch("cou.steps.execute.prompt_input")
@patch("cou.steps.execute._run_step")
async def test_apply_step_abort(mock_run_step, mock_input, input_value):
async def test_apply_step_abort(mock_run_step, mock_prompt_input, input_value):
upgrade_step = AsyncMock(spec_set=UpgradeStep())
upgrade_step.description = "Test Step"
mock_input.return_value = input_value
mock_prompt_input.return_value = input_value

with pytest.raises(SystemExit):
await apply_step(upgrade_step, True)

mock_input.assert_awaited_once_with(prompt_message("Test Step"))
mock_prompt_input.assert_awaited_once_with(["Test Step", "Continue"])
mock_run_step.assert_not_awaited()


@pytest.mark.asyncio
@patch("cou.steps.execute.ainput")
@pytest.mark.parametrize("input_value", ["y", "yes"])
@patch("cou.steps.execute.prompt_input")
@patch("cou.steps.execute._run_step")
async def test_apply_step_no_prompt(mock_run_step, mock_input):
async def test_apply_step_continue(mock_run_step, mock_prompt_input, input_value):
upgrade_step = AsyncMock(spec_set=UpgradeStep())
upgrade_step.description = "Test Step"
mock_prompt_input.return_value = input_value

await apply_step(upgrade_step, False)
await apply_step(upgrade_step, True)

mock_input.assert_not_awaited()
mock_run_step.assert_awaited_once_with(upgrade_step, False, False)
mock_prompt_input.assert_awaited_once_with(["Test Step", "Continue"])
mock_run_step.assert_awaited_once_with(upgrade_step, True, False)


@pytest.mark.asyncio
@pytest.mark.parametrize("input_value", ["y", "Y"])
@patch("cou.steps.execute.ainput")
@patch("cou.steps.execute.prompt_input")
@patch("cou.steps.execute._run_step")
async def test_apply_step_continue(mock_run_step, mock_input, input_value):
async def test_apply_step_non_interactive(mock_run_step, mock_prompt_input):
upgrade_step = AsyncMock(spec_set=UpgradeStep())
upgrade_step.description = "Test Step"
mock_input.return_value = input_value

await apply_step(upgrade_step, True)
await apply_step(upgrade_step, False)

mock_input.assert_awaited_once_with(prompt_message("Test Step"))
mock_run_step.assert_awaited_once_with(upgrade_step, True, False)
mock_prompt_input.assert_not_awaited()
mock_run_step.assert_awaited_once_with(upgrade_step, False, False)


@pytest.mark.asyncio
@patch("cou.steps.execute.ainput")
@patch("cou.steps.execute.prompt_input")
@patch("cou.steps.execute._run_step")
async def test_apply_step_nonsense(mock_run_step, mock_input):
@patch("cou.steps.execute.print_and_debug")
async def test_apply_step_nonsense(mock_print_and_debug, mock_run_step, mock_prompt_input):
upgrade_step = AsyncMock(spec_set=UpgradeStep())
upgrade_step.description = "Test Step"
mock_input.side_effect = ["x", "n"]
mock_prompt_input.side_effect = ["x", "n"]

with pytest.raises(SystemExit, match="1"):
await apply_step(upgrade_step, True)

mock_input.assert_has_awaits(
[call(prompt_message("Test Step")), call(prompt_message("Test Step"))]
mock_prompt_input.assert_has_awaits(
[call(["Test Step", "Continue"]), call(["Test Step", "Continue"])]
)
mock_run_step.assert_not_awaited()
mock_print_and_debug.assert_called_once_with("No valid input provided!")


@pytest.mark.asyncio
@patch("cou.steps.execute.ainput")
@patch("cou.steps.execute.prompt_input")
@patch("cou.steps.execute._run_step")
async def test_apply_application_upgrade_plan(mock_run_step, mock_input):
async def test_apply_application_upgrade_plan(mock_run_step, mock_prompt_input):
expected_prompt = (
"Test plan\n\tTest pre-upgrade step\n\tTest upgrade step\n\t" + "Test post-upgrade step\n"
)
Expand All @@ -172,16 +174,16 @@ async def test_apply_application_upgrade_plan(mock_run_step, mock_input):
PostUpgradeStep(description="Test post-upgrade step", coro=AsyncMock()),
]

mock_input.side_effect = ["y"]
mock_prompt_input.side_effect = ["y"]
await apply_step(upgrade_plan, True)

mock_input.assert_has_awaits([call(prompt_message(expected_prompt))])
mock_prompt_input.assert_awaited_once_with([expected_prompt, "Continue"])


@pytest.mark.asyncio
@patch("cou.steps.execute.ainput")
@patch("cou.steps.execute.prompt_input")
@patch("cou.steps.execute._run_step")
async def test_apply_application_upgrade_plan_no_prompt(mock_run_step, mock_input):
async def test_apply_application_upgrade_plan_non_interactive(mock_run_step, mock_prompt_input):
plan_description = "Test plan"
upgrade_plan = ApplicationUpgradePlan(plan_description)
upgrade_plan.sub_steps = [
Expand All @@ -192,20 +194,20 @@ async def test_apply_application_upgrade_plan_no_prompt(mock_run_step, mock_inpu

await apply_step(upgrade_plan, False)

mock_input.assert_not_awaited()
mock_prompt_input.assert_not_awaited()
mock_run_step.assert_awaited()


@pytest.mark.asyncio
@patch("cou.steps.execute.ainput")
@patch("cou.steps.execute.prompt_input")
@patch("cou.steps.execute._run_step")
async def test_apply_empty_step(mock_run_step, mock_input):
async def test_apply_empty_step(mock_run_step, mock_prompt_input):
# upgrade_plan is empty because it has neither coro nor sub-steps
upgrade_plan = ApplicationUpgradePlan("Test plan")

await apply_step(upgrade_plan, True)

mock_input.assert_not_awaited()
mock_prompt_input.assert_not_awaited()
mock_run_step.assert_not_awaited()


Expand Down
Loading

0 comments on commit e99b0d3

Please sign in to comment.