Skip to content

Commit

Permalink
Pull-through caching
Browse files Browse the repository at this point in the history
[noissue]
  • Loading branch information
lubosmj committed Jun 15, 2023
1 parent dec4203 commit d4ab757
Show file tree
Hide file tree
Showing 13 changed files with 747 additions and 111 deletions.
73 changes: 73 additions & 0 deletions pulp_container/app/downloaders.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import aiohttp
import asyncio
import hashlib as the_real_hashlib
import json
import re
import tempfile

from gettext import gettext as _

from aiohttp.client_exceptions import ClientResponseError
from logging import getLogger
from multidict import MultiDict
from urllib import parse

from django.conf import settings

from pulpcore.plugin.models import Artifact, Task
from pulpcore.plugin.download import DownloaderFactory, HttpDownloader

log = getLogger(__name__)
Expand Down Expand Up @@ -88,14 +95,50 @@ async def _run(self, handle_401=True, extra_data=None):
return await self._run(handle_401=False, extra_data=extra_data)
else:
raise

to_return = await self._handle_response(response)

await response.release()
self.response_headers = response.headers

if self._close_session_on_finalize:
self.session.close()
return to_return

def _ensure_writer_has_open_file(self):
"""
Create a temporary file on demand.
Create a temporary file when it's actually used, allowing plugin writers to instantiate
many downloaders in memory.
This method sets the path of NamedTemporaryFile dynamically based on whether it is running
from a task or not. Otherwise, permission errors might be raised when Pulp is trying to
download a file from api-app and write to a user space.
"""
if not self._writer:
dir_path = settings.WORKING_DIRECTORY if Task.current() is None else "."
self._writer = tempfile.NamedTemporaryFile(dir=dir_path, delete=False)
self.path = self._writer.name
self._digests = {n: hashlib_new(n) for n in Artifact.DIGEST_FIELDS}
self._size = 0

def fetch(self, extra_data=None):
"""
Run the download synchronously with additional data and return the `DownloadResult`.
Returns:
:class:`~pulpcore.plugin.download.DownloadResult`
or :class:`~aiohttp.client.ClientResponse`
Raises:
Exception: Any fatal exception emitted during downloading
"""
done, _ = asyncio.get_event_loop().run_until_complete(
asyncio.wait([self.run(extra_data=extra_data)])
)
return done.pop().result()

async def update_token(self, response_auth_header, used_token, repo_name):
"""
Update the Bearer token to be used with all requests.
Expand Down Expand Up @@ -246,3 +289,33 @@ class to be instantiated.
kwargs["throttler"] = self._remote.download_throttler if self._remote.rate_limit else None

return download_class(url, **options, **kwargs)


def hashlib_new(name, *args, **kwargs):
"""
A wrapper around the real `hashlib.new()` providing only trusted hashers.
The `ALLOWED_CONTENT_CHECKSUMS` setting identifies which hashers are allowed for use by Pulp.
This function raises an exception if a hasher is requested which is not allowed, and otherwise,
returns the standard hasher from `hashlib.new()`.
Args:
name: The name of the hasher to be instantiated.
*args: args to be passed along to the real `hashlib.new()`.
**kwargs: kwargs to be passed along to the real `hashlib.new()`
Returns:
An instantiated hasher, if it is allowed according to `ALLOWED_CONTENT_CHECKSUMS` setting.
Raises:
An exception if the name of the hasher is not in the `ALLOWED_CONTENT_CHECKSUMS` settings.
"""
if name not in settings.ALLOWED_CONTENT_CHECKSUMS:
raise Exception(
_(
"Hasher {} attempted to be used but is not in the `ALLOWED_CONTENT_CHECKSUMS` "
"setting"
).format(name)
)
return the_real_hashlib.new(name, *args, **kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Generated by Django 4.2.2 on 2023-06-15 09:50

from django.db import migrations, models
import django.db.models.deletion
import pulpcore.app.models.access_policy


class Migration(migrations.Migration):

dependencies = [
('core', '0107_distribution_hidden'),
('container', '0036_containerpushrepository_pending_blobs_manifests'),
]

operations = [
migrations.CreateModel(
name='ContainerPullThroughDistribution',
fields=[
('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')),
('private', models.BooleanField(default=False, help_text='Restrict pull access to explicitly authorized users. Defaults to unrestricted pull access.')),
],
options={
'default_related_name': '%(app_label)s_%(model_name)s',
},
bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin),
),
migrations.CreateModel(
name='ContainerPullThroughRemote',
fields=[
('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')),
('upstream_name', models.TextField(db_index=True)),
],
options={
'default_related_name': '%(app_label)s_%(model_name)s',
},
bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin),
),
migrations.AddField(
model_name='containerrepository',
name='pending_blobs',
field=models.ManyToManyField(related_name='pending_blobs', to='container.blob'),
),
migrations.AddField(
model_name='containerrepository',
name='pending_manifests',
field=models.ManyToManyField(to='container.manifest'),
),
migrations.AddField(
model_name='containerrepository',
name='pending_tags',
field=models.ManyToManyField(to='container.tag'),
),
migrations.AddField(
model_name='containerrepository',
name='remaining_blobs',
field=models.ManyToManyField(related_name='remaining_blobs', to='container.blob'),
),
migrations.AddField(
model_name='containerdistribution',
name='pull_through_distribution',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='distributions', to='container.containerpullthroughdistribution'),
),
]
137 changes: 137 additions & 0 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tempfile
import time
from logging import getLogger
from pathlib import PurePath

from django.db import models
from django.conf import settings
Expand Down Expand Up @@ -105,6 +106,10 @@ class Manifest(Content):
through_fields=("image_manifest", "manifest_list"),
)

@staticmethod
def init_from_artifact_and_relative_path(artifact, relative_path):
pass

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
unique_together = ("digest",)
Expand Down Expand Up @@ -401,6 +406,33 @@ def namespaced_upstream_name(self):
else:
return self.upstream_name

def get_remote_artifact_url(self, relative_path=None, request=None):
"""
TODO: ensure that the functionality is not affected by keywords included within the path
"""
if "manifests" in request.path:
if "tag_name" in request.match_info:
tag_name = request.match_info["tag_name"]
return os.path.join(self.url, "v2", relative_path, "manifests", tag_name)
elif "digest" in request.match_info:
digest = "sha256:{digest}".format(digest=request.match_info["digest"])
return os.path.join(self.url, "v2", relative_path, "manifests", digest)
elif "blobs" in request.path:
digest = "sha256:{digest}".format(digest=request.match_info["digest"])
return os.path.join(self.url, "v2", relative_path, "blobs", digest)

def get_remote_artifact_content_type(self, relative_path=None):
"""
TODO: re-evaluate the need of this method
"""
if relative_path:
type_path = PurePath(relative_path)
if type_path.match("manifests/.*"):
return Manifest
elif type_path.match("blobs/.*"):
return Blob
return None

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
permissions = [
Expand All @@ -411,6 +443,68 @@ class Meta:
]


class ContainerPullThroughRemote(Remote, AutoAddObjPermsMixin):
"""
TODO: Add permissions.
"""

TYPE = "pull-through"

upstream_name = models.TextField(db_index=True)

@property
def download_factory(self):
"""
Downloader Factory that maps to custom downloaders which support registry auth.
Upon first access, the DownloaderFactory is instantiated and saved internally.
Returns:
DownloadFactory: The instantiated DownloaderFactory to be used by
get_downloader()
"""
try:
return self._download_factory
except AttributeError:
self._download_factory = DownloaderFactory(
self,
downloader_overrides={
"http": downloaders.RegistryAuthHttpDownloader,
"https": downloaders.RegistryAuthHttpDownloader,
},
)
return self._download_factory

def get_downloader(self, remote_artifact=None, url=None, **kwargs):
"""
Get a downloader from either a RemoteArtifact or URL that is configured with this Remote.
This method accepts either `remote_artifact` or `url` but not both. At least one is
required. If neither or both are passed a ValueError is raised.
Args:
remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to
download.
url (str): The URL to download.
kwargs (dict): This accepts the parameters of
:class:`~pulpcore.plugin.download.BaseDownloader`.
Raises:
ValueError: If neither remote_artifact and url are passed, or if both are passed.
Returns:
subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that
is configured with the remote settings.
"""
kwargs["remote"] = self
return super().get_downloader(remote_artifact=remote_artifact, url=url, **kwargs)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"


class ManifestSigningService(SigningService):
"""
Signing service used for creating container signatures.
Expand Down Expand Up @@ -485,6 +579,13 @@ class ContainerRepository(
ManifestSigningService, on_delete=models.SET_NULL, null=True
)

