Skip to content

Commit

Permalink
[CHORE] Classify throttle and internal errors as Retryable in Python (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Sep 25, 2024
1 parent 45e2944 commit a8602a2
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 29 deletions.
16 changes: 16 additions & 0 deletions daft/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,19 @@ class SocketError(DaftTransientError):
"""

pass


class ThrottleError(DaftTransientError):
"""Daft Throttle Error
Daft client had a throttle error while making request to server.
"""

pass


class MiscTransientError(DaftTransientError):
"""Daft Misc Transient Error
Daft client had a Misc Transient Error while making request to server.
"""

pass
4 changes: 4 additions & 0 deletions src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub enum DaftError {
ByteStreamError(GenericError),
#[error("SocketError {0}")]
SocketError(GenericError),
#[error("ThrottledIo {0}")]
ThrottledIo(GenericError),
#[error("MiscTransient {0}")]
MiscTransient(GenericError),
#[error("DaftError::External {0}")]
External(GenericError),
#[error("DaftError::SerdeJsonError {0}")]
Expand Down
4 changes: 4 additions & 0 deletions src/common/error/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import_exception!(daft.exceptions, ConnectTimeoutError);
import_exception!(daft.exceptions, ReadTimeoutError);
import_exception!(daft.exceptions, ByteStreamError);
import_exception!(daft.exceptions, SocketError);
import_exception!(daft.exceptions, ThrottleError);
import_exception!(daft.exceptions, MiscTransientError);

impl std::convert::From<DaftError> for pyo3::PyErr {
fn from(err: DaftError) -> Self {
Expand All @@ -21,6 +23,8 @@ impl std::convert::From<DaftError> for pyo3::PyErr {
DaftError::ReadTimeout(err) => ReadTimeoutError::new_err(err.to_string()),
DaftError::ByteStreamError(err) => ByteStreamError::new_err(err.to_string()),
DaftError::SocketError(err) => SocketError::new_err(err.to_string()),
DaftError::ThrottledIo(err) => ThrottleError::new_err(err.to_string()),
DaftError::MiscTransient(err) => MiscTransientError::new_err(err.to_string()),
_ => DaftCoreException::new_err(err.to_string()),
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ pub enum Error {
))]
SocketError { path: String, source: DynError },

#[snafu(display("Throttled when trying to read {}\nDetails:\n{:?}", path, source))]
Throttled { path: String, source: DynError },

#[snafu(display("Misc Transient error trying to read {}\nDetails:\n{:?}", path, source))]
MiscTransient { path: String, source: DynError },

#[snafu(display("Unable to convert URL \"{}\" to path", path))]
InvalidUrl {
path: String,
Expand Down Expand Up @@ -150,6 +156,8 @@ impl From<Error> for DaftError {
ReadTimeout { .. } => Self::ReadTimeout(err.into()),
UnableToReadBytes { .. } => Self::ByteStreamError(err.into()),
SocketError { .. } => Self::SocketError(err.into()),
Throttled { .. } => Self::ThrottledIo(err.into()),
MiscTransient { .. } => Self::MiscTransient(err.into()),
// We have to repeat everything above for the case we have an Arc since we can't move the error.
CachedError { ref source } => match source.as_ref() {
NotFound { path, source: _ } => Self::FileNotFound {
Expand All @@ -160,6 +168,8 @@ impl From<Error> for DaftError {
ReadTimeout { .. } => Self::ReadTimeout(err.into()),
UnableToReadBytes { .. } => Self::ByteStreamError(err.into()),
SocketError { .. } => Self::SocketError(err.into()),
Throttled { .. } => Self::ThrottledIo(err.into()),
MiscTransient { .. } => Self::MiscTransient(err.into()),
_ => Self::External(err.into()),
},
_ => Self::External(err.into()),
Expand Down
84 changes: 55 additions & 29 deletions src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use aws_credential_types::{
cache::{CredentialsCache, ProvideCachedCredentials, SharedCredentialsCache},
provider::error::CredentialsError,
};
use aws_sdk_s3 as s3;
use aws_sdk_s3::{operation::put_object::PutObjectError, primitives::ByteStreamError};
use aws_sdk_s3::{
self as s3, error::ProvideErrorMetadata, operation::put_object::PutObjectError,
primitives::ByteStreamError,
};
use aws_sig_auth::signer::SigningRequirements;
use aws_smithy_async::rt::sleep::TokioSleep;
use common_io_config::S3Config;
Expand Down Expand Up @@ -118,10 +120,51 @@ enum Error {
UploadsCannotBeAnonymous {},
}

/// List of AWS error codes that are due to throttling
/// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
const THROTTLING_ERRORS: &[&str] = &[
"Throttling",
"ThrottlingException",
"ThrottledException",
"RequestThrottledException",
"TooManyRequestsException",
"ProvisionedThroughputExceededException",
"TransactionInProgressException",
"RequestLimitExceeded",
"BandwidthLimitExceeded",
"LimitExceededException",
"RequestThrottled",
"SlowDown",
"PriorRequestNotComplete",
"EC2ThrottledException",
];

impl From<Error> for super::Error {
fn from(error: Error) -> Self {
use Error::*;

fn classify_unhandled_error<
E: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static,
>(
path: String,
err: E,
) -> super::Error {
match err.code() {
Some("InternalError") => super::Error::MiscTransient {
path,
source: err.into(),
},
Some(code) if THROTTLING_ERRORS.contains(&code) => super::Error::Throttled {
path,
source: err.into(),
},
_ => super::Error::Unhandled {
path,
msg: DisplayErrorContext(err).to_string(),
},
}
}

match error {
UnableToOpenFile { path, source } => match source {
SdkError::TimeoutError(_) => Self::ReadTimeout {
Expand All @@ -140,25 +183,20 @@ impl From<Error> for super::Error {
source: source.into(),
}
} else {
Self::UnableToOpenFile {
// who knows what happened here during dispatch, let's just tell the user it's transient
Self::MiscTransient {
path,
source: source.into(),
}
}
}

_ => match source.into_service_error() {
GetObjectError::NoSuchKey(no_such_key) => Self::NotFound {
path,
source: no_such_key.into(),
},
GetObjectError::Unhandled(v) => Self::Unhandled {
path,
msg: DisplayErrorContext(v).to_string(),
},
err => Self::UnableToOpenFile {
path,
source: err.into(),
},
err => classify_unhandled_error(path, err),
},
},
UnableToHeadFile { path, source } => match source {
Expand All @@ -178,7 +216,8 @@ impl From<Error> for super::Error {
source: source.into(),
}
} else {
Self::UnableToOpenFile {
// who knows what happened here during dispatch, let's just tell the user it's transient
Self::MiscTransient {
path,
source: source.into(),
}
Expand All @@ -189,14 +228,7 @@ impl From<Error> for super::Error {
path,
source: no_such_key.into(),
},
HeadObjectError::Unhandled(v) => Self::Unhandled {
path,
msg: DisplayErrorContext(v).to_string(),
},
err => Self::UnableToOpenFile {
path,
source: err.into(),
},
err => classify_unhandled_error(path, err),
},
},
UnableToListObjects { path, source } => match source {
Expand All @@ -216,7 +248,8 @@ impl From<Error> for super::Error {
source: source.into(),
}
} else {
Self::UnableToOpenFile {
// who knows what happened here during dispatch, let's just tell the user it's transient
Self::MiscTransient {
path,
source: source.into(),
}
Expand All @@ -227,14 +260,7 @@ impl From<Error> for super::Error {
path,
source: no_such_key.into(),
},
ListObjectsV2Error::Unhandled(v) => Self::Unhandled {
path,
msg: DisplayErrorContext(v).to_string(),
},
err => Self::UnableToOpenFile {
path,
source: err.into(),
},
err => classify_unhandled_error(path, err),
},
},
InvalidUrl { path, source } => Self::InvalidUrl { path, source },
Expand Down

0 comments on commit a8602a2

Please sign in to comment.