Skip to content

Commit

Permalink
[CHORE] Add s3 fixtures for retrying logic (#1206)
Browse files Browse the repository at this point in the history
Adds FastAPI fixtures for testing s3 error handling and retry logic

The current server serves two buckets:

1. `s3://head-retries-bucket/{status_code}/{num_errors}/{item_id}`: for
a given `item_id`, will throw the status code `num_errors` times on HEAD
requests before giving a successful response
2. `s3://get-retries-bucket/{status_code}/{num_errors}/{item_id}`: for a
given `item_id` and byte-range, will throw the status code `num_errors`
times on ranged GET requests before giving a successful response

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Aug 1, 2023
1 parent 7a78bd1 commit e95acae
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ jobs:
matrix:
python-version: ['3.7']
daft-runner: [py, ray]
# These permissions are needed to interact with GitHub's OIDC Token endpoint.
# This is used in the step "Assume GitHub Actions AWS Credentials"
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -242,6 +247,12 @@ jobs:
run: |
mkdir -p /tmp/daft-integration-testing/nginx
chmod +rw /tmp/daft-integration-testing/nginx
- name: Assume GitHub Actions AWS Credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-region: us-west-2
role-to-assume: ${{ secrets.ACTIONS_AWS_ROLE_ARN }}
role-session-name: DaftPythonPackageGitHubWorkflow
- name: Spin up IO services
uses: isbang/[email protected]
with:
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/docker-compose/Dockerfile.s3_retry_server
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.9

WORKDIR /code

COPY ./retry_server/retry-server-requirements.txt /code/requirements.txt

RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt

COPY ./retry_server /code/app

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
9 changes: 9 additions & 0 deletions tests/integration/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,12 @@ services:
- /tmp/daft-integration-testing/nginx:/app/static:rw
ports:
- 8080:8080

# Custom FastAPI server which mocks an s3 endpoint and serves retryable objects
s3-retry-server:
image: python-s3-retry-server
build:
context: .
dockerfile: Dockerfile.s3_retry_server
ports:
- 8001:8000
Empty file.
23 changes: 23 additions & 0 deletions tests/integration/docker-compose/retry_server/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""This file defines a FastAPI server that emulates an S3 implementation that serves Parquet files
This S3 implementation serves a small Parquet file at the location: `s3://{bucket-name}/{status_code}/{num_errors}/{item_id}`
1. `status_code` is the HTTP status code it returns when S3 clients hit that endpoint
2. `num_errors` is the number of times it throws that error before successfully returning data for a given requested byte range
3. `item_id` is an ID generated by the clients to uniquely identify one attempt at retrieving this Parquet file
We provide two different buckets, with slightly different behavior:
1. "head-retries-bucket": this bucket throws errors during HEAD operations
2. "get-retries-bucket": this bucket throws errors during the ranged GET operations
"""

from __future__ import annotations

from fastapi import FastAPI

from .routers import get_retries_bucket, head_retries_bucket

app = FastAPI()
app.include_router(get_retries_bucket.router)
app.include_router(head_retries_bucket.router)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
annotated-types==0.5.0
anyio==3.7.1
click==8.1.6
exceptiongroup==1.1.2
fastapi==0.100.1
h11==0.14.0
httptools==0.6.0
idna==3.4
pydantic==2.1.1
pydantic_core==2.4.0
python-dotenv==1.0.0
PyYAML==6.0.1
sniffio==1.3.0
starlette==0.27.0
typing_extensions==4.7.1
uvicorn==0.23.2
uvloop==0.17.0
watchfiles==0.19.0
websockets==11.0.3
pyarrow
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

import os
from typing import Annotated

from fastapi import APIRouter, Header, Response

from ..utils.parquet_generation import generate_parquet_file
from ..utils.responses import get_response

BUCKET_NAME = "get-retries-bucket"
OBJECT_KEY_URL = "/{status_code}/{num_errors}/{item_id}"
MOCK_PARQUET_DATA_PATH = generate_parquet_file()

ITEM_ID_TO_NUM_RETRIES: dict[tuple[str, tuple[int, int]], int] = {}

router = APIRouter(prefix=f"/{BUCKET_NAME}")


@router.head(OBJECT_KEY_URL)
async def bucket_head(status_code: int, num_errors: int, item_id: str):
return Response(
headers={
"Content-Length": str(os.path.getsize(MOCK_PARQUET_DATA_PATH.name)),
"Content-Type": "binary/octet-stream",
"Accept-Ranges": "bytes",
},
)


@router.get(OBJECT_KEY_URL)
async def retryable_bucket_get(
status_code: int,
num_errors: int,
item_id: str,
range: Annotated[str, Header()],
):
# If we've only seen this range request <= num_errors times, we throw an error
start, end = (int(i) for i in range[len("bytes=") :].split("-"))
key = (item_id, (start, end))
if key not in ITEM_ID_TO_NUM_RETRIES:
ITEM_ID_TO_NUM_RETRIES[key] = 1
else:
ITEM_ID_TO_NUM_RETRIES[key] += 1
if ITEM_ID_TO_NUM_RETRIES[key] <= num_errors:
return get_response(BUCKET_NAME, status_code, num_errors, item_id)

with open(MOCK_PARQUET_DATA_PATH.name, "rb") as f:
f.seek(start)
data = f.read(end - start + 1)

return Response(
status_code=206,
content=data,
headers={
"Content-Length": str(len(data)),
"Content-Type": "binary/octet-stream",
"Content-Range": f"bytes {start}-{end}/{os.path.getsize(MOCK_PARQUET_DATA_PATH.name)}",
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

import os
from typing import Annotated

from fastapi import APIRouter, Header, Response

from ..utils.parquet_generation import generate_parquet_file
from ..utils.responses import get_response

BUCKET_NAME = "head-retries-bucket"
OBJECT_KEY_URL = "/{status_code}/{num_errors}/{item_id}"
MOCK_PARQUET_DATA_PATH = generate_parquet_file()

ITEM_ID_TO_NUM_RETRIES: dict[str, int] = {}

router = APIRouter(prefix=f"/{BUCKET_NAME}")


@router.head(OBJECT_KEY_URL)
async def retryable_bucket_head(status_code: int, num_errors: int, item_id: str):
"""Reading of Parquet starts with a head request, which potentially must be retried as well"""
key = item_id
if key not in ITEM_ID_TO_NUM_RETRIES:
ITEM_ID_TO_NUM_RETRIES[key] = 1
else:
ITEM_ID_TO_NUM_RETRIES[key] += 1
if ITEM_ID_TO_NUM_RETRIES[key] <= num_errors:
return get_response(BUCKET_NAME, status_code, num_errors, item_id)

return Response(
headers={
"Content-Length": str(os.path.getsize(MOCK_PARQUET_DATA_PATH.name)),
"Content-Type": "binary/octet-stream",
"Accept-Ranges": "bytes",
},
)


@router.get(OBJECT_KEY_URL)
async def bucket_get(
status_code: int,
num_errors: int,
item_id: str,
range: Annotated[str, Header()],
):
start, end = (int(i) for i in range[len("bytes=") :].split("-"))
with open(MOCK_PARQUET_DATA_PATH.name, "rb") as f:
f.seek(start)
data = f.read(end - start + 1)

return Response(
status_code=206,
content=data,
headers={
"Content-Length": str(len(data)),
"Content-Type": "binary/octet-stream",
"Content-Range": f"bytes {start}-{end}/{os.path.getsize(MOCK_PARQUET_DATA_PATH.name)}",
},
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

import tempfile

import pyarrow as pa
import pyarrow.parquet as papq


def generate_parquet_file():
"""Generate a small Parquet file and return the path to the file"""
tmpfile = tempfile.NamedTemporaryFile()
tbl = pa.Table.from_pydict({"foo": [1, 2, 3]})
papq.write_table(tbl, tmpfile.name)
return tmpfile
19 changes: 19 additions & 0 deletions tests/integration/docker-compose/retry_server/utils/responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from fastapi import Response


def get_response(bucket_name: str, status_code: int, num_errors: int, item_id: str):
return Response(
status_code=status_code,
content=f"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code></Code>
<Message>This is a mock error message</Message>
<Resource>/{bucket_name}/{status_code}/{num_errors}/{item_id}</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>""",
headers={
"Content-Type": "application/xml",
},
)
8 changes: 8 additions & 0 deletions tests/integration/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ def nginx_config() -> tuple[str, pathlib.Path]:
)


