Skip to content

Commit

Permalink
Merge pull request #197 from noqdev/bug/en-1824-keyboard-buffer
Browse files Browse the repository at this point in the history
EN-1824 EN-1820: Clear keyboard buffer before questionary prompts
  • Loading branch information
castrapel authored Mar 6, 2023
2 parents ebc4a55 + d7fcff3 commit d1c1b36
Showing 1 changed file with 96 additions and 37 deletions.
133 changes: 96 additions & 37 deletions iambic/config/wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import asyncio
import contextlib
import functools
import os
import re
import select
import sys
from textwrap import dedent
from typing import Union

import boto3
Expand Down Expand Up @@ -71,6 +74,50 @@
)


def clear_stdin_buffer():
"""Clears the standard input (stdin) buffer.
This function reads and discards any input that may be present in the standard
input (stdin) buffer. This can be useful in cases where previous input may be
interfering with the desired behavior of a subsequent input operation.
Args:
None
Returns:
None
"""
# Warning: This appears to work fine in a real terminal,
# but not VSCode's Debug Terminal

r, _, _ = select.select([sys.stdin], [], [], 0)
while r:
# If there is input waiting, read and discard it.
sys.stdin.readline()
r, _, _ = select.select([sys.stdin], [], [], 0)


def monkeypatch_questionary():
"""Monkeypatches the questionary functions in use to clear stdin buffer."""
original_functions = {
"prompt": questionary.prompt,
"ask": questionary.Question.ask,
"unsafe_ask": questionary.Question.unsafe_ask,
"select": questionary.select,
}

def patched_function(original_function):
@functools.wraps(original_function)
def wrapper(*args, **kwargs):
clear_stdin_buffer()
return original_function(*args, **kwargs)

return wrapper

for function_name, original_function in original_functions.items():
setattr(questionary, function_name, patched_function(original_function))


def set_aws_region(question_text: str, default_val: Union[str, RegionName]) -> str:
default_val = default_val if isinstance(default_val, str) else default_val.value
choices = [default_val] + [e.value for e in RegionName if e.value != default_val]
Expand Down Expand Up @@ -231,11 +278,10 @@ def __init__(self, repo_dir: str):
if not self.hub_account_id:
while True:
self.hub_account_id = set_required_text_value(
"What is the Account ID where you would like to deploy the IAMbic hub role?\n"
"This is the account that will be used to assume into all other accounts by IAMbic.\n"
"If you have an AWS Organization, that would be your hub account.\n"
"However, if you are just trying IAMbic out, you can provide any account.\n"
"Just be sure to delete all IAMbic stacks when/if you decide to use a different account as your hub.",
"To get started with the IAMbic setup wizard, you'll need an AWS account.\n"
"This is where IAMbic will deploy its main role. If you have an AWS Organization, "
"that account will be your hub account.\n"
"Which Account ID should we use to deploy the IAMbic hub role?",
default_val=default_hub_account_id,
)
if is_valid_account_id(self.hub_account_id):
Expand Down Expand Up @@ -337,12 +383,13 @@ def set_aws_profile_name(
available_profiles.insert(0, "None")

if not question_text:
question_text = (
f"Unable to detect default AWS credentials or "
f"they are not for the Hub Account ({self.hub_account_id}).\n"
f"Please specify the profile to use with access to the Hub Account.\n"
f"This identity will require the ability to create "
f"CloudFormation stacks, stack sets, and stack set instances."
question_text = dedent(
f"""
We couldn't find your AWS credentials, or they're not linked to the Hub Account ({self.hub_account_id}).
The specified AWS credentials need to be able to create CloudFormation stacks, stack sets,
and stack set instances.
Please provide an AWS profile to use for this operation, or restart the wizard with valid AWS credentials: """
)

try:
Expand Down Expand Up @@ -373,34 +420,43 @@ def set_aws_profile_name(

def set_boto3_session(self):
self._has_cf_permissions = True
profile_name = self.set_aws_profile_name()
self.boto3_session = boto3.Session(
profile_name=profile_name, region_name=self.default_region
)
try:
self.caller_identity = self.boto3_session.client(
"sts"
).get_caller_identity()
selected_hub_account_id = self.caller_identity.get("Arn").split(":")[4]
if selected_hub_account_id != self.hub_account_id:
log.error(
"The selected profile does not have access to the Hub Account. Please try again.",
required_account_id=self.hub_account_id,
selected_account_id=selected_hub_account_id,
while True:
try:
profile_name = self.set_aws_profile_name()
self.boto3_session = boto3.Session(
profile_name=profile_name, region_name=self.default_region
)
self.set_boto3_session()
except botocore.exceptions.ClientError as err:
log.info(
"Unable to create a session for the provided profile name. Please try again.",
error=str(err),
)
self.set_boto3_session()
self.caller_identity = self.boto3_session.client(
"sts"
).get_caller_identity()
selected_hub_account_id = self.caller_identity.get("Arn").split(":")[4]
if selected_hub_account_id != self.hub_account_id:
log.error(
"The selected profile does not have access to the Hub Account. Please try again.",
required_account_id=self.hub_account_id,
selected_account_id=selected_hub_account_id,
)
continue
except botocore.exceptions.ClientError as err:
log.info(
"Unable to create a session for the provided profile name. Please try again.",
error=str(err),
)
continue

self.profile_name = profile_name
with contextlib.suppress(ClientError, NoCredentialsError):
self.autodetected_org_settings = self.boto3_session.client(
"organizations"
).describe_organization()["Organization"]
except botocore.exceptions.ProfileNotFound as err:
log.info(
"Selected profile doesn't exist. Please try again.",
error=str(err),
)
continue

self.profile_name = profile_name
with contextlib.suppress(ClientError, NoCredentialsError):
self.autodetected_org_settings = self.boto3_session.client(
"organizations"
).describe_organization()["Organization"]
break

def get_boto3_session_for_account(self, account_id: str):
if account_id == self.hub_account_id:
Expand Down Expand Up @@ -1307,3 +1363,6 @@ def run(self): # noqa: C901
)
except KeyboardInterrupt:
...


monkeypatch_questionary()

0 comments on commit d1c1b36

Please sign in to comment.