Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Add s3 fixtures for retrying logic #1206

Merged
merged 9 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ jobs:
matrix:
python-version: ['3.7']
daft-runner: [py, ray]
# These permissions are needed to interact with GitHub's OIDC Token endpoint.
# This is used in the step "Assume GitHub Actions AWS Credentials"
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -242,6 +247,12 @@ jobs:
run: |
mkdir -p /tmp/daft-integration-testing/nginx
chmod +rw /tmp/daft-integration-testing/nginx
- name: Assume GitHub Actions AWS Credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-region: us-west-2
role-to-assume: ${{ secrets.ACTIONS_AWS_ROLE_ARN }}
role-session-name: DaftPythonPackageGitHubWorkflow
- name: Spin up IO services
uses: isbang/[email protected]
with:
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/docker-compose/Dockerfile.s3_retry_server
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.9

WORKDIR /code

COPY ./retry_server/retry-server-requirements.txt /code/requirements.txt

RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt

COPY ./retry_server /code/app

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
9 changes: 9 additions & 0 deletions tests/integration/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,12 @@ services:
- /tmp/daft-integration-testing/nginx:/app/static:rw
ports:
- 8080:8080

# Custom FastAPI server which mocks an s3 endpoint and serves retryable objects
s3-retry-server:
image: python-s3-retry-server
build:
context: .
dockerfile: Dockerfile.s3_retry_server
ports:
- 8001:8000
Empty file.
23 changes: 23 additions & 0 deletions tests/integration/docker-compose/retry_server/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""This file defines a FastAPI server that emulates an S3 implementation that serves Parquet files

This S3 implementation serves a small Parquet file at the location: `s3://{bucket-name}/{status_code}/{num_errors}/{item_id}`

1. `status_code` is the HTTP status code it returns when S3 clients hit that endpoint
2. `num_errors` is the number of times it throws that error before successfully returning data for a given requested byte range
3. `item_id` is an ID generated by the clients to uniquely identify one attempt at retrieving this Parquet file

We provide two different buckets, with slightly different behavior:

