Skip to content

Commit

Permalink
up-to-date/base-image: cache dockerhub image listing (#47447)
Browse files Browse the repository at this point in the history
## What

* Problems:`up-to-date`: to find the latest base image we have to list all the base images available and fetch their digest. If this operation is not cached it can easily lead to DockerHub rate limiting issue. 
* Solution: Add a `cache_ttl_seconds` parameters to cache the interaction with the `crane` client. In `up-to-date` we set this cache TTL to 1 day.

This pull request introduces a caching mechanism to avoid DockerHub rate limiting and updates various files to support this new feature. The most important changes include adding a cache TTL for base image listings, updating the `CraneClient` and related methods to accept a cache TTL, and adding logging for digest fetching.

### Caching Mechanism:

* [`airbyte-ci/connectors/base_images/README.md`](diffhunk://#diff-2e343fda68ecd85f5023c98f7f9a41af2125e7e1cc12d545bc13ace788a4d075R97-R99): Added a changelog entry for version 1.1.0, mentioning the cache TTL addition.
* [`airbyte-ci/connectors/base_images/base_images/utils/docker.py`](diffhunk://#diff-d92863e5bd18b276dba43c00c727c4f54d65193740f2044c7929948652e2d1b3L38-R48): Modified the `CraneClient` class to accept a `cache_ttl_seconds` parameter and updated the `bare_container` initialization to use this TTL.
* [`airbyte-ci/connectors/base_images/base_images/version_registry.py`](diffhunk://#diff-2b84383522cacca9880ff1dae76f40d08ec437c59208cd856e41f63971f4340dL108-R140): Updated methods to pass the `cache_ttl_seconds` parameter to the `CraneClient`. [[1]](diffhunk://#diff-2b84383522cacca9880ff1dae76f40d08ec437c59208cd856e41f63971f4340dL108-R140) [[2]](diffhunk://#diff-2b84383522cacca9880ff1dae76f40d08ec437c59208cd856e41f63971f4340dL144-R152) [[3]](diffhunk://#diff-2b84383522cacca9880ff1dae76f40d08ec437c59208cd856e41f63971f4340dL249-R274) [[4]](diffhunk://#diff-2b84383522cacca9880ff1dae76f40d08ec437c59208cd856e41f63971f4340dL267-R293)
* [`airbyte-ci/connectors/pipelines/README.md`](diffhunk://#diff-62eccd92928fbcd3d285983bfdaa2b0d4ca49016cb9c2f63d6d9fc968c59c541R853): Documented the use of `cache_ttl` for base image registry listing in version 4.41.8.
* [`airbyte-ci/connectors/pipelines/pyproject.toml`](diffhunk://#diff-087e2c37602bbd6824f875004abddcb4e1a374da12bf84201671ed0900882ce0L7-R7): Bumped the version to 4.41.8.

### Logging Enhancements:

* [`airbyte-ci/connectors/base_images/base_images/utils/docker.py`](diffhunk://#diff-d92863e5bd18b276dba43c00c727c4f54d65193740f2044c7929948652e2d1b3R90-R95): Added logging statements for digest fetching to help identify bottlenecks and improve debugging.
  • Loading branch information
alafanechere authored Oct 28, 2024
1 parent 6988679 commit b4496f0
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 24 deletions.
3 changes: 3 additions & 0 deletions airbyte-ci/connectors/base_images/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ poetry run mypy base_images --check-untyped-defs
```
## CHANGELOG

### 1.1.0
- Add a cache ttl for base image listing to avoid DockerHub rate limiting.

### 1.0.4
- Upgrade Dagger to `0.13.3`

Expand Down
19 changes: 11 additions & 8 deletions airbyte-ci/connectors/base_images/base_images/utils/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import getpass
import os
import time
import uuid
from typing import List, Tuple

Expand Down Expand Up @@ -35,16 +36,16 @@ class CraneClient:
"gcr.io/go-containerregistry/crane/debug:v0.15.1@sha256:f6ddf8e2c47df889e06e33c3e83b84251ac19c8728a670ff39f2ca9e90c4f905"
)

def __init__(self, dagger_client: dagger.Client, docker_credentials: Tuple[str, str]):
def __init__(self, dagger_client: dagger.Client, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0):
self.docker_hub_username_secret = dagger_client.set_secret("DOCKER_HUB_USERNAME", docker_credentials[0])
self.docker_hub_username_password = dagger_client.set_secret("DOCKER_HUB_PASSWORD", docker_credentials[1])

self.bare_container = (
dagger_client.container().from_(self.CRANE_IMAGE_ADDRESS)
# We don't want to cache any subsequent commands that might run in this container
# because we want to have fresh output data every time we run this command.
.with_env_variable("CACHE_BUSTER", str(uuid.uuid4()))
)
if cache_ttl_seconds == 0:
cache_buster = str(uuid.uuid4())
else:
cache_buster = str(int(time.time()) // cache_ttl_seconds)

self.bare_container = dagger_client.container().from_(self.CRANE_IMAGE_ADDRESS).with_env_variable("CACHE_BUSTER", cache_buster)

self.authenticated_container = self.login()

Expand All @@ -56,7 +57,6 @@ def login(self) -> dagger.Container:
)

async def digest(self, repository_and_tag: str) -> str:
console.log(f"Fetching digest for {repository_and_tag}...")
return (await self.authenticated_container.with_exec(["digest", repository_and_tag], use_entrypoint=True).stdout()).strip()

async def ls(self, registry_name: str, repository_name: str) -> List[str]:
Expand Down Expand Up @@ -87,7 +87,10 @@ async def get_all_images(self) -> List[published_image.PublishedImage]:
# We want the digest to uniquely identify the image, so we need to fetch it separately with `crane digest`
available_addresses_without_digest = [f"{repository_address}:{tag}" for tag in all_tags]
available_addresses_with_digest = []
console.log(f"Fetching digests for {len(available_addresses_without_digest)} images...")
# TODO: This is a bottleneck, we should parallelize this
for address in available_addresses_without_digest:
digest = await self.crane_client.digest(address)
available_addresses_with_digest.append(f"{address}@{digest}")
console.log(f"Found digests for {len(available_addresses_with_digest)} images in {repository_address}")
return [published_image.PublishedImage.from_address(address) for address in available_addresses_with_digest]
44 changes: 30 additions & 14 deletions airbyte-ci/connectors/base_images/base_images/version_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,31 +105,39 @@ def get_changelog_entries(ConnectorBaseImageClass: Type[AirbyteConnectorBaseImag

@staticmethod
async def get_all_published_base_images(
dagger_client: dagger.Client, docker_credentials: Tuple[str, str], ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage]
dagger_client: dagger.Client,
docker_credentials: Tuple[str, str],
ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage],
cache_ttl_seconds: int = 0,
) -> List[published_image.PublishedImage]:
"""Returns all the published base images for a given base image version class.
Args:
dagger_client (dagger.Client): The dagger client used to build the registry.
docker_credentials (Tuple[str, str]): The docker credentials used to fetch published images from DockerHub.
ConnectorBaseImageClass (Type[AirbyteConnectorBaseImage]): The base image version class bound to the registry.
cache_ttl_seconds (int, optional): The cache time to live in seconds for crane output. Defaults to 0.
Returns:
List[published_image.PublishedImage]: The published base images for a given base image version class.
"""
crane_client = docker.CraneClient(dagger_client, docker_credentials)
crane_client = docker.CraneClient(dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds)
remote_registry = docker.RemoteRepository(crane_client, consts.REMOTE_REGISTRY, ConnectorBaseImageClass.repository) # type: ignore
return await remote_registry.get_all_images()

@staticmethod
async def load(
ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage], dagger_client: dagger.Client, docker_credentials: Tuple[str, str]
ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage],
dagger_client: dagger.Client,
docker_credentials: Tuple[str, str],
cache_ttl_seconds: int = 0,
) -> VersionRegistry:
"""Instantiates a registry by fetching available versions from the remote registry and loading the changelog from disk.
Args:
ConnectorBaseImageClass (Type[AirbyteConnectorBaseImage]): The base image version class bound to the registry.
dagger_client (dagger.Client): The dagger client used to build the registry.
docker_credentials (Tuple[str, str]): The docker credentials used to fetch published images from DockerHub.
cache_ttl_seconds (int, optional): The cache time to live in seconds for crane output. Defaults to 0.
Returns:
VersionRegistry: The registry.
"""
Expand All @@ -141,7 +149,7 @@ async def load(

# Instantiate a crane client and a remote registry to fetch published images from DockerHub
published_docker_images = await VersionRegistry.get_all_published_base_images(
dagger_client, docker_credentials, ConnectorBaseImageClass
dagger_client, docker_credentials, ConnectorBaseImageClass, cache_ttl_seconds=cache_ttl_seconds
)

# Build a dict of published images by version number for easier lookup
Expand Down Expand Up @@ -246,16 +254,24 @@ def latest_not_pre_released_published_entry(self) -> Optional[VersionRegistryEnt
return None


async def get_python_registry(dagger_client: dagger.Client, docker_credentials: Tuple[str, str]) -> VersionRegistry:
return await VersionRegistry.load(AirbytePythonConnectorBaseImage, dagger_client, docker_credentials)
async def get_python_registry(
dagger_client: dagger.Client, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0
) -> VersionRegistry:
return await VersionRegistry.load(
AirbytePythonConnectorBaseImage, dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds
)


async def get_manifest_only_registry(dagger_client: dagger.Client, docker_credentials: Tuple[str, str]) -> VersionRegistry:
return await VersionRegistry.load(AirbyteManifestOnlyConnectorBaseImage, dagger_client, docker_credentials)
async def get_manifest_only_registry(
dagger_client: dagger.Client, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0
) -> VersionRegistry:
return await VersionRegistry.load(
AirbyteManifestOnlyConnectorBaseImage, dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds
)


async def get_registry_for_language(
dagger_client: dagger.Client, language: ConnectorLanguage, docker_credentials: Tuple[str, str]
dagger_client: dagger.Client, language: ConnectorLanguage, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0
) -> VersionRegistry:
"""Returns the registry for a given language.
It is meant to be used externally to get the registry for a given connector language.
Expand All @@ -264,17 +280,17 @@ async def get_registry_for_language(
dagger_client (dagger.Client): The dagger client used to build the registry.
language (ConnectorLanguage): The connector language.
docker_credentials (Tuple[str, str]): The docker credentials used to fetch published images from DockerHub.
cache_ttl_seconds (int, optional): The cache time to live in seconds for crane output. Defaults to 0.
Raises:
NotImplementedError: Raised if the registry for the given language is not implemented yet.
Returns:
VersionRegistry: The registry for the given language.
"""
if language in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE]:
return await get_python_registry(dagger_client, docker_credentials)
return await get_python_registry(dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds)
elif language is ConnectorLanguage.MANIFEST_ONLY:
return await get_manifest_only_registry(dagger_client, docker_credentials)
return await get_manifest_only_registry(dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds)
else:
raise NotImplementedError(f"Registry for language {language} is not implemented yet.")

Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/base_images/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "airbyte-connectors-base-images"
version = "1.0.4"
version = "1.1.0"
description = "This package is used to generate and publish the base images for Airbyte Connectors."
authors = ["Augustin Lafanechere <[email protected]>"]
readme = "README.md"
Expand Down
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ airbyte-ci connectors --language=low-code migrate-to-manifest-only

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |
| 4.41.8 | [#47447](https://github.com/airbytehq/airbyte/pull/47447) | Use `cache_ttl` for base image registry listing in `up-to-date`. |
| 4.41.7 | [#47444](https://github.com/airbytehq/airbyte/pull/47444) | Remove redundant `--ignore-connector` error from up-to-date. `--metadata-query` can be used instead. |
| 4.41.6 | [#47308](https://github.com/airbytehq/airbyte/pull/47308) | Connector testing: skip incremental acceptance test when the connector is not released. |
| 4.41.5 | [#47255](https://github.com/airbytehq/airbyte/pull/47255) | Fix `DisableProgressiveRollout` following Dagger API change. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class NoBaseImageAddressInMetadataError(Exception):


class UpdateBaseImageMetadata(StepModifyingFiles):

BASE_IMAGE_LIST_CACHE_TTL_SECONDS = 60 * 60 * 24 # 1 day

context: ConnectorContext

title = "Upgrade the base image to the latest version in metadata.yaml"
Expand All @@ -45,6 +48,7 @@ async def get_latest_base_image_address(self) -> Optional[str]:
self.dagger_client,
self.context.connector.language,
(self.context.docker_hub_username.value, self.context.docker_hub_password.value),
cache_ttl_seconds=self.BASE_IMAGE_LIST_CACHE_TTL_SECONDS,
)
return version_registry_for_language.latest_not_pre_released_published_entry.published_docker_image.address
except NotImplementedError:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "4.41.7"
version = "4.41.8"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <[email protected]>"]

Expand Down

0 comments on commit b4496f0

Please sign in to comment.