diff --git a/daft/exceptions.py b/daft/exceptions.py index 6909d9300f..121b06938b 100644 --- a/daft/exceptions.py +++ b/daft/exceptions.py @@ -52,3 +52,19 @@ class SocketError(DaftTransientError): """ pass + + +class ThrottleError(DaftTransientError): + """Daft Throttle Error + Daft client had a throttle error while making request to server. + """ + + pass + + +class MiscTransientError(DaftTransientError): + """Daft Misc Transient Error + Daft client had a Misc Transient Error while making request to server. + """ + + pass diff --git a/src/common/error/src/error.rs b/src/common/error/src/error.rs index c3ea90f5f2..0513d3e112 100644 --- a/src/common/error/src/error.rs +++ b/src/common/error/src/error.rs @@ -34,6 +34,10 @@ pub enum DaftError { ByteStreamError(GenericError), #[error("SocketError {0}")] SocketError(GenericError), + #[error("ThrottledIo {0}")] + ThrottledIo(GenericError), + #[error("MiscTransient {0}")] + MiscTransient(GenericError), #[error("DaftError::External {0}")] External(GenericError), #[error("DaftError::SerdeJsonError {0}")] diff --git a/src/common/error/src/python.rs b/src/common/error/src/python.rs index 917dafdc78..08ad13d8d6 100644 --- a/src/common/error/src/python.rs +++ b/src/common/error/src/python.rs @@ -8,6 +8,8 @@ import_exception!(daft.exceptions, ConnectTimeoutError); import_exception!(daft.exceptions, ReadTimeoutError); import_exception!(daft.exceptions, ByteStreamError); import_exception!(daft.exceptions, SocketError); +import_exception!(daft.exceptions, ThrottleError); +import_exception!(daft.exceptions, MiscTransientError); impl std::convert::From for pyo3::PyErr { fn from(err: DaftError) -> Self { @@ -21,6 +23,8 @@ impl std::convert::From for pyo3::PyErr { DaftError::ReadTimeout(err) => ReadTimeoutError::new_err(err.to_string()), DaftError::ByteStreamError(err) => ByteStreamError::new_err(err.to_string()), DaftError::SocketError(err) => SocketError::new_err(err.to_string()), + DaftError::ThrottledIo(err) => ThrottleError::new_err(err.to_string()), + DaftError::MiscTransient(err) => MiscTransientError::new_err(err.to_string()), _ => DaftCoreException::new_err(err.to_string()), } } diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 6fdaac2368..8d87f5b767 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -97,6 +97,12 @@ pub enum Error { ))] SocketError { path: String, source: DynError }, + #[snafu(display("Throttled when trying to read {}\nDetails:\n{:?}", path, source))] + Throttled { path: String, source: DynError }, + + #[snafu(display("Misc Transient error trying to read {}\nDetails:\n{:?}", path, source))] + MiscTransient { path: String, source: DynError }, + #[snafu(display("Unable to convert URL \"{}\" to path", path))] InvalidUrl { path: String, @@ -150,6 +156,8 @@ impl From for DaftError { ReadTimeout { .. } => Self::ReadTimeout(err.into()), UnableToReadBytes { .. } => Self::ByteStreamError(err.into()), SocketError { .. } => Self::SocketError(err.into()), + Throttled { .. } => Self::ThrottledIo(err.into()), + MiscTransient { .. } => Self::MiscTransient(err.into()), // We have to repeat everything above for the case we have an Arc since we can't move the error. CachedError { ref source } => match source.as_ref() { NotFound { path, source: _ } => Self::FileNotFound { @@ -160,6 +168,8 @@ impl From for DaftError { ReadTimeout { .. } => Self::ReadTimeout(err.into()), UnableToReadBytes { .. } => Self::ByteStreamError(err.into()), SocketError { .. } => Self::SocketError(err.into()), + Throttled { .. } => Self::ThrottledIo(err.into()), + MiscTransient { .. } => Self::MiscTransient(err.into()), _ => Self::External(err.into()), }, _ => Self::External(err.into()), diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index 2766011ae7..e6eb829a78 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -10,8 +10,10 @@ use aws_credential_types::{ cache::{CredentialsCache, ProvideCachedCredentials, SharedCredentialsCache}, provider::error::CredentialsError, }; -use aws_sdk_s3 as s3; -use aws_sdk_s3::{operation::put_object::PutObjectError, primitives::ByteStreamError}; +use aws_sdk_s3::{ + self as s3, error::ProvideErrorMetadata, operation::put_object::PutObjectError, + primitives::ByteStreamError, +}; use aws_sig_auth::signer::SigningRequirements; use aws_smithy_async::rt::sleep::TokioSleep; use common_io_config::S3Config; @@ -118,10 +120,51 @@ enum Error { UploadsCannotBeAnonymous {}, } +/// List of AWS error codes that are due to throttling +/// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList +const THROTTLING_ERRORS: &[&str] = &[ + "Throttling", + "ThrottlingException", + "ThrottledException", + "RequestThrottledException", + "TooManyRequestsException", + "ProvisionedThroughputExceededException", + "TransactionInProgressException", + "RequestLimitExceeded", + "BandwidthLimitExceeded", + "LimitExceededException", + "RequestThrottled", + "SlowDown", + "PriorRequestNotComplete", + "EC2ThrottledException", +]; + impl From for super::Error { fn from(error: Error) -> Self { use Error::*; + fn classify_unhandled_error< + E: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static, + >( + path: String, + err: E, + ) -> super::Error { + match err.code() { + Some("InternalError") => super::Error::MiscTransient { + path, + source: err.into(), + }, + Some(code) if THROTTLING_ERRORS.contains(&code) => super::Error::Throttled { + path, + source: err.into(), + }, + _ => super::Error::Unhandled { + path, + msg: DisplayErrorContext(err).to_string(), + }, + } + } + match error { UnableToOpenFile { path, source } => match source { SdkError::TimeoutError(_) => Self::ReadTimeout { @@ -140,25 +183,20 @@ impl From for super::Error { source: source.into(), } } else { - Self::UnableToOpenFile { + // who knows what happened here during dispatch, let's just tell the user it's transient + Self::MiscTransient { path, source: source.into(), } } } + _ => match source.into_service_error() { GetObjectError::NoSuchKey(no_such_key) => Self::NotFound { path, source: no_such_key.into(), }, - GetObjectError::Unhandled(v) => Self::Unhandled { - path, - msg: DisplayErrorContext(v).to_string(), - }, - err => Self::UnableToOpenFile { - path, - source: err.into(), - }, + err => classify_unhandled_error(path, err), }, }, UnableToHeadFile { path, source } => match source { @@ -178,7 +216,8 @@ impl From for super::Error { source: source.into(), } } else { - Self::UnableToOpenFile { + // who knows what happened here during dispatch, let's just tell the user it's transient + Self::MiscTransient { path, source: source.into(), } @@ -189,14 +228,7 @@ impl From for super::Error { path, source: no_such_key.into(), }, - HeadObjectError::Unhandled(v) => Self::Unhandled { - path, - msg: DisplayErrorContext(v).to_string(), - }, - err => Self::UnableToOpenFile { - path, - source: err.into(), - }, + err => classify_unhandled_error(path, err), }, }, UnableToListObjects { path, source } => match source { @@ -216,7 +248,8 @@ impl From for super::Error { source: source.into(), } } else { - Self::UnableToOpenFile { + // who knows what happened here during dispatch, let's just tell the user it's transient + Self::MiscTransient { path, source: source.into(), } @@ -227,14 +260,7 @@ impl From for super::Error { path, source: no_such_key.into(), }, - ListObjectsV2Error::Unhandled(v) => Self::Unhandled { - path, - msg: DisplayErrorContext(v).to_string(), - }, - err => Self::UnableToOpenFile { - path, - source: err.into(), - }, + err => classify_unhandled_error(path, err), }, }, InvalidUrl { path, source } => Self::InvalidUrl { path, source },