From 00647a87fb05358e5d695de06784311c4bf06f8a Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Fri, 1 Sep 2023 17:34:34 -0700 Subject: [PATCH] [CHORE] factor io config into common code (#1335) * Factors out IOConfig, S3Config, etc into a common io-config crate. * Reexports them in the daft-io crate for convenience * sadly `daft-dsl` still depends on `daft-io` for the url_download function so we still have to recompile quite a bit. * This should be fixed when we allow for dynamic function registration. --- Cargo.lock | 14 +- Cargo.toml | 1 + src/common/io-config/Cargo.toml | 14 + src/common/io-config/src/azure.rs | 25 ++ src/common/io-config/src/config.rs | 26 ++ src/common/io-config/src/gcs.rs | 23 ++ src/common/io-config/src/lib.rs | 9 + src/common/io-config/src/python.rs | 309 +++++++++++++++++ .../config.rs => common/io-config/src/s3.rs} | 59 +--- src/daft-dsl/Cargo.toml | 3 +- src/daft-dsl/src/functions/uri/mod.rs | 2 +- src/daft-dsl/src/python.rs | 4 +- src/daft-io/Cargo.toml | 3 +- src/daft-io/src/azure_blob.rs | 2 +- src/daft-io/src/google_cloud.rs | 5 +- src/daft-io/src/lib.rs | 3 +- src/daft-io/src/python.rs | 310 +----------------- src/daft-io/src/s3_like.rs | 5 +- src/daft-parquet/src/metadata.rs | 2 +- src/daft-parquet/src/read.rs | 3 +- src/daft-plan/Cargo.toml | 4 +- src/daft-plan/src/source_info.rs | 4 +- 22 files changed, 448 insertions(+), 382 deletions(-) create mode 100644 src/common/io-config/Cargo.toml create mode 100644 src/common/io-config/src/azure.rs create mode 100644 src/common/io-config/src/config.rs create mode 100644 src/common/io-config/src/gcs.rs create mode 100644 src/common/io-config/src/lib.rs create mode 100644 src/common/io-config/src/python.rs rename src/{daft-io/src/config.rs => common/io-config/src/s3.rs} (56%) diff --git a/Cargo.lock b/Cargo.lock index 4f10f18d39..5b673d2c04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -846,6 +846,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "common-io-config" +version = "0.1.10" +dependencies = [ + "common-error", + "pyo3", + "serde", + "serde_json", +] + [[package]] name = "concurrent-queue" version = "2.2.0" @@ -1038,6 +1048,7 @@ version = "0.1.10" dependencies = [ "bincode", "common-error", + "common-io-config", "daft-core", "daft-io", "pyo3", @@ -1062,6 +1073,7 @@ dependencies = [ "azure_storage_blobs", "bytes", "common-error", + "common-io-config", "daft-core", "futures", "google-cloud-storage", @@ -1110,9 +1122,9 @@ dependencies = [ "arrow2", "bincode", "common-error", + "common-io-config", "daft-core", "daft-dsl", - "daft-io", "daft-table", "indexmap 2.0.0", "pyo3", diff --git a/Cargo.toml b/Cargo.toml index 808e9b1d12..c9c6614be5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ inherits = "dev" [workspace] members = [ "src/common/error", + "src/common/io-config", "src/daft-core", "src/daft-io", "src/daft-parquet", diff --git a/src/common/io-config/Cargo.toml b/src/common/io-config/Cargo.toml new file mode 100644 index 0000000000..307de25498 --- /dev/null +++ b/src/common/io-config/Cargo.toml @@ -0,0 +1,14 @@ +[dependencies] +common-error = {path = "../error", default-features = false} +pyo3 = {workspace = true, optional = true} +serde = {workspace = true} +serde_json = {workspace = true} + +[features] +default = ["python"] +python = ["dep:pyo3", "common-error/python"] + +[package] +edition = {workspace = true} +name = "common-io-config" +version = {workspace = true} diff --git a/src/common/io-config/src/azure.rs b/src/common/io-config/src/azure.rs new file mode 100644 index 0000000000..eaa3442e51 --- /dev/null +++ b/src/common/io-config/src/azure.rs @@ -0,0 +1,25 @@ +use std::fmt::Display; +use std::fmt::Formatter; + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct AzureConfig { + pub storage_account: Option, + pub access_key: Option, + pub anonymous: bool, +} + +impl Display for AzureConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!( + f, + "AzureConfig + storage_account: {:?} + access_key: {:?} + anonymous: {:?}", + self.storage_account, self.access_key, self.anonymous + ) + } +} diff --git a/src/common/io-config/src/config.rs b/src/common/io-config/src/config.rs new file mode 100644 index 0000000000..81edbf2a1d --- /dev/null +++ b/src/common/io-config/src/config.rs @@ -0,0 +1,26 @@ +use std::fmt::Display; +use std::fmt::Formatter; + +use serde::Deserialize; +use serde::Serialize; + +use crate::{AzureConfig, GCSConfig, S3Config}; +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct IOConfig { + pub s3: S3Config, + pub azure: AzureConfig, + pub gcs: GCSConfig, +} + +impl Display for IOConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!( + f, + "IOConfig: +{} +{} +{}", + self.s3, self.azure, self.gcs + ) + } +} diff --git a/src/common/io-config/src/gcs.rs b/src/common/io-config/src/gcs.rs new file mode 100644 index 0000000000..017c23fc2a --- /dev/null +++ b/src/common/io-config/src/gcs.rs @@ -0,0 +1,23 @@ +use std::fmt::Display; +use std::fmt::Formatter; + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct GCSConfig { + pub project_id: Option, + pub anonymous: bool, +} + +impl Display for GCSConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!( + f, + "GCSConfig + project_id: {:?} + anonymous: {:?}", + self.project_id, self.anonymous + ) + } +} diff --git a/src/common/io-config/src/lib.rs b/src/common/io-config/src/lib.rs new file mode 100644 index 0000000000..535ac72d40 --- /dev/null +++ b/src/common/io-config/src/lib.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "python")] +pub mod python; + +mod azure; +mod config; +mod gcs; +mod s3; + +pub use crate::{azure::AzureConfig, config::IOConfig, gcs::GCSConfig, s3::S3Config}; diff --git a/src/common/io-config/src/python.rs b/src/common/io-config/src/python.rs new file mode 100644 index 0000000000..62492283e4 --- /dev/null +++ b/src/common/io-config/src/python.rs @@ -0,0 +1,309 @@ +use common_error::DaftError; +use pyo3::prelude::*; + +use crate::config; + +/// Create configurations to be used when accessing an S3-compatible system +/// +/// Args: +/// region_name: Name of the region to be used (used when accessing AWS S3), defaults to "us-east-1". +/// If wrongly provided, Daft will attempt to auto-detect the buckets' region at the cost of extra S3 requests. +/// endpoint_url: URL to the S3 endpoint, defaults to endpoints to AWS +/// key_id: AWS Access Key ID, defaults to auto-detection from the current environment +/// access_key: AWS Secret Access Key, defaults to auto-detection from the current environment +/// session_token: AWS Session Token, required only if `key_id` and `access_key` are temporary credentials +/// retry_initial_backoff_ms: Initial backoff duration in milliseconds for an S3 retry, defaults to 1000ms +/// connect_timeout_ms: Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 60 seconds +/// read_timeout_ms: Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 60 seconds +/// num_tries: Number of attempts to make a connection, defaults to 5 +/// retry_mode: Retry Mode when a request fails, current supported values are `standard` and `adaptive` +/// anonymous: Whether or not to use "anonymous mode", which will access S3 without any credentials +/// +/// Example: +/// >>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx")) +/// >>> daft.read_parquet("s3://some-path", io_config=io_config) +#[derive(Clone, Default)] +#[pyclass] +pub struct S3Config { + pub config: crate::S3Config, +} +/// Create configurations to be used when accessing Azure Blob Storage +/// +/// Args: +/// storage_account: Azure Storage Account, defaults to reading from `AZURE_STORAGE_ACCOUNT` environment variable. +/// access_key: Azure Secret Access Key, defaults to reading from `AZURE_STORAGE_KEY` environment variable +/// anonymous: Whether or not to use "anonymous mode", which will access Azure without any credentials +/// +/// Example: +/// >>> io_config = IOConfig(azure=AzureConfig(storage_account="dafttestdata", access_key="xxx")) +/// >>> daft.read_parquet("az://some-path", io_config=io_config) +#[derive(Clone, Default)] +#[pyclass] +pub struct AzureConfig { + pub config: crate::AzureConfig, +} + +/// Create configurations to be used when accessing Google Cloud Storage +/// +/// Args: +/// project_id: Google Project ID, defaults to reading credentials file or Google Cloud metadata service +/// anonymous: Whether or not to use "anonymous mode", which will access Google Storage without any credentials +/// +/// Example: +/// >>> io_config = IOConfig(gcs=GCSConfig(anonymous=True)) +/// >>> daft.read_parquet("gs://some-path", io_config=io_config) +#[derive(Clone, Default)] +#[pyclass] +pub struct GCSConfig { + pub config: crate::GCSConfig, +} + +/// Create configurations to be used when accessing storage +/// +/// Args: +/// s3: Configuration to use when accessing URLs with the `s3://` scheme +/// azure: Configuration to use when accessing URLs with the `az://` or `abfs://` scheme +/// gcs: Configuration to use when accessing URLs with the `gs://` or `gcs://` scheme +/// Example: +/// >>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx", num_tries=10), azure=AzureConfig(anonymous=True), gcs=GCSConfig(...)) +/// >>> daft.read_parquet(["s3://some-path", "az://some-other-path", "gs://path3"], io_config=io_config) +#[derive(Clone, Default)] +#[pyclass] +pub struct IOConfig { + pub config: config::IOConfig, +} + +#[pymethods] +impl IOConfig { + #[new] + pub fn new(s3: Option, azure: Option, gcs: Option) -> Self { + IOConfig { + config: config::IOConfig { + s3: s3.unwrap_or_default().config, + azure: azure.unwrap_or_default().config, + gcs: gcs.unwrap_or_default().config, + }, + } + } + + pub fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.config)) + } + + /// Configuration to be used when accessing s3 URLs + #[getter] + pub fn s3(&self) -> PyResult { + Ok(S3Config { + config: self.config.s3.clone(), + }) + } + + /// Configuration to be used when accessing Azure URLs + #[getter] + pub fn azure(&self) -> PyResult { + Ok(AzureConfig { + config: self.config.azure.clone(), + }) + } + + /// Configuration to be used when accessing Azure URLs + #[getter] + pub fn gcs(&self) -> PyResult { + Ok(GCSConfig { + config: self.config.gcs.clone(), + }) + } + + #[staticmethod] + pub fn from_json(input: &str) -> PyResult { + let config: config::IOConfig = serde_json::from_str(input).map_err(DaftError::from)?; + Ok(config.into()) + } + + pub fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (String,))> { + let io_config_module = py.import("daft.io.config")?; + let json_string = serde_json::to_string(&self.config).map_err(DaftError::from)?; + Ok(( + io_config_module + .getattr("_io_config_from_json")? + .to_object(py), + (json_string,), + )) + } +} + +#[pymethods] +impl S3Config { + #[allow(clippy::too_many_arguments)] + #[new] + pub fn new( + region_name: Option, + endpoint_url: Option, + key_id: Option, + session_token: Option, + access_key: Option, + retry_initial_backoff_ms: Option, + connect_timeout_ms: Option, + read_timeout_ms: Option, + num_tries: Option, + retry_mode: Option, + anonymous: Option, + ) -> Self { + let def = crate::S3Config::default(); + S3Config { + config: crate::S3Config { + region_name: region_name.or(def.region_name), + endpoint_url: endpoint_url.or(def.endpoint_url), + key_id: key_id.or(def.key_id), + session_token: session_token.or(def.session_token), + access_key: access_key.or(def.access_key), + retry_initial_backoff_ms: retry_initial_backoff_ms + .unwrap_or(def.retry_initial_backoff_ms), + connect_timeout_ms: connect_timeout_ms.unwrap_or(def.connect_timeout_ms), + read_timeout_ms: read_timeout_ms.unwrap_or(def.read_timeout_ms), + num_tries: num_tries.unwrap_or(def.num_tries), + retry_mode: retry_mode.or(def.retry_mode), + anonymous: anonymous.unwrap_or(def.anonymous), + }, + } + } + + pub fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.config)) + } + + /// Region to use when accessing AWS S3 + #[getter] + pub fn region_name(&self) -> PyResult> { + Ok(self.config.region_name.clone()) + } + + /// S3-compatible endpoint to use + #[getter] + pub fn endpoint_url(&self) -> PyResult> { + Ok(self.config.endpoint_url.clone()) + } + + /// AWS Access Key ID + #[getter] + pub fn key_id(&self) -> PyResult> { + Ok(self.config.key_id.clone()) + } + + /// AWS Session Token + #[getter] + pub fn session_token(&self) -> PyResult> { + Ok(self.config.session_token.clone()) + } + + /// AWS Secret Access Key + #[getter] + pub fn access_key(&self) -> PyResult> { + Ok(self.config.access_key.clone()) + } + + /// AWS Retry Initial Backoff Time in Milliseconds + #[getter] + pub fn retry_initial_backoff_ms(&self) -> PyResult { + Ok(self.config.retry_initial_backoff_ms) + } + + /// AWS Connection Timeout in Milliseconds + #[getter] + pub fn connect_timeout_ms(&self) -> PyResult { + Ok(self.config.connect_timeout_ms) + } + + /// AWS Read Timeout in Milliseconds + #[getter] + pub fn read_timeout_ms(&self) -> PyResult { + Ok(self.config.read_timeout_ms) + } + + /// AWS Number Retries + #[getter] + pub fn num_tries(&self) -> PyResult { + Ok(self.config.num_tries) + } + + /// AWS Retry Mode + #[getter] + pub fn retry_mode(&self) -> PyResult> { + Ok(self.config.retry_mode.clone()) + } +} + +#[pymethods] +impl AzureConfig { + #[allow(clippy::too_many_arguments)] + #[new] + pub fn new( + storage_account: Option, + access_key: Option, + anonymous: Option, + ) -> Self { + let def = crate::AzureConfig::default(); + AzureConfig { + config: crate::AzureConfig { + storage_account: storage_account.or(def.storage_account), + access_key: access_key.or(def.access_key), + anonymous: anonymous.unwrap_or(def.anonymous), + }, + } + } + + pub fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.config)) + } + + /// Storage Account to use when accessing Azure Storage + #[getter] + pub fn storage_account(&self) -> PyResult> { + Ok(self.config.storage_account.clone()) + } + + /// Azure Secret Access Key + #[getter] + pub fn access_key(&self) -> PyResult> { + Ok(self.config.access_key.clone()) + } +} + +#[pymethods] +impl GCSConfig { + #[allow(clippy::too_many_arguments)] + #[new] + pub fn new(project_id: Option, anonymous: Option) -> Self { + let def = crate::GCSConfig::default(); + GCSConfig { + config: crate::GCSConfig { + project_id: project_id.or(def.project_id), + anonymous: anonymous.unwrap_or(def.anonymous), + }, + } + } + + pub fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.config)) + } + + /// Project ID to use when accessing Google Cloud Storage + #[getter] + pub fn project_id(&self) -> PyResult> { + Ok(self.config.project_id.clone()) + } +} + +impl From for IOConfig { + fn from(config: config::IOConfig) -> Self { + Self { config } + } +} + +pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { + parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; + Ok(()) +} diff --git a/src/daft-io/src/config.rs b/src/common/io-config/src/s3.rs similarity index 56% rename from src/daft-io/src/config.rs rename to src/common/io-config/src/s3.rs index 10a0f3d971..57ba13c567 100644 --- a/src/daft-io/src/config.rs +++ b/src/common/io-config/src/s3.rs @@ -3,6 +3,7 @@ use std::fmt::Formatter; use serde::Deserialize; use serde::Serialize; + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct S3Config { pub region_name: Option, @@ -66,61 +67,3 @@ impl Display for S3Config { ) } } - -#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct AzureConfig { - pub storage_account: Option, - pub access_key: Option, - pub anonymous: bool, -} - -impl Display for AzureConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { - write!( - f, - "AzureConfig - storage_account: {:?} - access_key: {:?} - anonymous: {:?}", - self.storage_account, self.access_key, self.anonymous - ) - } -} - -#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct GCSConfig { - pub project_id: Option, - pub anonymous: bool, -} - -impl Display for GCSConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { - write!( - f, - "GCSConfig - project_id: {:?} - anonymous: {:?}", - self.project_id, self.anonymous - ) - } -} - -#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct IOConfig { - pub s3: S3Config, - pub azure: AzureConfig, - pub gcs: GCSConfig, -} - -impl Display for IOConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { - write!( - f, - "IOConfig: -{} -{} -{}", - self.s3, self.azure, self.gcs - ) - } -} diff --git a/src/daft-dsl/Cargo.toml b/src/daft-dsl/Cargo.toml index 11ac44d324..2f20fee890 100644 --- a/src/daft-dsl/Cargo.toml +++ b/src/daft-dsl/Cargo.toml @@ -1,6 +1,7 @@ [dependencies] bincode = {workspace = true} common-error = {path = "../common/error", default-features = false} +common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-io = {path = "../daft-io", default-features = false} pyo3 = {workspace = true, optional = true} @@ -10,7 +11,7 @@ serde_json = {workspace = true} [features] default = ["python"] -python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python"] +python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "common-io-config/python"] [package] edition = {workspace = true} diff --git a/src/daft-dsl/src/functions/uri/mod.rs b/src/daft-dsl/src/functions/uri/mod.rs index d7b022eb4c..c9940ac7a6 100644 --- a/src/daft-dsl/src/functions/uri/mod.rs +++ b/src/daft-dsl/src/functions/uri/mod.rs @@ -9,7 +9,7 @@ use crate::Expr; use super::FunctionEvaluator; -use daft_io::config::IOConfig; +use common_io_config::IOConfig; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum UriExpr { diff --git a/src/daft-dsl/src/python.rs b/src/daft-dsl/src/python.rs index 4599ff56c3..4ed62ff51e 100644 --- a/src/daft-dsl/src/python.rs +++ b/src/daft-dsl/src/python.rs @@ -9,7 +9,7 @@ use daft_core::{ python::{datatype::PyDataType, field::PyField, schema::PySchema}, }; -use daft_io::python::IOConfig; +use common_io_config::python::IOConfig as PyIOConfig; use pyo3::{ exceptions::PyValueError, prelude::*, @@ -358,7 +358,7 @@ impl PyExpr { max_connections: i64, raise_error_on_failure: bool, multi_thread: bool, - config: Option, + config: Option, ) -> PyResult { if max_connections <= 0 { return Err(PyValueError::new_err(format!( diff --git a/src/daft-io/Cargo.toml b/src/daft-io/Cargo.toml index 6158afe73b..07fa06c551 100644 --- a/src/daft-io/Cargo.toml +++ b/src/daft-io/Cargo.toml @@ -11,6 +11,7 @@ azure_storage = {version = "0.13.0", features = ["enable_reqwest_rustls"], defau azure_storage_blobs = {version = "0.13.1", features = ["enable_reqwest_rustls"], default-features = false} bytes = {workspace = true} common-error = {path = "../common/error", default-features = false} +common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} futures = {workspace = true} google-cloud-storage = {version = "0.13.0", default-features = false, features = ["rustls-tls", "auth"]} @@ -35,7 +36,7 @@ tempfile = "3.7.1" [features] default = ["python"] -python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python"] +python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "common-io-config/python"] [package] edition = {workspace = true} diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 5c24463e4f..d7ef6cd37c 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -6,10 +6,10 @@ use snafu::{IntoError, ResultExt, Snafu}; use std::{num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; use crate::{ - config::AzureConfig, object_io::{LSResult, ObjectSource}, GetResult, }; +use common_io_config::AzureConfig; #[derive(Debug, Snafu)] enum Error { diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index d64d3f077e..9d91f32290 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -14,12 +14,11 @@ use snafu::IntoError; use snafu::ResultExt; use snafu::Snafu; -use crate::config; -use crate::config::GCSConfig; use crate::object_io::LSResult; use crate::object_io::ObjectSource; use crate::s3_like; use crate::GetResult; +use common_io_config::GCSConfig; #[derive(Debug, Snafu)] enum Error { @@ -201,7 +200,7 @@ pub(crate) struct GCSSource { impl GCSSource { async fn build_s3_compat_client() -> super::Result> { - let s3_config = config::S3Config { + let s3_config = common_io_config::S3Config { anonymous: true, endpoint_url: Some("https://storage.googleapis.com".to_string()), ..Default::default() diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 37fbbc0667..df39d27cef 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -1,7 +1,6 @@ #![feature(async_closure)] mod azure_blob; -pub mod config; mod google_cloud; mod http; mod local; @@ -13,7 +12,7 @@ use lazy_static::lazy_static; #[cfg(feature = "python")] pub mod python; -use config::IOConfig; +pub use common_io_config::{AzureConfig, IOConfig, S3Config}; pub use object_io::GetResult; #[cfg(feature = "python")] pub use python::register_modules; diff --git a/src/daft-io/src/python.rs b/src/daft-io/src/python.rs index ce689aa763..b1f3957650 100644 --- a/src/daft-io/src/python.rs +++ b/src/daft-io/src/python.rs @@ -1,313 +1,18 @@ -use crate::{config, get_io_client, get_runtime, object_io::LSResult, parse_url}; -use common_error::{DaftError, DaftResult}; +use crate::{get_io_client, get_runtime, object_io::LSResult, parse_url}; +use common_error::DaftResult; use pyo3::{ prelude::*, types::{PyDict, PyList}, }; -/// Create configurations to be used when accessing an S3-compatible system -/// -/// Args: -/// region_name: Name of the region to be used (used when accessing AWS S3), defaults to "us-east-1". -/// If wrongly provided, Daft will attempt to auto-detect the buckets' region at the cost of extra S3 requests. -/// endpoint_url: URL to the S3 endpoint, defaults to endpoints to AWS -/// key_id: AWS Access Key ID, defaults to auto-detection from the current environment -/// access_key: AWS Secret Access Key, defaults to auto-detection from the current environment -/// session_token: AWS Session Token, required only if `key_id` and `access_key` are temporary credentials -/// retry_initial_backoff_ms: Initial backoff duration in milliseconds for an S3 retry, defaults to 1000ms -/// connect_timeout_ms: Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 60 seconds -/// read_timeout_ms: Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 60 seconds -/// num_tries: Number of attempts to make a connection, defaults to 5 -/// retry_mode: Retry Mode when a request fails, current supported values are `standard` and `adaptive` -/// anonymous: Whether or not to use "anonymous mode", which will access S3 without any credentials -/// -/// Example: -/// >>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx")) -/// >>> daft.read_parquet("s3://some-path", io_config=io_config) -#[derive(Clone, Default)] -#[pyclass] -pub struct S3Config { - pub config: config::S3Config, -} -/// Create configurations to be used when accessing Azure Blob Storage -/// -/// Args: -/// storage_account: Azure Storage Account, defaults to reading from `AZURE_STORAGE_ACCOUNT` environment variable. -/// access_key: Azure Secret Access Key, defaults to reading from `AZURE_STORAGE_KEY` environment variable -/// anonymous: Whether or not to use "anonymous mode", which will access Azure without any credentials -/// -/// Example: -/// >>> io_config = IOConfig(azure=AzureConfig(storage_account="dafttestdata", access_key="xxx")) -/// >>> daft.read_parquet("az://some-path", io_config=io_config) -#[derive(Clone, Default)] -#[pyclass] -pub struct AzureConfig { - pub config: config::AzureConfig, -} - -/// Create configurations to be used when accessing Google Cloud Storage -/// -/// Args: -/// project_id: Google Project ID, defaults to reading credentials file or Google Cloud metadata service -/// anonymous: Whether or not to use "anonymous mode", which will access Google Storage without any credentials -/// -/// Example: -/// >>> io_config = IOConfig(gcs=GCSConfig(anonymous=True)) -/// >>> daft.read_parquet("gs://some-path", io_config=io_config) -#[derive(Clone, Default)] -#[pyclass] -pub struct GCSConfig { - pub config: config::GCSConfig, -} - -/// Create configurations to be used when accessing storage -/// -/// Args: -/// s3: Configuration to use when accessing URLs with the `s3://` scheme -/// azure: Configuration to use when accessing URLs with the `az://` or `abfs://` scheme -/// gcs: Configuration to use when accessing URLs with the `gs://` or `gcs://` scheme -/// Example: -/// >>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx", num_tries=10), azure=AzureConfig(anonymous=True), gcs=GCSConfig(...)) -/// >>> daft.read_parquet(["s3://some-path", "az://some-other-path", "gs://path3"], io_config=io_config) -#[derive(Clone, Default)] -#[pyclass] -pub struct IOConfig { - pub config: config::IOConfig, -} - -#[pymethods] -impl IOConfig { - #[new] - pub fn new(s3: Option, azure: Option, gcs: Option) -> Self { - IOConfig { - config: config::IOConfig { - s3: s3.unwrap_or_default().config, - azure: azure.unwrap_or_default().config, - gcs: gcs.unwrap_or_default().config, - }, - } - } - - pub fn __repr__(&self) -> PyResult { - Ok(format!("{}", self.config)) - } - - /// Configuration to be used when accessing s3 URLs - #[getter] - pub fn s3(&self) -> PyResult { - Ok(S3Config { - config: self.config.s3.clone(), - }) - } - - /// Configuration to be used when accessing Azure URLs - #[getter] - pub fn azure(&self) -> PyResult { - Ok(AzureConfig { - config: self.config.azure.clone(), - }) - } - - /// Configuration to be used when accessing Azure URLs - #[getter] - pub fn gcs(&self) -> PyResult { - Ok(GCSConfig { - config: self.config.gcs.clone(), - }) - } - - #[staticmethod] - pub fn from_json(input: &str) -> PyResult { - let config: config::IOConfig = serde_json::from_str(input).map_err(DaftError::from)?; - Ok(config.into()) - } - - pub fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (String,))> { - let io_config_module = py.import("daft.io.config")?; - let json_string = serde_json::to_string(&self.config).map_err(DaftError::from)?; - Ok(( - io_config_module - .getattr("_io_config_from_json")? - .to_object(py), - (json_string,), - )) - } -} - -#[pymethods] -impl S3Config { - #[allow(clippy::too_many_arguments)] - #[new] - pub fn new( - region_name: Option, - endpoint_url: Option, - key_id: Option, - session_token: Option, - access_key: Option, - retry_initial_backoff_ms: Option, - connect_timeout_ms: Option, - read_timeout_ms: Option, - num_tries: Option, - retry_mode: Option, - anonymous: Option, - ) -> Self { - let def = config::S3Config::default(); - S3Config { - config: config::S3Config { - region_name: region_name.or(def.region_name), - endpoint_url: endpoint_url.or(def.endpoint_url), - key_id: key_id.or(def.key_id), - session_token: session_token.or(def.session_token), - access_key: access_key.or(def.access_key), - retry_initial_backoff_ms: retry_initial_backoff_ms - .unwrap_or(def.retry_initial_backoff_ms), - connect_timeout_ms: connect_timeout_ms.unwrap_or(def.connect_timeout_ms), - read_timeout_ms: read_timeout_ms.unwrap_or(def.read_timeout_ms), - num_tries: num_tries.unwrap_or(def.num_tries), - retry_mode: retry_mode.or(def.retry_mode), - anonymous: anonymous.unwrap_or(def.anonymous), - }, - } - } - - pub fn __repr__(&self) -> PyResult { - Ok(format!("{}", self.config)) - } - - /// Region to use when accessing AWS S3 - #[getter] - pub fn region_name(&self) -> PyResult> { - Ok(self.config.region_name.clone()) - } - - /// S3-compatible endpoint to use - #[getter] - pub fn endpoint_url(&self) -> PyResult> { - Ok(self.config.endpoint_url.clone()) - } - - /// AWS Access Key ID - #[getter] - pub fn key_id(&self) -> PyResult> { - Ok(self.config.key_id.clone()) - } - - /// AWS Session Token - #[getter] - pub fn session_token(&self) -> PyResult> { - Ok(self.config.session_token.clone()) - } - - /// AWS Secret Access Key - #[getter] - pub fn access_key(&self) -> PyResult> { - Ok(self.config.access_key.clone()) - } - - /// AWS Retry Initial Backoff Time in Milliseconds - #[getter] - pub fn retry_initial_backoff_ms(&self) -> PyResult { - Ok(self.config.retry_initial_backoff_ms) - } - - /// AWS Connection Timeout in Milliseconds - #[getter] - pub fn connect_timeout_ms(&self) -> PyResult { - Ok(self.config.connect_timeout_ms) - } - - /// AWS Read Timeout in Milliseconds - #[getter] - pub fn read_timeout_ms(&self) -> PyResult { - Ok(self.config.read_timeout_ms) - } - - /// AWS Number Retries - #[getter] - pub fn num_tries(&self) -> PyResult { - Ok(self.config.num_tries) - } - - /// AWS Retry Mode - #[getter] - pub fn retry_mode(&self) -> PyResult> { - Ok(self.config.retry_mode.clone()) - } -} - -#[pymethods] -impl AzureConfig { - #[allow(clippy::too_many_arguments)] - #[new] - pub fn new( - storage_account: Option, - access_key: Option, - anonymous: Option, - ) -> Self { - let def = config::AzureConfig::default(); - AzureConfig { - config: config::AzureConfig { - storage_account: storage_account.or(def.storage_account), - access_key: access_key.or(def.access_key), - anonymous: anonymous.unwrap_or(def.anonymous), - }, - } - } - - pub fn __repr__(&self) -> PyResult { - Ok(format!("{}", self.config)) - } - - /// Storage Account to use when accessing Azure Storage - #[getter] - pub fn storage_account(&self) -> PyResult> { - Ok(self.config.storage_account.clone()) - } - - /// Azure Secret Access Key - #[getter] - pub fn access_key(&self) -> PyResult> { - Ok(self.config.access_key.clone()) - } -} - -#[pymethods] -impl GCSConfig { - #[allow(clippy::too_many_arguments)] - #[new] - pub fn new(project_id: Option, anonymous: Option) -> Self { - let def = config::GCSConfig::default(); - GCSConfig { - config: config::GCSConfig { - project_id: project_id.or(def.project_id), - anonymous: anonymous.unwrap_or(def.anonymous), - }, - } - } - - pub fn __repr__(&self) -> PyResult { - Ok(format!("{}", self.config)) - } - - /// Project ID to use when accessing Google Cloud Storage - #[getter] - pub fn project_id(&self) -> PyResult> { - Ok(self.config.project_id.clone()) - } -} - -impl From for IOConfig { - fn from(config: config::IOConfig) -> Self { - Self { config } - } -} +pub use common_io_config::python::{AzureConfig, GCSConfig, IOConfig}; #[pyfunction] fn io_list( py: Python, path: String, multithreaded_io: Option, - io_config: Option, + io_config: Option, ) -> PyResult<&PyList> { let lsr: DaftResult = py.allow_threads(|| { let io_client = get_io_client( @@ -335,11 +40,8 @@ fn io_list( Ok(PyList::new(py, to_rtn)) } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_class::()?; - parent.add_class::()?; - parent.add_class::()?; - parent.add_class::()?; +pub fn register_modules(py: Python, parent: &PyModule) -> PyResult<()> { + common_io_config::python::register_modules(py, parent)?; parent.add_function(wrap_pyfunction!(io_list, parent)?)?; Ok(()) } diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index 1d9b6a10a2..91c9d6d82e 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -6,13 +6,13 @@ use reqwest::StatusCode; use s3::operation::head_object::HeadObjectError; use s3::operation::list_objects_v2::ListObjectsV2Error; -use crate::config::S3Config; use crate::object_io::{FileMetadata, FileType, LSResult}; use crate::{InvalidArgumentSnafu, SourceType}; use aws_config::SdkConfig; use aws_credential_types::cache::ProvideCachedCredentials; use aws_credential_types::provider::error::CredentialsError; use aws_sig_auth::signer::SigningRequirements; +use common_io_config::S3Config; use futures::{StreamExt, TryStreamExt}; use s3::client::customize::Response; use s3::config::{Credentials, Region}; @@ -664,8 +664,9 @@ impl ObjectSource for S3LikeSource { mod tests { use crate::object_io::ObjectSource; + use crate::Result; use crate::S3LikeSource; - use crate::{config::S3Config, Result}; + use common_io_config::S3Config; #[tokio::test] async fn test_full_get_from_s3() -> Result<()> { diff --git a/src/daft-parquet/src/metadata.rs b/src/daft-parquet/src/metadata.rs index d135d5501c..974c678fba 100644 --- a/src/daft-parquet/src/metadata.rs +++ b/src/daft-parquet/src/metadata.rs @@ -90,7 +90,7 @@ mod tests { use std::sync::Arc; use common_error::DaftResult; - use daft_io::{config::IOConfig, IOClient}; + use daft_io::{IOClient, IOConfig}; use super::read_parquet_metadata; diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index c352ba4038..997b5bef5d 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -308,7 +308,8 @@ mod tests { use std::sync::Arc; use common_error::DaftResult; - use daft_io::{config::IOConfig, IOClient}; + + use daft_io::{IOClient, IOConfig}; use super::read_parquet; #[test] diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index d1c91358bf..1b8a39b350 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -2,9 +2,9 @@ arrow2 = {workspace = true, features = ["chrono-tz", "compute_take", "compute_cast", "compute_aggregate", "compute_if_then_else", "compute_sort", "compute_filter", "compute_temporal", "compute_comparison", "compute_arithmetics", "compute_concatenate", "io_ipc"]} bincode = {workspace = true} common-error = {path = "../common/error", default-features = false} +common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} -daft-io = {path = "../daft-io", default-features = false} daft-table = {path = "../daft-table", default-features = false} indexmap = {workspace = true} pyo3 = {workspace = true, optional = true} @@ -14,7 +14,7 @@ snafu = {workspace = true} [features] default = ["python"] -python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-io/python", "daft-table/python"] +python = ["dep:pyo3", "common-error/python", "common-io-config/python", "daft-core/python", "daft-dsl/python", "daft-table/python"] [package] edition = {workspace = true} diff --git a/src/daft-plan/src/source_info.rs b/src/daft-plan/src/source_info.rs index 297a91aa12..4e75150389 100644 --- a/src/daft-plan/src/source_info.rs +++ b/src/daft-plan/src/source_info.rs @@ -5,17 +5,17 @@ use std::{ use arrow2::array::Array; use common_error::DaftResult; +use common_io_config::IOConfig; use daft_core::{ impl_bincode_py_state_serialization, schema::{Schema, SchemaRef}, Series, }; -use daft_io::config::IOConfig; use daft_table::Table; #[cfg(feature = "python")] use { - daft_io::python::IOConfig as PyIOConfig, + common_io_config::python::IOConfig as PyIOConfig, daft_table::python::PyTable, pyo3::{ exceptions::{PyKeyError, PyValueError},