Skip to content

Commit

Permalink
feat: share images backend to backend
Browse files Browse the repository at this point in the history
Signed-off-by: SdgJlbl <[email protected]>
  • Loading branch information
SdgJlbl committed Aug 14, 2023
1 parent 5dbc214 commit e7d1b13
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 29 deletions.
5 changes: 5 additions & 0 deletions backend/api/views/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,8 @@ def file(self, request, *args, **kwargs):
@action(detail=True, url_path="description", url_name="description")
def description_(self, request, *args, **kwargs):
return self.download_file(request, Function, "description", "description_address")

@action(detail=True)
def image(self, request, *args, **kwargs):
# TODO fix url
return self.download_file(request, Function, "file", "function_address")
1 change: 0 additions & 1 deletion backend/builder/image_builder/image_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def _get_entrypoint_from_dockerfile(dockerfile_dir: str) -> list[str]:
raise BuildError("Invalid Dockerfile: Cannot find ENTRYPOINT")



def _delete_kaniko_pod(create_pod: bool, k8s_client: kubernetes.client.CoreV1Api, pod_name: str) -> str:
logs = ""
if create_pod:
Expand Down
8 changes: 3 additions & 5 deletions backend/builder/tasks/tasks_build_image.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import structlog

from builder.exceptions import CeleryNoRetryError

from django.conf import settings

import orchestrator
from backend.celery import app
from builder.exceptions import BuildRetryError
from builder.exceptions import CeleryNoRetryError
from builder.image_builder.image_builder import build_image_if_missing
from builder.tasks.task import BuildTask

Expand All @@ -24,7 +22,7 @@
# Ack late and reject on worker lost allows use to
# see http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-reject-on-worker-lost
# and https://github.com/celery/celery/issues/5106
def build_image(task: BuildTask, function_serialized: str, channel: str, compute_task_key: str) -> None:
def build_image(task: BuildTask, function_serialized: str, channel: str) -> str:
function = orchestrator.Function.parse_raw(function_serialized)

