Skip to content

Commit

Permalink
[CHORE] Add status code to IO integration tests (#1356)
Browse files Browse the repository at this point in the history
Some retry logic only triggers on the string status code in the
`<Code></Code>` section of the XML response from S3, not on the actual
int status code itself

This PR adds the ability to supply this string status code to our local
test fixtures so we can test for these cases.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Sep 7, 2023
1 parent fa0af29 commit 6f6ca88
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..utils.responses import get_response

BUCKET_NAME = "get-retries-parquet-bucket"
OBJECT_KEY_URL = "/{status_code}/{num_errors}/{item_id}"
OBJECT_KEY_URL = "/{status_code}/{status_code_str}/{num_errors}/{item_id}"
MOCK_PARQUET_DATA_PATH = generate_parquet_file()

ITEM_ID_TO_NUM_RETRIES: dict[tuple[str, tuple[int, int]], int] = {}
Expand All @@ -33,6 +33,7 @@ async def bucket_head(status_code: int, num_errors: int, item_id: str):
async def retryable_bucket_get(
request: Request,
status_code: int,
status_code_str: str,
num_errors: int,
item_id: str,
range: Annotated[str, Header()],
Expand All @@ -45,7 +46,7 @@ async def retryable_bucket_get(
else:
ITEM_ID_TO_NUM_RETRIES[key] += 1
if ITEM_ID_TO_NUM_RETRIES[key] <= num_errors:
return get_response(request.url, status_code)
return get_response(request.url, status_code, status_code_str)

with open(MOCK_PARQUET_DATA_PATH.name, "rb") as f:
f.seek(start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..utils.responses import get_response

BUCKET_NAME = "head-retries-parquet-bucket"
OBJECT_KEY_URL = "/{status_code}/{num_errors}/{item_id}"
OBJECT_KEY_URL = "/{status_code}/{status_code_str}/{num_errors}/{item_id}"
MOCK_PARQUET_DATA_PATH = generate_parquet_file()

ITEM_ID_TO_NUM_RETRIES: dict[str, int] = {}
Expand All @@ -19,15 +19,17 @@


@app.head(OBJECT_KEY_URL)
async def retryable_bucket_head(request: Request, status_code: int, num_errors: int, item_id: str):
async def retryable_bucket_head(
request: Request, status_code: int, status_code_str: str, num_errors: int, item_id: str
):
"""Reading of Parquet starts with a head request, which potentially must be retried as well"""
key = item_id
if key not in ITEM_ID_TO_NUM_RETRIES:
ITEM_ID_TO_NUM_RETRIES[key] = 1
else:
ITEM_ID_TO_NUM_RETRIES[key] += 1
if ITEM_ID_TO_NUM_RETRIES[key] <= num_errors:
return get_response(request.url, status_code)
return get_response(request.url, status_code, status_code_str)

return Response(
headers={
Expand All @@ -41,6 +43,7 @@ async def retryable_bucket_head(request: Request, status_code: int, num_errors:
@app.get(OBJECT_KEY_URL)
async def bucket_get(
status_code: int,
status_code_str: str,
num_errors: int,
item_id: str,
range: Annotated[str, Header()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


def rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded) -> Response:
return get_response(request.url, status_code=503)
return get_response(request.url, status_code=503, status_code_str="SlowDown")


limiter = Limiter(key_func=get_remote_address)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from fastapi import Response


def get_response(url: str, status_code: int):
def get_response(url: str, status_code: int, code_str: str):
return Response(
status_code=status_code,
content=f"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code></Code>
<Message>This is a mock error message</Message>
<Code>{code_str}</Code>
<Message>{code_str}</Message>
<Resource>{url}</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>""",
Expand Down
22 changes: 13 additions & 9 deletions tests/integration/io/parquet/test_reads_local_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@


@pytest.mark.integration()
@pytest.mark.parametrize("status_code", [400, 403, 404])
@pytest.mark.parametrize("status_code", [(400, "foo"), (403, "foo"), (404, "foo")])
@pytest.mark.parametrize("bucket", BUCKETS)
def test_non_retryable_errors(retry_server_s3_config, status_code: int, bucket: str):
data_path = f"s3://{bucket}/{status_code}/1/{uuid.uuid4()}"
def test_non_retryable_errors(retry_server_s3_config, status_code: tuple[int, str], bucket: str):
status_code, status_code_str = status_code
data_path = f"s3://{bucket}/{status_code}/{status_code_str}/1/{uuid.uuid4()}"
with pytest.raises((FileNotFoundError, ValueError)):
Table.read_parquet(data_path, io_config=retry_server_s3_config)

Expand All @@ -22,10 +23,12 @@ def test_non_retryable_errors(retry_server_s3_config, status_code: int, bucket:
@pytest.mark.parametrize(
"status_code",
[
500,
503,
504,
# TODO: [IO-RETRIES] We should also retry correctly on these error codes (PyArrow does retry appropriately here)
# NOTE: Certain errors are only correctly retried when the correct str code is returned in the XML response
# For these cases, we can pass them into our custom S3 endpoint so that it will echo it in the XML response on return
(500, "InternalError"),
(503, "SlowDown"),
(504, "PLACEHOLDER_CODE - should not matter"),
# TODO: [IO-RETRIES] We should also retry correctly on these error codes
# These are marked as retryable error codes, see:
# https://github.com/aws/aws-sdk-cpp/blob/8a9550f1db04b33b3606602ba181d68377f763df/src/aws-cpp-sdk-core/include/aws/core/http/HttpResponse.h#L113-L131
# 509,
Expand All @@ -38,10 +41,11 @@ def test_non_retryable_errors(retry_server_s3_config, status_code: int, bucket:
],
)
@pytest.mark.parametrize("bucket", BUCKETS)
def test_retryable_errors(retry_server_s3_config, status_code: int, bucket: str):
def test_retryable_errors(retry_server_s3_config, status_code: tuple[int, str], bucket: str):
# By default the SDK retries 3 times, so we should be able to tolerate NUM_ERRORS=2
# Tweak this variable appropriately to match the retry policy
NUM_ERRORS = 2
data_path = f"s3://{bucket}/{status_code}/{NUM_ERRORS}/{uuid.uuid4()}"
status_code, status_code_str = status_code
data_path = f"s3://{bucket}/{status_code}/{status_code_str}/{NUM_ERRORS}/{uuid.uuid4()}"

Table.read_parquet(data_path, io_config=retry_server_s3_config)

0 comments on commit 6f6ca88

Please sign in to comment.