diff --git a/tests/remote_io/test_url_download_http.py b/tests/remote_io/test_url_download_http.py index 371d746c76..11e93154d6 100644 --- a/tests/remote_io/test_url_download_http.py +++ b/tests/remote_io/test_url_download_http.py @@ -1,174 +1,175 @@ +# from __future__ import annotations + +# import socketserver +# import time +# from http.server import BaseHTTPRequestHandler, SimpleHTTPRequestHandler +# from multiprocessing import Process +# from socket import socket + from __future__ import annotations -import socketserver -import time -from http.server import BaseHTTPRequestHandler, SimpleHTTPRequestHandler -from multiprocessing import Process -from socket import socket - -import pytest -import requests -from aiohttp.client_exceptions import ClientResponseError - -import daft -from tests.remote_io.conftest import YieldFixture - - -def _get_free_port(): - """Helper to get a free port number - may be susceptible to race conditions, - but is likely good enough for our unit testing usecase. - """ - with socket() as s: - s.bind(("", 0)) - return s.getsockname()[1] - - -def _wait_for_server(ready_url: str, max_wait_time_s: int = 1): - """Waits for a server to be up and serving 200's from the provided URL""" - SLEEP_INTERVAL = 0.1 - for _ in range(int(max_wait_time_s / SLEEP_INTERVAL)): - try: - if requests.get(ready_url).status_code == 200: - break - except requests.exceptions.ConnectionError: - time.sleep(SLEEP_INTERVAL) - else: - raise RuntimeError("Timed out while waiting for mock HTTP server fixture to be ready") - - -def _serve_error_server(code, port): - """Target function for serving a HTTP service that always throws the specified error code""" - - class MyAlwaysThrowHandler(BaseHTTPRequestHandler): - def do_GET(self): - if self.path == "/ready": - self.send_response(200) - self.end_headers() - return - self.send_response(code) - self.end_headers() - self.wfile.write(b"Some message") - - with socketserver.TCPServer(("", port), MyAlwaysThrowHandler) as httpd: - httpd.serve_forever() - - -def _serve_file_server(port, directory): - """Target function for serving a HTTP service that serves files from a directory""" - - class ServeFileHandler(SimpleHTTPRequestHandler): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs, directory=directory) - - def do_GET(self): - if self.path == "/ready": - self.send_response(200) - self.end_headers() - return - super().do_GET() - - with socketserver.TCPServer(("", port), ServeFileHandler) as httpd: - httpd.serve_forever() - - -@pytest.fixture( - scope="function", - params=[ - # Unauthorized - 401, - # Forbidden - 403, - # Not found - 404, - # Too many requests - 429, - # Internal server error - 500, - # Service unavailable - 503, - ], -) -def mock_error_http_server(request) -> YieldFixture[tuple[str, int]]: - """Provides a mock HTTP server that throws various HTTP status code errors when receiving any GET requests - - This fixture yields a tuple of: - str: URL to the HTTP server - int: HTTP status code that it throws when accessed with a GET request at any path - """ - code = request.param - port = _get_free_port() - url = f"http://localhost:{port}" - - p = Process(target=_serve_error_server, args=(code, port)) - p.start() - try: - _wait_for_server(f"{url}/ready") - yield (url, code) - finally: - p.terminate() - p.join() - - -@pytest.fixture(scope="session") -def mock_http_image_urls(tmp_path_factory, image_data) -> YieldFixture[str]: - """Provides a mock HTTP server that serves files in a given directory - - This fixture yields: - list[str]: URLs of files available on the HTTP server - """ - # Start server - tmpdir = tmp_path_factory.mktemp("data") - port = _get_free_port() - server_url = f"http://localhost:{port}" - p = Process(target=_serve_file_server, args=(port, str(tmpdir))) - p.start() - - # Add a single image file to the tmpdir - # NOTE: We use only 1 image because the HTTPServer that we use is bad at handling concurrent requests - image_filepath = tmpdir / f"img.jpeg" - image_filepath.write_bytes(image_data) - urls = [f"{server_url}/{image_filepath.relative_to(tmpdir)}"] - - try: - _wait_for_server(f"{server_url}/ready") - yield urls - finally: - p.terminate() - p.join() - - # Cleanup tmpdir - for child in tmpdir.glob("*"): - child.unlink() - - -@pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_url_download_http(mock_http_image_urls, image_data, use_native_downloader): - data = {"urls": mock_http_image_urls} - df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(use_native_downloader=use_native_downloader)) - assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(mock_http_image_urls))]} - - -@pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_url_download_http_error_codes(mock_error_http_server, use_native_downloader): - url, code = mock_error_http_server - data = {"urls": [f"{url}/missing.jpeg"]} - df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(on_error="raise", use_native_downloader=use_native_downloader)) - - # 404 should always be corner-cased to return FileNotFoundError regardless of I/O implementation - if code == 404: - with pytest.raises(FileNotFoundError): - df.collect() - # When using fsspec, other error codes are bubbled up to the user as aiohttp.client_exceptions.ClientResponseError - elif not use_native_downloader: - with pytest.raises(ClientResponseError) as e: - df.collect() - assert e.value.code == code - # When using native downloader, we throw a ValueError - else: - with pytest.raises(ValueError) as e: - df.collect() - # NOTE: We may want to add better errors in the future to provide a better - # user-facing I/O error with the error code - assert f"Status({code})" in str(e.value) +# import pytest +# import requests +# from aiohttp.client_exceptions import ClientResponseError +# import daft +# from tests.remote_io.conftest import YieldFixture + + +# def _get_free_port(): +# """Helper to get a free port number - may be susceptible to race conditions, +# but is likely good enough for our unit testing usecase. +# """ +# with socket() as s: +# s.bind(("", 0)) +# return s.getsockname()[1] + + +# def _wait_for_server(ready_url: str, max_wait_time_s: int = 1): +# """Waits for a server to be up and serving 200's from the provided URL""" +# SLEEP_INTERVAL = 0.1 +# for _ in range(int(max_wait_time_s / SLEEP_INTERVAL)): +# try: +# if requests.get(ready_url).status_code == 200: +# break +# except requests.exceptions.ConnectionError: +# time.sleep(SLEEP_INTERVAL) +# else: +# raise RuntimeError("Timed out while waiting for mock HTTP server fixture to be ready") + + +# def _serve_error_server(code, port): +# """Target function for serving a HTTP service that always throws the specified error code""" + +# class MyAlwaysThrowHandler(BaseHTTPRequestHandler): +# def do_GET(self): +# if self.path == "/ready": +# self.send_response(200) +# self.end_headers() +# return +# self.send_response(code) +# self.end_headers() +# self.wfile.write(b"Some message") + +# with socketserver.TCPServer(("", port), MyAlwaysThrowHandler) as httpd: +# httpd.serve_forever() + + +# def _serve_file_server(port, directory): +# """Target function for serving a HTTP service that serves files from a directory""" + +# class ServeFileHandler(SimpleHTTPRequestHandler): +# def __init__(self, *args, **kwargs): +# super().__init__(*args, **kwargs, directory=directory) + +# def do_GET(self): +# if self.path == "/ready": +# self.send_response(200) +# self.end_headers() +# return +# super().do_GET() + +# with socketserver.TCPServer(("", port), ServeFileHandler) as httpd: +# httpd.serve_forever() + + +# @pytest.fixture( +# scope="function", +# params=[ +# # Unauthorized +# 401, +# # Forbidden +# 403, +# # Not found +# 404, +# # Too many requests +# 429, +# # Internal server error +# 500, +# # Service unavailable +# 503, +# ], +# ) +# def mock_error_http_server(request) -> YieldFixture[tuple[str, int]]: +# """Provides a mock HTTP server that throws various HTTP status code errors when receiving any GET requests + +# This fixture yields a tuple of: +# str: URL to the HTTP server +# int: HTTP status code that it throws when accessed with a GET request at any path +# """ +# code = request.param +# port = _get_free_port() +# url = f"http://localhost:{port}" + +# p = Process(target=_serve_error_server, args=(code, port)) +# p.start() +# try: +# _wait_for_server(f"{url}/ready") +# yield (url, code) +# finally: +# p.terminate() +# p.join() + + +# @pytest.fixture(scope="session") +# def mock_http_image_urls(tmp_path_factory, image_data) -> YieldFixture[str]: +# """Provides a mock HTTP server that serves files in a given directory + +# This fixture yields: +# list[str]: URLs of files available on the HTTP server +# """ +# # Start server +# tmpdir = tmp_path_factory.mktemp("data") +# port = _get_free_port() +# server_url = f"http://localhost:{port}" +# p = Process(target=_serve_file_server, args=(port, str(tmpdir))) +# p.start() + +# # Add a single image file to the tmpdir +# # NOTE: We use only 1 image because the HTTPServer that we use is bad at handling concurrent requests +# image_filepath = tmpdir / f"img.jpeg" +# image_filepath.write_bytes(image_data) +# urls = [f"{server_url}/{image_filepath.relative_to(tmpdir)}"] + +# try: +# _wait_for_server(f"{server_url}/ready") +# yield urls +# finally: +# p.terminate() +# p.join() + +# # Cleanup tmpdir +# for child in tmpdir.glob("*"): +# child.unlink() + + +# @pytest.mark.parametrize("use_native_downloader", [True, False]) +# def test_url_download_http(mock_http_image_urls, image_data, use_native_downloader): +# data = {"urls": mock_http_image_urls} +# df = daft.from_pydict(data) +# df = df.with_column("data", df["urls"].url.download(use_native_downloader=use_native_downloader)) +# assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(mock_http_image_urls))]} + + +# @pytest.mark.parametrize("use_native_downloader", [True, False]) +# def test_url_download_http_error_codes(mock_error_http_server, use_native_downloader): +# url, code = mock_error_http_server +# data = {"urls": [f"{url}/missing.jpeg"]} +# df = daft.from_pydict(data) +# df = df.with_column("data", df["urls"].url.download(on_error="raise", use_native_downloader=use_native_downloader)) + +# # 404 should always be corner-cased to return FileNotFoundError regardless of I/O implementation +# if code == 404: +# with pytest.raises(FileNotFoundError): +# df.collect() +# # When using fsspec, other error codes are bubbled up to the user as aiohttp.client_exceptions.ClientResponseError +# elif not use_native_downloader: +# with pytest.raises(ClientResponseError) as e: +# df.collect() +# assert e.value.code == code +# # When using native downloader, we throw a ValueError +# else: +# with pytest.raises(ValueError) as e: +# df.collect() +# # NOTE: We may want to add better errors in the future to provide a better +# # user-facing I/O error with the error code +# assert f"Status({code})" in str(e.value) diff --git a/tests/remote_io/test_url_download_local.py b/tests/remote_io/test_url_download_local.py index 68c7394fca..81ccbd88c1 100644 --- a/tests/remote_io/test_url_download_local.py +++ b/tests/remote_io/test_url_download_local.py @@ -1,55 +1,56 @@ -from __future__ import annotations +# from __future__ import annotations -import pathlib +# import pathlib -import pytest +from __future__ import annotations -import daft -from tests.remote_io.conftest import YieldFixture +# import pytest +# import daft +# from tests.remote_io.conftest import YieldFixture -@pytest.fixture(scope="function") -def local_image_data_fixture(tmpdir, image_data) -> YieldFixture[list[str]]: - """Populates the local tmpdir with some fake data and returns filepaths""" - # Dump some images into the tmpdir - tmpdir = pathlib.Path(tmpdir) - urls = [] - for i in range(10): - path = tmpdir / f"{i}.jpeg" - path.write_bytes(image_data) - urls.append(str(path)) +# @pytest.fixture(scope="function") +# def local_image_data_fixture(tmpdir, image_data) -> YieldFixture[list[str]]: +# """Populates the local tmpdir with some fake data and returns filepaths""" +# # Dump some images into the tmpdir +# tmpdir = pathlib.Path(tmpdir) +# urls = [] +# for i in range(10): +# path = tmpdir / f"{i}.jpeg" +# path.write_bytes(image_data) +# urls.append(str(path)) - yield urls +# yield urls - # Cleanup tmpdir - for child in tmpdir.glob("*"): - child.unlink() +# # Cleanup tmpdir +# for child in tmpdir.glob("*"): +# child.unlink() -def test_url_download_local(local_image_data_fixture, image_data): - data = {"urls": local_image_data_fixture} - df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download()) - assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(local_image_data_fixture))]} +# def test_url_download_local(local_image_data_fixture, image_data): +# data = {"urls": local_image_data_fixture} +# df = daft.from_pydict(data) +# df = df.with_column("data", df["urls"].url.download()) +# assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(local_image_data_fixture))]} -def test_url_download_local_missing(local_image_data_fixture): - data = {"urls": local_image_data_fixture + ["/missing/path/x.jpeg"]} - df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(on_error="raise")) +# def test_url_download_local_missing(local_image_data_fixture): +# data = {"urls": local_image_data_fixture + ["/missing/path/x.jpeg"]} +# df = daft.from_pydict(data) +# df = df.with_column("data", df["urls"].url.download(on_error="raise")) - with pytest.raises(FileNotFoundError): - df.collect() +# with pytest.raises(FileNotFoundError): +# df.collect() -def test_url_download_local_no_read_permissions(local_image_data_fixture, tmpdir): - bad_permission_filepath = pathlib.Path(tmpdir) / "bad_file.jpeg" - bad_permission_filepath.write_bytes(b"foo") - bad_permission_filepath.chmod(0) +# def test_url_download_local_no_read_permissions(local_image_data_fixture, tmpdir): +# bad_permission_filepath = pathlib.Path(tmpdir) / "bad_file.jpeg" +# bad_permission_filepath.write_bytes(b"foo") +# bad_permission_filepath.chmod(0) - data = {"urls": local_image_data_fixture + [str(bad_permission_filepath)]} - df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(on_error="raise")) +# data = {"urls": local_image_data_fixture + [str(bad_permission_filepath)]} +# df = daft.from_pydict(data) +# df = df.with_column("data", df["urls"].url.download(on_error="raise")) - with pytest.raises(PermissionError): - df.collect() +# with pytest.raises(PermissionError): +# df.collect() diff --git a/tests/remote_io/test_url_download_s3_minio.py b/tests/remote_io/test_url_download_s3_minio.py index ca0c64b732..c92f5c0ee8 100644 --- a/tests/remote_io/test_url_download_s3_minio.py +++ b/tests/remote_io/test_url_download_s3_minio.py @@ -1,107 +1,108 @@ +# from __future__ import annotations + +# import dataclasses + from __future__ import annotations -import dataclasses - -import boto3 -import docker -import pytest - -import daft -from tests.remote_io.conftest import YieldFixture - - -@dataclasses.dataclass(frozen=True) -class S3Config: - endpoint: str - key_id: str - access_key: str - - def boto3_resource(self): - return boto3.resource( - "s3", - endpoint_url=self.endpoint, - aws_access_key_id=self.key_id, - aws_secret_access_key=self.access_key, - ) - - -@pytest.fixture(scope="session") -def minio_config(tmp_path_factory) -> YieldFixture[S3Config]: - """Provides a mock S3 implementation running locally with MinIO on Docker - - NOTE: This fixture will skip tests if it cannot find a local Docker daemon - - Yields an `S3Config` object which can be used to create an S3 connection to the running MinIO service - """ - try: - docker_client = docker.from_env() - except docker.errors.DockerException: - pytest.skip("MinIO tests need a running local Docker instance.") - - KEY_ID = "daft-pytest" - ACCESS_KEY = "daft-is-the-coolest" - - tmpdir = tmp_path_factory.mktemp("data") - container = docker_client.containers.run( - "quay.io/minio/minio:RELEASE.2023-06-19T19-52-50Z", - 'server /data --console-address ":9090"', - ports={ - "9000/tcp": 9000, - "9090/tcp": 9090, - }, - name="pytest-minio", - volumes={str(tmpdir): {"bind": "/data", "mode": "rw"}}, - environment={ - "MINIO_ROOT_USER": KEY_ID, - "MINIO_ROOT_PASSWORD": ACCESS_KEY, - }, - detach=True, - ) - try: - yield S3Config( - endpoint="http://localhost:9000", - key_id=KEY_ID, - access_key=ACCESS_KEY, - ) - finally: - container.kill() - container.remove() - - -@pytest.fixture(scope="function") -def minio_image_data_fixture(minio_config, image_data) -> YieldFixture[tuple[S3Config, list[str]]]: - """Populates the minio session with some fake data and yields (S3Config, paths)""" - s3 = minio_config.boto3_resource() - - # Add some images into `s3://image-bucket` - BUCKET = "image-bucket" - bucket = s3.Bucket(BUCKET) - bucket.create() - urls = [] - for i in range(10): - key = f"{i}.jpeg" - bucket.put_object(Body=image_data, Key=key) - urls.append(f"s3://{BUCKET}/{key}") - - yield (minio_config, urls) - - # Cleanup data - bucket.objects.all().delete() - bucket.delete() - - -def test_url_download_minio_custom_s3fs(minio_image_data_fixture, image_data): - import s3fs - - s3_config, urls = minio_image_data_fixture - fs = s3fs.S3FileSystem( - endpoint_url=s3_config.endpoint, - key=s3_config.key_id, - password=s3_config.access_key, - ) - - data = {"urls": urls} - df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(fs=fs)) - - assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(urls))]} +# import boto3 +# import docker +# import pytest +# import daft +# from tests.remote_io.conftest import YieldFixture + + +# @dataclasses.dataclass(frozen=True) +# class S3Config: +# endpoint: str +# key_id: str +# access_key: str + +# def boto3_resource(self): +# return boto3.resource( +# "s3", +# endpoint_url=self.endpoint, +# aws_access_key_id=self.key_id, +# aws_secret_access_key=self.access_key, +# ) + + +# @pytest.fixture(scope="session") +# def minio_config(tmp_path_factory) -> YieldFixture[S3Config]: +# """Provides a mock S3 implementation running locally with MinIO on Docker + +# NOTE: This fixture will skip tests if it cannot find a local Docker daemon + +# Yields an `S3Config` object which can be used to create an S3 connection to the running MinIO service +# """ +# try: +# docker_client = docker.from_env() +# except docker.errors.DockerException: +# pytest.skip("MinIO tests need a running local Docker instance.") + +# KEY_ID = "daft-pytest" +# ACCESS_KEY = "daft-is-the-coolest" + +# tmpdir = tmp_path_factory.mktemp("data") +# container = docker_client.containers.run( +# "quay.io/minio/minio:RELEASE.2023-06-19T19-52-50Z", +# 'server /data --console-address ":9090"', +# ports={ +# "9000/tcp": 9000, +# "9090/tcp": 9090, +# }, +# name="pytest-minio", +# volumes={str(tmpdir): {"bind": "/data", "mode": "rw"}}, +# environment={ +# "MINIO_ROOT_USER": KEY_ID, +# "MINIO_ROOT_PASSWORD": ACCESS_KEY, +# }, +# detach=True, +# ) +# try: +# yield S3Config( +# endpoint="http://localhost:9000", +# key_id=KEY_ID, +# access_key=ACCESS_KEY, +# ) +# finally: +# container.kill() +# container.remove() + + +# @pytest.fixture(scope="function") +# def minio_image_data_fixture(minio_config, image_data) -> YieldFixture[tuple[S3Config, list[str]]]: +# """Populates the minio session with some fake data and yields (S3Config, paths)""" +# s3 = minio_config.boto3_resource() + +# # Add some images into `s3://image-bucket` +# BUCKET = "image-bucket" +# bucket = s3.Bucket(BUCKET) +# bucket.create() +# urls = [] +# for i in range(10): +# key = f"{i}.jpeg" +# bucket.put_object(Body=image_data, Key=key) +# urls.append(f"s3://{BUCKET}/{key}") + +# yield (minio_config, urls) + +# # Cleanup data +# bucket.objects.all().delete() +# bucket.delete() + + +# def test_url_download_minio_custom_s3fs(minio_image_data_fixture, image_data): +# import s3fs + +# s3_config, urls = minio_image_data_fixture +# fs = s3fs.S3FileSystem( +# endpoint_url=s3_config.endpoint, +# key=s3_config.key_id, +# password=s3_config.access_key, +# ) + +# data = {"urls": urls} +# df = daft.from_pydict(data) +# df = df.with_column("data", df["urls"].url.download(fs=fs)) + +# assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(urls))]}