attempt = 0
Expand All @@ -45,4 +43,4 @@ def build_image(task: BuildTask, function_serialized: str, channel: str, compute
else:
continue
break

return function_serialized
4 changes: 2 additions & 2 deletions backend/substrapp/clients/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ def get(
channel: str,
organization_id: str,
url: str,
checksum: str,
checksum: typing.Optional[str],
salt: typing.Optional[str] = None,
) -> bytes:
"""Get asset data."""
content = _http_request(_Method.GET, channel, organization_id, url).content
new_checksum = compute_hash(content, key=salt)
if new_checksum != checksum:
if checksum is not None and new_checksum != checksum:
raise IntegrityError(f"url {url}: checksum doesn't match {checksum} vs {new_checksum}")
return content

Expand Down
76 changes: 59 additions & 17 deletions backend/substrapp/compute_tasks/image_builder.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,76 @@
import os
import pathlib
import time
from tempfile import TemporaryDirectory

import structlog
from django.conf import settings

import orchestrator

import substrapp.clients.organization as organization_client
from builder import exceptions
from image_transfer import push_payload
from substrapp.compute_tasks import utils
from substrapp.docker_registry import container_image_exists
from substrapp.exceptions import OrganizationHttpError
from substrapp.utils import get_owner

logger = structlog.get_logger(__name__)

IMAGE_BUILD_TIMEOUT = settings.IMAGE_BUILD_TIMEOUT
IMAGE_BUILD_CHECK_DELAY = settings.IMAGE_BUILD_CHECK_DELAY
REGISTRY = settings.REGISTRY
SUBTUPLE_TMP_DIR = settings.SUBTUPLE_TMP_DIR


def wait_for_image_built(function: orchestrator.Function) -> None:
def wait_for_image_built(function: orchestrator.Function, channel: str) -> None:
container_image_tag = utils.container_image_tag_from_function(function)
attempt = 0
# with 60 attempts we wait max 2 min with a pending pod
max_attempts = IMAGE_BUILD_TIMEOUT / IMAGE_BUILD_CHECK_DELAY
while attempt < max_attempts:
# Consider relying on celery task success so we can move `container_image_exists` in builder
if container_image_exists(container_image_tag):
logger.info("Found existing image", image=container_image_tag)
return

attempt += 1
time.sleep(IMAGE_BUILD_CHECK_DELAY)

raise exceptions.PodTimeoutError(
f"Build for function {function.key} didn't complete after {IMAGE_BUILD_TIMEOUT} seconds"
)
if function.owner == get_owner():
attempt = 0
# with 60 attempts we wait max 2 min with a pending pod
max_attempts = IMAGE_BUILD_TIMEOUT / IMAGE_BUILD_CHECK_DELAY
while attempt < max_attempts:
# Consider relying on celery task success so we can move `container_image_exists` in builder
if container_image_exists(container_image_tag):
logger.info("Found existing image", image=container_image_tag)
return

attempt += 1
time.sleep(IMAGE_BUILD_CHECK_DELAY)

raise exceptions.PodTimeoutError(
f"Build for function {function.key} didn't complete after {IMAGE_BUILD_TIMEOUT} seconds"
)
else:
# Ask the backend owner of the function if it's available
logger.info(
f"Initial function URI {function.function_address.uri}; "
f"modified URI{function.function_address.uri.replace('file', 'image')}"
)
attempt = 0
max_attempts = 10
while attempt < max_attempts:
try:
function_image_content = organization_client.get(
channel=channel,
organization_id=function.owner,
# TODO create a clean Address for function image
url=function.function_address.uri.replace("file", "image"),
checksum=None,
)
except OrganizationHttpError:
attempt += 1
time.sleep(5)

os.makedirs(SUBTUPLE_TMP_DIR, exist_ok=True)
with TemporaryDirectory(dir=SUBTUPLE_TMP_DIR) as tmp_dir:
storage_path = pathlib.Path(tmp_dir) / f"{container_image_tag}.zip"
storage_path.write_bytes(function_image_content)
push_payload(storage_path, registry=REGISTRY, secure=False)

#
# if exc.status_code == 404:
# raise exceptions.CeleryRetryError(f"Function {function.key} was not found
# on backend {function.owner}")
# else:
# raise exceptions.CeleryNoRetryError("I've got a bad feeling about this.")
9 changes: 6 additions & 3 deletions backend/substrapp/events/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from substrapp.task_routing import get_builder_queue
from substrapp.tasks.tasks_compute_plan import queue_delete_cp_pod_and_dirs_and_optionally_images
from substrapp.tasks.tasks_compute_task import queue_compute_task
from substrapp.tasks.tasks_save_image import save_image_task

logger = structlog.get_logger("events")
_MY_ORGANIZATION: str = settings.LEDGER_MSP_ID
Expand Down Expand Up @@ -86,8 +87,7 @@ def on_function_event(payload):
logger.info("Processing function", asset_key=asset_key, kind=event_kind)

if event_pb2.EventKind.Value(event_kind) == event_pb2.EVENT_ASSET_CREATED:
# TODO: To be replaced by `function.get("owner") == _MY_ORGANIZATION` when building only by owner
if True:
if orc_function.owner == _MY_ORGANIZATION:
function_key = orc_function.key
builder_queue = get_builder_queue()
logger.info(
Expand All @@ -97,7 +97,10 @@ def on_function_event(payload):
)
# TODO switch to function.model_dump_json() as soon as pydantic is updated to > 2.0
build_image.apply_async(
(orc_function.json(), channel_name, function_key), queue=builder_queue, task_id=function_key
(orc_function.json(), channel_name),
queue=builder_queue,
task_id=function_key,
link=save_image_task.s(channel_name=channel_name, function_key=function_key),
)

else:
Expand Down
2 changes: 2 additions & 0 deletions backend/substrapp/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .datamanager import DataManager
from .datasample import DataSample
from .function import Function
from .function import FunctionImage
from .image_entrypoint import ImageEntrypoint
from .model import Model
from .worker_last_event import WorkerLastEvent
Expand All @@ -11,6 +12,7 @@
"DataSample",
"DataManager",
"Function",
"FunctionImage",
"Model",
"ComputePlanWorkerMapping",
"ImageEntrypoint",
Expand Down
23 changes: 23 additions & 0 deletions backend/substrapp/models/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ def upload_to(instance, filename) -> str:
return f"functions/{instance.key}/{filename}"


def upload_to_function(instance, filename) -> str:
return upload_to(instance.function, filename)


class Function(models.Model):
"""Storage Data table"""

Expand All @@ -30,3 +34,22 @@ def save(self, *args, **kwargs) -> None:

def __str__(self) -> str:
return f"Function with key {self.key}"


class FunctionImage(models.Model):
"""Serialized Docker image"""

function = models.OneToOneField(Function, on_delete=models.CASCADE)
file = models.FileField(
storage=settings.FUNCTION_STORAGE, max_length=500, upload_to=upload_to_function
) # path max length to 500 instead of default 100
checksum = models.CharField(max_length=64, blank=True)

def save(self, *args, **kwargs) -> None:
"""Use hash of file as checksum"""
if not self.checksum and self.file:
self.checksum = get_hash(self.file)
super().save(*args, **kwargs)

def __str__(self) -> str:
return f"Function image associated function key {self.function.key}"
1 change: 1 addition & 0 deletions backend/substrapp/serializers/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from libs.serializers import DynamicFieldsModelSerializer
from substrapp.models import Function
from substrapp.models import FunctionImage
from substrapp.serializers.utils import FileSizeValidator
from substrapp.serializers.utils import FileValidator

Expand Down
2 changes: 2 additions & 0 deletions backend/substrapp/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from substrapp.tasks.tasks_outputs import remove_transient_outputs_from_orc
from substrapp.tasks.tasks_remove_intermediary_models import remove_intermediary_model_from_db
from substrapp.tasks.tasks_remove_intermediary_models import remove_intermediary_models_from_buffer
from substrapp.tasks.tasks_save_image import save_image_task

__all__ = [
"delete_cp_pod_and_dirs_and_optionally_images",
Expand All @@ -14,4 +15,5 @@
"remove_intermediary_models_from_buffer",
"remove_transient_outputs_from_orc",
"remove_intermediary_model_from_db",
"save_image_task",
]
2 changes: 1 addition & 1 deletion backend/substrapp/tasks/tasks_compute_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def _run(
# start build_image timer
timer.start()

wait_for_image_built(ctx.function)
wait_for_image_built(ctx.function, channel_name)

# stop build_image timer
_create_task_profiling_step(channel_name, task.key, ComputeTaskSteps.BUILD_IMAGE, timer.stop())
Expand Down
61 changes: 61 additions & 0 deletions backend/substrapp/tasks/tasks_save_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import pathlib
from tempfile import TemporaryDirectory

import structlog
from django.conf import settings
from django.core.files import File

import orchestrator
from backend.celery import app
from image_transfer import make_payload
from substrapp.compute_tasks import utils
from substrapp.docker_registry import USER_IMAGE_REPOSITORY
from substrapp.models import FunctionImage
from substrapp.tasks.tasks_compute_task import ComputeTask

REGISTRY = settings.REGISTRY
REGISTRY_SCHEME = settings.REGISTRY_SCHEME
SUBTUPLE_TMP_DIR = settings.SUBTUPLE_TMP_DIR

logger = structlog.get_logger("worker")


@app.task(
bind=True,
acks_late=True,
reject_on_worker_lost=True,
ignore_result=False,
base=ComputeTask,
)
# Ack late and reject on worker lost allows use to
# see http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-reject-on-worker-lost
# and https://github.com/celery/celery/issues/5106
def save_image_task(self: ComputeTask, function_serialized: str, channel_name: str, function_key: str) -> None:
logger.info(f"Starting save_image_task")
logger.info(
f"Parameters: function_serialized {function_serialized}, channel_name {channel_name}, function_key {function_key}"
)
# create serialized image
function = orchestrator.Function.parse_raw(function_serialized)
container_image_tag = utils.container_image_tag_from_function(function)

os.makedirs(SUBTUPLE_TMP_DIR, exist_ok=True)

logger.info("Serialising the image from the registry")

with TemporaryDirectory(dir=SUBTUPLE_TMP_DIR) as tmp_dir:
storage_path = pathlib.Path(tmp_dir) / f"{container_image_tag}.zip"
make_payload(
zip_file=storage_path,
docker_images_to_transfer=[f"{USER_IMAGE_REPOSITORY}:{container_image_tag}"],
registry=REGISTRY,
secure=False,
)

logger.info("Start saving the serialized image")
# save it
FunctionImage.objects.create(
function_id=function.key, file=File(file=storage_path.open(mode="rb"), name=f"{container_image_tag}.zip")
)
logger.info("Serialized image saved")
1 change: 1 addition & 0 deletions docker/substra-backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ COPY ./backend/users /usr/src/app/users
COPY ./backend/orchestrator /usr/src/app/orchestrator
COPY ./backend/api /usr/src/app/api
COPY ./backend/builder /usr/src/app/builder
COPY ./backend/image_transfer /usr/src/app/image_transfer

FROM build AS arm64

Expand Down

0 comments on commit e7d1b13

Please sign in to comment.