From e7d1b13dc01ff051c2849a0077e726c1891a7c2f Mon Sep 17 00:00:00 2001 From: SdgJlbl Date: Tue, 8 Aug 2023 19:24:46 +0200 Subject: [PATCH] feat: share images backend to backend Signed-off-by: SdgJlbl --- backend/api/views/function.py | 5 ++ .../builder/image_builder/image_builder.py | 1 - backend/builder/tasks/tasks_build_image.py | 8 +- backend/substrapp/clients/organization.py | 4 +- .../substrapp/compute_tasks/image_builder.py | 76 ++++++++++++++----- backend/substrapp/events/reactor.py | 9 ++- backend/substrapp/models/__init__.py | 2 + backend/substrapp/models/function.py | 23 ++++++ backend/substrapp/serializers/function.py | 1 + backend/substrapp/tasks/__init__.py | 2 + backend/substrapp/tasks/tasks_compute_task.py | 2 +- backend/substrapp/tasks/tasks_save_image.py | 61 +++++++++++++++ docker/substra-backend/Dockerfile | 1 + 13 files changed, 166 insertions(+), 29 deletions(-) create mode 100644 backend/substrapp/tasks/tasks_save_image.py diff --git a/backend/api/views/function.py b/backend/api/views/function.py index f14d280df..e9a5ece77 100644 --- a/backend/api/views/function.py +++ b/backend/api/views/function.py @@ -206,3 +206,8 @@ def file(self, request, *args, **kwargs): @action(detail=True, url_path="description", url_name="description") def description_(self, request, *args, **kwargs): return self.download_file(request, Function, "description", "description_address") + + @action(detail=True) + def image(self, request, *args, **kwargs): + # TODO fix url + return self.download_file(request, Function, "file", "function_address") diff --git a/backend/builder/image_builder/image_builder.py b/backend/builder/image_builder/image_builder.py index c1c51388c..698bbb5e8 100644 --- a/backend/builder/image_builder/image_builder.py +++ b/backend/builder/image_builder/image_builder.py @@ -121,7 +121,6 @@ def _get_entrypoint_from_dockerfile(dockerfile_dir: str) -> list[str]: raise BuildError("Invalid Dockerfile: Cannot find ENTRYPOINT") - def _delete_kaniko_pod(create_pod: bool, k8s_client: kubernetes.client.CoreV1Api, pod_name: str) -> str: logs = "" if create_pod: diff --git a/backend/builder/tasks/tasks_build_image.py b/backend/builder/tasks/tasks_build_image.py index c70b8c03e..75ca378a8 100644 --- a/backend/builder/tasks/tasks_build_image.py +++ b/backend/builder/tasks/tasks_build_image.py @@ -1,12 +1,10 @@ import structlog - -from builder.exceptions import CeleryNoRetryError - from django.conf import settings import orchestrator from backend.celery import app from builder.exceptions import BuildRetryError +from builder.exceptions import CeleryNoRetryError from builder.image_builder.image_builder import build_image_if_missing from builder.tasks.task import BuildTask @@ -24,7 +22,7 @@ # Ack late and reject on worker lost allows use to # see http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-reject-on-worker-lost # and https://github.com/celery/celery/issues/5106 -def build_image(task: BuildTask, function_serialized: str, channel: str, compute_task_key: str) -> None: +def build_image(task: BuildTask, function_serialized: str, channel: str) -> str: function = orchestrator.Function.parse_raw(function_serialized) attempt = 0 @@ -45,4 +43,4 @@ def build_image(task: BuildTask, function_serialized: str, channel: str, compute else: continue break - + return function_serialized diff --git a/backend/substrapp/clients/organization.py b/backend/substrapp/clients/organization.py index 23084b271..086fc64cc 100644 --- a/backend/substrapp/clients/organization.py +++ b/backend/substrapp/clients/organization.py @@ -178,13 +178,13 @@ def get( channel: str, organization_id: str, url: str, - checksum: str, + checksum: typing.Optional[str], salt: typing.Optional[str] = None, ) -> bytes: """Get asset data.""" content = _http_request(_Method.GET, channel, organization_id, url).content new_checksum = compute_hash(content, key=salt) - if new_checksum != checksum: + if checksum is not None and new_checksum != checksum: raise IntegrityError(f"url {url}: checksum doesn't match {checksum} vs {new_checksum}") return content diff --git a/backend/substrapp/compute_tasks/image_builder.py b/backend/substrapp/compute_tasks/image_builder.py index 811f0f999..f4edb5848 100644 --- a/backend/substrapp/compute_tasks/image_builder.py +++ b/backend/substrapp/compute_tasks/image_builder.py @@ -1,34 +1,76 @@ +import os +import pathlib import time +from tempfile import TemporaryDirectory import structlog from django.conf import settings import orchestrator - +import substrapp.clients.organization as organization_client from builder import exceptions +from image_transfer import push_payload from substrapp.compute_tasks import utils from substrapp.docker_registry import container_image_exists +from substrapp.exceptions import OrganizationHttpError +from substrapp.utils import get_owner logger = structlog.get_logger(__name__) IMAGE_BUILD_TIMEOUT = settings.IMAGE_BUILD_TIMEOUT IMAGE_BUILD_CHECK_DELAY = settings.IMAGE_BUILD_CHECK_DELAY +REGISTRY = settings.REGISTRY +SUBTUPLE_TMP_DIR = settings.SUBTUPLE_TMP_DIR -def wait_for_image_built(function: orchestrator.Function) -> None: +def wait_for_image_built(function: orchestrator.Function, channel: str) -> None: container_image_tag = utils.container_image_tag_from_function(function) - attempt = 0 - # with 60 attempts we wait max 2 min with a pending pod - max_attempts = IMAGE_BUILD_TIMEOUT / IMAGE_BUILD_CHECK_DELAY - while attempt < max_attempts: - # Consider relying on celery task success so we can move `container_image_exists` in builder - if container_image_exists(container_image_tag): - logger.info("Found existing image", image=container_image_tag) - return - - attempt += 1 - time.sleep(IMAGE_BUILD_CHECK_DELAY) - - raise exceptions.PodTimeoutError( - f"Build for function {function.key} didn't complete after {IMAGE_BUILD_TIMEOUT} seconds" - ) + if function.owner == get_owner(): + attempt = 0 + # with 60 attempts we wait max 2 min with a pending pod + max_attempts = IMAGE_BUILD_TIMEOUT / IMAGE_BUILD_CHECK_DELAY + while attempt < max_attempts: + # Consider relying on celery task success so we can move `container_image_exists` in builder + if container_image_exists(container_image_tag): + logger.info("Found existing image", image=container_image_tag) + return + + attempt += 1 + time.sleep(IMAGE_BUILD_CHECK_DELAY) + + raise exceptions.PodTimeoutError( + f"Build for function {function.key} didn't complete after {IMAGE_BUILD_TIMEOUT} seconds" + ) + else: + # Ask the backend owner of the function if it's available + logger.info( + f"Initial function URI {function.function_address.uri}; " + f"modified URI{function.function_address.uri.replace('file', 'image')}" + ) + attempt = 0 + max_attempts = 10 + while attempt < max_attempts: + try: + function_image_content = organization_client.get( + channel=channel, + organization_id=function.owner, + # TODO create a clean Address for function image + url=function.function_address.uri.replace("file", "image"), + checksum=None, + ) + except OrganizationHttpError: + attempt += 1 + time.sleep(5) + + os.makedirs(SUBTUPLE_TMP_DIR, exist_ok=True) + with TemporaryDirectory(dir=SUBTUPLE_TMP_DIR) as tmp_dir: + storage_path = pathlib.Path(tmp_dir) / f"{container_image_tag}.zip" + storage_path.write_bytes(function_image_content) + push_payload(storage_path, registry=REGISTRY, secure=False) + + # + # if exc.status_code == 404: + # raise exceptions.CeleryRetryError(f"Function {function.key} was not found + # on backend {function.owner}") + # else: + # raise exceptions.CeleryNoRetryError("I've got a bad feeling about this.") diff --git a/backend/substrapp/events/reactor.py b/backend/substrapp/events/reactor.py index 8e3ed68bd..ec61cff9f 100644 --- a/backend/substrapp/events/reactor.py +++ b/backend/substrapp/events/reactor.py @@ -19,6 +19,7 @@ from substrapp.task_routing import get_builder_queue from substrapp.tasks.tasks_compute_plan import queue_delete_cp_pod_and_dirs_and_optionally_images from substrapp.tasks.tasks_compute_task import queue_compute_task +from substrapp.tasks.tasks_save_image import save_image_task logger = structlog.get_logger("events") _MY_ORGANIZATION: str = settings.LEDGER_MSP_ID @@ -86,8 +87,7 @@ def on_function_event(payload): logger.info("Processing function", asset_key=asset_key, kind=event_kind) if event_pb2.EventKind.Value(event_kind) == event_pb2.EVENT_ASSET_CREATED: - # TODO: To be replaced by `function.get("owner") == _MY_ORGANIZATION` when building only by owner - if True: + if orc_function.owner == _MY_ORGANIZATION: function_key = orc_function.key builder_queue = get_builder_queue() logger.info( @@ -97,7 +97,10 @@ def on_function_event(payload): ) # TODO switch to function.model_dump_json() as soon as pydantic is updated to > 2.0 build_image.apply_async( - (orc_function.json(), channel_name, function_key), queue=builder_queue, task_id=function_key + (orc_function.json(), channel_name), + queue=builder_queue, + task_id=function_key, + link=save_image_task.s(channel_name=channel_name, function_key=function_key), ) else: diff --git a/backend/substrapp/models/__init__.py b/backend/substrapp/models/__init__.py index 7ca4e9815..793035cba 100644 --- a/backend/substrapp/models/__init__.py +++ b/backend/substrapp/models/__init__.py @@ -3,6 +3,7 @@ from .datamanager import DataManager from .datasample import DataSample from .function import Function +from .function import FunctionImage from .image_entrypoint import ImageEntrypoint from .model import Model from .worker_last_event import WorkerLastEvent @@ -11,6 +12,7 @@ "DataSample", "DataManager", "Function", + "FunctionImage", "Model", "ComputePlanWorkerMapping", "ImageEntrypoint", diff --git a/backend/substrapp/models/function.py b/backend/substrapp/models/function.py index 3eb3eed12..d501c6a1d 100644 --- a/backend/substrapp/models/function.py +++ b/backend/substrapp/models/function.py @@ -10,6 +10,10 @@ def upload_to(instance, filename) -> str: return f"functions/{instance.key}/{filename}" +def upload_to_function(instance, filename) -> str: + return upload_to(instance.function, filename) + + class Function(models.Model): """Storage Data table""" @@ -30,3 +34,22 @@ def save(self, *args, **kwargs) -> None: def __str__(self) -> str: return f"Function with key {self.key}" + + +class FunctionImage(models.Model): + """Serialized Docker image""" + + function = models.OneToOneField(Function, on_delete=models.CASCADE) + file = models.FileField( + storage=settings.FUNCTION_STORAGE, max_length=500, upload_to=upload_to_function + ) # path max length to 500 instead of default 100 + checksum = models.CharField(max_length=64, blank=True) + + def save(self, *args, **kwargs) -> None: + """Use hash of file as checksum""" + if not self.checksum and self.file: + self.checksum = get_hash(self.file) + super().save(*args, **kwargs) + + def __str__(self) -> str: + return f"Function image associated function key {self.function.key}" diff --git a/backend/substrapp/serializers/function.py b/backend/substrapp/serializers/function.py index 4c6bd633c..f7cdcaf2b 100644 --- a/backend/substrapp/serializers/function.py +++ b/backend/substrapp/serializers/function.py @@ -2,6 +2,7 @@ from libs.serializers import DynamicFieldsModelSerializer from substrapp.models import Function +from substrapp.models import FunctionImage from substrapp.serializers.utils import FileSizeValidator from substrapp.serializers.utils import FileValidator diff --git a/backend/substrapp/tasks/__init__.py b/backend/substrapp/tasks/__init__.py index 3dab2c307..58a78d22d 100644 --- a/backend/substrapp/tasks/__init__.py +++ b/backend/substrapp/tasks/__init__.py @@ -5,6 +5,7 @@ from substrapp.tasks.tasks_outputs import remove_transient_outputs_from_orc from substrapp.tasks.tasks_remove_intermediary_models import remove_intermediary_model_from_db from substrapp.tasks.tasks_remove_intermediary_models import remove_intermediary_models_from_buffer +from substrapp.tasks.tasks_save_image import save_image_task __all__ = [ "delete_cp_pod_and_dirs_and_optionally_images", @@ -14,4 +15,5 @@ "remove_intermediary_models_from_buffer", "remove_transient_outputs_from_orc", "remove_intermediary_model_from_db", + "save_image_task", ] diff --git a/backend/substrapp/tasks/tasks_compute_task.py b/backend/substrapp/tasks/tasks_compute_task.py index ada4b5bd3..ca8bf7a61 100644 --- a/backend/substrapp/tasks/tasks_compute_task.py +++ b/backend/substrapp/tasks/tasks_compute_task.py @@ -251,7 +251,7 @@ def _run( # start build_image timer timer.start() - wait_for_image_built(ctx.function) + wait_for_image_built(ctx.function, channel_name) # stop build_image timer _create_task_profiling_step(channel_name, task.key, ComputeTaskSteps.BUILD_IMAGE, timer.stop()) diff --git a/backend/substrapp/tasks/tasks_save_image.py b/backend/substrapp/tasks/tasks_save_image.py new file mode 100644 index 000000000..088b50667 --- /dev/null +++ b/backend/substrapp/tasks/tasks_save_image.py @@ -0,0 +1,61 @@ +import os +import pathlib +from tempfile import TemporaryDirectory + +import structlog +from django.conf import settings +from django.core.files import File + +import orchestrator +from backend.celery import app +from image_transfer import make_payload +from substrapp.compute_tasks import utils +from substrapp.docker_registry import USER_IMAGE_REPOSITORY +from substrapp.models import FunctionImage +from substrapp.tasks.tasks_compute_task import ComputeTask + +REGISTRY = settings.REGISTRY +REGISTRY_SCHEME = settings.REGISTRY_SCHEME +SUBTUPLE_TMP_DIR = settings.SUBTUPLE_TMP_DIR + +logger = structlog.get_logger("worker") + + +@app.task( + bind=True, + acks_late=True, + reject_on_worker_lost=True, + ignore_result=False, + base=ComputeTask, +) +# Ack late and reject on worker lost allows use to +# see http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-reject-on-worker-lost +# and https://github.com/celery/celery/issues/5106 +def save_image_task(self: ComputeTask, function_serialized: str, channel_name: str, function_key: str) -> None: + logger.info(f"Starting save_image_task") + logger.info( + f"Parameters: function_serialized {function_serialized}, channel_name {channel_name}, function_key {function_key}" + ) + # create serialized image + function = orchestrator.Function.parse_raw(function_serialized) + container_image_tag = utils.container_image_tag_from_function(function) + + os.makedirs(SUBTUPLE_TMP_DIR, exist_ok=True) + + logger.info("Serialising the image from the registry") + + with TemporaryDirectory(dir=SUBTUPLE_TMP_DIR) as tmp_dir: + storage_path = pathlib.Path(tmp_dir) / f"{container_image_tag}.zip" + make_payload( + zip_file=storage_path, + docker_images_to_transfer=[f"{USER_IMAGE_REPOSITORY}:{container_image_tag}"], + registry=REGISTRY, + secure=False, + ) + + logger.info("Start saving the serialized image") + # save it + FunctionImage.objects.create( + function_id=function.key, file=File(file=storage_path.open(mode="rb"), name=f"{container_image_tag}.zip") + ) + logger.info("Serialized image saved") diff --git a/docker/substra-backend/Dockerfile b/docker/substra-backend/Dockerfile index f3a999583..1bd6f37a3 100644 --- a/docker/substra-backend/Dockerfile +++ b/docker/substra-backend/Dockerfile @@ -25,6 +25,7 @@ COPY ./backend/users /usr/src/app/users COPY ./backend/orchestrator /usr/src/app/orchestrator COPY ./backend/api /usr/src/app/api COPY ./backend/builder /usr/src/app/builder +COPY ./backend/image_transfer /usr/src/app/image_transfer FROM build AS arm64