From 26939f1c029ae6669aae8a30e72de21a9760272c Mon Sep 17 00:00:00 2001 From: SdgJlbl Date: Tue, 11 Jul 2023 12:19:22 +0200 Subject: [PATCH] feat: decouple image builder from worker Signed-off-by: SdgJlbl --- backend/backend/settings/common.py | 1 + backend/builder/__init__.py | 0 backend/builder/apps.py | 5 + backend/builder/image_builder/__init__.py | 0 .../builder/image_builder/image_builder.py | 325 ++++++++++++++++++ .../tests}/test_image_builder.py | 0 .../substrapp/compute_tasks/image_builder.py | 314 +---------------- backend/substrapp/events/reactor.py | 5 + backend/substrapp/tasks/tasks_compute_task.py | 5 +- .../templates/deployment-builder.yaml | 265 ++++++++++++++ charts/substra-backend/values.yaml | 69 +++- 11 files changed, 684 insertions(+), 305 deletions(-) create mode 100644 backend/builder/__init__.py create mode 100644 backend/builder/apps.py create mode 100644 backend/builder/image_builder/__init__.py create mode 100644 backend/builder/image_builder/image_builder.py rename backend/{substrapp/tests/compute_tasks => builder/tests}/test_image_builder.py (100%) create mode 100644 charts/substra-backend/templates/deployment-builder.yaml diff --git a/backend/backend/settings/common.py b/backend/backend/settings/common.py index bc34f2a9c..d8819deb1 100644 --- a/backend/backend/settings/common.py +++ b/backend/backend/settings/common.py @@ -56,6 +56,7 @@ "api", "drf_spectacular", "django_filters", + "builder", ] AUTHENTICATION_BACKENDS = [ diff --git a/backend/builder/__init__.py b/backend/builder/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/builder/apps.py b/backend/builder/apps.py new file mode 100644 index 000000000..966bd8a6e --- /dev/null +++ b/backend/builder/apps.py @@ -0,0 +1,5 @@ +from django.apps import AppConfig + + +class BuilderConfig(AppConfig): + name = "builder" diff --git a/backend/builder/image_builder/__init__.py b/backend/builder/image_builder/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/builder/image_builder/image_builder.py b/backend/builder/image_builder/image_builder.py new file mode 100644 index 000000000..83d801996 --- /dev/null +++ b/backend/builder/image_builder/image_builder.py @@ -0,0 +1,325 @@ +import json +import os +from tempfile import TemporaryDirectory + +import kubernetes +import structlog +from django.conf import settings + +import orchestrator +from substrapp import exceptions +from substrapp.compute_tasks import errors as compute_task_errors +from substrapp.compute_tasks import utils +from substrapp.compute_tasks.compute_pod import Label +from substrapp.compute_tasks.datastore import Datastore +from substrapp.compute_tasks.volumes import get_docker_cache_pvc_name +from substrapp.compute_tasks.volumes import get_worker_subtuple_pvc_name +from substrapp.docker_registry import USER_IMAGE_REPOSITORY +from substrapp.docker_registry import container_image_exists +from substrapp.kubernetes_utils import delete_pod +from substrapp.kubernetes_utils import get_pod_logs +from substrapp.kubernetes_utils import get_security_context +from substrapp.kubernetes_utils import pod_exists +from substrapp.kubernetes_utils import watch_pod +from substrapp.lock_local import lock_resource +from substrapp.models.image_entrypoint import ImageEntrypoint +from substrapp.utils import timeit +from substrapp.utils import uncompress_content + +logger = structlog.get_logger(__name__) + +REGISTRY = settings.REGISTRY +REGISTRY_SCHEME = settings.REGISTRY_SCHEME +NAMESPACE = settings.NAMESPACE +KANIKO_MIRROR = settings.TASK["KANIKO_MIRROR"] +KANIKO_IMAGE = settings.TASK["KANIKO_IMAGE"] +KANIKO_DOCKER_CONFIG_SECRET_NAME = settings.TASK["KANIKO_DOCKER_CONFIG_SECRET_NAME"] +KANIKO_DOCKER_CONFIG_VOLUME_NAME = "docker-config" +CELERY_WORKER_CONCURRENCY = settings.CELERY_WORKER_CONCURRENCY +SUBTUPLE_TMP_DIR = settings.SUBTUPLE_TMP_DIR +MAX_IMAGE_BUILD_TIME = 3 * 60 * 60 # 3 hours +KANIKO_CONTAINER_NAME = "kaniko" +HOSTNAME = settings.HOSTNAME + + +def container_image_tag_from_function(function: orchestrator.Function) -> str: + """builds the container image tag from the function checksum + + Args: + function (orchestrator.Function): an function retrieved from the orchestrator + + Returns: + str: the container image tag + """ + return f"function-{function.function_address.checksum[:16]}" + + +# main entrypoint +# inputs: channel + function +def build_image_if_missing(channel: str, function: orchestrator.Function) -> None: + """ + Build the container image and the ImageEntryPoint entry if they don't exist already + """ + datastore = Datastore(channel=channel) + container_image_tag = utils.container_image_tag_from_function(function) + with lock_resource("image-build", container_image_tag, ttl=MAX_IMAGE_BUILD_TIME, timeout=MAX_IMAGE_BUILD_TIME): + if container_image_exists(container_image_tag): + logger.info("Reusing existing image", image=container_image_tag) + else: + asset_content = datastore.get_function(function) + _build_function_image(asset_content, function) + + +def _build_function_image(asset: bytes, function: orchestrator.Function) -> None: + """ + Build a function's container image. + + Perform multiple steps: + 1. Download the function using the provided asset storage_address/owner. Verify its checksum and uncompress the data + to a temporary folder. + 2. Extract the ENTRYPOINT from the Dockerfile. + 3. Build the container image using Kaniko. + 4. Save the ENTRYPOINT to the DB + """ + + os.makedirs(SUBTUPLE_TMP_DIR, exist_ok=True) + + with TemporaryDirectory(dir=SUBTUPLE_TMP_DIR) as tmp_dir: + # Download source + uncompress_content(asset, tmp_dir) + + # Extract ENTRYPOINT from Dockerfile + entrypoint = _get_entrypoint_from_dockerfile(tmp_dir) + + # Build image + _build_container_image(tmp_dir, utils.container_image_tag_from_function(function)) + + # Save entrypoint to DB if the image build was successful + ImageEntrypoint.objects.get_or_create( + function_checksum=function.function_address.checksum, entrypoint_json=entrypoint + ) + + +def _get_entrypoint_from_dockerfile(dockerfile_dir: str) -> list[str]: + """ + Get entrypoint from ENTRYPOINT in the Dockerfile. + + This is necessary because the user function can have arbitrary names, ie; "myfunction.py". + + Example: + ENTRYPOINT ["python3", "myfunction.py"] + """ + dockerfile_path = f"{dockerfile_dir}/Dockerfile" + + with open(dockerfile_path, "r") as file: + for line in file: + if line.startswith("ENTRYPOINT"): + try: + res = json.loads(line[len("ENTRYPOINT") :]) + except json.JSONDecodeError: + res = None + + if not isinstance(res, list): + raise compute_task_errors.BuildError( + "Invalid ENTRYPOINT in function/metric Dockerfile. " + "You must use the exec form in your Dockerfile. " + "See https://docs.docker.com/engine/reference/builder/#entrypoint" + ) + return res + + raise compute_task_errors.BuildError("Invalid Dockerfile: Cannot find ENTRYPOINT") + + +def _delete_kaniko_pod(create_pod: bool, k8s_client: kubernetes.client.CoreV1Api, pod_name: str) -> str: + logs = "" + if create_pod: + logs = get_pod_logs(k8s_client, pod_name, KANIKO_CONTAINER_NAME, ignore_pod_not_found=True) + delete_pod(k8s_client, pod_name) + logger.info(logs or "", pod_name=pod_name) + return logs + + +@timeit +def _build_container_image(path: str, tag: str) -> None: + _assert_dockerfile_exist(path) + + kubernetes.config.load_incluster_config() + k8s_client = kubernetes.client.CoreV1Api() + + pod_name = _build_pod_name(tag) + + create_pod = not pod_exists(k8s_client, pod_name) + if create_pod: + try: + logger.info("creating pod: building image", namespace=NAMESPACE, pod=pod_name, image=tag) + pod = _build_pod(path, tag) + k8s_client.create_namespaced_pod(body=pod, namespace=NAMESPACE) + except kubernetes.client.ApiException as e: + raise compute_task_errors.BuildRetryError( + f"Error creating pod {NAMESPACE}/{pod_name}. Reason: {e.reason}, status: {e.status}, body: {e.body}" + ) from e + + try: + watch_pod(k8s_client, pod_name) + + except Exception as e: + # In case of concurrent builds, it may fail. Check if the image exists. + if container_image_exists(tag): + logger.warning( + f"Build of container image {tag} failed, probably because it was done by a concurrent build", + exc_info=True, + ) + return + + logs = _delete_kaniko_pod(create_pod, k8s_client, pod_name) + + if isinstance(e, exceptions.PodTimeoutError): + raise compute_task_errors.BuildRetryError(logs) from e + else: # exceptions.PodError or other + raise compute_task_errors.BuildError(logs) from e + + _delete_kaniko_pod(create_pod, k8s_client, pod_name) + + +def _assert_dockerfile_exist(dockerfile_path): + dockerfile_fullpath = os.path.join(dockerfile_path, "Dockerfile") + if not os.path.exists(dockerfile_fullpath): + raise compute_task_errors.BuildError(f"Dockerfile does not exist : {dockerfile_fullpath}") + + +def _build_pod(dockerfile_mount_path: str, image_tag: str) -> kubernetes.client.V1Pod: + pod_name = _build_pod_name(image_tag) + pod_spec = _build_pod_spec(dockerfile_mount_path, image_tag) + return kubernetes.client.V1Pod( + api_version="v1", + kind="Pod", + metadata=kubernetes.client.V1ObjectMeta( + name=pod_name, + labels={ + Label.PodName: pod_name, + Label.PodType: "image-build", + Label.Component: Label.Component_Compute, + }, + ), + spec=pod_spec, + ) + + +def _build_pod_name(image_tag: str) -> str: + dns_1123_compliant_tag = image_tag.split("/")[-1].replace("_", "-") + return f"kaniko-{dns_1123_compliant_tag}" + + +def _build_pod_spec(dockerfile_mount_path: str, image_tag: str) -> kubernetes.client.V1PodSpec: + container = _build_container(dockerfile_mount_path, image_tag) + pod_affinity = _build_pod_affinity() + + cache_pvc_name = ( + settings.WORKER_PVC_DOCKER_CACHE if settings.WORKER_PVC_IS_HOSTPATH else get_docker_cache_pvc_name() + ) + cache = kubernetes.client.V1Volume( + name="cache", + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource(claim_name=cache_pvc_name), + ) + + dockerfile_pvc_name = ( + settings.WORKER_PVC_SUBTUPLE if settings.WORKER_PVC_IS_HOSTPATH else get_worker_subtuple_pvc_name() + ) + dockerfile = kubernetes.client.V1Volume( + name="dockerfile", + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource(claim_name=dockerfile_pvc_name), + ) + + volumes = [cache, dockerfile] + + if KANIKO_DOCKER_CONFIG_SECRET_NAME: + docker_config = kubernetes.client.V1Volume( + name=KANIKO_DOCKER_CONFIG_VOLUME_NAME, + secret=kubernetes.client.V1SecretVolumeSource( + secret_name=KANIKO_DOCKER_CONFIG_SECRET_NAME, + items=[kubernetes.client.V1KeyToPath(key=".dockerconfigjson", path="config.json")], + ), + ) + volumes.append(docker_config) + + return kubernetes.client.V1PodSpec( + restart_policy="Never", affinity=pod_affinity, containers=[container], volumes=volumes + ) + + +def _build_pod_affinity() -> kubernetes.client.V1Affinity: + return kubernetes.client.V1Affinity( + pod_affinity=kubernetes.client.V1PodAffinity( + required_during_scheduling_ignored_during_execution=[ + kubernetes.client.V1PodAffinityTerm( + label_selector=kubernetes.client.V1LabelSelector( + match_expressions=[ + kubernetes.client.V1LabelSelectorRequirement( + key="statefulset.kubernetes.io/pod-name", operator="In", values=[HOSTNAME] + ) + ] + ), + topology_key="kubernetes.io/hostname", + ) + ] + ) + ) + + +def _build_container(dockerfile_mount_path: str, image_tag: str) -> kubernetes.client.V1Container: + # kaniko build can be launched without privilege but + # it needs some capabilities and to be root + # https://github.com/GoogleContainerTools/kaniko/issues/778 + # https://github.com/GoogleContainerTools/kaniko/issues/778#issuecomment-619112417 + # https://github.com/moby/moby/blob/master/oci/caps/defaults.go + # https://man7.org/linux/man-pages/man7/capabilities.7.html + capabilities = ["CHOWN", "SETUID", "SETGID", "FOWNER", "DAC_OVERRIDE", "SETFCAP"] + container_security_context = get_security_context(root=True, capabilities=capabilities) + args = _build_container_args(dockerfile_mount_path, image_tag) + dockerfile_mount_subpath = dockerfile_mount_path.split("/subtuple/")[-1] + + dockerfile = kubernetes.client.V1VolumeMount( + name="dockerfile", mount_path=dockerfile_mount_path, sub_path=dockerfile_mount_subpath, read_only=True + ) + cache = kubernetes.client.V1VolumeMount(name="cache", mount_path="/cache", read_only=True) + volume_mounts = [dockerfile, cache] + + if KANIKO_DOCKER_CONFIG_SECRET_NAME: + docker_config = kubernetes.client.V1VolumeMount( + name=KANIKO_DOCKER_CONFIG_VOLUME_NAME, mount_path="/kaniko/.docker" + ) + volume_mounts.append(docker_config) + + return kubernetes.client.V1Container( + name=KANIKO_CONTAINER_NAME, + image=KANIKO_IMAGE, + command=None, + args=args, + volume_mounts=volume_mounts, + security_context=container_security_context, + ) + + +def _build_container_args(dockerfile_mount_path: str, image_tag: str) -> list[str]: + dockerfile_fullpath = os.path.join(dockerfile_mount_path, "Dockerfile") + args = [ + f"--dockerfile={dockerfile_fullpath}", + f"--context=dir://{dockerfile_mount_path}", + f"--destination={REGISTRY}/{USER_IMAGE_REPOSITORY}:{image_tag}", + "--cache=true", + "--log-timestamp=true", + "--snapshotMode=redo", + "--push-retry=3", + "--cache-copy-layers", + "--log-format=text", + f"--verbosity={('debug' if settings.LOG_LEVEL == 'DEBUG' else 'info')}", + ] + + if REGISTRY_SCHEME == "http": + args.append("--insecure") + + if KANIKO_MIRROR: + args.append(f"--registry-mirror={REGISTRY}") + if REGISTRY_SCHEME == "http": + args.append("--insecure-pull") + return args diff --git a/backend/substrapp/tests/compute_tasks/test_image_builder.py b/backend/builder/tests/test_image_builder.py similarity index 100% rename from backend/substrapp/tests/compute_tasks/test_image_builder.py rename to backend/builder/tests/test_image_builder.py diff --git a/backend/substrapp/compute_tasks/image_builder.py b/backend/substrapp/compute_tasks/image_builder.py index 691788911..ed1494550 100644 --- a/backend/substrapp/compute_tasks/image_builder.py +++ b/backend/substrapp/compute_tasks/image_builder.py @@ -1,310 +1,34 @@ -import json -import os -from tempfile import TemporaryDirectory +import time -import kubernetes import structlog -from django.conf import settings import orchestrator from substrapp import exceptions -from substrapp.compute_tasks import errors as compute_task_errors from substrapp.compute_tasks import utils -from substrapp.compute_tasks.compute_pod import Label -from substrapp.compute_tasks.datastore import Datastore -from substrapp.compute_tasks.volumes import get_docker_cache_pvc_name -from substrapp.compute_tasks.volumes import get_worker_subtuple_pvc_name -from substrapp.docker_registry import USER_IMAGE_REPOSITORY from substrapp.docker_registry import container_image_exists -from substrapp.kubernetes_utils import delete_pod -from substrapp.kubernetes_utils import get_pod_logs -from substrapp.kubernetes_utils import get_security_context -from substrapp.kubernetes_utils import pod_exists -from substrapp.kubernetes_utils import watch_pod -from substrapp.lock_local import lock_resource -from substrapp.models.image_entrypoint import ImageEntrypoint -from substrapp.utils import timeit -from substrapp.utils import uncompress_content logger = structlog.get_logger(__name__) -REGISTRY = settings.REGISTRY -REGISTRY_SCHEME = settings.REGISTRY_SCHEME -NAMESPACE = settings.NAMESPACE -KANIKO_MIRROR = settings.TASK["KANIKO_MIRROR"] -KANIKO_IMAGE = settings.TASK["KANIKO_IMAGE"] -KANIKO_DOCKER_CONFIG_SECRET_NAME = settings.TASK["KANIKO_DOCKER_CONFIG_SECRET_NAME"] -KANIKO_DOCKER_CONFIG_VOLUME_NAME = "docker-config" -CELERY_WORKER_CONCURRENCY = settings.CELERY_WORKER_CONCURRENCY -SUBTUPLE_TMP_DIR = settings.SUBTUPLE_TMP_DIR MAX_IMAGE_BUILD_TIME = 3 * 60 * 60 # 3 hours -KANIKO_CONTAINER_NAME = "kaniko" -HOSTNAME = settings.HOSTNAME +WAITING_TIME = 5 # wait 5 seconds between two queries -def build_image_if_missing(datastore: Datastore, function: orchestrator.Function) -> None: - """ - Build the container image and the ImageEntryPoint entry if they don't exist already - """ +def wait_for_image_built(function: orchestrator.Function): container_image_tag = utils.container_image_tag_from_function(function) - with lock_resource("image-build", container_image_tag, ttl=MAX_IMAGE_BUILD_TIME, timeout=MAX_IMAGE_BUILD_TIME): - if container_image_exists(container_image_tag): - logger.info("Reusing existing image", image=container_image_tag) - else: - asset_content = datastore.get_function(function) - _build_function_image(asset_content, function) - - -def _build_function_image(asset: bytes, function: orchestrator.Function) -> None: - """ - Build a function's container image. - - Perform multiple steps: - 1. Download the function using the provided asset storage_address/owner. Verify its checksum and uncompress the data - to a temporary folder. - 2. Extract the ENTRYPOINT from the Dockerfile. - 3. Build the container image using Kaniko. - 4. Save the ENTRYPOINT to the DB - """ - - os.makedirs(SUBTUPLE_TMP_DIR, exist_ok=True) - - with TemporaryDirectory(dir=SUBTUPLE_TMP_DIR) as tmp_dir: - # Download source - uncompress_content(asset, tmp_dir) - - # Extract ENTRYPOINT from Dockerfile - entrypoint = _get_entrypoint_from_dockerfile(tmp_dir) - - # Build image - _build_container_image(tmp_dir, utils.container_image_tag_from_function(function)) - - # Save entrypoint to DB if the image build was successful - ImageEntrypoint.objects.get_or_create( - function_checksum=function.function_address.checksum, entrypoint_json=entrypoint - ) - - -def _get_entrypoint_from_dockerfile(dockerfile_dir: str) -> list[str]: - """ - Get entrypoint from ENTRYPOINT in the Dockerfile. - - This is necessary because the user function can have arbitrary names, ie; "myfunction.py". - - Example: - ENTRYPOINT ["python3", "myfunction.py"] - """ - dockerfile_path = f"{dockerfile_dir}/Dockerfile" - - with open(dockerfile_path, "r") as file: - for line in file: - if line.startswith("ENTRYPOINT"): - try: - res = json.loads(line[len("ENTRYPOINT") :]) - except json.JSONDecodeError: - res = None - - if not isinstance(res, list): - raise compute_task_errors.BuildError( - "Invalid ENTRYPOINT in function/metric Dockerfile. " - "You must use the exec form in your Dockerfile. " - "See https://docs.docker.com/engine/reference/builder/#entrypoint" - ) - return res - - raise compute_task_errors.BuildError("Invalid Dockerfile: Cannot find ENTRYPOINT") - - -def _delete_kaniko_pod(create_pod: bool, k8s_client: kubernetes.client.CoreV1Api, pod_name: str) -> str: - logs = "" - if create_pod: - logs = get_pod_logs(k8s_client, pod_name, KANIKO_CONTAINER_NAME, ignore_pod_not_found=True) - delete_pod(k8s_client, pod_name) - logger.info(logs or "", pod_name=pod_name) - return logs - - -@timeit -def _build_container_image(path: str, tag: str) -> None: - _assert_dockerfile_exist(path) - - kubernetes.config.load_incluster_config() - k8s_client = kubernetes.client.CoreV1Api() - - pod_name = _build_pod_name(tag) - - create_pod = not pod_exists(k8s_client, pod_name) - if create_pod: - try: - logger.info("creating pod: building image", namespace=NAMESPACE, pod=pod_name, image=tag) - pod = _build_pod(path, tag) - k8s_client.create_namespaced_pod(body=pod, namespace=NAMESPACE) - except kubernetes.client.ApiException as e: - raise compute_task_errors.BuildRetryError( - f"Error creating pod {NAMESPACE}/{pod_name}. Reason: {e.reason}, status: {e.status}, body: {e.body}" - ) from e - - try: - watch_pod(k8s_client, pod_name) - - except Exception as e: - # In case of concurrent builds, it may fail. Check if the image exists. - if container_image_exists(tag): - logger.warning( - f"Build of container image {tag} failed, probably because it was done by a concurrent build", - exc_info=True, - ) - return - - logs = _delete_kaniko_pod(create_pod, k8s_client, pod_name) - - if isinstance(e, exceptions.PodTimeoutError): - raise compute_task_errors.BuildRetryError(logs) from e - else: # exceptions.PodError or other - raise compute_task_errors.BuildError(logs) from e - - _delete_kaniko_pod(create_pod, k8s_client, pod_name) - - -def _assert_dockerfile_exist(dockerfile_path): - dockerfile_fullpath = os.path.join(dockerfile_path, "Dockerfile") - if not os.path.exists(dockerfile_fullpath): - raise compute_task_errors.BuildError(f"Dockerfile does not exist : {dockerfile_fullpath}") - - -def _build_pod(dockerfile_mount_path: str, image_tag: str) -> kubernetes.client.V1Pod: - pod_name = _build_pod_name(image_tag) - pod_spec = _build_pod_spec(dockerfile_mount_path, image_tag) - return kubernetes.client.V1Pod( - api_version="v1", - kind="Pod", - metadata=kubernetes.client.V1ObjectMeta( - name=pod_name, - labels={ - Label.PodName: pod_name, - Label.PodType: "image-build", - Label.Component: Label.Component_Compute, - }, - ), - spec=pod_spec, - ) - - -def _build_pod_name(image_tag: str) -> str: - dns_1123_compliant_tag = image_tag.split("/")[-1].replace("_", "-") - return f"kaniko-{dns_1123_compliant_tag}" - - -def _build_pod_spec(dockerfile_mount_path: str, image_tag: str) -> kubernetes.client.V1PodSpec: - container = _build_container(dockerfile_mount_path, image_tag) - pod_affinity = _build_pod_affinity() - - cache_pvc_name = ( - settings.WORKER_PVC_DOCKER_CACHE if settings.WORKER_PVC_IS_HOSTPATH else get_docker_cache_pvc_name() - ) - cache = kubernetes.client.V1Volume( - name="cache", - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource(claim_name=cache_pvc_name), - ) - - dockerfile_pvc_name = ( - settings.WORKER_PVC_SUBTUPLE if settings.WORKER_PVC_IS_HOSTPATH else get_worker_subtuple_pvc_name() - ) - dockerfile = kubernetes.client.V1Volume( - name="dockerfile", - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource(claim_name=dockerfile_pvc_name), - ) - - volumes = [cache, dockerfile] - - if KANIKO_DOCKER_CONFIG_SECRET_NAME: - docker_config = kubernetes.client.V1Volume( - name=KANIKO_DOCKER_CONFIG_VOLUME_NAME, - secret=kubernetes.client.V1SecretVolumeSource( - secret_name=KANIKO_DOCKER_CONFIG_SECRET_NAME, - items=[kubernetes.client.V1KeyToPath(key=".dockerconfigjson", path="config.json")], - ), + if container_image_exists(container_image_tag): + logger.info("Found existing image", image=container_image_tag) + else: + attempt = 0 + # with 60 attempts we wait max 2 min with a pending pod + max_attempts = MAX_IMAGE_BUILD_TIME / WAITING_TIME + + while attempt < max_attempts: + if container_image_exists: + logger.info("Found existing image", image=container_image_tag) + else: + attempt += 1 + time.sleep(WAITING_TIME) + + raise exceptions.PodTimeoutError( + f"Build for function {function.key} didn't complete" f" after {MAX_IMAGE_BUILD_TIME} seconds" ) - volumes.append(docker_config) - - return kubernetes.client.V1PodSpec( - restart_policy="Never", affinity=pod_affinity, containers=[container], volumes=volumes - ) - - -def _build_pod_affinity() -> kubernetes.client.V1Affinity: - return kubernetes.client.V1Affinity( - pod_affinity=kubernetes.client.V1PodAffinity( - required_during_scheduling_ignored_during_execution=[ - kubernetes.client.V1PodAffinityTerm( - label_selector=kubernetes.client.V1LabelSelector( - match_expressions=[ - kubernetes.client.V1LabelSelectorRequirement( - key="statefulset.kubernetes.io/pod-name", operator="In", values=[HOSTNAME] - ) - ] - ), - topology_key="kubernetes.io/hostname", - ) - ] - ) - ) - - -def _build_container(dockerfile_mount_path: str, image_tag: str) -> kubernetes.client.V1Container: - # kaniko build can be launched without privilege but - # it needs some capabilities and to be root - # https://github.com/GoogleContainerTools/kaniko/issues/778 - # https://github.com/GoogleContainerTools/kaniko/issues/778#issuecomment-619112417 - # https://github.com/moby/moby/blob/master/oci/caps/defaults.go - # https://man7.org/linux/man-pages/man7/capabilities.7.html - capabilities = ["CHOWN", "SETUID", "SETGID", "FOWNER", "DAC_OVERRIDE", "SETFCAP"] - container_security_context = get_security_context(root=True, capabilities=capabilities) - args = _build_container_args(dockerfile_mount_path, image_tag) - dockerfile_mount_subpath = dockerfile_mount_path.split("/subtuple/")[-1] - - dockerfile = kubernetes.client.V1VolumeMount( - name="dockerfile", mount_path=dockerfile_mount_path, sub_path=dockerfile_mount_subpath, read_only=True - ) - cache = kubernetes.client.V1VolumeMount(name="cache", mount_path="/cache", read_only=True) - volume_mounts = [dockerfile, cache] - - if KANIKO_DOCKER_CONFIG_SECRET_NAME: - docker_config = kubernetes.client.V1VolumeMount( - name=KANIKO_DOCKER_CONFIG_VOLUME_NAME, mount_path="/kaniko/.docker" - ) - volume_mounts.append(docker_config) - - return kubernetes.client.V1Container( - name=KANIKO_CONTAINER_NAME, - image=KANIKO_IMAGE, - command=None, - args=args, - volume_mounts=volume_mounts, - security_context=container_security_context, - ) - - -def _build_container_args(dockerfile_mount_path: str, image_tag: str) -> list[str]: - dockerfile_fullpath = os.path.join(dockerfile_mount_path, "Dockerfile") - args = [ - f"--dockerfile={dockerfile_fullpath}", - f"--context=dir://{dockerfile_mount_path}", - f"--destination={REGISTRY}/{USER_IMAGE_REPOSITORY}:{image_tag}", - "--cache=true", - "--log-timestamp=true", - "--snapshotMode=redo", - "--push-retry=3", - "--cache-copy-layers", - "--log-format=text", - f"--verbosity={('debug' if settings.LOG_LEVEL == 'DEBUG' else 'info')}", - ] - - if REGISTRY_SCHEME == "http": - args.append("--insecure") - - if KANIKO_MIRROR: - args.append(f"--registry-mirror={REGISTRY}") - if REGISTRY_SCHEME == "http": - args.append("--insecure-pull") - return args diff --git a/backend/substrapp/events/reactor.py b/backend/substrapp/events/reactor.py index 4df8b5f8e..a509ef2c5 100644 --- a/backend/substrapp/events/reactor.py +++ b/backend/substrapp/events/reactor.py @@ -9,6 +9,7 @@ import orchestrator.common_pb2 as common_pb2 import orchestrator.computetask_pb2 as computetask_pb2 import orchestrator.event_pb2 as event_pb2 +from builder.image_builder.image_builder import build_image_if_missing from orchestrator import model_pb2 from substrapp.events import handler_compute_engine from substrapp.events import health @@ -69,6 +70,10 @@ def on_computetask_event(payload): ) return + # trigger image build, then add the task to the Celery queue + with get_orchestrator_client(channel_name) as client: + function = client.query_function(orc_task.function_key) + build_image_if_missing(channel=channel_name, function=function) queue_compute_task(channel_name, task=orc_task) diff --git a/backend/substrapp/tasks/tasks_compute_task.py b/backend/substrapp/tasks/tasks_compute_task.py index 84caec1df..dc0c8f9fd 100644 --- a/backend/substrapp/tasks/tasks_compute_task.py +++ b/backend/substrapp/tasks/tasks_compute_task.py @@ -4,7 +4,6 @@ - Create execution context - Populate asset buffer - Loads assets from the asset buffer -- Build container images - **Execute the compute task** - Save the models/results - Teardown the context @@ -53,7 +52,7 @@ from substrapp.compute_tasks.directories import restore_dir from substrapp.compute_tasks.directories import teardown_task_dirs from substrapp.compute_tasks.execute import execute_compute_task -from substrapp.compute_tasks.image_builder import build_image_if_missing +from substrapp.compute_tasks.image_builder import wait_for_image_built from substrapp.compute_tasks.lock import MAX_TASK_DURATION from substrapp.compute_tasks.lock import acquire_compute_plan_lock from substrapp.compute_tasks.outputs import OutputSaver @@ -256,7 +255,7 @@ def _run( # start build_image timer timer.start() - build_image_if_missing(datastore, ctx.function) + wait_for_image_built(ctx.function) # stop build_image timer _create_task_profiling_step(channel_name, task.key, ComputeTaskSteps.BUILD_IMAGE, timer.stop()) diff --git a/charts/substra-backend/templates/deployment-builder.yaml b/charts/substra-backend/templates/deployment-builder.yaml new file mode 100644 index 000000000..a5adb77f1 --- /dev/null +++ b/charts/substra-backend/templates/deployment-builder.yaml @@ -0,0 +1,265 @@ +{{- $metricsPath := "/tmp/django_metrics" -}} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ template "substra.fullname" . }}-builder + labels: + {{ include "substra.labels" . | nindent 4 }} + app.kubernetes.io/name: {{ template "substra.name" . }}-server +spec: + replicas: {{ .Values.server.replicaCount }} + selector: + matchLabels: + app.kubernetes.io/name: {{ template "substra.name" . }}-server + {{ include "substra.selectorLabels" . | nindent 8}} + template: + metadata: + labels: + app.kubernetes.io/name: {{ template "substra.name" . }}-server + {{ include "substra.labels" . | nindent 8 }} + annotations: + # This will cause the pod to restart if the content of the ConfigMap is updated through Helm + checksum-cm-orchestrator: {{ include (print $.Template.BasePath "/configmap-orchestrator.yaml") . | sha256sum }} + checksum-cm-settings: {{ include (print $.Template.BasePath "/configmap-settings.yaml") . | sha256sum }} + checksum-cm-db: {{ include (print $.Template.BasePath "/configmap-database.yaml") . | sha256sum }} + checksum-secret-objectstore : {{ include (print $.Template.BasePath "/secret-objectstore.yaml") . | sha256sum }} + checksum-secret-database : {{ include (print $.Template.BasePath "/secret-database.yaml") . | sha256sum }} + spec: + {{- if .Values.server.podSecurityContext.enabled }} + securityContext: + fsGroup: {{ .Values.server.podSecurityContext.fsGroup }} + runAsUser: {{ .Values.server.podSecurityContext.runAsUser }} + runAsGroup: {{ .Values.server.podSecurityContext.runAsGroup }} + {{- end }} + {{- with .Values.server.image.pullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + containers: + - name: server + image: {{ include "substra-backend.images.name" (dict "img" .Values.server.image "defaultTag" $.Chart.AppVersion) }} + imagePullPolicy: "{{ .Values.server.image.pullPolicy }}" + command: ["/bin/bash"] + {{- if eq .Values.settings "prod" }} + args: ["-c", "uwsgi --ini uwsgi.ini"] + {{- else }} + args: ["-c", "watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- uwsgi --ini uwsgi.ini --honour-stdin"] + stdin: true + tty: true + {{- end }} + envFrom: + - configMapRef: + name: {{ include "substra.fullname" . }}-orchestrator + - secretRef: + name: {{ include "substra.fullname" . }}-objectstore + - configMapRef: + name: {{ include "substra.fullname" . }}-settings + - configMapRef: + name: {{ include "substra.fullname" . }}-database + - secretRef: + name: {{ include "substra-backend.database.secret-name" . }} + - configMapRef: + name: {{ include "substra.fullname" . }}-oidc + {{- if .Values.oidc.enabled }} + - secretRef: + name: {{ .Values.oidc.clientSecretName }} + {{- end }} + env: + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + {{- if .Values.server.metrics.enabled }} + - name: ENABLE_METRICS + value: "True" + - name: PROMETHEUS_MULTIPROC_DIR + value: {{ $metricsPath }} + {{- end }} + {{- if .Values.privateCa.enabled }} + - name: REQUESTS_CA_BUNDLE + value: /etc/ssl/certs/ca-certificates.crt + {{- end }} + ports: + - name: http + containerPort: 8000 + protocol: TCP + volumeMounts: + - name: data-servermedias + mountPath: /var/substra/servermedias + - name: statics + mountPath: /usr/src/app/backend/statics + - name: uwsgi + mountPath: /usr/src/app/uwsgi.ini + subPath: uwsgi.ini + readOnly: true + {{- if .Values.privateCa.enabled }} + - mountPath: /etc/ssl/certs + name: ssl-certs + {{- end }} + {{ if .Values.orchestrator.tls.enabled }} + - name: orchestrator-tls-cacert + mountPath: /var/substra/orchestrator/tls/server + {{ if .Values.orchestrator.tls.mtls.enabled }} + - name: orchestrator-tls-client-pair + mountPath: /var/substra/orchestrator/tls/client + {{ end }} + {{ end }} + {{- if .Values.server.metrics.enabled }} + - name: metrics + mountPath: {{ $metricsPath }} + {{- end }} + {{- if .Values.server.livenessProbe.enabled }} + livenessProbe: + httpGet: + path: {{ .Values.server.livenessProbe.path }} + port: http + initialDelaySeconds: {{ .Values.server.livenessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.server.livenessProbe.periodSeconds }} + timeoutSeconds: {{ .Values.server.livenessProbe.timeoutSeconds }} + successThreshold: {{ .Values.server.livenessProbe.successThreshold }} + failureThreshold: {{ .Values.server.livenessProbe.failureThreshold }} + {{- end }} + {{- if .Values.server.readinessProbe.enabled }} + readinessProbe: + httpGet: + path: {{ .Values.server.readinessProbe.path }} + port: http + initialDelaySeconds: {{ .Values.server.readinessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.server.readinessProbe.periodSeconds }} + timeoutSeconds: {{ .Values.server.readinessProbe.timeoutSeconds }} + successThreshold: {{ .Values.server.readinessProbe.successThreshold }} + failureThreshold: {{ .Values.server.readinessProbe.failureThreshold }} + {{- end }} + resources: + {{- toYaml .Values.server.resources | nindent 12 }} + {{- if .Values.server.metrics.enabled }} + - name: metrics-sidecar + image: {{ include "substra-backend.images.name" (dict "img" .Values.server.metrics.image "defaultTag" $.Chart.AppVersion) }} + imagePullPolicy: {{ .Values.server.metrics.image.pullPolicy }} + command: ["/bin/bash"] + args: + - "-c" + - "python /usr/src/app/metrics_exporter/server.py" + envFrom: + - configMapRef: + name: {{ include "substra.fullname" . }}-redis + - secretRef: + name: {{ include "substra.fullname" . }}-redis + env: + - name: PROMETHEUS_MULTIPROC_DIR + value: {{ $metricsPath }} + - name: CELERY_MONITORING_ENABLED + value: "True" + ports: + - name: metrics + containerPort: 8001 + protocol: TCP + volumeMounts: + - name: metrics + mountPath: {{ $metricsPath }} + {{- end }} + initContainers: + {{- if .Values.privateCa.enabled }} + - name: add-cert + image: {{ include "common.images.name" .Values.privateCa.image }} + imagePullPolicy: {{ .Values.privateCa.image.pullPolicy }} + securityContext: + runAsUser: 0 + command: ['sh', '-c'] + args: + - | + {{- if .Values.privateCa.image.apkAdd }} + apt update + apt install -y ca-certificates openssl + {{- end }} + update-ca-certificates && cp /etc/ssl/certs/* /tmp/certs/ + volumeMounts: + - mountPath: /usr/local/share/ca-certificates/{{ .Values.privateCa.configMap.fileName }} + name: private-ca + subPath: {{ .Values.privateCa.configMap.fileName }} + - mountPath: /tmp/certs/ + name: ssl-certs + {{- end }} + - name: wait-postgresql + image: jwilder/dockerize:0.6.1 + command: ['dockerize', '-wait', 'tcp://{{ template "substra-backend.database.host" . }}:{{ .Values.database.port }}'] + - name: wait-minio + image: jwilder/dockerize:0.6.1 + command: ['dockerize', '-wait', 'tcp://{{ .Release.Name }}-minio:9000'] + - name: init-migrate + image: {{ include "substra-backend.images.name" (dict "img" .Values.server.image "defaultTag" $.Chart.AppVersion) }} + command: ['python', 'manage.py', 'migrate'] + envFrom: + - configMapRef: + name: {{ include "substra.fullname" . }}-orchestrator + - secretRef: + name: {{ include "substra.fullname" . }}-objectstore + - configMapRef: + name: {{ include "substra.fullname" . }}-database + - configMapRef: + name: {{ include "substra.fullname" . }}-settings + - secretRef: + name: {{ include "substra-backend.database.secret-name" . }} + env: + - name: DJANGO_SETTINGS_MODULE + value: backend.settings.{{ .Values.settings }} + - name: init-collectstatic + image: {{ include "substra-backend.images.name" (dict "img" .Values.server.image "defaultTag" $.Chart.AppVersion) }} + command: ['python', 'manage.py', 'collectstatic', '--noinput'] + envFrom: + - configMapRef: + name: {{ include "substra.fullname" . }}-orchestrator + - configMapRef: + name: {{ include "substra.fullname" . }}-settings + env: + - name: DJANGO_SETTINGS_MODULE + value: backend.settings.{{ .Values.settings }} + volumeMounts: + - name: statics + mountPath: /usr/src/app/backend/statics + volumes: + - name: data-servermedias + persistentVolumeClaim: + claimName: {{ .Values.server.persistence.servermedias.existingClaim | default (print (include "substra.fullname" $) "-servermedias") }} + - name: statics + emptyDir: {} + - name: uwsgi + configMap: + name: {{ include "substra.fullname" . }}-server-uwsgi + {{- if .Values.server.metrics.enabled }} + - name: metrics + emptyDir: {} + {{- end }} + {{- if .Values.privateCa.enabled }} + - name: ssl-certs + emptyDir: {} + - name: private-ca + configMap: + name: {{ .Values.privateCa.configMap.name }} + {{- end }} + {{ if .Values.orchestrator.tls.enabled }} + - name: orchestrator-tls-cacert + configMap: + name: {{ .Values.orchestrator.tls.cacert }} + {{ if .Values.orchestrator.tls.mtls.enabled }} + - name: orchestrator-tls-client-pair + secret: + secretName: {{ .Values.orchestrator.tls.mtls.clientCertificate }} + {{ end }} + {{ end }} + {{- with .Values.server.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.server.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.server.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/charts/substra-backend/values.yaml b/charts/substra-backend/values.yaml index 7b6ccb4b2..a6c3d5b58 100644 --- a/charts/substra-backend/values.yaml +++ b/charts/substra-backend/values.yaml @@ -471,6 +471,61 @@ scheduler: runAsGroup: 1001 fsGroup: 1001 + +## @section Builder settings +## @param builder.replicaCount Number of builder replicas +## +builder: + replicaCount: 1 + + ## Substra backend image version + ## @param builder.image.registry Substra backend server image registry + ## @param builder.image.repository Substra backend server image repository + ## @param builder.image.tag Substra backend server image tag (defaults to AppVersion) + ## @param builder.image.pullPolicy Substra backend server image pull policy + ## @param builder.image.pullSecrets Specify image pull secrets + ## + image: + registry: ghcr.io + repository: substra/substra-backend + tag: null + pullPolicy: IfNotPresent + ## Optionally specify an array of imagePullSecrets. + ## Secrets must be created manually in the namespace. + ## + pullSecrets: [] + + ## @param builder.podSecurityContext.enabled Enable security context + ## @param builder.podSecurityContext.runAsUser User ID for the pod + ## @param builder.podSecurityContext.runAsGroup Group ID for the pod + ## @param builder.podSecurityContext.fsGroup FileSystem group ID for the pod + ## + podSecurityContext: + enabled: true + runAsUser: 1001 + runAsGroup: 1001 + fsGroup: 1001 + + + ## @param builder.resources Server container resources requests and limits + ## e.g: + ## resources: + ## limits: + ## cpu: 100m + ## memory: 128Mi + ## requests: + ## cpu: 100m + ## memory: 128Mi + ## + resources: {} + + persistence: + ## @param builder.persistence.storageClass Specify the _StorageClass_ used to provision the volume. Or the default _StorageClass_ will be used. Set it to `-` to disable dynamic provisioning + ## + storageClass: "" + size: 10Gi + + ## @section Substra container registry settings ## containerRegistry: @@ -654,7 +709,7 @@ addAccountOperator: ## @descriptionStart Uses the authorization code flow. ## ## By default, `oidc.users.useRefreshToken` is enabled. This makes sure the user still has an account at the identity provider, without damaging user experience. -## +## ## The way it works is that a OIDC user that spent more than `oidc.users.loginValidityDuration` since their last login must undergo a refresh to keep using their access tokens -- but these refreshes are done in the background if `oidc.users.useRefreshToken` is enabled (otherwise a new manual authorization is necessary). The identity provider must support `offline_access` and configuration discovery. ## ## With this option active, you can set `oidc.users.loginValidityDuration` to low values (minutes). @@ -666,10 +721,10 @@ oidc: ## @param oidc.enabled Whether to enable OIDC authentication ## enabled: false - + ## @param oidc.clientSecretName The name of a secret containing the keys `OIDC_RP_CLIENT_ID` and `OIDC_RP_CLIENT_SECRET` (client ID and secret, typically issued by the provider) clientSecretName: null - + provider: ## @param oidc.provider.url The identity provider URL (with scheme). url: null @@ -683,10 +738,10 @@ oidc: token: null ## @param oidc.provider.endpoints.user Typically https://provider/me user: null - + ## @param oidc.provider.jwksUri Typically https://provider/jwks. Only required for public-key-based signing algorithms. If not given, read from `/.well-known/openid-configuration` at startup. jwksUri: null - + ## @param oidc.signAlgo Either RS256 or HS256 signAlgo: RS256 users: @@ -708,11 +763,11 @@ database: username: &psql-username postgres ## @param database.auth.password what password to use for connecting password: &psql-password postgres - + ## @param database.auth.credentialsSecretName An alternative to giving username and password; must have `DATABASE_USERNAME` and `DATABASE_PASSWORD` keys. ## credentialsSecretName: null - + ## @param database.host Hostname of the database to connect to (defaults to local) host: null ## @param database.port Port of an external database to connect to