# temporary relations used for uncommitted pull-through cache operations
pending_tags = models.ManyToManyField(Tag)
pending_manifests = models.ManyToManyField(Manifest)
pending_blobs = models.ManyToManyField(Blob, related_name="pending_blobs")
# digests of remaining blobs to be attached to pending manifests
remaining_blobs = models.ManyToManyField(Blob, related_name="remaining_blobs")

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
permissions = [
Expand All @@ -507,6 +608,16 @@ def finalize_new_version(self, new_version):
"""
remove_duplicates(new_version)
validate_repo_version(new_version)
self.remove_pending_content(new_version)

def remove_pending_content(self, repository_version):
"""Remove pending blobs and manifests when committing the content to the repository."""
added_content = repository_version.added(
base_version=repository_version.base_version
).values_list("pk")
self.pending_tags.remove(*Tag.objects.filter(pk__in=added_content))
self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content))
self.pending_blobs.remove(*Blob.objects.filter(pk__in=added_content))


class ContainerPushRepository(Repository, AutoAddObjPermsMixin):
Expand Down Expand Up @@ -563,6 +674,25 @@ def remove_pending_content(self, repository_version):
self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content))


class ContainerPullThroughDistribution(Distribution, AutoAddObjPermsMixin):
"""
TODO: Add permissions.
"""

TYPE = "pull-through"

private = models.BooleanField(
default=False,
help_text=_(
"Restrict pull access to explicitly authorized users. "
"Defaults to unrestricted pull access."
),
)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"


class ContainerDistribution(Distribution, AutoAddObjPermsMixin):
"""
A container distribution defines how a repository version is distributed by Pulp's webserver.
Expand Down Expand Up @@ -593,6 +723,13 @@ class ContainerDistribution(Distribution, AutoAddObjPermsMixin):
)
description = models.TextField(null=True)

pull_through_distribution = models.ForeignKey(
ContainerPullThroughDistribution,
related_name="distributions",
on_delete=models.CASCADE,
null=True,
)

def get_repository_version(self):
"""
Returns the repository version that is supposed to be served by this ContainerDistribution.
Expand Down
6 changes: 6 additions & 0 deletions pulp_container/app/redirects.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def issue_blob_redirect(self, blob):
"""
return self.redirect_to_content_app("blobs", blob.digest)

def issue_pull_through_manifests_redirect(self, pk):
return self.redirect_to_content_app("manifests", pk)

def issue_pull_through_blobs_redirect(self, pk):
return self.redirect_to_content_app("blobs", pk)


class S3StorageRedirects(CommonRedirects):
"""
Expand Down
Loading

0 comments on commit d4ab757

Please sign in to comment.