1. "head-retries-bucket": this bucket throws errors during HEAD operations
2. "get-retries-bucket": this bucket throws errors during the ranged GET operations
"""

from __future__ import annotations

from fastapi import FastAPI

from .routers import get_retries_bucket, head_retries_bucket

app = FastAPI()
app.include_router(get_retries_bucket.router)
app.include_router(head_retries_bucket.router)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
annotated-types==0.5.0
anyio==3.7.1
click==8.1.6
exceptiongroup==1.1.2
fastapi==0.100.1
h11==0.14.0
httptools==0.6.0
idna==3.4
pydantic==2.1.1
pydantic_core==2.4.0
python-dotenv==1.0.0
PyYAML==6.0.1
sniffio==1.3.0
starlette==0.27.0
typing_extensions==4.7.1
uvicorn==0.23.2
uvloop==0.17.0
watchfiles==0.19.0
websockets==11.0.3
pyarrow
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

import os
from typing import Annotated

from fastapi import APIRouter, Header, Response

from ..utils.parquet_generation import generate_parquet_file
from ..utils.responses import get_response

BUCKET_NAME = "get-retries-bucket"
OBJECT_KEY_URL = "/{status_code}/{num_errors}/{item_id}"
MOCK_PARQUET_DATA_PATH = generate_parquet_file()

ITEM_ID_TO_NUM_RETRIES: dict[tuple[str, tuple[int, int]], int] = {}

router = APIRouter(prefix=f"/{BUCKET_NAME}")


@router.head(OBJECT_KEY_URL)
async def bucket_head(status_code: int, num_errors: int, item_id: str):
return Response(
headers={
"Content-Length": str(os.path.getsize(MOCK_PARQUET_DATA_PATH.name)),
"Content-Type": "binary/octet-stream",
"Accept-Ranges": "bytes",
},
)


@router.get(OBJECT_KEY_URL)
async def retryable_bucket_get(
status_code: int,
num_errors: int,
item_id: str,
range: Annotated[str, Header()],
):
# If we've only seen this range request <= num_errors times, we throw an error
start, end = (int(i) for i in range[len("bytes=") :].split("-"))
key = (item_id, (start, end))
if key not in ITEM_ID_TO_NUM_RETRIES:
ITEM_ID_TO_NUM_RETRIES[key] = 1
else:
ITEM_ID_TO_NUM_RETRIES[key] += 1
if ITEM_ID_TO_NUM_RETRIES[key] <= num_errors:
return get_response(BUCKET_NAME, status_code, num_errors, item_id)

with open(MOCK_PARQUET_DATA_PATH.name, "rb") as f:
f.seek(start)
data = f.read(end - start + 1)

return Response(
status_code=206,
content=data,
headers={
"Content-Length": str(len(data)),
"Content-Type": "binary/octet-stream",
"Content-Range": f"bytes {start}-{end}/{os.path.getsize(MOCK_PARQUET_DATA_PATH.name)}",
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

import os
from typing import Annotated

from fastapi import APIRouter, Header, Response

from ..utils.parquet_generation import generate_parquet_file
from ..utils.responses import get_response

BUCKET_NAME = "head-retries-bucket"
OBJECT_KEY_URL = "/{status_code}/{num_errors}/{item_id}"
MOCK_PARQUET_DATA_PATH = generate_parquet_file()

ITEM_ID_TO_NUM_RETRIES: dict[str, int] = {}

router = APIRouter(prefix=f"/{BUCKET_NAME}")


@router.head(OBJECT_KEY_URL)
async def retryable_bucket_head(status_code: int, num_errors: int, item_id: str):
"""Reading of Parquet starts with a head request, which potentially must be retried as well"""
key = item_id
if key not in ITEM_ID_TO_NUM_RETRIES:
ITEM_ID_TO_NUM_RETRIES[key] = 1
else:
ITEM_ID_TO_NUM_RETRIES[key] += 1
if ITEM_ID_TO_NUM_RETRIES[key] <= num_errors:
return get_response(BUCKET_NAME, status_code, num_errors, item_id)

return Response(
headers={
"Content-Length": str(os.path.getsize(MOCK_PARQUET_DATA_PATH.name)),
"Content-Type": "binary/octet-stream",
"Accept-Ranges": "bytes",
},
)


@router.get(OBJECT_KEY_URL)
async def bucket_get(
status_code: int,
num_errors: int,
item_id: str,
range: Annotated[str, Header()],
):
start, end = (int(i) for i in range[len("bytes=") :].split("-"))
with open(MOCK_PARQUET_DATA_PATH.name, "rb") as f:
f.seek(start)
data = f.read(end - start + 1)

return Response(
status_code=206,
content=data,
headers={
"Content-Length": str(len(data)),
"Content-Type": "binary/octet-stream",
"Content-Range": f"bytes {start}-{end}/{os.path.getsize(MOCK_PARQUET_DATA_PATH.name)}",
},
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

import tempfile

import pyarrow as pa
import pyarrow.parquet as papq


def generate_parquet_file():
"""Generate a small Parquet file and return the path to the file"""
tmpfile = tempfile.NamedTemporaryFile()
tbl = pa.Table.from_pydict({"foo": [1, 2, 3]})
papq.write_table(tbl, tmpfile.name)
return tmpfile
19 changes: 19 additions & 0 deletions tests/integration/docker-compose/retry_server/utils/responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from fastapi import Response


def get_response(bucket_name: str, status_code: int, num_errors: int, item_id: str):
return Response(
status_code=status_code,
content=f"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code></Code>
<Message>This is a mock error message</Message>
<Resource>/{bucket_name}/{status_code}/{num_errors}/{item_id}</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>""",
headers={
"Content-Type": "application/xml",
},
)
8 changes: 8 additions & 0 deletions tests/integration/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ def nginx_config() -> tuple[str, pathlib.Path]:
)


@pytest.fixture(scope="session")
def retry_server_s3_config() -> daft.io.IOConfig:
"""Returns the URL to the local retry_server fixture"""
return daft.io.IOConfig(
s3=daft.io.S3Config(endpoint_url="http://127.0.0.1:8001"),
)


###
# Mounting utilities: mount data and perform cleanup at the end of each test
###
Expand Down
47 changes: 47 additions & 0 deletions tests/integration/io/parquet/test_reads_local_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

import uuid

import pytest

from daft.table import Table

BUCKETS = ["head-retries-bucket", "get-retries-bucket"]


@pytest.mark.integration()
@pytest.mark.parametrize("status_code", [400, 403, 404])
@pytest.mark.parametrize("bucket", BUCKETS)
def test_non_retryable_errors(retry_server_s3_config, status_code: int, bucket: str):
data_path = f"s3://{bucket}/{status_code}/1/{uuid.uuid4()}"

with pytest.raises(ValueError):
Table.read_parquet(data_path, io_config=retry_server_s3_config)


@pytest.mark.integration()
@pytest.mark.parametrize(
"status_code",
[
500,
503,
504,
# TODO: We should also retry correctly on these error codes (PyArrow does retry appropriately here)
# These are marked as retryable error codes, see:
# https://github.com/aws/aws-sdk-cpp/blob/8a9550f1db04b33b3606602ba181d68377f763df/src/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h#L113-L131
# 509,
# 408,
# 429,
# 419,
# 440,
# 598,
# 599,
],
)
@pytest.mark.parametrize("bucket", BUCKETS)
def test_retryable_errors(retry_server_s3_config, status_code: int, bucket: str):
# By default the SDK retries 3 times, so we should be able to tolerate NUM_ERRORS=2
NUM_ERRORS = 2
data_path = f"s3://{bucket}/{status_code}/{NUM_ERRORS}/{uuid.uuid4()}"

Table.read_parquet(data_path, io_config=retry_server_s3_config)
Loading