diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 7b4000dee8..f5f572af5c 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -4,7 +4,7 @@ use arrow2::io::csv::read_async::{infer, infer_schema, AsyncReaderBuilder}; use async_compat::CompatExt; use common_error::DaftResult; use daft_core::schema::Schema; -use daft_io::{get_runtime, GetResult, IOClient}; +use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef}; use futures::{io::Cursor, AsyncRead, AsyncSeek}; use tokio::fs::File; @@ -13,11 +13,13 @@ pub fn read_csv_schema( has_header: bool, delimiter: Option, io_client: Arc, + io_stats: Option, ) -> DaftResult { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); - runtime_handle - .block_on(async { read_csv_schema_single(uri, has_header, delimiter, io_client).await }) + runtime_handle.block_on(async { + read_csv_schema_single(uri, has_header, delimiter, io_client, io_stats).await + }) } async fn read_csv_schema_single( @@ -25,8 +27,12 @@ async fn read_csv_schema_single( has_header: bool, delimiter: Option, io_client: Arc, + io_stats: Option, ) -> DaftResult { - match io_client.single_url_get(uri.to_string(), None).await? { + match io_client + .single_url_get(uri.to_string(), None, io_stats) + .await? + { GetResult::File(file) => { read_csv_schema_from_reader( File::open(file.path).await?.compat(), @@ -77,7 +83,7 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let schema = read_csv_schema(file, true, None, io_client.clone())?; + let schema = read_csv_schema(file, true, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ diff --git a/src/daft-csv/src/python.rs b/src/daft-csv/src/python.rs index a38d2866b0..2bd3bd4307 100644 --- a/src/daft-csv/src/python.rs +++ b/src/daft-csv/src/python.rs @@ -4,7 +4,7 @@ pub mod pylib { use std::sync::Arc; use daft_core::python::schema::PySchema; - use daft_io::{get_io_client, python::IOConfig}; + use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; use daft_table::python::PyTable; use pyo3::{exceptions::PyValueError, pyfunction, PyResult, Python}; @@ -34,6 +34,8 @@ pub mod pylib { multithreaded_io: Option, ) -> PyResult { py.allow_threads(|| { + let io_stats = IOStatsContext::new(format!("read_csv: for uri {uri}")); + let io_client = get_io_client( multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), @@ -46,6 +48,7 @@ pub mod pylib { has_header.unwrap_or(true), str_delimiter_to_byte(delimiter)?, io_client, + Some(io_stats), multithreaded_io.unwrap_or(true), )? .into()) @@ -71,6 +74,7 @@ pub mod pylib { has_header.unwrap_or(true), str_delimiter_to_byte(delimiter)?, io_client, + None, // PRINT HERE TOO )?) .into()) }) diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 5bf766656a..0896697434 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -13,7 +13,7 @@ use arrow2::{ use async_compat::CompatExt; use common_error::DaftResult; use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; -use daft_io::{get_runtime, GetResult, IOClient}; +use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef}; use daft_table::Table; use futures::{io::Cursor, AsyncRead, AsyncSeek}; use tokio::fs::File; @@ -27,6 +27,7 @@ pub fn read_csv( has_header: bool, delimiter: Option, io_client: Arc, + io_stats: Option, multithreaded_io: bool, ) -> DaftResult { let runtime_handle = get_runtime(multithreaded_io)?; @@ -40,6 +41,7 @@ pub fn read_csv( has_header, delimiter, io_client, + io_stats, ) .await }) @@ -53,8 +55,12 @@ async fn read_csv_single( has_header: bool, delimiter: Option, io_client: Arc, + io_stats: Option, ) -> DaftResult
{ - match io_client.single_url_get(uri.to_string(), None).await? { + match io_client + .single_url_get(uri.to_string(), None, io_stats) + .await? + { GetResult::File(file) => { read_csv_single_from_reader( File::open(file.path).await?.compat(), @@ -208,7 +214,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv(file, None, None, None, true, None, io_client, true)?; + let table = read_csv(file, None, None, None, true, None, io_client, None, true)?; assert_eq!(table.len(), 100); assert_eq!( table.schema, @@ -231,7 +237,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv(file, None, None, None, true, None, io_client, true)?; + let table = read_csv(file, None, None, None, true, None, io_client, None, true)?; assert_eq!(table.len(), 5000); Ok(()) @@ -246,7 +252,17 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv(file, None, None, Some(10), true, None, io_client, true)?; + let table = read_csv( + file, + None, + None, + Some(10), + true, + None, + io_client, + None, + true, + )?; assert_eq!(table.len(), 10); assert_eq!( table.schema, @@ -277,6 +293,7 @@ mod tests { true, None, io_client, + None, true, )?; assert_eq!(table.len(), 100); diff --git a/src/daft-dsl/src/functions/uri/download.rs b/src/daft-dsl/src/functions/uri/download.rs index da27349c36..b9d5c3661d 100644 --- a/src/daft-dsl/src/functions/uri/download.rs +++ b/src/daft-dsl/src/functions/uri/download.rs @@ -64,6 +64,7 @@ impl FunctionEvaluator for DownloadEvaluator { *raise_error_on_failure, *multi_thread, config.clone(), + None, ), _ => Err(DaftError::ValueError(format!( "Expected 1 input arg, got {}", diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index c3006e02fb..8d64671c4c 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -156,7 +156,7 @@ impl AzureBlobSource { // Flatmap each page of results to a single stream of our standardized FileMetadata. responses_stream .map(move |response| { - io_stats.clone().map(|is| is.load_list_requests(1)); + io_stats.clone().map(|is| is.mark_list_requests(1)); (response, protocol.clone()) }) .flat_map(move |(response, protocol)| match response { @@ -430,10 +430,9 @@ impl ObjectSource for AzureBlobSource { .into_error(e) .into() }); - tokio::pin!(stream); - io_stats.map(|is| is.mark_get_requests(1)); + io_stats.as_ref().map(|is| is.mark_get_requests(1)); Ok(GetResult::Stream( - io_stats_on_bytestream(stream, io_stats), + io_stats_on_bytestream(Box::pin(stream), io_stats), None, None, )) @@ -456,7 +455,7 @@ impl ObjectSource for AzureBlobSource { .get_properties() .await .context(UnableToOpenFileSnafu:: { path: uri.into() })?; - io_stats.map(|is| is.mark_head_requests(1)); + io_stats.as_ref().map(|is| is.mark_head_requests(1)); Ok(metadata.blob.properties.content_length as usize) } diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index 40210cf8dc..c7acf5077a 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -237,7 +237,7 @@ impl GCSClientWrapper { .context(UnableToListObjectsSnafu { path: format!("{GCS_SCHEME}://{}/{}", bucket, key), })?; - io_stats.map(|is| is.mark_list_requests(1)); + io_stats.as_ref().map(|is| is.mark_list_requests(1)); let response_items = ls_response.items.unwrap_or_default(); let response_prefixes = ls_response.prefixes.unwrap_or_default(); diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 7982220105..70a1c24e28 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -195,7 +195,7 @@ impl ObjectSource for HttpSource { let response = response .error_for_status() .context(UnableToOpenFileSnafu:: { path: uri.into() })?; - io_stats.map(|is| is.mark_get_requests(1)); + io_stats.as_ref().map(|is| is.mark_get_requests(1)); let size_bytes = response.content_length().map(|s| s as usize); let stream = response.bytes_stream(); let owned_string = uri.to_owned(); @@ -223,7 +223,7 @@ impl ObjectSource for HttpSource { .error_for_status() .context(UnableToOpenFileSnafu:: { path: uri.into() })?; - io_stats.map(|is| is.mark_head_requests(1)); + io_stats.as_ref().map(|is| is.mark_head_requests(1)); let headers = response.headers(); match headers.get(CONTENT_LENGTH) { @@ -275,7 +275,7 @@ impl ObjectSource for HttpSource { .context(UnableToConnectSnafu:: { path: path.into() })? .error_for_status() .with_context(|_| UnableToOpenFileSnafu { path })?; - io_stats.map(|is| is.mark_list_requests(1)); + io_stats.as_ref().map(|is| is.mark_list_requests(1)); // Reconstruct the actual path of the request, which may have been redirected via a 301 // This is important because downstream URL joining logic relies on proper trailing-slashes/index.html diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index ee7992844b..013eb9050c 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -20,6 +20,7 @@ pub use common_io_config::{AzureConfig, IOConfig, S3Config}; pub use object_io::GetResult; #[cfg(feature = "python")] pub use python::register_modules; +pub use stats::{IOStatsContext, IOStatsRef}; use tokio::runtime::RuntimeFlavor; use std::{borrow::Cow, collections::HashMap, hash::Hash, ops::Range, sync::Arc}; @@ -168,16 +169,21 @@ impl IOClient { &self, input: String, range: Option>, + io_stats: Option, ) -> Result { let (scheme, path) = parse_url(&input)?; let source = self.get_source(&scheme).await?; - source.get(path.as_ref(), range).await + source.get(path.as_ref(), range, io_stats).await } - pub async fn single_url_get_size(&self, input: String) -> Result { + pub async fn single_url_get_size( + &self, + input: String, + io_stats: Option, + ) -> Result { let (scheme, path) = parse_url(&input)?; let source = self.get_source(&scheme).await?; - source.get_size(path.as_ref()).await + source.get_size(path.as_ref(), io_stats).await } async fn single_url_download( @@ -185,9 +191,10 @@ impl IOClient { index: usize, input: Option, raise_error_on_failure: bool, + io_stats: Option, ) -> Result> { let value = if let Some(input) = input { - let response = self.single_url_get(input, None).await; + let response = self.single_url_get(input, None, io_stats).await; let res = match response { Ok(res) => res.bytes().await, Err(err) => Err(err), @@ -357,6 +364,7 @@ pub fn _url_download( raise_error_on_failure: bool, multi_thread: bool, config: Arc, + io_stats: Option, ) -> DaftResult { let urls = array.as_arrow().iter(); let name = array.name(); @@ -378,11 +386,12 @@ pub fn _url_download( let fetches = futures::stream::iter(urls.enumerate().map(|(i, url)| { let owned_url = url.map(|s| s.to_string()); let owned_client = io_client.clone(); + let owned_io_stats = io_stats.clone(); tokio::spawn(async move { ( i, owned_client - .single_url_download(i, owned_url, raise_error_on_failure) + .single_url_download(i, owned_url, raise_error_on_failure, owned_io_stats) .await, ) }) @@ -434,6 +443,7 @@ pub fn url_download( raise_error_on_failure: bool, multi_thread: bool, config: Arc, + io_stats: Option, ) -> DaftResult { match series.data_type() { DataType::Utf8 => Ok(_url_download( @@ -442,6 +452,7 @@ pub fn url_download( raise_error_on_failure, multi_thread, config, + io_stats, )? .into_series()), dt => Err(DaftError::TypeError(format!( diff --git a/src/daft-io/src/object_io.rs b/src/daft-io/src/object_io.rs index 7f0a6e6048..c0429c687d 100644 --- a/src/daft-io/src/object_io.rs +++ b/src/daft-io/src/object_io.rs @@ -137,14 +137,14 @@ pub(crate) trait ObjectSource: Sync + Send { ) -> super::Result>> { let uri = uri.to_string(); let s = stream! { - let lsr = self.ls(&uri, posix, None, page_size, io_stats).await?; + let lsr = self.ls(&uri, posix, None, page_size, io_stats.clone()).await?; for fm in lsr.files { yield Ok(fm); } let mut continuation_token = lsr.continuation_token.clone(); while continuation_token.is_some() { - let lsr = self.ls(&uri, posix, continuation_token.as_deref(), page_size, io_stats).await?; + let lsr = self.ls(&uri, posix, continuation_token.as_deref(), page_size, io_stats.clone()).await?; continuation_token = lsr.continuation_token.clone(); for fm in lsr.files { yield Ok(fm); diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index 5c19315ba2..d7de111795 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -393,7 +393,7 @@ pub(crate) async fn glob( .fanout_limit .map(|fanout_limit| fanout_limit / state.current_fanout), state.page_size, - io_stats, + io_stats.clone(), ) .await; @@ -412,7 +412,7 @@ pub(crate) async fn glob( state.current_fragment_idx, stream_dir_count, ), - io_stats, + io_stats.clone(), ); } // Return any Files that match @@ -506,7 +506,7 @@ pub(crate) async fn glob( .fanout_limit .map(|fanout_limit| fanout_limit / state.current_fanout), state.page_size, - io_stats, + io_stats.clone(), ) .await; @@ -529,7 +529,7 @@ pub(crate) async fn glob( stream_dir_count, ) .with_wildcard_mode(), - io_stats, + io_stats.clone(), ); } FileType::File diff --git a/src/daft-io/src/python.rs b/src/daft-io/src/python.rs index 601e4a61ca..1dba8af023 100644 --- a/src/daft-io/src/python.rs +++ b/src/daft-io/src/python.rs @@ -20,7 +20,9 @@ mod py { page_size: Option, ) -> PyResult<&PyList> { let multithreaded_io = multithreaded_io.unwrap_or(true); - let io_stats = IOStatsContext::new(); + let io_stats = IOStatsContext::new(format!("io_glob for {path}")); + let io_stats_handle = io_stats.clone(); + let lsr: DaftResult> = py.allow_threads(|| { let io_client = get_io_client( multithreaded_io, @@ -33,7 +35,12 @@ mod py { runtime_handle.block_on(async move { let source = io_client.get_source(&scheme).await?; let files = source - .glob(path.as_ref(), fanout_limit, page_size, Some(io_stats)) + .glob( + path.as_ref(), + fanout_limit, + page_size, + Some(io_stats_handle), + ) .await? .try_collect() .await?; @@ -49,8 +56,6 @@ mod py { dict.set_item("size", file.size)?; to_rtn.push(dict); } - - log::warn!("{:?}", io_stats); Ok(PyList::new(py, to_rtn)) } diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index a515b3c798..bafd4ebd6d 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -737,9 +737,18 @@ impl ObjectSource for S3LikeSource { let get_result = self ._get_impl(permit, uri, range, &self.default_region) .await?; - if let GetResult::Stream(stream, num_bytes , permit) = get_result && io_stats.is_some() { - io_stats.map(|is| is.mark_get_requests(1)); - Ok(GetResult::Stream(io_stats_on_bytestream(stream, io_stats), num_bytes, permit)) + + if io_stats.is_some() { + if let GetResult::Stream(stream, num_bytes, permit) = get_result { + io_stats.as_ref().map(|is| is.mark_get_requests(1)); + Ok(GetResult::Stream( + io_stats_on_bytestream(stream, io_stats), + num_bytes, + permit, + )) + } else { + panic!("This should always be a stream"); + } } else { Ok(get_result) } @@ -752,7 +761,7 @@ impl ObjectSource for S3LikeSource { .await .context(UnableToGrabSemaphoreSnafu)?; let head_result = self._head_impl(permit, uri, &self.default_region).await?; - io_stats.map(|is| is.mark_head_requests(1)); + io_stats.as_ref().map(|is| is.mark_head_requests(1)); Ok(head_result) } @@ -825,7 +834,7 @@ impl ObjectSource for S3LikeSource { ) .await? }; - io_stats.map(|is| is.mark_list_requests(1)); + io_stats.as_ref().map(|is| is.mark_list_requests(1)); if lsr.files.is_empty() && key.contains(S3_DELIMITER) { let permit = self @@ -847,7 +856,7 @@ impl ObjectSource for S3LikeSource { page_size, ) .await?; - io_stats.map(|is| is.mark_list_requests(1)); + io_stats.as_ref().map(|is| is.mark_list_requests(1)); let target_path = format!("{scheme}://{bucket}/{key}"); lsr.files.retain(|f| f.filepath == target_path); @@ -880,7 +889,7 @@ impl ObjectSource for S3LikeSource { ) .await? }; - io_stats.map(|is| is.mark_list_requests(1)); + io_stats.as_ref().map(|is| is.mark_list_requests(1)); Ok(lsr) } diff --git a/src/daft-io/src/stats.rs b/src/daft-io/src/stats.rs index 7c614c707c..6b467e0368 100644 --- a/src/daft-io/src/stats.rs +++ b/src/daft-io/src/stats.rs @@ -1,15 +1,36 @@ -use std::sync::{atomic, Arc}; +use std::sync::{ + atomic::{self}, + Arc, +}; pub type IOStatsRef = Arc; #[derive(Default, Debug)] pub struct IOStatsContext { + name: String, num_get_requests: atomic::AtomicUsize, num_head_requests: atomic::AtomicUsize, num_list_requests: atomic::AtomicUsize, bytes_read: atomic::AtomicUsize, } +impl Drop for IOStatsContext { + fn drop(&mut self) { + let bytes_read = self.load_bytes_read(); + let num_gets = self.load_get_requests(); + let mean_size = (bytes_read as f64) / (num_gets as f64); + log::warn!( + "IOStatsContext: {}, Gets: {}, Heads: {}, Lists: {}, BytesReads: {}, AvgGetSize: {}", + self.name, + num_gets, + self.load_head_requests(), + self.load_list_requests(), + bytes_read, + mean_size as i64 + ); + } +} + pub(crate) struct IOStatsByteStreamContextHandle { // do not enable Copy or Clone on this struct bytes_read: usize, @@ -17,8 +38,14 @@ pub(crate) struct IOStatsByteStreamContextHandle { } impl IOStatsContext { - pub fn new() -> IOStatsRef { - Arc::new(Default::default()) + pub fn new(name: String) -> IOStatsRef { + Arc::new(IOStatsContext { + name, + num_get_requests: atomic::AtomicUsize::new(0), + num_head_requests: atomic::AtomicUsize::new(0), + num_list_requests: atomic::AtomicUsize::new(0), + bytes_read: atomic::AtomicUsize::new(0), + }) } #[inline] @@ -40,18 +67,18 @@ impl IOStatsContext { } #[inline] - pub fn load_get_requests(&self, num_requests: usize) { - self.num_get_requests.load(atomic::Ordering::Acquire); + pub fn load_get_requests(&self) -> usize { + self.num_get_requests.load(atomic::Ordering::Acquire) } #[inline] - pub fn load_head_requests(&self, num_requests: usize) { - self.num_head_requests.load(atomic::Ordering::Acquire); + pub fn load_head_requests(&self) -> usize { + self.num_head_requests.load(atomic::Ordering::Acquire) } #[inline] - pub fn load_list_requests(&self, num_requests: usize) { - self.num_list_requests.load(atomic::Ordering::Acquire); + pub fn load_list_requests(&self) -> usize { + self.num_list_requests.load(atomic::Ordering::Acquire) } #[inline] @@ -83,7 +110,5 @@ impl IOStatsByteStreamContextHandle { impl Drop for IOStatsByteStreamContextHandle { fn drop(&mut self) { self.inner.mark_bytes_read(self.bytes_read); - - log::warn!("Finished IOStats: {}", self.inner.load_bytes_read()); } } diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index b30e8acf93..22d0c3984b 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, sync::Arc}; use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; -use daft_io::IOClient; +use daft_io::{IOClient, IOStatsRef}; use daft_table::Table; use futures::{future::try_join_all, StreamExt}; use parquet2::{ @@ -142,10 +142,16 @@ pub(crate) fn build_row_ranges( } impl ParquetReaderBuilder { - pub async fn from_uri(uri: &str, io_client: Arc) -> super::Result { + pub async fn from_uri( + uri: &str, + io_client: Arc, + io_stats: Option, + ) -> super::Result { // TODO(sammy): We actually don't need this since we can do negative offsets when reading the metadata - let size = io_client.single_url_get_size(uri.into()).await?; - let metadata = read_parquet_metadata(uri, size, io_client).await?; + let size = io_client + .single_url_get_size(uri.into(), io_stats.clone()) + .await?; + let metadata = read_parquet_metadata(uri, size, io_client, io_stats).await?; let num_rows = metadata.num_rows; Ok(ParquetReaderBuilder { uri: uri.into(), @@ -301,7 +307,11 @@ impl ParquetFileReader { Ok(read_planner) } - pub fn prebuffer_ranges(&self, io_client: Arc) -> DaftResult> { + pub fn prebuffer_ranges( + &self, + io_client: Arc, + io_stats: Option, + ) -> DaftResult> { let mut read_planner = self.naive_read_plan()?; // TODO(sammy) these values should be populated by io_client read_planner.add_pass(Box::new(SplitLargeRequestPass { @@ -315,7 +325,7 @@ impl ParquetFileReader { })); read_planner.run_passes()?; - read_planner.collect(io_client) + read_planner.collect(io_client, io_stats) } pub async fn read_from_ranges_into_table( diff --git a/src/daft-parquet/src/metadata.rs b/src/daft-parquet/src/metadata.rs index 974c678fba..92969294da 100644 --- a/src/daft-parquet/src/metadata.rs +++ b/src/daft-parquet/src/metadata.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use daft_io::IOClient; +use daft_io::{IOClient, IOStatsRef}; use parquet2::{metadata::FileMetaData, read::deserialize_metadata}; use snafu::ResultExt; @@ -15,6 +15,7 @@ pub async fn read_parquet_metadata( uri: &str, size: usize, io_client: Arc, + io_stats: Option, ) -> super::Result { const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; @@ -25,7 +26,7 @@ pub async fn read_parquet_metadata( let start = size.saturating_sub(default_end_len); let mut data = io_client - .single_url_get(uri.into(), Some(start..size)) + .single_url_get(uri.into(), Some(start..size), io_stats.clone()) .await? .bytes() .await?; @@ -57,7 +58,7 @@ pub async fn read_parquet_metadata( let start = size.saturating_sub(footer_len); data = io_client - .single_url_get(uri.into(), Some(start..size)) + .single_url_get(uri.into(), Some(start..size), io_stats) .await? .bytes() .await?; @@ -103,7 +104,7 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let metadata = read_parquet_metadata(file, size, io_client.clone()).await?; + let metadata = read_parquet_metadata(file, size, io_client.clone(), None).await?; assert_eq!(metadata.num_rows, 100); Ok(()) diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 51244c8b24..13988d2620 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -5,7 +5,7 @@ pub mod pylib { ffi::field_to_py, python::{datatype::PyTimeUnit, schema::PySchema, PySeries}, }; - use daft_io::{get_io_client, python::IOConfig}; + use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; use daft_table::python::PyTable; use pyo3::{pyfunction, types::PyModule, PyResult, Python}; use std::{collections::BTreeMap, sync::Arc}; @@ -26,6 +26,8 @@ pub mod pylib { coerce_int96_timestamp_unit: Option, ) -> PyResult { py.allow_threads(|| { + let io_stats = IOStatsContext::new(format!("read_parquet: for uri {uri}")); + let io_client = get_io_client( multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), @@ -33,17 +35,19 @@ pub mod pylib { let schema_infer_options = ParquetSchemaInferenceOptions::new( coerce_int96_timestamp_unit.map(|tu| tu.timeunit), ); - Ok(crate::read::read_parquet( + let result = crate::read::read_parquet( uri, columns.as_deref(), start_offset, num_rows, row_groups, io_client, + Some(io_stats.clone()), multithreaded_io.unwrap_or(true), schema_infer_options, )? - .into()) + .into(); + Ok(result) }) } type PyArrowChunks = Vec>; @@ -100,6 +104,7 @@ pub mod pylib { num_rows, row_groups, io_client, + None, multithreaded_io.unwrap_or(true), schema_infer_options, ) @@ -123,6 +128,8 @@ pub mod pylib { coerce_int96_timestamp_unit: Option, ) -> PyResult> { py.allow_threads(|| { + let io_stats = IOStatsContext::new(format!("read_parquet_bulk")); + let io_client = get_io_client( multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), @@ -138,6 +145,7 @@ pub mod pylib { num_rows, row_groups, io_client, + Some(io_stats), num_parallel_tasks.unwrap_or(128) as usize, multithreaded_io.unwrap_or(true), &schema_infer_options, @@ -178,6 +186,7 @@ pub mod pylib { num_rows, row_groups, io_client, + None, num_parallel_tasks.unwrap_or(128) as usize, multithreaded_io.unwrap_or(true), schema_infer_options, @@ -211,6 +220,7 @@ pub mod pylib { Ok(Arc::new(crate::read::read_parquet_schema( uri, io_client, + None, schema_infer_options, )?) .into()) @@ -229,7 +239,7 @@ pub mod pylib { multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), )?; - Ok(crate::read::read_parquet_statistics(&uris.series, io_client)?.into()) + Ok(crate::read::read_parquet_statistics(&uris.series, io_client, None)?.into()) }) } } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 1815a99a54..c9b22b190c 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -7,7 +7,7 @@ use daft_core::{ schema::Schema, DataType, IntoSeries, Series, }; -use daft_io::{get_runtime, parse_url, IOClient, SourceType}; +use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; use futures::{future::join_all, StreamExt, TryStreamExt}; use itertools::Itertools; @@ -56,6 +56,7 @@ async fn read_parquet_single( num_rows: Option, row_groups: Option>, io_client: Arc, + io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult
{ let (source_type, fixed_uri) = parse_url(uri)?; @@ -70,7 +71,8 @@ async fn read_parquet_single( ) .await } else { - let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?; + let builder = + ParquetReaderBuilder::from_uri(uri, io_client.clone(), io_stats.clone()).await?; let builder = builder.set_infer_schema_options(schema_infer_options); let builder = if let Some(columns) = columns { @@ -92,7 +94,7 @@ async fn read_parquet_single( }; let parquet_reader = builder.build()?; - let ranges = parquet_reader.prebuffer_ranges(io_client)?; + let ranges = parquet_reader.prebuffer_ranges(io_client, io_stats)?; Ok(( metadata, parquet_reader.read_from_ranges_into_table(ranges).await?, @@ -172,6 +174,7 @@ async fn read_parquet_single_into_arrow( num_rows: Option, row_groups: Option>, io_client: Arc, + io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult<(arrow2::datatypes::SchemaRef, Vec)> { let (source_type, fixed_uri) = parse_url(uri)?; @@ -188,7 +191,8 @@ async fn read_parquet_single_into_arrow( .await?; (metadata, Arc::new(schema), all_arrays) } else { - let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?; + let builder = + ParquetReaderBuilder::from_uri(uri, io_client.clone(), io_stats.clone()).await?; let builder = builder.set_infer_schema_options(schema_infer_options); let builder = if let Some(columns) = columns { @@ -212,7 +216,7 @@ async fn read_parquet_single_into_arrow( let parquet_reader = builder.build()?; let schema = parquet_reader.arrow_schema().clone(); - let ranges = parquet_reader.prebuffer_ranges(io_client)?; + let ranges = parquet_reader.prebuffer_ranges(io_client, io_stats)?; let all_arrays = parquet_reader .read_from_ranges_into_arrow_arrays(ranges) .await?; @@ -306,6 +310,7 @@ pub fn read_parquet( num_rows: Option, row_groups: Option>, io_client: Arc, + io_stats: Option, multithreaded_io: bool, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult
{ @@ -319,6 +324,7 @@ pub fn read_parquet( num_rows, row_groups, io_client, + io_stats, schema_infer_options, ) .await @@ -334,6 +340,7 @@ pub fn read_parquet_into_pyarrow( num_rows: Option, row_groups: Option>, io_client: Arc, + io_stats: Option, multithreaded_io: bool, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult { @@ -347,6 +354,7 @@ pub fn read_parquet_into_pyarrow( num_rows, row_groups, io_client, + io_stats, schema_infer_options, ) .await @@ -361,6 +369,7 @@ pub fn read_parquet_bulk( num_rows: Option, row_groups: Option>>, io_client: Arc, + io_stats: Option, num_parallel_tasks: usize, multithreaded_io: bool, schema_infer_options: &ParquetSchemaInferenceOptions, @@ -388,6 +397,8 @@ pub fn read_parquet_bulk( }; let io_client = io_client.clone(); + let io_stats = io_stats.clone(); + let schema_infer_options = schema_infer_options.clone(); tokio::task::spawn(async move { let columns = owned_columns @@ -402,6 +413,7 @@ pub fn read_parquet_bulk( num_rows, owned_row_group, io_client, + io_stats, schema_infer_options, ) .await?, @@ -428,6 +440,7 @@ pub fn read_parquet_into_pyarrow_bulk( num_rows: Option, row_groups: Option>>, io_client: Arc, + io_stats: Option, num_parallel_tasks: usize, multithreaded_io: bool, schema_infer_options: ParquetSchemaInferenceOptions, @@ -456,6 +469,8 @@ pub fn read_parquet_into_pyarrow_bulk( }; let io_client = io_client.clone(); + let io_stats = io_stats.clone(); + let schema_infer_options = schema_infer_options.clone(); tokio::task::spawn(async move { let columns = owned_columns @@ -470,6 +485,7 @@ pub fn read_parquet_into_pyarrow_bulk( num_rows, owned_row_group, io_client, + io_stats, schema_infer_options, ) .await?, @@ -489,18 +505,24 @@ pub fn read_parquet_into_pyarrow_bulk( pub fn read_parquet_schema( uri: &str, io_client: Arc, + io_stats: Option, schema_inference_options: ParquetSchemaInferenceOptions, ) -> DaftResult { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); - let builder = runtime_handle - .block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?; + let builder = runtime_handle.block_on(async { + ParquetReaderBuilder::from_uri(uri, io_client.clone(), io_stats).await + })?; let builder = builder.set_infer_schema_options(schema_inference_options); Schema::try_from(builder.build()?.arrow_schema().as_ref()) } -pub fn read_parquet_statistics(uris: &Series, io_client: Arc) -> DaftResult
{ +pub fn read_parquet_statistics( + uris: &Series, + io_client: Arc, + io_stats: Option, +) -> DaftResult
{ let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); @@ -518,9 +540,12 @@ pub fn read_parquet_statistics(uris: &Series, io_client: Arc) -> DaftR let handles_iter = values.iter().map(|uri| { let owned_string = uri.map(|v| v.to_string()); let owned_client = io_client.clone(); + let io_stats = io_stats.clone(); + tokio::spawn(async move { if let Some(owned_string) = owned_string { - let builder = ParquetReaderBuilder::from_uri(&owned_string, owned_client).await?; + let builder = + ParquetReaderBuilder::from_uri(&owned_string, owned_client, io_stats).await?; let num_rows = builder.metadata().num_rows; let num_row_groups = builder.metadata().row_groups.len(); let version_num = builder.metadata().version; @@ -596,6 +621,7 @@ mod tests { None, None, io_client, + None, true, Default::default(), )?; diff --git a/src/daft-parquet/src/read_planner.rs b/src/daft-parquet/src/read_planner.rs index 79133c84cf..f9615024c7 100644 --- a/src/daft-parquet/src/read_planner.rs +++ b/src/daft-parquet/src/read_planner.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, ops::Range, sync::Arc}; use bytes::Bytes; use common_error::DaftResult; -use daft_io::IOClient; +use daft_io::{IOClient, IOStatsRef}; use futures::StreamExt; use tokio::task::JoinHandle; @@ -148,16 +148,22 @@ impl ReadPlanner { Ok(()) } - pub fn collect(self, io_client: Arc) -> DaftResult> { + pub fn collect( + self, + io_client: Arc, + io_stats: Option, + ) -> DaftResult> { let mut entries = Vec::with_capacity(self.ranges.len()); for range in self.ranges { let owned_io_client = io_client.clone(); let owned_url = self.source.clone(); + let owned_io_stats = io_stats.clone(); + let start = range.start; let end = range.end; let join_handle = tokio::spawn(async move { let get_result = owned_io_client - .single_url_get(owned_url, Some(range.clone())) + .single_url_get(owned_url, Some(range.clone()), owned_io_stats) .await?; get_result.bytes().await });