-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use charmcraft credentials to ensure tracks exist
Signed-off-by: Adam Dyess <[email protected]>
- Loading branch information
Showing
4 changed files
with
113 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/home/addyess/git/charmed-kubernetes/jenkins/cilib/ch.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,96 +1,155 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import argparse | ||
import base64 | ||
import logging | ||
import json | ||
import os | ||
import subprocess | ||
import sys | ||
import re | ||
import urllib.request | ||
|
||
from typing import Optional | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
def _charmcraft_auth_to_macaroon(charmcraft_auth: str) -> Optional[str]: | ||
def _base64_json(creds: str) -> Optional[str]: | ||
"""Decode charmcraft auth into the macaroon.""" | ||
try: | ||
bytes = base64.b64decode(charmcraft_auth.strip().encode()) | ||
bytes = base64.b64decode(creds.strip().encode()) | ||
return json.loads(bytes).get("v") | ||
except (base64.binascii.Error, json.JSONDecodeError): | ||
except (base64.binascii.Error, json.JSONDecodeError, UnicodeDecodeError) as e: | ||
log.warning("Failed to decode base64 json: %s", e) | ||
return None | ||
|
||
|
||
def _request(url: str, authorization: str = None): | ||
"""Create a request with the appropriate macaroon.""" | ||
return urllib.request.Request( | ||
url, | ||
method="GET", | ||
headers={ | ||
"Authorization": authorization, | ||
"Accept": "application/json", | ||
"Content-Type": "application/json", | ||
}, | ||
) | ||
|
||
|
||
def _track_or_channel(channel: str): | ||
"""Get the track from a channel.""" | ||
return channel.split("/")[0] if "/" in channel else channel | ||
|
||
|
||
def macaroon(): | ||
"""Get the charmhub macaroon.""" | ||
macaroon = os.getenv("CHARM_MACAROON", "") | ||
if not macaroon and (charmcraft_auth := os.getenv("CHARMCRAFT_AUTH")): | ||
macaroon = _charmcraft_auth_to_macaroon(charmcraft_auth) | ||
def _save_auth_header(auth_header: str) -> str: | ||
"""Save the macaroon for later use.""" | ||
os.environ["CH_AUTH_HEADER"] = auth_header | ||
return auth_header | ||
|
||
|
||
def _load_auth_header() -> Optional[str]: | ||
"""Load the macaroon from the environment.""" | ||
return os.getenv("CH_AUTH_HEADER", None) | ||
|
||
|
||
def charmhub_auth_header(): | ||
"""Get the authentication macaroon.""" | ||
if macaroon := _load_auth_header(): | ||
log.debug("Reusing existing auth header") | ||
return macaroon | ||
if charmcraft_auth := os.getenv("CHARMCRAFT_AUTH"): | ||
log.debug("Trying to use env CHARMCRAFT_AUTH to get macaroon...") | ||
macaroon = _base64_json(charmcraft_auth) | ||
if not macaroon: | ||
log.debug("Trying to use 'charmcraft login' to get macaroon...") | ||
out = subprocess.run( | ||
["charmcraft", "login", "--export", "/dev/fd/2"], | ||
stderr=subprocess.PIPE, | ||
text=True, | ||
check=True, | ||
) | ||
macaroon = _charmcraft_auth_to_macaroon(out.stderr.splitlines()[-1]) | ||
macaroon = _base64_json(out.stderr.splitlines()[-1]) | ||
if not macaroon: | ||
raise ValueError("No charmhub macaroon found") | ||
os.environ["CHARM_MACAROON"] = macaroon | ||
return macaroon | ||
log.error("Cannot load charmcraft macaroon") | ||
raise ValueError("No macaroon found -- Cannot authenticate") | ||
if not isinstance(macaroon, str): | ||
log.error("Macaroon wasn't a str") | ||
raise ValueError("Invalid macaroon found -- Cannot authenticate") | ||
return _save_auth_header(f"Macaroon {macaroon}") | ||
|
||
|
||
def request(url: str): | ||
"""Create a request with the appropriate macaroon.""" | ||
return urllib.request.Request( | ||
url, | ||
method="GET", | ||
headers={ | ||
"Authorization": f"Macaroon {macaroon()}", | ||
"Content-Type": "application/json", | ||
}, | ||
) | ||
|
||
|
||
def info(charm: str): | ||
"""Get charm info.""" | ||
req = request(f"https://api.charmhub.io/v1/charm/{charm}") | ||
def info(kind: str, name: str): | ||
"""Get entity info.""" | ||
req = _request(f"https://api.charmhub.io/v1/{kind}/{name}", charmhub_auth_header()) | ||
with urllib.request.urlopen(req) as resp: | ||
if 200 <= resp.status < 300: | ||
log.debug(f"Got charm info for {charm}") | ||
return json.loads(resp.read()) | ||
raise ValueError(f"Failed to get charm info for {charm}: {resp.status}") | ||
log.debug("Received info for %s '%s'", kind, name) | ||
return json.loads(resp.read()) | ||
|
||
|
||
def create_track(charm: str, track_or_channel: str): | ||
"""Create a track for a charm.""" | ||
req = request(f"https://api.charmhub.io/v1/charm/{charm}/tracks") | ||
def create_track(kind: str, name: str, track_or_channel: str): | ||
"""Create a track for an entity.""" | ||
req = _request( | ||
f"https://api.charmhub.io/v1/{kind}/{name}/tracks", charmhub_auth_header() | ||
) | ||
req.method = "POST" | ||
track = _track_or_channel(track_or_channel) | ||
req.data = json.dumps([{"name": track}]).encode() | ||
with urllib.request.urlopen(req) as resp: | ||
if 200 <= resp.status < 300: | ||
log.info(f"Track {track} created for charm {charm}") | ||
return | ||
raise ValueError(f"Failed to create track {track} for charm {charm}: {resp.read()}") | ||
with urllib.request.urlopen(req): | ||
log.info("Track %-10s created for %5s %s", track, kind, name) | ||
return | ||
|
||
|
||
def ensure_track(charm: str, track_or_channel: str): | ||
"""Ensure a track exists for a charm.""" | ||
charm_info = info(charm) | ||
def ensure_track(kind: str, name: str, track_or_channel: str): | ||
"""Ensure a track exists for a named entity.""" | ||
entity_info = info(kind, name) | ||
track = _track_or_channel(track_or_channel) | ||
charm_tracks = [t["name"] for t in charm_info["metadata"]["tracks"]] | ||
if track in charm_tracks: | ||
log.info(f"Track {track} already exists for charm {charm}") | ||
tracks = [t["name"] for t in entity_info["metadata"]["tracks"]] | ||
if track in tracks: | ||
log.info("Track %-10s exists for %5s %s", track, kind, name) | ||
return | ||
patterns = [t["pattern"] for t in charm_info["metadata"]["track-guardrails"]] | ||
patterns = [t["pattern"] for t in entity_info["metadata"]["track-guardrails"]] | ||
if not any(re.compile(f"^{pattern}$").match(track) for pattern in patterns): | ||
raise ValueError( | ||
f"Track {track} does not match any guardrails for charm {charm}" | ||
f"Track {track} does not match any guardrails for {kind} {name}" | ||
) | ||
return create_track(kind, name, track) | ||
|
||
|
||
return create_track(charm, track) | ||
def ensure_charm_track(charm: str, track: str): | ||
"""Ensure a track exists for a charm.""" | ||
return ensure_track("charm", charm, track) | ||
|
||
|
||
def ensure_snap_track(snap: str, track: str): | ||
"""Ensure a track exists for a snap.""" | ||
return ensure_track("snap", snap, track) | ||
|
||
|
||
def main(): | ||
FORMAT = "%(name)s: %(asctime)s %(levelname)8s - %(message)s" | ||
logging.basicConfig(format=FORMAT) | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("kind", help="type of the entity", choices=["charm", "snap"]) | ||
parser.add_argument("name", help="name of the entity") | ||
parser.add_argument("track", help="track to ensure") | ||
parser.add_argument( | ||
"-l", | ||
"--log", | ||
dest="loglevel", | ||
default="INFO", | ||
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | ||
help="Set the logging level", | ||
) | ||
args = parser.parse_args() | ||
if args.loglevel: | ||
log.setLevel(level=args.loglevel.upper()) | ||
ensure_track(args.kind, args.name, args.track) | ||
|
||
|
||
execd = __name__ == "__main__" | ||
logger_name = sys.argv[0] if execd else __name__ | ||
log = logging.getLogger(logger_name) | ||
if execd: | ||
main() | ||
else: | ||
log.setLevel(logging.DEBUG) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters