From 056e9b425517edb87e78f30e7fce84d51f2fa5f3 Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Fri, 19 May 2023 12:30:33 +0200 Subject: [PATCH] Add pull-through caching closes #507 --- CHANGES/507.feature | 3 + docs/workflows/host.rst | 35 ++++ pulp_container/app/cache.py | 17 +- pulp_container/app/downloaders.py | 16 +- .../0037_create_pull_through_cache_models.py | 43 +++++ pulp_container/app/models.py | 147 +++++++++++++++ pulp_container/app/registry.py | 94 ++++++++-- pulp_container/app/registry_api.py | 119 +++++++++++- pulp_container/app/serializers.py | 48 ++++- pulp_container/app/tasks/__init__.py | 1 + .../app/tasks/download_image_data.py | 147 +++++++++++++++ pulp_container/app/tasks/sync_stages.py | 79 +------- pulp_container/app/utils.py | 75 ++++++++ pulp_container/app/viewsets.py | 177 ++++++++++++++++++ .../functional/api/test_pull_through_cache.py | 122 ++++++++++++ pulp_container/tests/functional/conftest.py | 14 ++ requirements.txt | 2 +- 17 files changed, 1035 insertions(+), 104 deletions(-) create mode 100644 CHANGES/507.feature create mode 100644 pulp_container/app/migrations/0037_create_pull_through_cache_models.py create mode 100644 pulp_container/app/tasks/download_image_data.py create mode 100644 pulp_container/tests/functional/api/test_pull_through_cache.py diff --git a/CHANGES/507.feature b/CHANGES/507.feature new file mode 100644 index 000000000..df2a3bbd8 --- /dev/null +++ b/CHANGES/507.feature @@ -0,0 +1,3 @@ +Added support for pull-through caching. Users can now configure a dedicated distribution and remote +linked to an external registry without specifying a repository name (upstream name). Pulp downloads +missing content automatically if requested and acts as a caching proxy. diff --git a/docs/workflows/host.rst b/docs/workflows/host.rst index 867a874ed..c8225fffe 100644 --- a/docs/workflows/host.rst +++ b/docs/workflows/host.rst @@ -117,3 +117,38 @@ Docker Output:: In general, the automatic conversion cannot be performed when the content is not available in the storage. Therefore, it may be successful only if the content was previously synced with the ``immediate`` policy. + + +Pull-Through Caching +-------------------- + +The Pull-Through Caching feature offers an alternative way to host content by leveraging a **remote +registry** as the source of truth. This eliminates the need for repository synchronization, reducing +storage overhead, and ensuring up-to-date images. Pulp acts as a **caching proxy** and stores images +in a local repository. + +Administering the caching:: + + # initialize a pull-through remote (the concept of upstream-name is not applicable here) + REMOTE_HREF=$(http ${BASE_ADDR}/pulp/api/v3/remotes/container/pull-through/ name=docker-cache url=https://registry-1.docker.io | jq -r ".pulp_href") + + # create a specialized distribution linked to the initialized remote + http ${BASE_ADDR}/pulp/api/v3/distributions/container/pull-through/ remote=${REMOTE_HREF} name=docker-cache base_path=docker-cache + +Downloading content:: + + podman pull localhost:24817/docker-cache/library/busybox + +In the example above, the image "busybox" is pulled from the "docker-cache" distribution, acting as +a transparent caching layer. + +By incorporating the Pull-Through Caching feature, administrators can **reduce external network +dependencies**, and ensure a more reliable and responsive container deployment system in production +environments. + +.. note:: + Pulp creates repositories that maintain a single repository version for user-pulled images. + Thus, only the latest repository version is retained. For instance, when pulling "debian:10," + a "debian" repository with the "10" tag is established. Subsequent pulls such as "debian:11" + result in a new repository version that incorporates both tags while removing the previous + version. Repositories and their content remain manageable through standard API endpoints. diff --git a/pulp_container/app/cache.py b/pulp_container/app/cache.py index 4b9151027..36e7ef77f 100644 --- a/pulp_container/app/cache.py +++ b/pulp_container/app/cache.py @@ -1,8 +1,9 @@ from django.core.exceptions import ObjectDoesNotExist +from django.db.models import F, Value from pulpcore.plugin.cache import CacheKeys, AsyncContentCache, SyncContentCache -from pulp_container.app.models import ContainerDistribution +from pulp_container.app.models import ContainerDistribution, ContainerPullThroughDistribution from pulp_container.app.exceptions import RepositoryNotFound ACCEPT_HEADER_KEY = "accept_header" @@ -67,11 +68,17 @@ def find_base_path_cached(request, cached): return path else: try: - distro = ContainerDistribution.objects.select_related( - "repository", "repository_version" - ).get(base_path=path) + distro = ContainerDistribution.objects.get(base_path=path) except ObjectDoesNotExist: - raise RepositoryNotFound(name=path) + distro = ( + ContainerPullThroughDistribution.objects.annotate(path=Value(path)) + .filter(path__startswith=F("base_path")) + .order_by("-base_path") + .first() + ) + if not distro: + raise RepositoryNotFound(name=path) + return distro.base_path diff --git a/pulp_container/app/downloaders.py b/pulp_container/app/downloaders.py index 4db7ae44d..f6c81fbef 100644 --- a/pulp_container/app/downloaders.py +++ b/pulp_container/app/downloaders.py @@ -5,6 +5,7 @@ import re from aiohttp.client_exceptions import ClientResponseError +from collections import namedtuple from logging import getLogger from multidict import MultiDict from urllib import parse @@ -15,6 +16,8 @@ log = getLogger(__name__) +InMemoryDownloadResult = namedtuple("InMemoryDownloadResult", ["data", "headers", "status_code"]) + class RegistryAuthHttpDownloader(HttpDownloader): """ @@ -24,13 +27,14 @@ class RegistryAuthHttpDownloader(HttpDownloader): """ registry_auth = {"bearer": None, "basic": None} - token_lock = asyncio.Lock() def __init__(self, *args, **kwargs): """ Initialize the downloader. """ self.remote = kwargs.pop("remote") + self.token_lock = asyncio.Lock() + super().__init__(*args, **kwargs) async def _run(self, handle_401=True, extra_data=None): @@ -174,6 +178,16 @@ def auth_header(token, basic_auth): return {} +class InMemoryDownloader(RegistryAuthHttpDownloader): + """A downloader class suited for downloading data in-memory.""" + + async def _handle_response(self, response): + data = await response.text() + return InMemoryDownloadResult( + data=data, headers=response.headers, status_code=response.status + ) + + class NoAuthSignatureDownloader(HttpDownloader): """A downloader class suited for signature downloads.""" diff --git a/pulp_container/app/migrations/0037_create_pull_through_cache_models.py b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py new file mode 100644 index 000000000..b247a6cfa --- /dev/null +++ b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py @@ -0,0 +1,43 @@ +# Generated by Django 4.2.6 on 2023-10-25 20:04 + +from django.db import migrations, models +import django.db.models.deletion +import pulpcore.app.models.access_policy + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0108_task_versions'), + ('container', '0036_containerpushrepository_pending_blobs_manifests'), + ] + + operations = [ + migrations.CreateModel( + name='ContainerPullThroughDistribution', + fields=[ + ('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')), + ], + options={ + 'permissions': [('manage_roles_containerpullthroughdistribution', 'Can manage role assignments on pull-through cache distribution')], + 'default_related_name': '%(app_label)s_%(model_name)s', + }, + bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.CreateModel( + name='ContainerPullThroughRemote', + fields=[ + ('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')), + ], + options={ + 'permissions': [('manage_roles_containerpullthroughremote', 'Can manage role assignments on pull-through container remote')], + 'default_related_name': '%(app_label)s_%(model_name)s', + }, + bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.AddField( + model_name='containerdistribution', + name='pull_through_distribution', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='distributions', to='container.containerpullthroughdistribution'), + ), + ] diff --git a/pulp_container/app/models.py b/pulp_container/app/models.py index beef5f984..dea2db436 100644 --- a/pulp_container/app/models.py +++ b/pulp_container/app/models.py @@ -334,6 +334,33 @@ def noauth_download_factory(self): ) return self._noauth_download_factory + @property + def in_memory_download_factory(self): + """ + A Downloader Factory that stores downloaded data in-memory. + + This downloader should be used in workflows where the size of downloaded content is + reasonably small. For instance, for downloading manifests or manifest lists. + + Upon first access, the InMemoryDownloaderFactory is instantiated and saved internally. + + Returns: + DownloadFactory: The instantiated InMemoryDownloaderFactory to be used by + get_in_memory_downloader(). + + """ + try: + return self._in_memory_download_factory + except AttributeError: + self._in_memory_download_factory = DownloaderFactory( + self, + downloader_overrides={ + "http": downloaders.InMemoryDownloader, + "https": downloaders.InMemoryDownloader, + }, + ) + return self._in_memory_download_factory + def get_downloader(self, remote_artifact=None, url=None, **kwargs): """ Get a downloader from either a RemoteArtifact or URL that is configured with this Remote. @@ -388,6 +415,36 @@ def get_noauth_downloader(self, remote_artifact=None, url=None, **kwargs): **kwargs, ) + def get_in_memory_downloader(self, remote_artifact=None, url=None, **kwargs): + """ + Get an in-memory downloader from either a RemoteArtifact or URL that is provided. + + This method accepts either `remote_artifact` or `url` but not both. At least one is + required. If neither of both are passed a ValueError is raised. + + Args: + remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to + download. + url (str): The URL to download. + kwargs (dict): This accepts the parameters of + :class:`~pulpcore.plugin.download.BaseDownloader`. + + Raises: + ValueError: If neither remote_artifact and url are passed, or if both are passed. + + Returns: + subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that + is configured with the remote settings. + + """ + kwargs["remote"] = self + return super().get_downloader( + remote_artifact=remote_artifact, + url=url, + download_factory=self.in_memory_download_factory, + **kwargs, + ) + @property def namespaced_upstream_name(self): """ @@ -413,6 +470,72 @@ class Meta: ] +class ContainerPullThroughRemote(Remote, AutoAddObjPermsMixin): + """ + A remote for pull-through caching, omitting the requirement for the upstream name. + """ + + TYPE = "pull-through" + + @property + def download_factory(self): + """ + Downloader Factory that maps to custom downloaders which support registry auth. + + Upon first access, the DownloaderFactory is instantiated and saved internally. + + Returns: + DownloadFactory: The instantiated DownloaderFactory to be used by + get_downloader() + + """ + try: + return self._download_factory + except AttributeError: + self._download_factory = DownloaderFactory( + self, + downloader_overrides={ + "http": downloaders.RegistryAuthHttpDownloader, + "https": downloaders.RegistryAuthHttpDownloader, + }, + ) + return self._download_factory + + def get_downloader(self, remote_artifact=None, url=None, **kwargs): + """ + Get a downloader from either a RemoteArtifact or URL that is configured with this Remote. + + This method accepts either `remote_artifact` or `url` but not both. At least one is + required. If neither or both are passed a ValueError is raised. + + Args: + remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to + download. + url (str): The URL to download. + kwargs (dict): This accepts the parameters of + :class:`~pulpcore.plugin.download.BaseDownloader`. + + Raises: + ValueError: If neither remote_artifact and url are passed, or if both are passed. + + Returns: + subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that + is configured with the remote settings. + + """ + kwargs["remote"] = self + return super().get_downloader(remote_artifact=remote_artifact, url=url, **kwargs) + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + permissions = [ + ( + "manage_roles_containerpullthroughremote", + "Can manage role assignments on pull-through container remote", + ), + ] + + class ManifestSigningService(SigningService): """ Signing service used for creating container signatures. @@ -565,6 +688,23 @@ def remove_pending_content(self, repository_version): self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content)) +class ContainerPullThroughDistribution(Distribution, AutoAddObjPermsMixin): + """ + A distribution for pull-through caching, referencing normal distributions. + """ + + TYPE = "pull-through" + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + permissions = [ + ( + "manage_roles_containerpullthroughdistribution", + "Can manage role assignments on pull-through cache distribution", + ), + ] + + class ContainerDistribution(Distribution, AutoAddObjPermsMixin): """ A container distribution defines how a repository version is distributed by Pulp's webserver. @@ -595,6 +735,13 @@ class ContainerDistribution(Distribution, AutoAddObjPermsMixin): ) description = models.TextField(null=True) + pull_through_distribution = models.ForeignKey( + ContainerPullThroughDistribution, + related_name="distributions", + on_delete=models.CASCADE, + null=True, + ) + def get_repository_version(self): """ Returns the repository version that is supposed to be served by this ContainerDistribution. diff --git a/pulp_container/app/registry.py b/pulp_container/app/registry.py index 706ae8998..719c06b2d 100644 --- a/pulp_container/app/registry.py +++ b/pulp_container/app/registry.py @@ -1,22 +1,35 @@ +import time +import json import logging import os from asgiref.sync import sync_to_async +from urllib.parse import urljoin + from aiohttp import web +from aiohttp.web_exceptions import HTTPTooManyRequests +from django_guid import set_guid +from django_guid.utils import generate_guid from django.conf import settings from django.core.exceptions import ObjectDoesNotExist from multidict import MultiDict from pulpcore.plugin.content import Handler, PathNotResolved -from pulpcore.plugin.models import Content, ContentArtifact +from pulpcore.plugin.models import Content, ContentArtifact, Task from pulpcore.plugin.content import ArtifactResponse +from pulpcore.plugin.tasking import dispatch from pulp_container.app.cache import RegistryContentCache from pulp_container.app.models import ContainerDistribution, Tag, Blob from pulp_container.app.schema_convert import Schema2toSchema1ConverterWrapper -from pulp_container.app.utils import get_accepted_media_types -from pulp_container.constants import BLOB_CONTENT_TYPE, EMPTY_BLOB, MEDIA_TYPE +from pulp_container.app.tasks import download_image_data +from pulp_container.app.utils import ( + calculate_digest, + get_accepted_media_types, + determine_media_type, +) +from pulp_container.constants import BLOB_CONTENT_TYPE, EMPTY_BLOB, MEDIA_TYPE, V2_ACCEPT_HEADERS log = logging.getLogger(__name__) @@ -117,7 +130,67 @@ async def get_tag(self, request): pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name ) except ObjectDoesNotExist: - raise PathNotResolved(tag_name) + if distribution.remote: + remote = await distribution.remote.acast() + + relative_url = "/v2/{name}/manifests/{tag}".format( + name=remote.namespaced_upstream_name, tag=tag_name + ) + tag_url = urljoin(remote.url, relative_url) + downloader = remote.get_in_memory_downloader(url=tag_url) + response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}) + + set_guid(generate_guid()) + task = await sync_to_async(dispatch)( + download_image_data, + exclusive_resources=[repository_version.repository], + kwargs={ + "repository_pk": repository_version.repository.pk, + "remote_pk": remote.pk, + "tag_name": tag_name, + "response_data": response.data, + }, + ) + + # waiting shortly for the task to be completed since a container client could + # request related content (i.e., manifests and blobs) and halt the pull operation + # if the content was not immediately served + for dummy in range(3): + time.sleep(1) + task = await Task.objects.aget(pk=task.pk) + if task.state == "completed": + await task.adelete() + break + elif task.state in ["waiting", "running"]: + continue + else: + error = task.error + await task.adelete() + raise Exception(str(error)) + else: + raise HTTPTooManyRequests() + + try: + manifest_data = json.loads(response.data) + except json.decoder.JSONDecodeError: + raise PathNotResolved(tag_name) + else: + encoded_data = response.data.encode("utf-8") + digest = calculate_digest(encoded_data) + media_type = determine_media_type(manifest_data, response) + + response_headers = { + "Content-Type": media_type, + "Docker-Content-Digest": digest, + } + + # at this time, the manifest artifact was already established, and we can return it + # as it is; meanwhile, the dispatched task has created Manifest/Blob objects and + # relations between them; the said content units are streamed/downloaded on demand + # to a client on a next run + return web.Response(text=response.data, headers=response_headers) + else: + raise PathNotResolved(tag_name) # we do not convert OCI to docker oci_mediatypes = [MEDIA_TYPE.MANIFEST_OCI, MEDIA_TYPE.INDEX_OCI] @@ -155,8 +228,7 @@ async def get_tag(self, request): async def dispatch_tag(self, request, tag, response_headers): """ - Finds an artifact associated with a Tag and sends it to the client, otherwise tries - to stream it. + Finds an artifact associated with a Tag and sends it to the client. Args: request(:class:`~aiohttp.web.Request`): The request to prepare a response for. @@ -169,13 +241,8 @@ async def dispatch_tag(self, request, tag, response_headers): streamed back to the client. """ - try: - artifact = await tag.tagged_manifest._artifacts.aget() - except ObjectDoesNotExist: - ca = await sync_to_async(lambda x: x[0])(tag.tagged_manifest.contentartifact_set.all()) - return await self._stream_content_artifact(request, web.StreamResponse(), ca) - else: - return await Registry._dispatch(artifact, response_headers) + artifact = await sync_to_async(tag.tagged_manifest._artifacts.get)() + return await Registry._dispatch(artifact, response_headers) @staticmethod async def dispatch_converted_schema(tag, accepted_media_types, path): @@ -219,7 +286,6 @@ async def get_by_digest(self, request): """ Return a response to the "GET" action. """ - path = request.match_info["path"] digest = "sha256:{digest}".format(digest=request.match_info["digest"]) distribution = await sync_to_async(self._match_distribution)(path) diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index cda1cbcf9..70c35e463 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -11,13 +11,15 @@ import hashlib import re +from aiohttp.client_exceptions import ClientResponseError from itertools import chain -from urllib.parse import urlparse, urlunparse, parse_qs, urlencode +from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode from tempfile import NamedTemporaryFile from django.core.files.storage import default_storage as storage from django.core.files.base import ContentFile, File from django.db import IntegrityError, transaction +from django.db.models import F, Value from django.shortcuts import get_object_or_404 from django.conf import settings @@ -85,6 +87,7 @@ SIGNATURE_HEADER, SIGNATURE_PAYLOAD_MAX_SIZE, SIGNATURE_TYPE, + V2_ACCEPT_HEADERS, ) log = logging.getLogger(__name__) @@ -234,7 +237,7 @@ def default_response_headers(self): def get_exception_handler_context(self): """ - Adjust the reder context for exceptions. + Adjust the render context for exceptions. """ context = super().get_exception_handler_context() if context["request"]: @@ -272,7 +275,8 @@ def get_drv_pull(self, path): try: distribution = models.ContainerDistribution.objects.get(base_path=path) except models.ContainerDistribution.DoesNotExist: - raise RepositoryNotFound(name=path) + # get a pull-through cache distribution whose base_path is a substring of the path + return self.get_pull_through_drv(path) if distribution.repository: repository_version = distribution.repository.latest_version() elif distribution.repository_version: @@ -281,6 +285,44 @@ def get_drv_pull(self, path): raise RepositoryNotFound(name=path) return distribution, distribution.repository, repository_version + def get_pull_through_drv(self, path): + root_cache_distribution = ( + models.ContainerPullThroughDistribution.objects.annotate(path=Value(path)) + .filter(path__startswith=F("base_path")) + .order_by("-base_path") + .first() + ) + if not root_cache_distribution: + raise RepositoryNotFound(name=path) + + try: + with transaction.atomic(): + cache_repository, _ = models.ContainerRepository.objects.get_or_create( + name=path, retain_repo_versions=1 + ) + + upstream_name = path.split(root_cache_distribution.base_path, maxsplit=1)[1] + cache_remote, _ = models.ContainerRemote.objects.get_or_create( + name=path, + upstream_name=upstream_name.strip("/"), + url=root_cache_distribution.remote.url, + ) + + cache_distribution, _ = models.ContainerDistribution.objects.get_or_create( + name=path, + base_path=path, + remote=cache_remote, + repository=cache_repository, + ) + except IntegrityError: + # some entities needed to be created, but their keys already exist in the database + # (e.g., a repository with the same name as the constructed path) + raise RepositoryNotFound(name=path) + else: + root_cache_distribution.distributions.add(cache_distribution) + + return cache_distribution, cache_repository, cache_repository.latest_version() + def get_dr_push(self, request, path, create=False): """ Get distribution and repository for push access. @@ -973,13 +1015,30 @@ def handle_safe_method(self, request, path, pk): try: tag = models.Tag.objects.get(name=pk, pk__in=repository_version.content) except models.Tag.DoesNotExist: - raise ManifestNotFound(reference=pk) + if distribution.remote: + remote = distribution.remote.cast() + repository = distribution.repository.cast() + manifest = self.fetch_manifest(remote, repository_version, repository, pk) + if manifest is None: + return redirects.redirect_to_content_app("manifests", pk) + + tag = models.Tag(name=pk, tagged_manifest=manifest) + try: + tag.save() + except IntegrityError: + tag = models.Tag.objects.get(name=tag.name, tagged_manifest=manifest) + tag.touch() + + return redirects.redirect_to_content_app("manifests", tag.name) + else: + raise ManifestNotFound(reference=pk) return redirects.issue_tag_redirect(tag) else: try: manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) - except models.Manifest.DoesNotExit: + except models.Manifest.DoesNotExist: + repository = repository.cast() if repository.PUSH_ENABLED: # the manifest might be a part of listed manifests currently being uploaded try: @@ -988,10 +1047,49 @@ def handle_safe_method(self, request, path, pk): except models.Manifest.DoesNotExist: raise ManifestNotFound(reference=pk) else: - ManifestNotFound(reference=pk) + if distribution.remote: + remote = distribution.remote.cast() + manifest = self.fetch_manifest(remote, repository_version, repository, pk) + if manifest is None: + return redirects.redirect_to_content_app("manifests", pk) + + raise ManifestNotFound(reference=pk) + else: + ManifestNotFound(reference=pk) return redirects.issue_manifest_redirect(manifest) + def fetch_manifest(self, remote, repository, repository_version, pk): + relative_url = "/v2/{name}/manifests/{pk}".format( + name=remote.namespaced_upstream_name, pk=pk + ) + tag_url = urljoin(remote.url, relative_url) + downloader = remote.get_in_memory_downloader(url=tag_url) + try: + response = downloader.fetch( + extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} + ) + except ClientResponseError as response_error: + try: + return models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) + except models.Manifest.DoesNotExist: + try: + manifest = repository.pending_manifests.get(digest=pk) + manifest.touch() + return manifest + except models.Manifest.DoesNotExist: + pass + + if response_error.status == 429: + # the client could request the manifest outside the docker hub pull limit; + # it is necessary to pass this information back to the client + raise Throttled() + else: + raise ManifestNotFound(reference=pk) + else: + digest = response.headers.get("docker-content-digest") + return models.Manifest.objects.filter(digest=digest).first() + def put(self, request, path, pk=None): """ Responds with the actual manifest @@ -1229,12 +1327,17 @@ def head(self, request, path, pk=None): def get(self, request, path, pk): """Return a signature identified by its sha256 checksum.""" - _, _, repository_version = self.get_drv_pull(path) + _, repository, repository_version = self.get_drv_pull(path) try: manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) except models.Manifest.DoesNotExist: - raise ManifestNotFound(reference=pk) + repository = repository.cast() + try: + manifest = repository.pending_manifests.get(digest=pk) + manifest.touch() + except models.Manifest.DoesNotExist: + raise ManifestNotFound(reference=pk) signatures = models.ManifestSignature.objects.filter( signed_manifest=manifest, pk__in=repository_version.content diff --git a/pulp_container/app/serializers.py b/pulp_container/app/serializers.py index 87393a713..51751d65d 100644 --- a/pulp_container/app/serializers.py +++ b/pulp_container/app/serializers.py @@ -277,6 +277,22 @@ class Meta: model = models.ContainerRemote +class ContainerPullThroughRemoteSerializer(RemoteSerializer): + """ + A serializer for a remote used in the pull-through distribution. + """ + + policy = serializers.ChoiceField( + help_text="The policy always mimics the on_demand behaviour when performing pull-through.", + choices=((models.Remote.ON_DEMAND, "When syncing, download just the metadata.")), + default=models.Remote.ON_DEMAND, + ) + + class Meta: + fields = RemoteSerializer.Meta.fields + model = models.ContainerPullThroughRemote + + class ContainerDistributionSerializer(DistributionSerializer): """ A serializer for ContainerDistribution. @@ -309,10 +325,16 @@ class ContainerDistributionSerializer(DistributionSerializer): repository_version = RepositoryVersionRelatedField( required=False, help_text=_("RepositoryVersion to be served"), allow_null=True ) + remote = DetailRelatedField( + required=False, + help_text=_("Remote that can be used to fetch content when using pull-through caching."), + view_name_pattern=r"remotes(-.*/.*)?-detail", + queryset=models.ContainerRemote.objects.all(), + ) def validate(self, data): """ - Validate the ContainterDistribution. + Validate the ContainerDistribution. Make sure there is an instance of ContentRedirectContentGuard always present in validated data. @@ -360,12 +382,36 @@ class Meta: fields = tuple(set(DistributionSerializer.Meta.fields) - {"base_url"}) + ( "repository_version", "registry_path", + "remote", "namespace", "private", "description", ) +class ContainerPullThroughDistributionSerializer(DistributionSerializer): + """ + A serializer for a specialized pull-through distribution referencing sub-distributions. + """ + + remote = DetailRelatedField( + help_text=_("Remote that can be used to fetch content when using pull-through caching."), + view_name_pattern=r"remotes(-.*/.*)-detail", + queryset=models.ContainerPullThroughRemote.objects.all(), + ) + distributions = DetailRelatedField( + many=True, + help_text="Distributions created after pulling content through cache", + view_name="distributions-detail", + queryset=models.ContainerDistribution.objects.all(), + required=False, + ) + + class Meta: + model = models.ContainerPullThroughDistribution + fields = DistributionSerializer.Meta.fields + ("remote", "distributions") + + class TagOperationSerializer(ValidateFieldsMixin, serializers.Serializer): """ A base serializer for tagging and untagging manifests. diff --git a/pulp_container/app/tasks/__init__.py b/pulp_container/app/tasks/__init__.py index 6e4392924..09f335a3d 100644 --- a/pulp_container/app/tasks/__init__.py +++ b/pulp_container/app/tasks/__init__.py @@ -1,3 +1,4 @@ +from .download_image_data import download_image_data # noqa from .builder import build_image_from_containerfile # noqa from .recursive_add import recursive_add_content # noqa from .recursive_remove import recursive_remove_content # noqa diff --git a/pulp_container/app/tasks/download_image_data.py b/pulp_container/app/tasks/download_image_data.py new file mode 100644 index 000000000..f4076a391 --- /dev/null +++ b/pulp_container/app/tasks/download_image_data.py @@ -0,0 +1,147 @@ +import asyncio +import json +import logging + +from tempfile import NamedTemporaryFile + +from asgiref.sync import sync_to_async + +from django.db import IntegrityError + +from pulpcore.plugin.models import Artifact +from pulpcore.plugin.stages import ( + ArtifactDownloader, + ArtifactSaver, + DeclarativeContent, + DeclarativeVersion, + RemoteArtifactSaver, + ResolveContentFutures, + QueryExistingArtifacts, + QueryExistingContents, +) + +from pulp_container.app.models import ContainerRemote, ContainerRepository, Tag +from pulp_container.app.utils import determine_media_type_from_json +from pulp_container.constants import MEDIA_TYPE + +from .sync_stages import ContainerContentSaver, ContainerFirstStage + +log = logging.getLogger(__name__) + + +def download_image_data(repository_pk, remote_pk, tag_name, response_data): + repository = ContainerRepository.objects.get(pk=repository_pk) + remote = ContainerRemote.objects.get(pk=remote_pk).cast() + log.info("Pulling cache: repository={r} remote={p}".format(r=repository.name, p=remote.name)) + first_stage = ContainerPullThroughFirstStage(remote, tag_name, response_data) + dv = ContainerPullThroughCacheDeclarativeVersion(first_stage, repository, mirror=False) + return dv.create() + + +class ContainerPullThroughFirstStage(ContainerFirstStage): + """The stage that prepares the pipeline for downloading a specific tag and its data.""" + + def __init__(self, remote, tag_name, response_data): + """Initialize the stage.""" + super().__init__(remote, signed_only=False) + self.tag_name = tag_name + self.response_data = response_data + + async def run(self): + """Run the stage and set declarative content for one tag, its manifest, and blobs.""" + tag_dc = DeclarativeContent(Tag(name=self.tag_name)) + + content_data = json.loads(self.response_data) + with NamedTemporaryFile("w") as temp_file: + temp_file.write(self.response_data) + temp_file.flush() + + artifact = Artifact.init_and_validate(temp_file.name) + try: + await artifact.asave() + except IntegrityError: + artifact = await Artifact.objects.aget(sha256=artifact.sha256) + await sync_to_async(artifact.touch)() + + media_type = determine_media_type_from_json(content_data) + if media_type in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): + list_dc = self.create_tagged_manifest_list( + self.tag_name, artifact, content_data, media_type + ) + for listed_manifest_task in asyncio.as_completed( + [ + self.create_listed_manifest(manifest_data) + for manifest_data in content_data.get("manifests") + ] + ): + listed_manifest = await listed_manifest_task + man_dc = listed_manifest["manifest_dc"] + list_dc.extra_data["listed_manifests"].append(listed_manifest) + else: + tag_dc.extra_data["tagged_manifest_dc"] = list_dc + for listed_manifest in list_dc.extra_data["listed_manifests"]: + await self.handle_blobs( + listed_manifest["manifest_dc"], listed_manifest["content_data"] + ) + self.manifest_dcs.append(listed_manifest["manifest_dc"]) + self.manifest_list_dcs.append(list_dc) + else: + # Simple tagged manifest + man_dc = self.create_tagged_manifest( + self.tag_name, artifact, content_data, self.response_data, media_type + ) + tag_dc.extra_data["tagged_manifest_dc"] = man_dc + await self.handle_blobs(man_dc, content_data) + self.manifest_dcs.append(man_dc) + + for manifest_dc in self.manifest_dcs: + config_blob_dc = manifest_dc.extra_data.get("config_blob_dc") + if config_blob_dc: + manifest_dc.content.config_blob = await config_blob_dc.resolution() + for blob_dc in manifest_dc.extra_data["blob_dcs"]: + # Just await here. They will be associated in the post_save hook. + await blob_dc.resolution() + await self.put(manifest_dc) + self.manifest_dcs.clear() + + for manifest_list_dc in self.manifest_list_dcs: + for listed_manifest in manifest_list_dc.extra_data["listed_manifests"]: + # Just await here. They will be associated in the post_save hook. + await listed_manifest["manifest_dc"].resolution() + await self.put(manifest_list_dc) + self.manifest_list_dcs.clear() + + tagged_manifest_dc = tag_dc.extra_data["tagged_manifest_dc"] + tag_dc.content.tagged_manifest = await tagged_manifest_dc.resolution() + await self.put(tag_dc) + + +class ContainerPullThroughCacheDeclarativeVersion(DeclarativeVersion): + """ + Subclassed Declarative version that creates a pipeline for caching remote content. + """ + + def pipeline_stages(self, new_version): + """ + Define the "architecture" of caching remote content. + + Args: + new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The + new repository version that is going to be built. + + Returns: + list: List of :class:`~pulpcore.plugin.stages.Stage` instances + + """ + pipeline = [ + self.first_stage, + QueryExistingArtifacts(), + ArtifactDownloader(), + ArtifactSaver(), + QueryExistingContents(), + ContainerContentSaver(), + RemoteArtifactSaver(), + ResolveContentFutures(), + ] + + return pipeline diff --git a/pulp_container/app/tasks/sync_stages.py b/pulp_container/app/tasks/sync_stages.py index 3c1a94413..bd2e856cf 100644 --- a/pulp_container/app/tasks/sync_stages.py +++ b/pulp_container/app/tasks/sync_stages.py @@ -14,12 +14,12 @@ from pulpcore.plugin.stages import DeclarativeArtifact, DeclarativeContent, Stage, ContentSaver from pulp_container.constants import ( - V2_ACCEPT_HEADERS, MEDIA_TYPE, SIGNATURE_API_EXTENSION_VERSION, SIGNATURE_HEADER, SIGNATURE_SOURCE, SIGNATURE_TYPE, + V2_ACCEPT_HEADERS, ) from pulp_container.app.models import ( Blob, @@ -34,6 +34,7 @@ urlpath_sanitize, determine_media_type, validate_manifest, + calculate_digest, ) log = logging.getLogger(__name__) @@ -50,6 +51,7 @@ async def _save_artifact(artifact_attributes): return saved_artifact + class ContainerFirstStage(Stage): """ The first stage of a pulp_container sync pipeline. @@ -382,7 +384,7 @@ def create_tagged_manifest_list(self, tag_name, saved_artifact, manifest_list_da tag_name (str): A name of a tag saved_artifact (pulpcore.plugin.models.Artifact): A saved manifest's Artifact manifest_list_data (dict): Data about a ManifestList - media_type (str): The type of a manifest + media_type (str): The type of manifest """ digest = f"sha256:{saved_artifact.sha256}" @@ -411,7 +413,7 @@ def create_tagged_manifest(self, tag_name, saved_artifact, manifest_data, raw_da if media_type in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI): digest = f"sha256:{saved_artifact.sha256}" else: - digest = self._calculate_digest(raw_data) + digest = calculate_digest(raw_data) manifest = Manifest( digest=digest, schema_version=manifest_data["schemaVersion"], media_type=media_type @@ -649,77 +651,6 @@ def _include_layer(self, layer): return False return True - def _calculate_digest(self, manifest): - """ - Calculate the requested digest of the ImageManifest, given in JSON. - - Args: - manifest (str): The raw JSON representation of the Manifest. - - Returns: - str: The digest of the given ImageManifest - - """ - decoded_manifest = json.loads(manifest) - if "signatures" in decoded_manifest: - # This manifest contains signatures. Unfortunately, the Docker manifest digest - # is calculated on the unsigned version of the Manifest so we need to remove the - # signatures. To do this, we will look at the 'protected' key within the first - # signature. This key indexes a (malformed) base64 encoded JSON dictionary that - # tells us how many bytes of the manifest we need to keep before the signature - # appears in the original JSON and what the original ending to the manifest was after - # the signature block. We will strip out the bytes after this cutoff point, add back the - # original ending, and then calculate the sha256 sum of the transformed JSON to get the - # digest. - protected = decoded_manifest["signatures"][0]["protected"] - # Add back the missing padding to the protected block so that it is valid base64. - protected = self._pad_unpadded_b64(protected) - # Now let's decode the base64 and load it as a dictionary so we can get the length - protected = base64.b64decode(protected) - protected = json.loads(protected) - # This is the length of the signed portion of the Manifest, except for a trailing - # newline and closing curly brace. - signed_length = protected["formatLength"] - # The formatTail key indexes a base64 encoded string that represents the end of the - # original Manifest before signatures. We will need to add this string back to the - # trimmed Manifest to get the correct digest. We'll do this as a one liner since it is - # a very similar process to what we've just done above to get the protected block - # decoded. - signed_tail = base64.b64decode(self._pad_unpadded_b64(protected["formatTail"])) - # Now we can reconstruct the original Manifest that the digest should be based on. - manifest = manifest[:signed_length] + signed_tail - - return "sha256:{digest}".format(digest=hashlib.sha256(manifest).hexdigest()) - - def _pad_unpadded_b64(self, unpadded_b64): - """ - Fix bad padding. - - Docker has not included the required padding at the end of the base64 encoded - 'protected' block, or in some encased base64 within it. This function adds the correct - number of ='s signs to the unpadded base64 text so that it can be decoded with Python's - base64 library. - - Args: - unpadded_b64 (str): The unpadded base64 text. - - Returns: - str: The same base64 text with the appropriate number of ='s symbols. - - """ - # The Pulp team has not observed any newlines or spaces within the base64 from Docker, but - # Docker's own code does this same operation so it seemed prudent to include it here. - # See lines 167 to 168 here: - # https://github.com/docker/libtrust/blob/9cbd2a1374f46905c68a4eb3694a130610adc62a/util.go - unpadded_b64 = unpadded_b64.replace("\n", "").replace(" ", "") - # It is illegal base64 for the remainder to be 1 when the length of the block is - # divided by 4. - if len(unpadded_b64) % 4 == 1: - raise ValueError("Invalid base64: {t}".format(t=unpadded_b64)) - # Add back the missing padding characters, based on the length of the encoded string - paddings = {0: "", 2: "==", 3: "="} - return unpadded_b64 + paddings[len(unpadded_b64) % 4] - class ContainerContentSaver(ContentSaver): """Container specific content saver stage to add content associations.""" diff --git a/pulp_container/app/utils.py b/pulp_container/app/utils.py index fe6c6e71a..aa1711331 100644 --- a/pulp_container/app/utils.py +++ b/pulp_container/app/utils.py @@ -1,3 +1,5 @@ +import base64 +import hashlib import re import subprocess import gnupg @@ -213,3 +215,76 @@ def validate_manifest(content_data, media_type, digest): raise ManifestInvalid( reason=f'{".".join(map(str, error.path))}: {error.message}', digest=digest ) + + +def calculate_digest(manifest): + """ + Calculate the requested digest of the ImageManifest, given in JSON. + + Args: + manifest (str): The raw JSON representation of the Manifest. + + Returns: + str: The digest of the given ImageManifest + + """ + decoded_manifest = json.loads(manifest) + if "signatures" in decoded_manifest: + # This manifest contains signatures. Unfortunately, the Docker manifest digest + # is calculated on the unsigned version of the Manifest so we need to remove the + # signatures. To do this, we will look at the 'protected' key within the first + # signature. This key indexes a (malformed) base64 encoded JSON dictionary that + # tells us how many bytes of the manifest we need to keep before the signature + # appears in the original JSON and what the original ending to the manifest was after + # the signature block. We will strip out the bytes after this cutoff point, add back the + # original ending, and then calculate the sha256 sum of the transformed JSON to get the + # digest. + protected = decoded_manifest["signatures"][0]["protected"] + # Add back the missing padding to the protected block so that it is valid base64. + protected = pad_unpadded_b64(protected) + # Now let's decode the base64 and load it as a dictionary so we can get the length + protected = base64.b64decode(protected) + protected = json.loads(protected) + # This is the length of the signed portion of the Manifest, except for a trailing + # newline and closing curly brace. + signed_length = protected["formatLength"] + # The formatTail key indexes a base64 encoded string that represents the end of the + # original Manifest before signatures. We will need to add this string back to the + # trimmed Manifest to get the correct digest. We'll do this as a one liner since it is + # a very similar process to what we've just done above to get the protected block + # decoded. + signed_tail = base64.b64decode(pad_unpadded_b64(protected["formatTail"])) + # Now we can reconstruct the original Manifest that the digest should be based on. + manifest = manifest[:signed_length] + signed_tail + + return "sha256:{digest}".format(digest=hashlib.sha256(manifest).hexdigest()) + + +def pad_unpadded_b64(unpadded_b64): + """ + Fix bad padding. + + Docker has not included the required padding at the end of the base64 encoded + 'protected' block, or in some encased base64 within it. This function adds the correct + number of ='s signs to the unpadded base64 text so that it can be decoded with Python's + base64 library. + + Args: + unpadded_b64 (str): The unpadded base64 text. + + Returns: + str: The same base64 text with the appropriate number of ='s symbols. + + """ + # The Pulp team has not observed any newlines or spaces within the base64 from Docker, but + # Docker's own code does this same operation so it seemed prudent to include it here. + # See lines 167 to 168 here: + # https://github.com/docker/libtrust/blob/9cbd2a1374f46905c68a4eb3694a130610adc62a/util.go + unpadded_b64 = unpadded_b64.replace("\n", "").replace(" ", "") + # It is illegal base64 for the remainder to be 1 when the length of the block is + # divided by 4. + if len(unpadded_b64) % 4 == 1: + raise ValueError("Invalid base64: {t}".format(t=unpadded_b64)) + # Add back the missing padding characters, based on the length of the encoded string + paddings = {0: "", 2: "==", 3: "="} + return unpadded_b64 + paddings[len(unpadded_b64) % 4] diff --git a/pulp_container/app/viewsets.py b/pulp_container/app/viewsets.py index 4a4501a1f..535e147eb 100644 --- a/pulp_container/app/viewsets.py +++ b/pulp_container/app/viewsets.py @@ -429,6 +429,86 @@ class ContainerRemoteViewSet(RemoteViewSet, RolesMixin): } +class ContainerPullThroughRemoteViewSet(RemoteViewSet, RolesMixin): + """ + A Container Remote referencing a remote registry used as a source for the pull-through caching. + """ + + endpoint_name = "pull-through" + queryset = models.ContainerPullThroughRemote.objects.all() + serializer_class = serializers.ContainerPullThroughRemoteSerializer + queryset_filtering_required_permission = "container.view_containerpullthroughremote" + + DEFAULT_ACCESS_POLICY = { + "statements": [ + { + "action": ["list", "my_permissions"], + "principal": "authenticated", + "effect": "allow", + }, + { + "action": ["create"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_perms:container.add_containerpullthroughremote", + }, + { + "action": ["retrieve"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_or_obj_perms:container.view_containerpullthroughremote", + }, + { + "action": ["update", "partial_update"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_obj_perms:container.change_containerpullthroughremote", + "has_model_or_obj_perms:container.view_containerpullthroughremote", + ], + }, + { + "action": ["destroy"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_obj_perms:container.delete_containerpullthroughremote", + "has_model_or_obj_perms:container.view_containerpullthroughremote", + ], + }, + { + "action": ["list_roles", "add_role", "remove_role"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_obj_perms:container.manage_roles_containerpullthroughremote" + ], + }, + ], + "creation_hooks": [ + { + "function": "add_roles_for_object_creator", + "parameters": {"roles": "container.containerpullthroughremote_owner"}, + }, + ], + "queryset_scoping": {"function": "scope_queryset"}, + } + LOCKED_ROLES = { + "container.containerpullthroughremote_creator": [ + "container.add_containerpullthroughremote", + ], + "container.containerpullthroughremote_owner": [ + "container.view_containerpullthroughremote", + "container.change_containerpullthroughremote", + "container.delete_containerpullthroughremote", + "container.manage_roles_containerpullthroughremote", + ], + "container.containerpullthroughremote_viewer": [ + "container.view_containerpullthroughremote", + ], + } + + class TagOperationsMixin: """ A mixin that adds functionality for creating and deleting tags. @@ -1302,6 +1382,103 @@ def destroy(self, request, pk, **kwargs): return OperationPostponedResponse(async_result, request) +class ContainerPullThroughDistributionViewSet(DistributionViewSet, RolesMixin): + """ + A special pull-through Container Distribution that will reference distributions serving content. + """ + + endpoint_name = "pull-through" + queryset = models.ContainerPullThroughDistribution.objects.all() + serializer_class = serializers.ContainerPullThroughDistributionSerializer + + DEFAULT_ACCESS_POLICY = { + "statements": [ + { + "action": ["list", "my_permissions"], + "principal": "authenticated", + "effect": "allow", + }, + { + "action": ["create"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_namespace_model_perms", + }, + { + "action": ["create"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_namespace_perms:container.add_containerpullthroughdistribution", + }, + { + "action": ["create"], + "principal": "authenticated", + "effect": "allow", + "condition": "namespace_is_username", + }, + { + "action": ["retrieve"], + "principal": "authenticated", + "effect": "allow", + "condition_expression": [ + "has_namespace_or_obj_perms:container.view_containerpullthroughdistribution", + ], + }, + { + "action": ["update", "partial_update"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_namespace_or_obj_perms:container.change_containerpullthroughdistribution", + "has_namespace_or_obj_perms:container.view_containerpullthroughdistribution", + ], + }, + { + "action": ["destroy"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_namespace_or_obj_perms:container.delete_containerpullthroughdistribution", + "has_namespace_or_obj_perms:container.view_containerpullthroughdistribution", + ], + }, + { + "action": ["list_roles", "add_role", "remove_role"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_obj_perms:container.manage_roles_containerpullthroughdistribution" + ], + }, + ], + "creation_hooks": [ + { + "function": "add_roles_for_object_creator", + "parameters": { + "roles": "container.containerpullthroughdistribution_owner", + }, + }, + ], + } + LOCKED_ROLES = { + "container.containerpullthroughdistribution_creator": [ + "container.add_containerpullthroughdistribution" + ], + "container.containerpullthroughdistribution_owner": [ + "container.view_containerpullthroughdistribution", + "container.delete_containerpullthroughdistribution", + "container.change_containerpullthroughdistribution", + "container.manage_roles_containerpullthroughdistribution", + ], + "container.containerpullthroughdistribution_collaborator": [ + "container.view_containerpullthroughdistribution", + ], + "container.containerpullthroughdistribution_consumer": [ + "container.view_containerpullthroughdistribution", + ], + } + + class ContainerNamespaceViewSet( NamedModelViewSet, mixins.CreateModelMixin, diff --git a/pulp_container/tests/functional/api/test_pull_through_cache.py b/pulp_container/tests/functional/api/test_pull_through_cache.py new file mode 100644 index 000000000..bc0bb7244 --- /dev/null +++ b/pulp_container/tests/functional/api/test_pull_through_cache.py @@ -0,0 +1,122 @@ +import subprocess +import pytest + +from uuid import uuid4 + +from pulp_container.tests.functional.constants import ( + REGISTRY_V2, + REGISTRY_V2_FEED_URL, + PULP_HELLO_WORLD_REPO, +) + + +@pytest.fixture +def pull_through_distribution( + gen_object_with_cleanup, + container_pull_through_remote_api, + container_pull_through_distribution_api, +): + remote = gen_object_with_cleanup( + container_pull_through_remote_api, + {"name": str(uuid4()), "url": REGISTRY_V2_FEED_URL}, + ) + distribution = gen_object_with_cleanup( + container_pull_through_distribution_api, + {"name": str(uuid4()), "base_path": str(uuid4()), "remote": remote.pulp_href}, + ) + return distribution + + +def test_image_pull( + add_to_cleanup, + container_pull_through_distribution_api, + container_distribution_api, + container_repository_api, + container_remote_api, + container_tag_api, + registry_client, + local_registry, + pull_through_distribution, +): + remote_image_path = f"{REGISTRY_V2}/{PULP_HELLO_WORLD_REPO}" + registry_client.pull(f"{remote_image_path}:latest") + remote_image = registry_client.inspect(remote_image_path) + + local_image_path = f"{pull_through_distribution.base_path}/{PULP_HELLO_WORLD_REPO}" + local_registry.pull(f"{local_image_path}:latest") + local_image = local_registry.inspect(local_image_path) + + # when the client pulls the image, a repository, distribution, and remote is created in + # the background; therefore, scheduling the cleanup for these entities is necessary + repository = container_repository_api.list(name=local_image_path).results[0] + add_to_cleanup(container_repository_api, repository.pulp_href) + remote = container_remote_api.list(name=local_image_path).results[0] + add_to_cleanup(container_remote_api, remote.pulp_href) + distribution = container_distribution_api.list(name=local_image_path).results[0] + add_to_cleanup(container_distribution_api, distribution.pulp_href) + + assert local_image[0]["Id"] == remote_image[0]["Id"] + + tags = container_tag_api.list(repository_version=repository.latest_version_href).results + assert ["latest"] == [tag.name for tag in tags] + + pull_through_distribution = container_pull_through_distribution_api.list( + name=pull_through_distribution.name + ).results[0] + assert [distribution.pulp_href] == pull_through_distribution.distributions + + assert f"{repository.pulp_href}versions/1/" == repository.latest_version_href + + # 1. test if pulling the same content twice works + local_registry.pull(f"{local_image_path}:latest") + + repository = container_repository_api.list(name=local_image_path).results[0] + assert f"{repository.pulp_href}versions/1/" == repository.latest_version_href + + # 2. test if pulling new content results into a new version, preserving the old content + local_registry.pull(f"{local_image_path}:linux") + + repository = container_repository_api.list(name=local_image_path).results[0] + assert f"{repository.pulp_href}versions/2/" == repository.latest_version_href + + tags = container_tag_api.list(repository_version=repository.latest_version_href).results + assert ["latest", "linux"] == sorted([tag.name for tag in tags]) + + +def test_conflicting_names_and_paths( + container_remote_api, + container_remote_factory, + container_repository_api, + container_repository_factory, + container_distribution_api, + pull_through_distribution, + gen_object_with_cleanup, + local_registry, + monitor_task, +): + local_image_path = f"{pull_through_distribution.base_path}/{str(uuid4())}" + + remote = container_remote_factory(name=local_image_path) + with pytest.raises(subprocess.CalledProcessError): + local_registry.pull(local_image_path) + monitor_task(container_remote_api.delete(remote.pulp_href).task) + + assert 0 == len(container_repository_api.list(name=local_image_path).results) + assert 0 == len(container_distribution_api.list(name=local_image_path).results) + + repository = container_repository_factory(name=local_image_path) + with pytest.raises(subprocess.CalledProcessError): + local_registry.pull(local_image_path) + monitor_task(container_repository_api.delete(repository.pulp_href).task) + + assert 0 == len(container_remote_api.list(name=local_image_path).results) + assert 0 == len(container_distribution_api.list(name=local_image_path).results) + + data = {"name": local_image_path, "base_path": local_image_path} + distribution = gen_object_with_cleanup(container_distribution_api, data) + with pytest.raises(subprocess.CalledProcessError): + local_registry.pull(local_image_path) + monitor_task(container_distribution_api.delete(distribution.pulp_href).task) + + assert 0 == len(container_repository_api.list(name=local_image_path).results) + assert 0 == len(container_remote_api.list(name=local_image_path).results) diff --git a/pulp_container/tests/functional/conftest.py b/pulp_container/tests/functional/conftest.py index f12032388..db9bbab2e 100644 --- a/pulp_container/tests/functional/conftest.py +++ b/pulp_container/tests/functional/conftest.py @@ -13,11 +13,13 @@ ApiClient, PulpContainerNamespacesApi, RemotesContainerApi, + RemotesPullThroughApi, RepositoriesContainerApi, RepositoriesContainerPushApi, RepositoriesContainerVersionsApi, RepositoriesContainerPushVersionsApi, DistributionsContainerApi, + DistributionsPullThroughApi, ContentTagsApi, ContentManifestsApi, ContentBlobsApi, @@ -317,6 +319,12 @@ def container_remote_api(container_client): return RemotesContainerApi(container_client) +@pytest.fixture(scope="session") +def container_pull_through_remote_api(container_client): + """Pull through cache container remote API fixture.""" + return RemotesPullThroughApi(container_client) + + @pytest.fixture(scope="session") def container_repository_api(container_client): """Container repository API fixture.""" @@ -347,6 +355,12 @@ def container_distribution_api(container_client): return DistributionsContainerApi(container_client) +@pytest.fixture(scope="session") +def container_pull_through_distribution_api(container_client): + """Pull through cache distribution API Fixture.""" + return DistributionsPullThroughApi(container_client) + + @pytest.fixture(scope="session") def container_tag_api(container_client): """Container tag API fixture.""" diff --git a/requirements.txt b/requirements.txt index 4262f9327..d5385d19e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ecdsa>=0.14,<=0.18.0 jsonschema>=4.4,<4.20 -pulpcore>=3.25.0,<3.40 +pulpcore>=3.30.0,<3.40 pyjwkest>=1.4,<=1.4.2 pyjwt[crypto]>=2.4,<2.9