@pytest.fixture(scope="session")
def retry_server_s3_config() -> daft.io.IOConfig:
"""Returns the URL to the local retry_server fixture"""
return daft.io.IOConfig(
s3=daft.io.S3Config(endpoint_url="http://127.0.0.1:8001"),
)


###
# Mounting utilities: mount data and perform cleanup at the end of each test
###
Expand Down
47 changes: 47 additions & 0 deletions tests/integration/io/parquet/test_reads_local_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

import uuid

import pytest

from daft.table import Table

BUCKETS = ["head-retries-bucket", "get-retries-bucket"]


@pytest.mark.integration()
@pytest.mark.parametrize("status_code", [400, 403, 404])
@pytest.mark.parametrize("bucket", BUCKETS)
def test_non_retryable_errors(retry_server_s3_config, status_code: int, bucket: str):
data_path = f"s3://{bucket}/{status_code}/1/{uuid.uuid4()}"

with pytest.raises(ValueError):
Table.read_parquet(data_path, io_config=retry_server_s3_config)


@pytest.mark.integration()
@pytest.mark.parametrize(
"status_code",
[
500,
503,
504,
# TODO: We should also retry correctly on these error codes (PyArrow does retry appropriately here)
# These are marked as retryable error codes, see:
# https://github.com/aws/aws-sdk-cpp/blob/8a9550f1db04b33b3606602ba181d68377f763df/src/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h#L113-L131
# 509,
# 408,
# 429,
# 419,
# 440,
# 598,
# 599,
],
)
@pytest.mark.parametrize("bucket", BUCKETS)
def test_retryable_errors(retry_server_s3_config, status_code: int, bucket: str):
# By default the SDK retries 3 times, so we should be able to tolerate NUM_ERRORS=2
NUM_ERRORS = 2
data_path = f"s3://{bucket}/{status_code}/{NUM_ERRORS}/{uuid.uuid4()}"

Table.read_parquet(data_path, io_config=retry_server_s3_config)

0 comments on commit e95acae

Please sign in to comment.