Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose retry params in partition via api #3724

Merged
merged 11 commits into from
Oct 22, 2024
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## 0.16.1-dev0
## 0.16.1-dev1

### Enhancements

### Features
* **Request retry parameters in `partition_via_api` function.** Expose retry-mechanism related parameters in the `partition_via_api` function to allow users to configure the retry behavior of the API requests.

### Fixes

Expand Down
108 changes: 107 additions & 1 deletion test_unstructured/partition/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@
import os
import pathlib
from typing import Any
from unittest.mock import Mock

import pytest
import requests
from partition.api import DEFAULT_RETRIES_MAX_INTERVAL_SEC
from unstructured_client.general import General
from unstructured_client.models import shared
from unstructured_client.models.operations import PartitionRequest
from unstructured_client.models.shared import PartitionParameters
from unstructured_client.utils import retries

from unstructured.documents.elements import ElementType, NarrativeText
from unstructured.partition.api import partition_multiple_via_api, partition_via_api
from unstructured.partition.api import (
DEFAULT_RETRIES_MAX_ELAPSED_TIME_SEC,
get_retries_config,
partition_multiple_via_api,
partition_via_api,
)

from ..unit_utils import ANY, FixtureRequest, example_doc_path, method_mock

Expand Down Expand Up @@ -180,13 +188,110 @@ def test_partition_via_api_image_block_extraction():
assert isinstance(image_data, bytes)


@pytest.mark.skipif(not is_in_ci, reason="Skipping test run outside of CI")
@pytest.mark.skipif(skip_not_on_main, reason="Skipping test run outside of main branch")
def test_partition_via_api_retries_config():
elements = partition_via_api(
filename=example_doc_path("pdf/embedded-images-tables.pdf"),
strategy="fast",
api_key=get_api_key(),
# The url has changed since the 06/24 API release while the sdk defaults to the old url
api_url=API_URL,
retries_initial_interval=5,
retries_max_interval=15,
retries_max_elapsed_time=100,
retries_connection_errors=True,
retries_exponent=1.5,
)

assert len(elements) > 0


# Note(austin) - This test is way too noisy against the hosted api
# def test_partition_via_api_invalid_request_data_kwargs():
# filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "layout-parser-paper-fast.pdf")
# with pytest.raises(SDKError):
# partition_via_api(filename=filename, strategy="not_a_strategy")


def test_retries_config_with_parameters_set():
sdk = Mock()
retries_config = get_retries_config(
retries_connection_errors=True,
retries_exponent=1.75,
retries_initial_interval=20,
retries_max_elapsed_time=1000,
retries_max_interval=100,
sdk=sdk,
)

assert retries_config.retry_connection_errors
assert retries_config.backoff.exponent == 1.75
assert retries_config.backoff.initial_interval == 20
assert retries_config.backoff.max_elapsed_time == 1000
assert retries_config.backoff.max_interval == 100


def test_retries_config_none_parameters_return_empty_config():
sdk = Mock()
retries_config = get_retries_config(
retries_connection_errors=None,
retries_exponent=None,
retries_initial_interval=None,
retries_max_elapsed_time=None,
retries_max_interval=None,
sdk=sdk,
)

assert retries_config is None


def test_retries_config_with_no_parameters_set():
retry_config = retries.RetryConfig(
"backoff", retries.BackoffStrategy(3000, 720000, 1.88, 1800000), True
)
sdk = Mock()
sdk.sdk_configuration.retry_config = retry_config
retries_config = get_retries_config(
retries_connection_errors=True,
retries_exponent=None,
retries_initial_interval=None,
retries_max_elapsed_time=None,
retries_max_interval=None,
sdk=sdk,
)

assert retries_config.retry_connection_errors
assert retries_config.backoff.exponent == 1.88
assert retries_config.backoff.initial_interval == 3000
assert retries_config.backoff.max_elapsed_time == 1800000
assert retries_config.backoff.max_interval == 720000


