Skip to content

Commit

Permalink
Add Origin Information to Namespace Endpoint (SOFTWARE-5629)
Browse files Browse the repository at this point in the history
  • Loading branch information
CannonLock committed Jul 26, 2023
1 parent 647a110 commit ddbb929
Showing 1 changed file with 61 additions and 9 deletions.
70 changes: 61 additions & 9 deletions src/stashcache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Literal

from webapp.common import is_null, PreJSON, XROOTD_CACHE_SERVER, XROOTD_ORIGIN_SERVER
from webapp.exceptions import DataError, ResourceNotRegistered, ResourceMissingService
Expand All @@ -25,6 +25,9 @@ def _log_or_raise(suppress_errors: bool, an_exception: BaseException, logmethod=
def _resource_has_cache(resource: Resource) -> bool:
return XROOTD_CACHE_SERVER in resource.service_names

def _resource_has_origin(resource: Resource) -> bool:
return XROOTD_ORIGIN_SERVER in resource.service_names


def _get_resource_with_service(fqdn: Optional[str], service_name: str, topology: Topology,
suppress_errors: bool) -> Optional[Resource]:
Expand Down Expand Up @@ -536,18 +539,30 @@ def get_namespaces_info(global_data: GlobalData) -> PreJSON:
"""
# Helper functions
def _cache_resource_dict(r: Resource):
endpoint = f"{r.fqdn}:8000"
auth_endpoint = f"{r.fqdn}:8443"

def _service_resource_dict(
r: Resource,
service_name: Literal[XROOTD_CACHE_SERVER, XROOTD_ORIGIN_SERVER],
auth_port_default: int,
unauth_port_default: int
):
endpoint = f"{r.fqdn}:{unauth_port_default}"
auth_endpoint = f"{r.fqdn}:{auth_port_default}"
for svc in r.services:
if svc.get("Name") == XROOTD_CACHE_SERVER:
if svc.get("Name") == service_name:
if not is_null(svc, "Details", "endpoint_override"):
endpoint = svc["Details"]["endpoint_override"]
if not is_null(svc, "Details", "auth_endpoint_override"):
auth_endpoint = svc["Details"]["auth_endpoint_override"]
break
return {"endpoint": endpoint, "auth_endpoint": auth_endpoint, "resource": r.name}

def _cache_resource_dict(r: Resource):
return _service_resource_dict(r=r, service_name=XROOTD_CACHE_SERVER, auth_port_default=8443, unauth_port_default=8000)

def _origin_resource_dict(r: Resource):
return _service_resource_dict(r=r, service_name=XROOTD_CACHE_SERVER, auth_port_default=1095, unauth_port_default=1094)

def _namespace_dict(ns: Namespace):
nsdict = {
"path": ns.path,
Expand All @@ -556,33 +571,56 @@ def _namespace_dict(ns: Namespace):
"writebackhost": ns.writeback,
"dirlisthost": ns.dirlist,
"caches": [],
"origins": [],
"credential_generation": get_credential_generation_dict_for_namespace(ns),
}

for cache_name, cache_resource_obj in cache_resource_objs.items():
if (resource_allows_namespace(cache_resource_obj, ns) and
namespace_allows_cache_resource(ns, cache_resource_obj)):
nsdict["caches"].append(cache_resource_dicts[cache_name])

nsdict["caches"].sort(key=lambda d: d["resource"])

for origin_name, origin_resource_obj in origin_resource_objs.items():
if (resource_allows_namespace(origin_resource_obj, ns) and
namespace_allows_origin_resource(ns, origin_resource_obj)):
nsdict["origins"].append(origin_resource_dicts[origin_name])

nsdict["origins"].sort(key=lambda d: d["resource"])

return nsdict

def _resource_has_downed_cache(r: Resource, t: Topology):
def _resource_has_downed_service(
r: Resource,
t: Topology,
service_name: Literal[XROOTD_CACHE_SERVER, XROOTD_ORIGIN_SERVER]
):
if r.name not in t.present_downtimes_by_resource:
return False
downtimes = t.present_downtimes_by_resource[r.name]
for dt in downtimes:
try:
if XROOTD_CACHE_SERVER in dt.service_names:
if service_name in dt.service_names:
return True
except (KeyError, AttributeError):
continue
return False

def _resource_has_downed_cache(r: Resource, t: Topology):
return _resource_has_downed_service(r, t, XROOTD_CACHE_SERVER)

def _resource_has_downed_origin(r: Resource, t: Topology):
return _resource_has_downed_service(r, t, XROOTD_ORIGIN_SERVER)

# End helper functions

topology = global_data.get_topology()
resource_groups: List[ResourceGroup] = topology.get_resource_group_list()
vos_data = global_data.get_vos_data()

# Build a dict of cache resources

cache_resource_objs = {} # type: Dict[str, Resource]
cache_resource_dicts = {} # type: Dict[str, Dict]

Expand All @@ -595,12 +633,26 @@ def _resource_has_downed_cache(r: Resource, t: Topology):
cache_resource_objs[resource.name] = resource
cache_resource_dicts[resource.name] = _cache_resource_dict(resource)

# Build a dict of origin resources

origin_resource_objs = {} # type: Dict[str, Resource]
origin_resource_dicts = {} # type: Dict[str, Dict]

for group in resource_groups:
for resource in group.resources:
if (_resource_has_origin(resource)
and resource.is_active
and not _resource_has_downed_origin(resource, topology)
):
origin_resource_objs[resource.name] = resource
origin_resource_dicts[resource.name] = _origin_resource_dict(resource)

result_namespaces = []
for stashcache_obj in vos_data.stashcache_by_vo_name.values():
for namespace in stashcache_obj.namespaces.values():
result_namespaces.append(_namespace_dict(namespace))

return PreJSON({
"caches": list(cache_resource_dicts.values()),
"namespaces": result_namespaces
"caches": sorted(list(cache_resource_dicts.values()), key=lambda x: x["resource"]),
"namespaces": sorted(result_namespaces, key=lambda x: x["path"])
})

0 comments on commit ddbb929

Please sign in to comment.