diff --git a/Cargo.lock b/Cargo.lock index ccd6d54e5a..8b6b2435a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,6 +283,7 @@ dependencies = [ "simdutf8", "streaming-iterator", "strength_reduce", + "thiserror", "tokio", "tokio-util", "zstd 0.12.4", @@ -1317,6 +1318,7 @@ dependencies = [ "pyo3", "regex", "serde_json", + "thiserror", ] [[package]] @@ -3790,6 +3792,7 @@ dependencies = [ "serde", "snap 1.1.1", "streaming-decompression", + "thiserror", "tokio", "xxhash-rust", "zstd 0.12.4", diff --git a/Cargo.toml b/Cargo.toml index 1709a3971c..4046665e77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,6 +164,7 @@ snafu = {version = "0.7.4", features = ["futures"]} sqlparser = "0.49.0" sysinfo = "0.30.12" test-log = "0.2.16" +thiserror = "1.0.63" tiktoken-rs = "0.5.9" tokio = {version = "1.37.0", features = [ "net", diff --git a/src/arrow2/Cargo.toml b/src/arrow2/Cargo.toml index e65e87b87c..0664947831 100644 --- a/src/arrow2/Cargo.toml +++ b/src/arrow2/Cargo.toml @@ -70,6 +70,7 @@ simdutf8 = "0.1.4" streaming-iterator = {version = "0.1", optional = true} # for division/remainder optimization at runtime strength_reduce = {version = "0.2", optional = true} +thiserror = {workspace = true} zstd = {version = "0.12", optional = true} # parquet support diff --git a/src/arrow2/src/error.rs b/src/arrow2/src/error.rs index e5e55b50c6..3b7eaadf3e 100644 --- a/src/arrow2/src/error.rs +++ b/src/arrow2/src/error.rs @@ -1,24 +1,47 @@ //! Defines [`Error`], representing all errors returned by this crate. -use std::fmt::{Debug, Display, Formatter}; + +use thiserror::Error; /// Enum with all errors in this crate. -#[derive(Debug)] +#[derive(Debug, Error)] #[non_exhaustive] pub enum Error { /// Returned when functionality is not yet available. + #[error("Not yet implemented: {0}")] NotYetImplemented(String), + + #[error("{0}")] + Utf8Error(#[from] simdutf8::basic::Utf8Error), + + #[error("{0}")] + StdUtf8Error(#[from] std::str::Utf8Error), + + #[error("{0}")] + TryReserveError(#[from] std::collections::TryReserveError), + /// Wrapper for an error triggered by a dependency + #[error("External error{0}: {1}")] External(String, Box), + /// Wrapper for IO errors - Io(std::io::Error), + #[error("Io error: {0}")] + Io(#[from] std::io::Error), + /// When an invalid argument is passed to a function. + #[error("Invalid argument error: {0}")] InvalidArgumentError(String), + /// Error during import or export to/from a format + #[error("External format error: {0}")] ExternalFormat(String), + /// Whenever pushing to a container fails because it does not support more entries. /// The solution is usually to use a higher-capacity container-backing type. + #[error("Operation overflew the backing container.")] Overflow, + /// Whenever incoming data from the C data interface, IPC or Flight does not fulfil the Arrow specification. + #[error("{0}")] OutOfSpec(String), } @@ -32,69 +55,10 @@ impl Error { Self::OutOfSpec(msg.into()) } - #[allow(dead_code)] pub(crate) fn nyi>(msg: A) -> Self { Self::NotYetImplemented(msg.into()) } } -impl From<::std::io::Error> for Error { - fn from(error: std::io::Error) -> Self { - Error::Io(error) - } -} - -impl From for Error { - fn from(error: std::str::Utf8Error) -> Self { - Error::External("".to_string(), Box::new(error)) - } -} - -impl From for Error { - fn from(error: std::string::FromUtf8Error) -> Self { - Error::External("".to_string(), Box::new(error)) - } -} - -impl From for Error { - fn from(error: simdutf8::basic::Utf8Error) -> Self { - Error::External("".to_string(), Box::new(error)) - } -} - -impl From for Error { - fn from(_: std::collections::TryReserveError) -> Error { - Error::Overflow - } -} - -impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Error::NotYetImplemented(source) => { - write!(f, "Not yet implemented: {}", &source) - } - Error::External(message, source) => { - write!(f, "External error{}: {}", message, &source) - } - Error::Io(desc) => write!(f, "Io error: {desc}"), - Error::InvalidArgumentError(desc) => { - write!(f, "Invalid argument error: {desc}") - } - Error::ExternalFormat(desc) => { - write!(f, "External format error: {desc}") - } - Error::Overflow => { - write!(f, "Operation overflew the backing container.") - } - Error::OutOfSpec(message) => { - write!(f, "{message}") - } - } - } -} - -impl std::error::Error for Error {} - /// Typedef for a [`std::result::Result`] of an [`Error`]. pub type Result = std::result::Result; diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml index 3f6b1452f3..8da47b9fee 100644 --- a/src/common/error/Cargo.toml +++ b/src/common/error/Cargo.toml @@ -3,6 +3,7 @@ arrow2 = {workspace = true} pyo3 = {workspace = true, optional = true} regex = {workspace = true} serde_json = {workspace = true} +thiserror = {workspace = true} [features] python = ["dep:pyo3"] diff --git a/src/common/error/src/error.rs b/src/common/error/src/error.rs index deadff3cba..c3ea90f5f2 100644 --- a/src/common/error/src/error.rs +++ b/src/common/error/src/error.rs @@ -1,113 +1,45 @@ -use std::{ - fmt::{Display, Formatter, Result}, - io, -}; +use thiserror::Error; +pub type DaftResult = std::result::Result; pub type GenericError = Box; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum DaftError { + #[error("DaftError::FieldNotFound {0}")] FieldNotFound(String), + #[error("DaftError::SchemaMismatch {0}")] SchemaMismatch(String), + #[error("DaftError::TypeError {0}")] TypeError(String), + #[error("DaftError::ComputeError {0}")] ComputeError(String), - ArrowError(String), + #[error("DaftError::ArrowError {0}")] + ArrowError(#[from] arrow2::error::Error), + #[error("DaftError::ValueError {0}")] ValueError(String), #[cfg(feature = "python")] - PyO3Error(pyo3::PyErr), - IoError(io::Error), - FileNotFound { - path: String, - source: GenericError, - }, + #[error("DaftError::PyO3Error {0}")] + PyO3Error(#[from] pyo3::PyErr), + #[error("DaftError::IoError {0}")] + IoError(#[from] std::io::Error), + #[error("DaftError::FileNotFound {path}: {source}")] + FileNotFound { path: String, source: GenericError }, + #[error("DaftError::InternalError {0}")] InternalError(String), + #[error("ConnectTimeout {0}")] ConnectTimeout(GenericError), + #[error("ReadTimeout {0}")] ReadTimeout(GenericError), + #[error("ByteStreamError {0}")] ByteStreamError(GenericError), + #[error("SocketError {0}")] SocketError(GenericError), + #[error("DaftError::External {0}")] External(GenericError), -} - -impl std::error::Error for DaftError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - DaftError::FieldNotFound(_) - | DaftError::SchemaMismatch(_) - | DaftError::TypeError(_) - | DaftError::ComputeError(_) - | DaftError::ArrowError(_) - | DaftError::ValueError(_) - | DaftError::InternalError(_) => None, - DaftError::IoError(io_error) => Some(io_error), - DaftError::FileNotFound { source, .. } - | DaftError::SocketError(source) - | DaftError::External(source) - | DaftError::ReadTimeout(source) - | DaftError::ConnectTimeout(source) - | DaftError::ByteStreamError(source) => Some(&**source), - #[cfg(feature = "python")] - DaftError::PyO3Error(pyerr) => Some(pyerr), - } - } -} - -impl From for DaftError { - fn from(error: arrow2::error::Error) -> Self { - match error { - arrow2::error::Error::Io(_) => DaftError::ByteStreamError(error.into()), - _ => DaftError::ArrowError(error.to_string()), - } - } -} - -impl From for DaftError { - fn from(error: serde_json::Error) -> Self { - DaftError::IoError(error.into()) - } -} - -impl From for DaftError { - fn from(error: io::Error) -> Self { - DaftError::IoError(error) - } -} - -impl From for DaftError { - fn from(error: regex::Error) -> Self { - DaftError::ValueError(error.to_string()) - } -} - -impl From for DaftError { - fn from(error: std::fmt::Error) -> Self { - DaftError::ComputeError(error.to_string()) - } -} - -pub type DaftResult = std::result::Result; - -impl Display for DaftError { - // `f` is a buffer, and this method must write the formatted string into it - fn fmt(&self, f: &mut Formatter) -> Result { - match self { - Self::FieldNotFound(s) => write!(f, "DaftError::FieldNotFound {s}"), - Self::SchemaMismatch(s) => write!(f, "DaftError::SchemaMismatch {s}"), - Self::TypeError(s) => write!(f, "DaftError::TypeError {s}"), - Self::ComputeError(s) => write!(f, "DaftError::ComputeError {s}"), - Self::ArrowError(s) => write!(f, "DaftError::ArrowError {s}"), - Self::ValueError(s) => write!(f, "DaftError::ValueError {s}"), - Self::InternalError(s) => write!(f, "DaftError::InternalError {s}"), - #[cfg(feature = "python")] - Self::PyO3Error(e) => write!(f, "DaftError::PyO3Error {e}"), - Self::IoError(e) => write!(f, "DaftError::IoError {e}"), - Self::External(e) => write!(f, "DaftError::External {}", e), - Self::FileNotFound { path, source } => { - write!(f, "DaftError::FileNotFound {path}: {source}") - } - Self::ByteStreamError(e) => write!(f, "ByteStreamError: {}", e), - Self::ConnectTimeout(e) => write!(f, "ConnectTimeout: {}", e), - Self::ReadTimeout(e) => write!(f, "ReadTimeout: {}", e), - Self::SocketError(e) => write!(f, "SocketError: {}", e), - } - } + #[error("DaftError::SerdeJsonError {0}")] + SerdeJsonError(#[from] serde_json::Error), + #[error("DaftError::FmtError {0}")] + FmtError(#[from] std::fmt::Error), + #[error("DaftError::RegexError {0}")] + RegexError(#[from] regex::Error), } diff --git a/src/common/error/src/python.rs b/src/common/error/src/python.rs index 0b815ac91b..34c8c3b1fc 100644 --- a/src/common/error/src/python.rs +++ b/src/common/error/src/python.rs @@ -1,13 +1,8 @@ +use pyo3::exceptions::PyFileNotFoundError; use pyo3::import_exception; use crate::DaftError; -impl From for DaftError { - fn from(error: pyo3::PyErr) -> Self { - DaftError::PyO3Error(error) - } -} - import_exception!(daft.exceptions, DaftCoreException); import_exception!(daft.exceptions, DaftTypeError); import_exception!(daft.exceptions, ConnectTimeoutError); @@ -17,8 +12,6 @@ import_exception!(daft.exceptions, SocketError); impl std::convert::From for pyo3::PyErr { fn from(err: DaftError) -> pyo3::PyErr { - use pyo3::exceptions::PyFileNotFoundError; - match err { DaftError::PyO3Error(pyerr) => pyerr, DaftError::FileNotFound { path, source } => { diff --git a/src/daft-core/src/array/ops/utf8.rs b/src/daft-core/src/array/ops/utf8.rs index 08cc491472..9cdccd8ea0 100644 --- a/src/daft-core/src/array/ops/utf8.rs +++ b/src/daft-core/src/array/ops/utf8.rs @@ -804,7 +804,7 @@ impl Utf8Array { } let self_iter = create_broadcasted_str_iter(self, expected_size); - let result = match nchars.len() { + let result: Utf8Array = match nchars.len() { 1 => { let n = nchars.get(0).unwrap(); let n: usize = NumCast::from(n).ok_or_else(|| { @@ -868,7 +868,7 @@ impl Utf8Array { } let self_iter = create_broadcasted_str_iter(self, expected_size); - let result = match nchars.len() { + let result: Utf8Array = match nchars.len() { 1 => { let n = nchars.get(0).unwrap(); let n: usize = NumCast::from(n).ok_or_else(|| { @@ -1009,7 +1009,7 @@ impl Utf8Array { } let self_iter = create_broadcasted_str_iter(self, expected_size); - let result = match n.len() { + let result: Utf8Array = match n.len() { 1 => { let n = n.get(0).unwrap(); let n: usize = NumCast::from(n).ok_or_else(|| { @@ -1256,7 +1256,7 @@ impl Utf8Array { let self_iter = create_broadcasted_str_iter(self, expected_size); let padchar_iter = create_broadcasted_str_iter(padchar, expected_size); - let result = match length.len() { + let result: Utf8Array = match length.len() { 1 => { let len = length.get(0).unwrap(); let len: usize = NumCast::from(len).ok_or_else(|| { diff --git a/src/parquet2/Cargo.toml b/src/parquet2/Cargo.toml index f66b674be5..6d437e1578 100644 --- a/src/parquet2/Cargo.toml +++ b/src/parquet2/Cargo.toml @@ -11,6 +11,7 @@ seq-macro = {version = "0.3", default-features = false} serde = {version = "^1.0", features = ["derive"]} snap = {version = "^1.1", optional = true} streaming-decompression = "0.1" +thiserror = {workspace = true} xxhash-rust = {version = "0.8", optional = true, features = ["xxh64"]} zstd = {version = "^0.12", optional = true, default-features = false} diff --git a/src/parquet2/src/error.rs b/src/parquet2/src/error.rs index d06e62bcae..b2b8b05372 100644 --- a/src/parquet2/src/error.rs +++ b/src/parquet2/src/error.rs @@ -1,5 +1,7 @@ //! Contains [`Error`] +use thiserror::Error; + /// List of features whose non-activation may cause a runtime error. /// Used to indicate which lack of feature caused [`Error::FeatureNotActive`]. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -18,78 +20,61 @@ pub enum Feature { } /// Errors generated by this crate -#[derive(Debug, Clone)] +#[derive(Debug, Error)] #[non_exhaustive] pub enum Error { /// When the parquet file is known to be out of spec. + #[error("File out of specification: {0}")] OutOfSpec(String), + /// Error presented when trying to use a code branch that requires activating a feature. + #[error("The feature \"{1:?}\" needs to be active to {1}")] FeatureNotActive(Feature, String), + /// Error presented when trying to use a feature from parquet that is not yet supported + #[error("Not yet supported: {0}")] FeatureNotSupported(String), + /// When encoding, the user passed an invalid parameter + #[error("Invalid parameter: {0}")] InvalidParameter(String), + /// When decoding or decompressing, the page would allocate more memory than allowed + #[error("Operation would exceed memory use threshold")] WouldOverAllocate, + /// When a transport error occurs when reading data + #[error("Transport error: {0}")] Transport(String), -} -impl Error { - pub(crate) fn oos>(message: I) -> Self { - Self::OutOfSpec(message.into()) - } -} + #[error("Can't deserialize to parquet native type: {0}")] + TryFromSliceError(#[from] std::array::TryFromSliceError), -impl std::error::Error for Error {} + #[error("underlying IO error: {0}")] + IoError(#[from] std::io::Error), -impl std::fmt::Display for Error { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Error::OutOfSpec(message) => { - write!(fmt, "File out of specification: {}", message) - } - Error::FeatureNotActive(feature, reason) => { - write!( - fmt, - "The feature \"{:?}\" needs to be active to {}", - feature, reason - ) - } - Error::FeatureNotSupported(reason) => { - write!(fmt, "Not yet supported: {}", reason) - } - Error::InvalidParameter(message) => { - write!(fmt, "Invalid parameter: {}", message) - } - Error::WouldOverAllocate => { - write!(fmt, "Operation would exceed memory use threshold") - } - Error::Transport(message) => { - write!(fmt, "Transport error: {}", message) - } - } - } -} + #[error("OOM: {0}")] + TryReserveError(#[from] std::collections::TryReserveError), -#[cfg(feature = "snappy")] -impl From for Error { - fn from(e: snap::Error) -> Error { - Error::OutOfSpec(format!("underlying snap error: {}", e)) - } -} + #[error("Number must be zero or positive: {0}")] + TryFromIntError(#[from] std::num::TryFromIntError), -#[cfg(feature = "lz4_flex")] -impl From for Error { - fn from(e: lz4_flex::block::DecompressError) -> Error { - Error::OutOfSpec(format!("underlying lz4_flex error: {}", e)) - } + #[cfg(feature = "snappy")] + #[error("underlying snap error: {0}")] + SnapError(#[from] snap::Error), + + #[cfg(feature = "lz4_flex")] + #[error("underlying lz4_flex error: {0}")] + DecompressError(#[from] lz4_flex::block::DecompressError), + + #[cfg(feature = "lz4_flex")] + #[error("underlying lz4_flex error: {0}")] + CompressError(#[from] lz4_flex::block::CompressError), } -#[cfg(feature = "lz4_flex")] -impl From for Error { - fn from(e: lz4_flex::block::CompressError) -> Error { - Error::OutOfSpec(format!("underlying lz4_flex error: {}", e)) +impl Error { + pub(crate) fn oos>(message: I) -> Self { + Self::OutOfSpec(message.into()) } } @@ -104,29 +89,5 @@ impl From for Error { } } -impl From for Error { - fn from(e: std::io::Error) -> Error { - Error::Transport(format!("underlying IO error: {}", e)) - } -} - -impl From for Error { - fn from(e: std::collections::TryReserveError) -> Error { - Error::OutOfSpec(format!("OOM: {}", e)) - } -} - -impl From for Error { - fn from(e: std::num::TryFromIntError) -> Error { - Error::OutOfSpec(format!("Number must be zero or positive: {}", e)) - } -} - -impl From for Error { - fn from(e: std::array::TryFromSliceError) -> Error { - Error::OutOfSpec(format!("Can't deserialize to parquet native type: {}", e)) - } -} - /// A specialized `Result` for Parquet errors. pub type Result = std::result::Result; diff --git a/tests/table/json/test_json_query.py b/tests/table/json/test_json_query.py index 6208dfbee1..d3e429fa47 100644 --- a/tests/table/json/test_json_query.py +++ b/tests/table/json/test_json_query.py @@ -75,7 +75,7 @@ def test_json_query_invalid_filter(): def test_json_query_invalid_json(): mp = MicroPartition.from_pydict({"col": ["a", "b", "c"]}) - with pytest.raises(ValueError, match="DaftError::IoError"): + with pytest.raises(ValueError, match="DaftError::SerdeJsonError"): mp.eval_expression_list([col("col").json.query(".a")])