def test_retries_config_cascade():
# notice max_interval is set to 0 which is incorrect - so the DEFAULT_RETRIES_MAX_INTERVAL_SEC
# should be used
retry_config = retries.RetryConfig(
"backoff", retries.BackoffStrategy(3000, 0, 1.88, None), True
)
sdk = Mock()
sdk.sdk_configuration.retry_config = retry_config
retries_config = get_retries_config(
retries_connection_errors=False,
retries_exponent=1.75,
retries_initial_interval=20,
retries_max_elapsed_time=None,
retries_max_interval=None,
sdk=sdk,
)

assert not retries_config.retry_connection_errors
assert retries_config.backoff.exponent == 1.75
assert retries_config.backoff.initial_interval == 20
assert retries_config.backoff.max_elapsed_time == DEFAULT_RETRIES_MAX_ELAPSED_TIME_SEC
assert retries_config.backoff.max_interval == DEFAULT_RETRIES_MAX_INTERVAL_SEC


def test_partition_multiple_via_api_with_single_filename(request: FixtureRequest):
partition_mock_ = method_mock(
request, requests, "post", return_value=FakeResponse(status_code=200)
Expand Down Expand Up @@ -522,4 +627,5 @@ def expected_call_():
xml_keep_tags=False,
)
),
None, # retries kwarg
]
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.16.1-dev0" # pragma: no cover
__version__ = "0.16.1-dev1" # pragma: no cover
133 changes: 132 additions & 1 deletion unstructured/partition/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@
import requests
from unstructured_client import UnstructuredClient
from unstructured_client.models import operations, shared
from unstructured_client.utils import retries

from unstructured.documents.elements import Element
from unstructured.logger import logger
from unstructured.partition.common.common import exactly_one
from unstructured.staging.base import elements_from_dicts, elements_from_json

# Default retry configuration taken from the client code
DEFAULT_RETRIES_INITIAL_INTERVAL_SEC = 3000
DEFAULT_RETRIES_MAX_INTERVAL_SEC = 720000
DEFAULT_RETRIES_EXPONENT = 1.5
DEFAULT_RETRIES_MAX_ELAPSED_TIME_SEC = 1800000
DEFAULT_RETRIES_CONNECTION_ERRORS = True


