diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 17a4921..c9ca00f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -53,7 +53,7 @@ jobs: - name: Install python dependencies run: | pip install --disable-pip-version-check --upgrade pip setuptools - pip install -e .[dramatiq,fastapi,auth,celery,fluentbit,sql,sql_sync,s3,api_client,amqp,nanoid,test] ${{ matrix.pins }} + pip install -e .[dramatiq,fastapi,auth,celery,fluentbit,sql,sql_sync,s3,s3_sync,api_client,amqp,nanoid,test] ${{ matrix.pins }} pip list - name: Run tests diff --git a/CHANGES.md b/CHANGES.md index 678b4a6..42a798e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,7 +3,7 @@ ## 0.14.1 (unreleased) ---------------------- -- Nothing changed yet. +- Added synchronous S3 interface (installable through optional `s3_sync` dependency). ## 0.14.0 (2024-05-22) diff --git a/clean_python/s3/__init__.py b/clean_python/s3/__init__.py index e5bf73c..86ed079 100644 --- a/clean_python/s3/__init__.py +++ b/clean_python/s3/__init__.py @@ -1,3 +1,6 @@ from .key_mapper import * # NOQA +from .s3_bucket_options import * # NOQA from .s3_gateway import * # NOQA from .s3_provider import * # NOQA +from .sync_s3_gateway import * # NOQA +from .sync_s3_provider import * # NOQA diff --git a/clean_python/s3/s3_bucket_options.py b/clean_python/s3/s3_bucket_options.py new file mode 100644 index 0000000..d67faf8 --- /dev/null +++ b/clean_python/s3/s3_bucket_options.py @@ -0,0 +1,11 @@ +from clean_python import ValueObject + +__all__ = ["S3BucketOptions"] + + +class S3BucketOptions(ValueObject): + url: str + access_key: str + secret_key: str + bucket: str + region: str | None = None diff --git a/clean_python/s3/s3_provider.py b/clean_python/s3/s3_provider.py index dd710d6..ee7b72f 100644 --- a/clean_python/s3/s3_provider.py +++ b/clean_python/s3/s3_provider.py @@ -3,10 +3,14 @@ import logging from typing import TYPE_CHECKING -import aioboto3 +try: + import aioboto3 +except ImportError: + aioboto3 = None + from botocore.client import Config -from clean_python import ValueObject +from .s3_bucket_options import S3BucketOptions if TYPE_CHECKING: from types_aiobotocore_s3.client import S3Client @@ -16,14 +20,6 @@ logger = logging.getLogger(__name__) -class S3BucketOptions(ValueObject): - url: str - access_key: str - secret_key: str - bucket: str - region: str | None = None - - class S3BucketProvider: def __init__(self, options: S3BucketOptions): self.options = options @@ -34,6 +30,7 @@ def bucket(self) -> str: @property def client(self) -> "S3Client": + assert aioboto3 is not None session = aioboto3.Session() return session.client( "s3", diff --git a/clean_python/s3/sync_s3_gateway.py b/clean_python/s3/sync_s3_gateway.py new file mode 100644 index 0000000..484c020 --- /dev/null +++ b/clean_python/s3/sync_s3_gateway.py @@ -0,0 +1,212 @@ +# This module is a copy paste of s3_gateway.py + +import logging +from pathlib import Path + +import inject +from botocore.exceptions import ClientError +from pydantic import AnyHttpUrl + +from clean_python import ctx +from clean_python import DoesNotExist +from clean_python import Filter +from clean_python import Id +from clean_python import Json +from clean_python import PageOptions +from clean_python import SyncGateway + +from .sync_s3_provider import SyncS3BucketProvider + +DEFAULT_EXPIRY = 3600 # in seconds +DEFAULT_TIMEOUT = 1.0 +AWS_LIMIT = 1000 # max s3 keys per request + + +__all__ = ["SyncS3Gateway"] + +logger = logging.getLogger(__name__) + + +class SyncS3Gateway(SyncGateway): + """The interface to S3 Buckets. + + The standard Gateway interface is only partially implemented: + + - get() and filter() return metadata + - add(), update(), upsert() are not implemented + - remove() works as expected + + For actually getting the object data either use the download_file() + or upload_file() or create a presigned url and hand that over to + the client. + """ + + def __init__( + self, + provider_override: SyncS3BucketProvider | None = None, + multitenant: bool = False, + ): + self.provider_override = provider_override + self.multitenant = multitenant + + @property + def provider(self): + return self.provider_override or inject.instance(SyncS3BucketProvider) + + def _id_to_key(self, id: Id) -> str: + if not self.multitenant: + return str(id) + if ctx.tenant is None: + raise RuntimeError(f"{self.__class__} requires a tenant in the context") + return f"tenant-{ctx.tenant.id}/{id}" + + def _key_to_id(self, key: str) -> Id: + return key.split("/", 1)[1] if self.multitenant else key + + def get(self, id: Id) -> Json | None: + try: + result = self.provider.client.head_object( + Bucket=self.provider.bucket, Key=self._id_to_key(id) + ) + except ClientError as e: + if e.response["Error"]["Code"] == "404": + return None + else: + raise e + return { + "id": str(id), + "last_modified": result["LastModified"], + "etag": result["ETag"].strip('"'), + "size": result["ContentLength"], + } + + def filter( + self, + filters: list[Filter], + params: PageOptions | None = PageOptions(limit=AWS_LIMIT), + ) -> list[Json]: + assert params is not None, "pagination is required for S3Gateway" + assert params.limit <= AWS_LIMIT, f"max {AWS_LIMIT} keys for S3Gateway" + assert params.offset == 0, "no 'offset' pagination for S3Gateway" + assert params.order_by == "id", "can order by 'id' only for S3Gateway" + kwargs = { + "Bucket": self.provider.bucket, + "MaxKeys": params.limit, + "Prefix": self.filters_to_prefix(filters), + } + if params.cursor is not None: + kwargs["StartAfter"] = self._id_to_key(params.cursor) + result = self.provider.client.list_objects_v2(**kwargs) + # Example response: + # { + # 'Key': 'object-in-s3', + # 'LastModified': datetime.datetime(..., tzinfo=utc), + # 'ETag': '"acbd18db4cc2f85cedef654fccc4a4d8"', + # 'Size': 3, 'StorageClass': + # 'STANDARD', + # 'Owner': {...} + # } + return [ + { + "id": self._key_to_id(x["Key"]), + "last_modified": x["LastModified"], + "etag": x["ETag"].strip('"'), + "size": x["Size"], + } + for x in result.get("Contents", []) + ] + + def remove(self, id: Id) -> bool: + self.provider.client.delete_object( + Bucket=self.provider.bucket, + Key=self._id_to_key(id), + ) + # S3 doesn't tell us if the object was there in the first place + return True + + def remove_multiple(self, ids: list[Id]) -> None: + if len(ids) == 0: + return + assert len(ids) <= AWS_LIMIT, f"max {AWS_LIMIT} keys for S3Gateway" + self.provider.client.delete_objects( + Bucket=self.provider.bucket, + Delete={ + "Objects": [{"Key": self._id_to_key(x)} for x in ids], + "Quiet": True, + }, + ) + + def _create_presigned_url( + self, + id: Id, + client_method: str, + ) -> AnyHttpUrl: + return self.provider.client.generate_presigned_url( + client_method, + Params={"Bucket": self.provider.bucket, "Key": self._id_to_key(id)}, + ExpiresIn=DEFAULT_EXPIRY, + ) + + def create_download_url(self, id: Id) -> AnyHttpUrl: + return self._create_presigned_url(id, "get_object") + + def create_upload_url(self, id: Id) -> AnyHttpUrl: + return self._create_presigned_url(id, "put_object") + + def download_file(self, id: Id, file_path: Path) -> None: + if file_path.exists(): + raise FileExistsError() + try: + self.provider.client.download_file( + Bucket=self.provider.bucket, + Key=self._id_to_key(id), + Filename=str(file_path), + ) + except ClientError as e: + if e.response["Error"]["Code"] == "404": + file_path.unlink(missing_ok=True) + raise DoesNotExist("object") + else: + raise e + + def upload_file(self, id: Id, file_path: Path) -> None: + if not file_path.is_file(): + raise FileNotFoundError() + self.provider.client.upload_file( + Bucket=self.provider.bucket, + Key=self._id_to_key(id), + Filename=str(file_path), + ) + + def filters_to_prefix(self, filters: list[Filter]) -> str: + if len(filters) == 0: + return self._id_to_key("") + elif len(filters) > 1: + raise NotImplementedError("More than 1 filter is not supported") + (filter,) = filters + if filter.field == "prefix": + assert len(filter.values) == 1 + return self._id_to_key(filter.values[0]) + else: + raise NotImplementedError(f"Unsupported filter '{filter.field}'") + + def remove_filtered(self, filters: list[Filter]) -> None: + kwargs = { + "Bucket": self.provider.bucket, + "MaxKeys": AWS_LIMIT, + "Prefix": self.filters_to_prefix(filters), + } + while True: + result = self.provider.client.list_objects_v2(**kwargs) + contents = result.get("Contents", []) + if contents: + self.provider.client.delete_objects( + Bucket=self.provider.bucket, + Delete={ + "Objects": [{"Key": x["Key"]} for x in contents], + "Quiet": True, + }, + ) + if len(contents) < AWS_LIMIT: + break + kwargs["StartAfter"] = contents[-1]["Key"] diff --git a/clean_python/s3/sync_s3_provider.py b/clean_python/s3/sync_s3_provider.py new file mode 100644 index 0000000..8e2ce45 --- /dev/null +++ b/clean_python/s3/sync_s3_provider.py @@ -0,0 +1,44 @@ +# (c) Nelen & Schuurmans + +import logging +from typing import TYPE_CHECKING + +import boto3 +from botocore.client import Config + +from .s3_bucket_options import S3BucketOptions + +if TYPE_CHECKING: + from mypy_boto3_s3.client import S3Client + +__all__ = ["S3BucketOptions", "SyncS3BucketProvider"] + +logger = logging.getLogger(__name__) + + +class SyncS3BucketProvider: + def __init__(self, options: S3BucketOptions): + self.options = options + + @property + def bucket(self) -> str: + return self.options.bucket + + @property + def client(self) -> "S3Client": + return boto3.client( + "s3", + endpoint_url=self.options.url, + aws_access_key_id=self.options.access_key, + aws_secret_access_key=self.options.secret_key, + region_name=self.options.region, + config=Config( + s3={"addressing_style": "virtual"}, # "path" will become deprecated + signature_version="s3v4", # for minio + retries={ + "max_attempts": 4, # 1 try and up to 3 retries + "mode": "adaptive", + }, + ), + use_ssl=self.options.url.startswith("https"), + ) diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index efffaad..4e0dc63 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -1,14 +1,17 @@ # (c) Nelen & Schuurmans import asyncio +import io import multiprocessing import os import time from urllib.error import URLError from urllib.request import urlopen +import boto3 import pytest import uvicorn +from botocore.exceptions import ClientError def pytest_sessionstart(session): @@ -97,3 +100,61 @@ async def fastapi_example_app(): yield f"http://localhost:{port}" finally: p.terminate() + + +@pytest.fixture(scope="session") +def s3_settings(s3_url): + minio_settings = { + "url": s3_url, + "access_key": "cleanpython", + "secret_key": "cleanpython", + "bucket": "cleanpython-test", + "region": None, + } + if not minio_settings["bucket"].endswith("-test"): # type: ignore + pytest.exit("Not running against a test minio bucket?! 😱") + return minio_settings.copy() + + +@pytest.fixture(scope="session") +def s3_bucket(s3_settings): + s3 = boto3.resource( + "s3", + endpoint_url=s3_settings["url"], + aws_access_key_id=s3_settings["access_key"], + aws_secret_access_key=s3_settings["secret_key"], + ) + bucket = s3.Bucket(s3_settings["bucket"]) + + # ensure existence + try: + bucket.create() + except ClientError as e: + if "BucketAlreadyOwnedByYou" in str(e): + pass + return bucket + + +@pytest.fixture +def local_file(tmp_path): + path = tmp_path / "test-upload.txt" + path.write_bytes(b"foo") + return path + + +@pytest.fixture +def object_in_s3(s3_bucket): + s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "object-in-s3") + return "object-in-s3" + + +@pytest.fixture +def object_in_s3_tenant(s3_bucket): + s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "tenant-22/object-in-s3") + return "object-in-s3" + + +@pytest.fixture +def object_in_s3_other_tenant(s3_bucket): + s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "tenant-222/object-in-s3") + return "object-in-s3" diff --git a/integration_tests/test_s3_gateway.py b/integration_tests/test_s3_gateway.py index 2a2388a..4a757e7 100644 --- a/integration_tests/test_s3_gateway.py +++ b/integration_tests/test_s3_gateway.py @@ -5,9 +5,7 @@ from datetime import datetime from unittest import mock -import boto3 import pytest -from botocore.exceptions import ClientError from clean_python import DoesNotExist from clean_python import Filter @@ -17,48 +15,15 @@ from clean_python.s3 import S3Gateway -@pytest.fixture(scope="session") -def s3_settings(s3_url): - minio_settings = { - "url": s3_url, - "access_key": "cleanpython", - "secret_key": "cleanpython", - "bucket": "cleanpython-test", - "region": None, - } - if not minio_settings["bucket"].endswith("-test"): # type: ignore - pytest.exit("Not running against a test minio bucket?! 😱") - return minio_settings.copy() - - -@pytest.fixture(scope="session") -def s3_bucket(s3_settings): - s3 = boto3.resource( - "s3", - endpoint_url=s3_settings["url"], - aws_access_key_id=s3_settings["access_key"], - aws_secret_access_key=s3_settings["secret_key"], - ) - bucket = s3.Bucket(s3_settings["bucket"]) - - # ensure existence - try: - bucket.create() - except ClientError as e: - if "BucketAlreadyOwnedByYou" in str(e): - pass - return bucket - - @pytest.fixture -def s3_provider(s3_bucket, s3_settings): +def s3_provider(s3_bucket, s3_settings) -> S3BucketProvider: # wipe contents before each test s3_bucket.objects.all().delete() return S3BucketProvider(S3BucketOptions(**s3_settings)) @pytest.fixture -def s3_gateway(s3_provider): +def s3_gateway(s3_provider) -> S3Gateway: return S3Gateway(s3_provider) diff --git a/integration_tests/test_s3_gateway_multitenant.py b/integration_tests/test_s3_gateway_multitenant.py index fa7df53..a72da50 100644 --- a/integration_tests/test_s3_gateway_multitenant.py +++ b/integration_tests/test_s3_gateway_multitenant.py @@ -4,9 +4,7 @@ import io from datetime import datetime -import boto3 import pytest -from botocore.exceptions import ClientError from clean_python import ctx from clean_python import DoesNotExist @@ -18,39 +16,6 @@ from clean_python.s3 import S3Gateway -@pytest.fixture(scope="session") -def s3_settings(s3_url): - minio_settings = { - "url": s3_url, - "access_key": "cleanpython", - "secret_key": "cleanpython", - "bucket": "cleanpython-test", - "region": None, - } - if not minio_settings["bucket"].endswith("-test"): # type: ignore - pytest.exit("Not running against a test minio bucket?! 😱") - return minio_settings.copy() - - -@pytest.fixture(scope="session") -def s3_bucket(s3_settings): - s3 = boto3.resource( - "s3", - endpoint_url=s3_settings["url"], - aws_access_key_id=s3_settings["access_key"], - aws_secret_access_key=s3_settings["secret_key"], - ) - bucket = s3.Bucket(s3_settings["bucket"]) - - # ensure existence - try: - bucket.create() - except ClientError as e: - if "BucketAlreadyOwnedByYou" in str(e): - pass - return bucket - - @pytest.fixture def s3_provider(s3_bucket, s3_settings): # wipe contents before each test @@ -66,25 +31,6 @@ def s3_gateway(s3_provider): return S3Gateway(s3_provider, multitenant=True) -@pytest.fixture -def object_in_s3(s3_bucket): - s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "tenant-22/object-in-s3") - return "object-in-s3" - - -@pytest.fixture -def object_in_s3_other_tenant(s3_bucket): - s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "tenant-222/object-in-s3") - return "object-in-s3" - - -@pytest.fixture -def local_file(tmp_path): - path = tmp_path / "test-upload.txt" - path.write_bytes(b"foo") - return path - - async def test_upload_file_uses_tenant(s3_gateway: S3Gateway, local_file, s3_bucket): object_name = "test-upload-file" @@ -93,10 +39,12 @@ async def test_upload_file_uses_tenant(s3_gateway: S3Gateway, local_file, s3_buc assert s3_bucket.Object("tenant-22/test-upload-file").content_length == 3 -async def test_download_file_uses_tenant(s3_gateway: S3Gateway, object_in_s3, tmp_path): +async def test_download_file_uses_tenant( + s3_gateway: S3Gateway, object_in_s3_tenant, tmp_path +): path = tmp_path / "test-download.txt" - await s3_gateway.download_file(object_in_s3, path) + await s3_gateway.download_file(object_in_s3_tenant, path) assert path.read_bytes() == b"foo" @@ -112,10 +60,12 @@ async def test_download_file_different_tenant( assert not path.exists() -async def test_remove_uses_tenant(s3_gateway: S3Gateway, s3_bucket, object_in_s3): - await s3_gateway.remove(object_in_s3) +async def test_remove_uses_tenant( + s3_gateway: S3Gateway, s3_bucket, object_in_s3_tenant +): + await s3_gateway.remove(object_in_s3_tenant) - assert await s3_gateway.get(object_in_s3) is None + assert await s3_gateway.get(object_in_s3_tenant) is None async def test_remove_other_tenant( @@ -170,9 +120,9 @@ async def test_filter_with_cursor_multitenant(s3_gateway: S3Gateway, multiple_ob assert actual[0]["id"] == "raster-2/foo" -async def test_get_multitenant(s3_gateway: S3Gateway, object_in_s3): - actual = await s3_gateway.get(object_in_s3) - assert actual["id"] == object_in_s3 +async def test_get_multitenant(s3_gateway: S3Gateway, object_in_s3_tenant): + actual = await s3_gateway.get(object_in_s3_tenant) + assert actual["id"] == object_in_s3_tenant assert isinstance(actual["last_modified"], datetime) assert actual["etag"] == "acbd18db4cc2f85cedef654fccc4a4d8" assert actual["size"] == 3 diff --git a/integration_tests/test_sync_s3_gateway.py b/integration_tests/test_sync_s3_gateway.py new file mode 100644 index 0000000..48f886d --- /dev/null +++ b/integration_tests/test_sync_s3_gateway.py @@ -0,0 +1,190 @@ +# This module is a copy paste of test_s3_gateway.py + +import io +from datetime import datetime +from unittest import mock + +import pytest + +from clean_python import DoesNotExist +from clean_python import Filter +from clean_python import PageOptions +from clean_python.s3 import S3BucketOptions +from clean_python.s3 import SyncS3BucketProvider +from clean_python.s3 import SyncS3Gateway + + +@pytest.fixture +def s3_provider(s3_bucket, s3_settings) -> SyncS3BucketProvider: + # wipe contents before each test + s3_bucket.objects.all().delete() + return SyncS3BucketProvider(S3BucketOptions(**s3_settings)) + + +@pytest.fixture +def s3_gateway(s3_provider) -> SyncS3Gateway: + return SyncS3Gateway(s3_provider) + + +@pytest.fixture +def object_in_s3(s3_bucket): + s3_bucket.upload_fileobj(io.BytesIO(b"foo"), "object-in-s3") + return "object-in-s3" + + +@pytest.fixture +def local_file(tmp_path): + path = tmp_path / "test-upload.txt" + path.write_bytes(b"foo") + return path + + +def test_upload_file(s3_gateway: SyncS3Gateway, local_file): + object_name = "test-upload-file" + + s3_gateway.upload_file(object_name, local_file) + + assert (s3_gateway.get(object_name))["size"] == 3 + + +def test_upload_file_does_not_exist(s3_gateway: SyncS3Gateway, tmp_path): + path = tmp_path / "test-upload.txt" + object_name = "test-upload-file" + + with pytest.raises(FileNotFoundError): + s3_gateway.upload_file(object_name, path) + + +def test_download_file(s3_gateway: SyncS3Gateway, object_in_s3, tmp_path): + path = tmp_path / "test-download.txt" + + s3_gateway.download_file(object_in_s3, path) + + assert path.read_bytes() == b"foo" + + +def test_download_file_path_already_exists( + s3_gateway: SyncS3Gateway, object_in_s3, tmp_path +): + path = tmp_path / "test-download.txt" + path.write_bytes(b"bar") + + with pytest.raises(FileExistsError): + s3_gateway.download_file(object_in_s3, path) + + assert path.read_bytes() == b"bar" + + +def test_download_file_does_not_exist(s3_gateway: SyncS3Gateway, s3_bucket, tmp_path): + path = tmp_path / "test-download-does-not-exist.txt" + + with pytest.raises(DoesNotExist): + s3_gateway.download_file("some-nonexisting", path) + + assert not path.exists() + + +def test_remove(s3_gateway: SyncS3Gateway, s3_bucket, object_in_s3): + s3_gateway.remove(object_in_s3) + + assert s3_gateway.get(object_in_s3) is None + + +def test_remove_does_not_exist(s3_gateway: SyncS3Gateway, s3_bucket): + s3_gateway.remove("non-existing") + + +@pytest.fixture +def multiple_objects(s3_bucket): + s3_bucket.upload_fileobj(io.BytesIO(b"a"), "raster-1/bla") + s3_bucket.upload_fileobj(io.BytesIO(b"ab"), "raster-2/bla") + s3_bucket.upload_fileobj(io.BytesIO(b"abc"), "raster-2/foo") + s3_bucket.upload_fileobj(io.BytesIO(b"abcde"), "raster-2/bz") + return ["raster-1/bla", "raster-2/bla", "raster-2/foo", "raster-2/bz"] + + +def test_remove_multiple(s3_gateway: SyncS3Gateway, multiple_objects): + s3_gateway.remove_multiple(multiple_objects[:2]) + + for key in multiple_objects[:2]: + assert s3_gateway.get(key) is None + + for key in multiple_objects[2:]: + assert s3_gateway.get(key) is not None + + +def test_remove_multiple_empty_list(s3_gateway: SyncS3Gateway, s3_bucket): + s3_gateway.remove_multiple([]) + + +def test_remove_filtered_all(s3_gateway: SyncS3Gateway, multiple_objects): + s3_gateway.remove_filtered([]) + + for key in multiple_objects: + assert s3_gateway.get(key) is None + + +def test_remove_filtered_prefix(s3_gateway: SyncS3Gateway, multiple_objects): + s3_gateway.remove_filtered([Filter(field="prefix", values=["raster-2/"])]) + + assert s3_gateway.get(multiple_objects[0]) is not None + for key in multiple_objects[1:]: + assert s3_gateway.get(key) is None + + +@mock.patch("clean_python.s3.s3_gateway.AWS_LIMIT", new=1) +def test_remove_filtered_pagination(s3_gateway: SyncS3Gateway, multiple_objects): + s3_gateway.remove_filtered([Filter(field="prefix", values=["raster-2/"])]) + + assert s3_gateway.get(multiple_objects[0]) is not None + for key in multiple_objects[1:]: + assert s3_gateway.get(key) is None + + +def test_filter(s3_gateway: SyncS3Gateway, multiple_objects): + actual = s3_gateway.filter([], params=PageOptions(limit=10)) + assert len(actual) == 4 + assert actual[0]["id"] == "raster-1/bla" + assert isinstance(actual[0]["last_modified"], datetime) + assert actual[0]["etag"] == "0cc175b9c0f1b6a831c399e269772661" + assert actual[0]["size"] == 1 + + +def test_filter_empty(s3_gateway: SyncS3Gateway, s3_bucket): + actual = s3_gateway.filter([], params=PageOptions(limit=10)) + assert actual == [] + + +def test_filter_with_limit(s3_gateway: SyncS3Gateway, multiple_objects): + actual = s3_gateway.filter([], params=PageOptions(limit=2)) + assert len(actual) == 2 + assert actual[0]["id"] == "raster-1/bla" + assert actual[1]["id"] == "raster-2/bla" + + +def test_filter_with_cursor(s3_gateway: SyncS3Gateway, multiple_objects): + actual = s3_gateway.filter([], params=PageOptions(limit=3, cursor="raster-2/bla")) + assert len(actual) == 2 + assert actual[0]["id"] == "raster-2/bz" + assert actual[1]["id"] == "raster-2/foo" + + +def test_filter_by_prefix(s3_gateway: SyncS3Gateway, multiple_objects): + actual = s3_gateway.filter([Filter(field="prefix", values=["raster-1/"])]) + assert len(actual) == 1 + + actual = s3_gateway.filter([Filter(field="prefix", values=["raster-2/"])]) + assert len(actual) == 3 + + +def test_get(s3_gateway: SyncS3Gateway, object_in_s3): + actual = s3_gateway.get(object_in_s3) + assert actual["id"] == "object-in-s3" + assert isinstance(actual["last_modified"], datetime) + assert actual["etag"] == "acbd18db4cc2f85cedef654fccc4a4d8" + assert actual["size"] == 3 + + +def test_get_does_not_exist(s3_gateway: SyncS3Gateway): + actual = s3_gateway.get("non-existing") + assert actual is None diff --git a/integration_tests/test_sync_s3_gateway_multitenant.py b/integration_tests/test_sync_s3_gateway_multitenant.py new file mode 100644 index 0000000..5654d14 --- /dev/null +++ b/integration_tests/test_sync_s3_gateway_multitenant.py @@ -0,0 +1,152 @@ +# This module is a copy paste of test_s3_gateway_multitenant.py + +import io +from datetime import datetime + +import pytest + +from clean_python import ctx +from clean_python import DoesNotExist +from clean_python import Filter +from clean_python import PageOptions +from clean_python import Tenant +from clean_python.s3 import S3BucketOptions +from clean_python.s3 import SyncS3BucketProvider +from clean_python.s3 import SyncS3Gateway + + +@pytest.fixture +def s3_provider(s3_bucket, s3_settings): + # wipe contents before each test + s3_bucket.objects.all().delete() + # set up a tenant + ctx.tenant = Tenant(id=22, name="foo") + yield SyncS3BucketProvider(S3BucketOptions(**s3_settings)) + ctx.tenant = None + + +@pytest.fixture +def s3_gateway(s3_provider): + return SyncS3Gateway(s3_provider, multitenant=True) + + +def test_upload_file_uses_tenant(s3_gateway: SyncS3Gateway, local_file, s3_bucket): + object_name = "test-upload-file" + + s3_gateway.upload_file(object_name, local_file) + + assert s3_bucket.Object("tenant-22/test-upload-file").content_length == 3 + + +def test_download_file_uses_tenant( + s3_gateway: SyncS3Gateway, object_in_s3_tenant, tmp_path +): + path = tmp_path / "test-download.txt" + + s3_gateway.download_file(object_in_s3_tenant, path) + + assert path.read_bytes() == b"foo" + + +def test_download_file_different_tenant( + s3_gateway: SyncS3Gateway, s3_bucket, tmp_path, object_in_s3_other_tenant +): + path = tmp_path / "test-download.txt" + + with pytest.raises(DoesNotExist): + s3_gateway.download_file("object-in-s3", path) + + assert not path.exists() + + +def test_remove_uses_tenant(s3_gateway: SyncS3Gateway, s3_bucket, object_in_s3_tenant): + s3_gateway.remove(object_in_s3_tenant) + + assert s3_gateway.get(object_in_s3_tenant) is None + + +def test_remove_other_tenant( + s3_gateway: SyncS3Gateway, s3_bucket, object_in_s3_other_tenant +): + s3_gateway.remove(object_in_s3_other_tenant) + + # it is still there + assert s3_bucket.Object("tenant-222/object-in-s3").content_length == 3 + + +@pytest.fixture +def multiple_objects(s3_bucket): + s3_bucket.upload_fileobj(io.BytesIO(b"a"), "tenant-22/raster-1/bla") + s3_bucket.upload_fileobj(io.BytesIO(b"ab"), "tenant-222/raster-2/bla") + s3_bucket.upload_fileobj(io.BytesIO(b"abc"), "tenant-22/raster-2/foo") + s3_bucket.upload_fileobj(io.BytesIO(b"abcde"), "tenant-22/raster-2/bz") + return ["raster-1/bla", "raster-2/bla", "raster-2/foo", "raster-2/bz"] + + +def test_remove_multiple_multitenant( + s3_gateway: SyncS3Gateway, multiple_objects, s3_bucket +): + s3_gateway.remove_multiple(multiple_objects[:2]) + + assert s3_gateway.get(multiple_objects[0]) is None + + # the other-tenant object is still there + assert s3_bucket.Object("tenant-222/raster-2/bla").content_length == 2 + + +def test_filter_multitenant(s3_gateway: SyncS3Gateway, multiple_objects): + actual = s3_gateway.filter([], params=PageOptions(limit=10)) + assert len(actual) == 3 + assert actual[0]["id"] == "raster-1/bla" + + +def test_filter_with_prefix_multitenant(s3_gateway: SyncS3Gateway, multiple_objects): + actual = s3_gateway.filter( + [Filter(field="prefix", values=["raster-2/"])], params=PageOptions(limit=10) + ) + assert len(actual) == 2 + assert actual[0]["id"] == "raster-2/bz" + assert actual[1]["id"] == "raster-2/foo" + + +def test_filter_with_cursor_multitenant(s3_gateway: SyncS3Gateway, multiple_objects): + actual = s3_gateway.filter([], params=PageOptions(limit=3, cursor="raster-2/bz")) + assert len(actual) == 1 + assert actual[0]["id"] == "raster-2/foo" + + +def test_get_multitenant(s3_gateway: SyncS3Gateway, object_in_s3_tenant): + actual = s3_gateway.get(object_in_s3_tenant) + assert actual["id"] == object_in_s3_tenant + assert isinstance(actual["last_modified"], datetime) + assert actual["etag"] == "acbd18db4cc2f85cedef654fccc4a4d8" + assert actual["size"] == 3 + + +def test_get_other_tenant(s3_gateway: SyncS3Gateway, object_in_s3_other_tenant): + actual = s3_gateway.get(object_in_s3_other_tenant) + assert actual is None + + +def test_remove_filtered_all(s3_gateway: SyncS3Gateway, multiple_objects): + s3_gateway.remove_filtered([]) + + # tenant 22 is completely wiped + for i in (0, 2, 3): + assert s3_gateway.get(multiple_objects[i]) is None + + # object of tenant 222 is still there + ctx.tenant = Tenant(id=222, name="other") + s3_gateway.get("raster-2/bla") is not None + + +def test_remove_filtered_prefix(s3_gateway: SyncS3Gateway, multiple_objects): + s3_gateway.remove_filtered([Filter(field="prefix", values=["raster-2/"])]) + + assert s3_gateway.get("raster-1/bla") is not None + assert s3_gateway.get("raster-2/foo") is None + assert s3_gateway.get("raster-2/bz") is None + + # object of tenant 222 is still there + ctx.tenant = Tenant(id=222, name="other") + s3_gateway.get("raster-2/bla") is not None diff --git a/pyproject.toml b/pyproject.toml index c156fbf..0db1da1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,9 @@ celery = ["celery==5.3.*"] fluentbit = ["fluent-logger"] sql = ["sqlalchemy==2.0.*", "asyncpg==0.29.*"] sql_sync = ["sqlalchemy==2.0.*", "psycopg2==2.9.*"] -s3 = ["aioboto3==12.2.*", "boto3==1.33.*"] +# help the resolver a bit by copying version pins from aioboto3 / aiobotocore +s3 = ["aioboto3==13.0.*", "aiobotocore==2.13.0", "boto3==1.34.106", "types-aioboto3[s3]"] +s3_sync = ["boto3==1.34.*", "boto3-stubs[s3]"] api_client = ["aiohttp==3.9.*", "urllib3==2.0.*"] profiler = ["yappi"] debugger = ["debugpy"]