Skip to content

Commit

Permalink
feat!: decoupled builder (#756)
Browse files Browse the repository at this point in the history
* feat: decouple image builder from worker

Signed-off-by: SdgJlbl <[email protected]>

* fix: update skaffold config

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: add `ServiceAccount` and modify role

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: improve `wait_for_image_built`

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: build image in new pod

Signed-off-by: Guilhem Barthes <[email protected]>

* chore: rename `deployment-builder.yaml` to `stateful-builder.yaml`

Signed-off-by: Guilhem Barthes <[email protected]>

* chore: rename `stateful-builder.yaml` to `statefulset-builder.yaml`

Signed-off-by: Guilhem Barthes <[email protected]>

* chore: centralize params

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: create `BuildTask`

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: move some values to `builder` module

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: move more code to `builder`

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: remove TaskProfiling as Celery task + save Entrypoint in DB

Signed-off-by: SdgJlbl <[email protected]>

* fix: extract entrypoint from registry

Signed-off-by: SdgJlbl <[email protected]>

* fix: make doc for helm chart

Signed-off-by: SdgJlbl <[email protected]>

* feat: build function at registration (#707)

<!-- Please reference issue if any. -->

<!-- Please include a summary of your changes. -->

<!-- Please describe the tests that you ran to verify your changes.  -->

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: SdgJlbl <[email protected]>
Signed-off-by: Guilhem Barthes <[email protected]>
Co-authored-by: SdgJlbl <[email protected]>

* feat: share images between backends (#708)



Signed-off-by: SdgJlbl <[email protected]>

* chore: update helm worklfow

Signed-off-by: ThibaultFy <[email protected]>

* chore: add .DS_Store to gitignore

Signed-off-by: ThibaultFy <[email protected]>

* chore: rm DS_Store

Signed-off-by: ThibaultFy <[email protected]>

* chore: rm .DS_Store

Signed-off-by: ThibaultFy <[email protected]>

* [sub]fix: add missing migration poc (#728)

## Description

Add a migration missing in the poc. 
This migration alters two things:

-  modify `ComputeTaskFailureReport.logs` 
-  modify `FunctionImage.file`

This migration has been generated automatically with `make migrations`

## How has this been tested?

<!-- Please describe the tests that you ran to verify your changes.  -->

## Checklist

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

Signed-off-by: Guilhem Barthes <[email protected]>

* [sub]feat: add function events (#714)

- Substra/orchestrator#263

Add function events, used now we decoupled the building of the function
with the execution of the compute task. For that it add a status field
on the Function. It also includes another PR (merged here), to have
functions build logs working again.

In a future PR, we will change the compute task execution to avoid
having to wait_for_function_built in compute_task()

Fixes FL-1160

As this is going to be merged on a branch that is going to be merged to
a POC branch, we use MNIST as a baseline of a working model. We will
deal with failing tests on the POC before merging on main.

- [x] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: SdgJlbl <[email protected]>
Signed-off-by: Guilhem Barthes <[email protected]>
Signed-off-by: Guilhem Barthés <[email protected]>
Co-authored-by: SdgJlbl <[email protected]>

* [sub]fix(app/orchestrator/resources): FunctionStatus.FUNCTION_STATUS_CREATED -> FunctionStatus.FUNCTION_STATUS_WAITING (#742)

# Issue

Backend FunctionStatus are not aligned with [orchestrator
definitions](https://github.com/Substra/orchestrator/blob/poc-decoupled-builder/lib/asset/function.proto#L29-L36).
In particular, `FunctionStatus.FUNCTION_STATUS_CREATED` leading to the
following error:

```txt
ValueError: 'FUNCTION_STATUS_WAITING' is not a valid FunctionStatus
```

## Description

FunctionStatus.FUNCTION_STATUS_CREATED ->
FunctionStatus.FUNCTION_STATUS_WAITING

## How has this been tested?

Running Camelyon benchmark on
[poc-builder-flpc](https://substra.org-1.poc-builder-flpc.cg.owkin.tech/compute_plans/a420306f-5719-412b-ab9c-688b7bed9c70/tasks?page=1&ordering=-rank)
environment.

## Checklist

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: Thibault Camalon <[email protected]>

* fix: builder using builder SA (#754)

* fix: builder using builder SA

Signed-off-by: Guilhem Barthés <[email protected]>

* docs: changelog

Signed-off-by: Guilhem Barthés <[email protected]>

---------

Signed-off-by: Guilhem Barthés <[email protected]>

* fix: rebase changelog

Signed-off-by: Guilhem Barthés <[email protected]>

* fix: adapt to pydantic 2.x.x (#758)

Signed-off-by: Guilhem Barthés <[email protected]>

* [sub]fix(backend/image_transfert/encoder): update pydantic method (#763)

* fix(backend/image_transfert/encoder): update pydantic method

Signed-off-by: Thibault Camalon <[email protected]>

* fix(backend/image_transfer/decoder): parse_raw -> model_validate_json

Signed-off-by: Thibault Camalon <[email protected]>

---------

Signed-off-by: Thibault Camalon <[email protected]>

* [sub]chore: upgrade chart (#765)

* chore(charts): bump chart version

Signed-off-by: Thibault Camalon <[email protected]>

* chore(charts/substra-backend/CHANGELOG): bring back unreleased section

Signed-off-by: Thibault Camalon <[email protected]>

---------

Signed-off-by: Thibault Camalon <[email protected]>

* fix: post-rebase

Signed-off-by: SdgJlbl <[email protected]>

* chore: rationalize migrations

Signed-off-by: SdgJlbl <[email protected]>

* [sub]chore(builder): waitPostgresqlInitContainer (#764)

* fix: builder using builder SA (#754)

* fix: builder using builder SA

Signed-off-by: Guilhem Barthés <[email protected]>

* docs: changelog

Signed-off-by: Guilhem Barthés <[email protected]>

---------

Signed-off-by: Guilhem Barthés <[email protected]>

* chore(charts/substra-backend/templates/statefulset-builder): add init-container waitPostgresqlInitContainer

Signed-off-by: Thibault Camalon <[email protected]>

---------

Signed-off-by: Guilhem Barthés <[email protected]>
Signed-off-by: Thibault Camalon <[email protected]>
Co-authored-by: Guilhem Barthés <[email protected]>

---------

Signed-off-by: SdgJlbl <[email protected]>
Signed-off-by: Guilhem Barthes <[email protected]>
Signed-off-by: ThibaultFy <[email protected]>
Signed-off-by: Guilhem Barthés <[email protected]>
Signed-off-by: Thibault Camalon <[email protected]>
Co-authored-by: SdgJlbl <[email protected]>
Co-authored-by: ThibaultFy <[email protected]>
Co-authored-by: Thibault Camalon <[email protected]>
  • Loading branch information
4 people authored Oct 25, 2023
1 parent b9459ac commit 4c8731f
Show file tree
Hide file tree
Showing 98 changed files with 2,818 additions and 871 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ wheels/
.installed.cfg
*.egg
MANIFEST
.DS_Store

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Field `asset_type` on `AssetFailureReport` (based on protobuf enum `orchestrator.FailedAssetKind`) ([#727](https://github.com/Substra/substra-backend/pull/727))
- Celery task `FailableTask` that contains the logic to store the failure report, that can be re-used in different assets. ([#727](https://github.com/Substra/substra-backend/pull/727))
- Add `FunctionStatus` enum ([#714](https://github.com/Substra/orchestrator/pull/714))
- BREAKING: Add `status` on `api.Function` (type `FunctionStatus`) ([#714](https://github.com/Substra/substra-backend/pull/714))

### Changed

- `ComputeTaskFailureReport` renamed in `AssetFailureReport` ([#727](https://github.com/Substra/substra-backend/pull/727))
- Field `AssetFailureReport.compute_task_key` renamed to `asset_key` ([#727](https://github.com/Substra/substra-backend/pull/727))

### Removed

- BREAKING: remove `distributed` Skaffold profile [#768](https://github.com/Substra/substra-backend/pull/768)
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ format: ## Format code
lint: ## Perform a static analysis of the code
flake8 $(SRC_DIRS)
bandit --ini=.bandit
mypy backend/substrapp/tasks/
mypy

.PHONY: shell
shell: ## Start a Python shell for the Django project
Expand Down
30 changes: 26 additions & 4 deletions backend/api/events/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from api.serializers import PerformanceSerializer
from orchestrator import client as orc_client
from orchestrator import computetask
from orchestrator import failure_report_pb2

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -85,13 +86,19 @@ def _create_function(channel: str, data: dict) -> None:
def _on_update_function_event(event: dict) -> None:
"""Process update function event to update local database."""
logger.debug("Syncing function update", asset_key=event["asset_key"], event_id=event["id"])
_update_function(key=event["asset_key"], data=event["function"])
function = event["function"]
_update_function(key=event["asset_key"], name=function["name"], status=function["status"])


def _update_function(key: str, data: dict) -> None:
def _update_function(key: str, *, name: Optional[str] = None, status: Optional[str] = None) -> None:
"""Process update function event to update local database."""
function = Function.objects.get(key=key)
function.name = data["name"]

if name:
function.name = name
if status:
function.status = status

function.save()


Expand Down Expand Up @@ -376,7 +383,22 @@ def _disable_model(key: str) -> None:
def _on_create_failure_report(event: dict) -> None:
"""Process create failure report event to update local database."""
logger.debug("Syncing failure report create", asset_key=event["asset_key"], event_id=event["id"])
_update_computetask(key=event["asset_key"], failure_report=event["failure_report"])

asset_key = event["asset_key"]
failure_report = event["failure_report"]
asset_type = failure_report_pb2.FailedAssetKind.Value(failure_report["asset_type"])

if asset_type == failure_report_pb2.FAILED_ASSET_FUNCTION:
# Needed as this field is only in ComputeTask
compute_task_keys = ComputeTask.objects.values_list("key", flat=True).filter(
function_id=asset_key,
status__in=[ComputeTask.Status.STATUS_TODO.value, ComputeTask.Status.STATUS_DOING.value],
)

for task_key in compute_task_keys:
_update_computetask(key=str(task_key), failure_report={"error_type": failure_report.get("error_type")})
else:
_update_computetask(key=asset_key, failure_report=failure_report)


EVENT_CALLBACKS = {
Expand Down
30 changes: 30 additions & 0 deletions backend/api/migrations/0053_function_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 4.2.3 on 2023-08-23 13:18

from django.db import migrations
from django.db import models


class Migration(migrations.Migration):
dependencies = [
("api", "0052_remove_metric_from_performance"),
]

operations = [
migrations.AddField(
model_name="function",
name="status",
field=models.CharField(
choices=[
("FUNCTION_STATUS_UNKNOWN", "Function Status Unknown"),
("FUNCTION_STATUS_WAITING", "Function Status Waiting"),
("FUNCTION_STATUS_BUILDING", "Function Status Building"),
("FUNCTION_STATUS_READY", "Function Status Ready"),
("FUNCTION_STATUS_CANCELED", "Function Status Canceled"),
("FUNCTION_STATUS_FAILED", "Function Status Failed"),
],
default="FUNCTION_STATUS_UNKNOWN",
max_length=64,
),
preserve_default=False,
),
]
6 changes: 6 additions & 0 deletions backend/api/models/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import orchestrator.common_pb2 as common_pb2
from api.models.utils import AssetPermissionMixin
from api.models.utils import URLValidatorWithOptionalTLD
from orchestrator import function_pb2


class FunctionInput(models.Model):
Expand Down Expand Up @@ -43,6 +44,10 @@ class Meta:
class Function(models.Model, AssetPermissionMixin):
"""Function represent a function and its associated metadata"""

Status = models.TextChoices(
"Status", [(status_name, status_name) for status_name in function_pb2.FunctionStatus.keys()]
)

key = models.UUIDField(primary_key=True)
name = models.CharField(max_length=100)
description_address = models.URLField(validators=[URLValidatorWithOptionalTLD()])
Expand All @@ -57,6 +62,7 @@ class Function(models.Model, AssetPermissionMixin):
creation_date = models.DateTimeField()
metadata = models.JSONField()
channel = models.CharField(max_length=100)
status = models.CharField(max_length=64, choices=Status.choices)

class Meta:
ordering = ["creation_date", "key"] # default order for relations serializations
1 change: 1 addition & 0 deletions backend/api/serializers/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Meta:
"permissions",
"inputs",
"outputs",
"status",
]

def to_representation(self, instance):
Expand Down
37 changes: 28 additions & 9 deletions backend/api/tests/asset_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import datetime
import uuid
from typing import Optional

from django.core import files
from django.utils import timezone
Expand All @@ -80,9 +81,10 @@
from api.models import Model
from api.models import Performance
from api.models import TaskProfiling
from substrapp.models import ComputeTaskFailureReport as ComputeTaskLogs
from substrapp.models import AssetFailureReport
from substrapp.models import DataManager as DataManagerFiles
from substrapp.models import DataSample as DataSampleFiles
from substrapp.models import FailedAssetKind
from substrapp.models import Function as FunctionFiles
from substrapp.models import Model as ModelFiles
from substrapp.utils import get_hash
Expand Down Expand Up @@ -236,6 +238,7 @@ def create_function(
creation_date=timezone.now(),
owner=owner,
channel=channel,
status=Function.Status.FUNCTION_STATUS_WAITING,
**get_permissions(owner, public),
)

Expand Down Expand Up @@ -534,20 +537,36 @@ def create_model_files(
return model_files


def create_computetask_logs(
compute_task_key: uuid.UUID,
logs: files.File = None,
) -> ComputeTaskLogs:
def create_asset_logs(
asset_key: uuid.UUID,
asset_type: FailedAssetKind,
logs: Optional[files.File] = None,
) -> AssetFailureReport:
if logs is None:
logs = files.base.ContentFile("dummy content")

compute_task_logs = ComputeTaskLogs.objects.create(
compute_task_key=compute_task_key,
asset_logs = AssetFailureReport.objects.create(
asset_key=asset_key,
asset_type=asset_type,
logs_checksum=get_hash(logs),
creation_date=timezone.now(),
)
compute_task_logs.logs.save("logs", logs)
return compute_task_logs
asset_logs.logs.save("logs", logs)
return asset_logs


def create_computetask_logs(
compute_task_key: uuid.UUID,
logs: Optional[files.File] = None,
) -> AssetFailureReport:
return create_asset_logs(compute_task_key, FailedAssetKind.FAILED_ASSET_COMPUTE_TASK, logs)


def create_function_logs(
function_key: uuid.UUID,
logs: Optional[files.File] = None,
) -> AssetFailureReport:
return create_asset_logs(function_key, FailedAssetKind.FAILED_ASSET_FUNCTION, logs)


def create_computetask_profiling(compute_task: ComputeTask) -> TaskProfiling:
Expand Down
1 change: 0 additions & 1 deletion backend/api/tests/views/test_views_computetask.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class GenericTaskViewTests(ComputeTaskViewTests):
def setUp(self):
super().setUp()
self.url = reverse("api:task-list")
self.maxDiff = None

todo_task = self.compute_tasks[ComputeTask.Status.STATUS_TODO]
waiting_task = self.compute_tasks[ComputeTask.Status.STATUS_WAITING]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from api.views import utils as view_utils
from organization import authentication as organization_auth
from organization import models as organization_models
from substrapp.models import ComputeTaskFailureReport
from substrapp.models import AssetFailureReport


@pytest.fixture
def compute_task_failure_report() -> tuple[ComputeTask, ComputeTaskFailureReport]:
def asset_failure_report() -> tuple[ComputeTask, AssetFailureReport]:
compute_task = factory.create_computetask(
factory.create_computeplan(),
factory.create_function(),
Expand All @@ -41,12 +41,12 @@ def test_download_logs_failure_unauthenticated(api_client: test.APIClient):

@pytest.mark.django_db
def test_download_local_logs_success(
compute_task_failure_report,
asset_failure_report,
authenticated_client: test.APIClient,
):
"""An authorized user download logs located on the organization."""

compute_task, failure_report = compute_task_failure_report
compute_task, failure_report = asset_failure_report
assert compute_task.owner == conf.settings.LEDGER_MSP_ID # local
assert conf.settings.LEDGER_MSP_ID in compute_task.logs_permission_authorized_ids # allowed

Expand All @@ -60,12 +60,12 @@ def test_download_local_logs_success(

@pytest.mark.django_db
def test_download_logs_failure_forbidden(
compute_task_failure_report,
asset_failure_report,
authenticated_client: test.APIClient,
):
"""An authenticated user cannot download logs if he is not authorized."""

compute_task, failure_report = compute_task_failure_report
compute_task, failure_report = asset_failure_report
assert compute_task.owner == conf.settings.LEDGER_MSP_ID # local
compute_task.logs_permission_authorized_ids = [] # not allowed
compute_task.save()
Expand All @@ -77,12 +77,12 @@ def test_download_logs_failure_forbidden(

@pytest.mark.django_db
def test_download_local_logs_failure_not_found(
compute_task_failure_report,
asset_failure_report,
authenticated_client: test.APIClient,
):
"""An authorized user attempt to download logs that are not referenced in the database."""

compute_task, failure_report = compute_task_failure_report
compute_task, failure_report = asset_failure_report
assert compute_task.owner == conf.settings.LEDGER_MSP_ID # local
assert conf.settings.LEDGER_MSP_ID in compute_task.logs_permission_authorized_ids # allowed
failure_report.delete() # not found
Expand All @@ -94,12 +94,12 @@ def test_download_local_logs_failure_not_found(

@pytest.mark.django_db
def test_download_remote_logs_success(
compute_task_failure_report,
asset_failure_report,
authenticated_client: test.APIClient,
):
"""An authorized user download logs on a remote organization by using his organization as proxy."""

compute_task, failure_report = compute_task_failure_report
compute_task, failure_report = asset_failure_report
outgoing_organization = "outgoing-organization"
compute_task.logs_owner = outgoing_organization # remote
compute_task.logs_permission_authorized_ids = [conf.settings.LEDGER_MSP_ID, outgoing_organization] # allowed
Expand Down Expand Up @@ -139,13 +139,13 @@ def get_proxy_headers(channel_name: str) -> dict[str, str]:

@pytest.mark.django_db
def test_organization_download_logs_success(
compute_task_failure_report,
asset_failure_report,
api_client: test.APIClient,
incoming_organization_user: organization_auth.OrganizationUser,
):
"""An authorized organization can download logs from another organization."""

compute_task, failure_report = compute_task_failure_report
compute_task, failure_report = asset_failure_report
compute_task.logs_owner = conf.settings.LEDGER_MSP_ID # local (incoming request from remote)
compute_task.logs_permission_authorized_ids = [
conf.settings.LEDGER_MSP_ID,
Expand All @@ -166,13 +166,13 @@ def test_organization_download_logs_success(

@pytest.mark.django_db
def test_organization_download_logs_forbidden(
compute_task_failure_report,
asset_failure_report,
api_client: test.APIClient,
incoming_organization_user: organization_auth.OrganizationUser,
):
"""An unauthorized organization cannot download logs from another organization."""

compute_task, failure_report = compute_task_failure_report
compute_task, failure_report = asset_failure_report
compute_task.logs_owner = conf.settings.LEDGER_MSP_ID # local (incoming request from remote)
compute_task.logs_permission_authorized_ids = [conf.settings.LEDGER_MSP_ID] # incoming user not allowed
compute_task.channel = incoming_organization_user.username
Expand Down
6 changes: 6 additions & 0 deletions backend/api/tests/views/test_views_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def setUp(self):
"outputs": {
"model": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_WAITING",
},
{
"key": str(aggregate_function.key),
Expand Down Expand Up @@ -135,6 +136,7 @@ def setUp(self):
"outputs": {
"model": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_WAITING",
},
{
"key": str(composite_function.key),
Expand Down Expand Up @@ -170,6 +172,7 @@ def setUp(self):
"local": {"kind": "ASSET_MODEL", "multiple": False},
"shared": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_WAITING",
},
{
"key": str(predict_function.key),
Expand Down Expand Up @@ -204,6 +207,7 @@ def setUp(self):
"outputs": {
"predictions": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_WAITING",
},
{
"key": str(metric_function.key),
Expand Down Expand Up @@ -237,6 +241,7 @@ def setUp(self):
"outputs": {
"performance": {"kind": "ASSET_PERFORMANCE", "multiple": False},
},
"status": "FUNCTION_STATUS_WAITING",
},
]

Expand Down Expand Up @@ -448,6 +453,7 @@ def mock_orc_response(data):
"function": data["function"],
"inputs": data["inputs"],
"outputs": data["outputs"],
"status": Function.Status.FUNCTION_STATUS_WAITING,
}

function_path = os.path.join(FIXTURE_PATH, filename)
Expand Down
Loading

0 comments on commit 4c8731f

Please sign in to comment.