def partition_via_api(
filename: Optional[str] = None,
Expand All @@ -21,6 +29,11 @@ def partition_via_api(
api_url: str = "https://api.unstructured.io/general/v0/general",
api_key: str = "",
metadata_filename: Optional[str] = None,
retries_initial_interval: [int] = None,
retries_max_interval: Optional[int] = None,
retries_exponent: Optional[float] = None,
retries_max_elapsed_time: Optional[int] = None,
retries_connection_errors: Optional[bool] = None,
**request_kwargs: Any,
) -> list[Element]:
"""Partitions a document using the Unstructured REST API. This is equivalent to
Expand All @@ -44,6 +57,21 @@ def partition_via_api(
The URL for the Unstructured API. Defaults to the hosted Unstructured API.
api_key
The API key to pass to the Unstructured API.
retries_initial_interval
Defines the time interval (in seconds) to wait before the first retry in case of a request
failure. Defaults to 3000. If set should be > 0.
retries_max_interval
Defines the maximum time interval (in seconds) to wait between retries (the interval
between retries is increased as using exponential increase algorithm
- this setting limits it). Defaults to 720000. If set should be > 0.
retries_exponent
Defines the exponential factor to increase the interval between retries. Defaults to 1.5.
If set should be > 0.0.
retries_max_elapsed_time
Defines the maximum time (in seconds) to wait for retries. If exceeded, the original
exception is raised. Defaults to 1800000. If set should be > 0.
retries_connection_errors
Defines whether to retry on connection errors. Defaults to True.
request_kwargs
Additional parameters to pass to the data field of the request to the Unstructured API.
For example the `strategy` parameter.
Expand Down Expand Up @@ -87,7 +115,19 @@ def partition_via_api(
partition_parameters=shared.PartitionParameters(files=files, **request_kwargs)
)

response = sdk.general.partition(request=req)
retries_config = get_retries_config(
retries_connection_errors=retries_connection_errors,
retries_exponent=retries_exponent,
retries_initial_interval=retries_initial_interval,
retries_max_elapsed_time=retries_max_elapsed_time,
retries_max_interval=retries_max_interval,
sdk=sdk,
)

response = sdk.general.partition(
request=req,
retries=retries_config,
)

if response.status_code == 200:
return elements_from_json(text=response.raw_response.text)
Expand All @@ -97,6 +137,97 @@ def partition_via_api(
)


def get_retries_config(
retries_connection_errors: Optional[bool],
retries_exponent: Optional[float],
retries_initial_interval: Optional[int],
retries_max_elapsed_time: Optional[int],
retries_max_interval: Optional[int],
sdk: UnstructuredClient,
) -> Optional[retries.RetryConfig]:
"""Constructs a RetryConfig object from the provided parameters. If any of the parameters
are None, the default values are taken from the SDK configuration or the default constants.

If all parameters are None, returns None (and the SDK-managed defaults are used within the
client)

The solution is not perfect as the RetryConfig object does not include the defaults by
itself so we might need to construct it basing on our defaults.

Parameters
----------
retries_connection_errors
Defines whether to retry on connection errors. If not set the
DEFAULT_RETRIES_CONNECTION_ERRORS constant is used.
retries_exponent
Defines the exponential factor to increase the interval between retries.
If set, should be > 0.0 (otherwise the DEFAULT_RETRIES_EXPONENT constant is used)
retries_initial_interval
Defines the time interval to wait before the first retry in case of a request failure.
If set, should be > 0 (otherwise the DEFAULT_RETRIES_INITIAL_INTERVAL_SEC constant is used)
retries_max_elapsed_time
Defines the maximum time to wait for retries. If exceeded, the original exception is raised.
If set, should be > 0 (otherwise the DEFAULT_RETRIES_MAX_ELAPSED_TIME_SEC constant is used)
retries_max_interval
Defines the maximum time interval to wait between retries. If set, should be > 0
(otherwise the DEFAULT_RETRIES_MAX_INTERVAL_SEC constant is used)
sdk
The UnstructuredClient object to take the default values from.
"""
retries_config = None
sdk_default_retries_config = sdk.sdk_configuration.retry_config
if any(
setting is not None
for setting in (
retries_initial_interval,
retries_max_interval,
retries_exponent,
retries_max_elapsed_time,
retries_connection_errors,
)
):

def get_backoff_default(setting_name: str, default_value: Any) -> Any:
if sdk_default_retries_config: # noqa: SIM102
if setting_value := getattr(sdk_default_retries_config.backoff, setting_name):
return setting_value
return default_value

default_retries_connneciton_errors = (
sdk_default_retries_config.retry_connection_errors
if sdk_default_retries_config.retry_connection_errors is not None
else DEFAULT_RETRIES_CONNECTION_ERRORS
)

backoff_strategy = retries.BackoffStrategy(
initial_interval=(
retries_initial_interval
or get_backoff_default("initial_interval", DEFAULT_RETRIES_INITIAL_INTERVAL_SEC)
),
max_interval=(
retries_max_interval
or get_backoff_default("max_interval", DEFAULT_RETRIES_MAX_INTERVAL_SEC)
),
exponent=(
retries_exponent or get_backoff_default("exponent", DEFAULT_RETRIES_EXPONENT)
),
max_elapsed_time=(
retries_max_elapsed_time
or get_backoff_default("max_elapsed_time", DEFAULT_RETRIES_MAX_ELAPSED_TIME_SEC)
),
)
retries_config = retries.RetryConfig(
strategy="backoff",
backoff=backoff_strategy,
retry_connection_errors=(
retries_connection_errors
if retries_connection_errors is not None
else default_retries_connneciton_errors
),
)
return retries_config


def partition_multiple_via_api(
filenames: Optional[list[str]] = None,
content_types: Optional[list[str]] = None,
Expand Down