Skip to content

Commit

Permalink
wip for iostats
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Oct 13, 2023
1 parent 76f77d7 commit 19a163f
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 79 deletions.
16 changes: 11 additions & 5 deletions src/daft-csv/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow2::io::csv::read_async::{infer, infer_schema, AsyncReaderBuilder};
use async_compat::CompatExt;
use common_error::DaftResult;
use daft_core::schema::Schema;
use daft_io::{get_runtime, GetResult, IOClient};
use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef};
use futures::{io::Cursor, AsyncRead, AsyncSeek};
use tokio::fs::File;

Expand All @@ -13,20 +13,26 @@ pub fn read_csv_schema(
has_header: bool,
delimiter: Option<u8>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<Schema> {
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();
runtime_handle
.block_on(async { read_csv_schema_single(uri, has_header, delimiter, io_client).await })
runtime_handle.block_on(async {
read_csv_schema_single(uri, has_header, delimiter, io_client, io_stats).await
})
}

async fn read_csv_schema_single(
uri: &str,
has_header: bool,
delimiter: Option<u8>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<Schema> {
match io_client.single_url_get(uri.to_string(), None).await? {
match io_client
.single_url_get(uri.to_string(), None, io_stats)
.await?
{
GetResult::File(file) => {
read_csv_schema_from_reader(
File::open(file.path).await?.compat(),
Expand Down Expand Up @@ -77,7 +83,7 @@ mod tests {
io_config.s3.anonymous = true;
let io_client = Arc::new(IOClient::new(io_config.into())?);

let schema = read_csv_schema(file, true, None, io_client.clone())?;
let schema = read_csv_schema(file, true, None, io_client.clone(), None)?;
assert_eq!(
schema,
Schema::new(vec![
Expand Down
6 changes: 5 additions & 1 deletion src/daft-csv/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod pylib {
use std::sync::Arc;

use daft_core::python::schema::PySchema;
use daft_io::{get_io_client, python::IOConfig};
use daft_io::{get_io_client, python::IOConfig, IOStatsContext};
use daft_table::python::PyTable;
use pyo3::{exceptions::PyValueError, pyfunction, PyResult, Python};

Expand Down Expand Up @@ -34,6 +34,8 @@ pub mod pylib {
multithreaded_io: Option<bool>,
) -> PyResult<PyTable> {
py.allow_threads(|| {
let io_stats = IOStatsContext::new(format!("read_csv: for uri {uri}"));

let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
io_config.unwrap_or_default().config.into(),
Expand All @@ -46,6 +48,7 @@ pub mod pylib {
has_header.unwrap_or(true),
str_delimiter_to_byte(delimiter)?,
io_client,
Some(io_stats),
multithreaded_io.unwrap_or(true),
)?
.into())
Expand All @@ -71,6 +74,7 @@ pub mod pylib {
has_header.unwrap_or(true),
str_delimiter_to_byte(delimiter)?,
io_client,
None, // PRINT HERE TOO
)?)
.into())
})
Expand Down
27 changes: 22 additions & 5 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use arrow2::{
use async_compat::CompatExt;
use common_error::DaftResult;
use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_io::{get_runtime, GetResult, IOClient};
use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef};
use daft_table::Table;
use futures::{io::Cursor, AsyncRead, AsyncSeek};
use tokio::fs::File;
Expand All @@ -27,6 +27,7 @@ pub fn read_csv(
has_header: bool,
delimiter: Option<u8>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
multithreaded_io: bool,
) -> DaftResult<Table> {
let runtime_handle = get_runtime(multithreaded_io)?;
Expand All @@ -40,6 +41,7 @@ pub fn read_csv(
has_header,
delimiter,
io_client,
io_stats,
)
.await
})
Expand All @@ -53,8 +55,12 @@ async fn read_csv_single(
has_header: bool,
delimiter: Option<u8>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<Table> {
match io_client.single_url_get(uri.to_string(), None).await? {
match io_client
.single_url_get(uri.to_string(), None, io_stats)
.await?
{
GetResult::File(file) => {
read_csv_single_from_reader(
File::open(file.path).await?.compat(),
Expand Down Expand Up @@ -208,7 +214,7 @@ mod tests {

let io_client = Arc::new(IOClient::new(io_config.into())?);

let table = read_csv(file, None, None, None, true, None, io_client, true)?;
let table = read_csv(file, None, None, None, true, None, io_client, None, true)?;
assert_eq!(table.len(), 100);
assert_eq!(
table.schema,
Expand All @@ -231,7 +237,7 @@ mod tests {

let io_client = Arc::new(IOClient::new(io_config.into())?);

let table = read_csv(file, None, None, None, true, None, io_client, true)?;
let table = read_csv(file, None, None, None, true, None, io_client, None, true)?;
assert_eq!(table.len(), 5000);

Ok(())
Expand All @@ -246,7 +252,17 @@ mod tests {

let io_client = Arc::new(IOClient::new(io_config.into())?);

let table = read_csv(file, None, None, Some(10), true, None, io_client, true)?;
let table = read_csv(
file,
None,
None,
Some(10),
true,
None,
io_client,
None,
true,
)?;
assert_eq!(table.len(), 10);
assert_eq!(
table.schema,
Expand Down Expand Up @@ -277,6 +293,7 @@ mod tests {
true,
None,
io_client,
None,
true,
)?;
assert_eq!(table.len(), 100);
Expand Down
1 change: 1 addition & 0 deletions src/daft-dsl/src/functions/uri/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl FunctionEvaluator for DownloadEvaluator {
*raise_error_on_failure,
*multi_thread,
config.clone(),
None,
),
_ => Err(DaftError::ValueError(format!(
"Expected 1 input arg, got {}",
Expand Down
9 changes: 4 additions & 5 deletions src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl AzureBlobSource {
// Flatmap each page of results to a single stream of our standardized FileMetadata.
responses_stream
.map(move |response| {
io_stats.clone().map(|is| is.load_list_requests(1));
io_stats.clone().map(|is| is.mark_list_requests(1));
(response, protocol.clone())
})
.flat_map(move |(response, protocol)| match response {
Expand Down Expand Up @@ -430,10 +430,9 @@ impl ObjectSource for AzureBlobSource {
.into_error(e)
.into()
});
tokio::pin!(stream);
io_stats.map(|is| is.mark_get_requests(1));
io_stats.as_ref().map(|is| is.mark_get_requests(1));
Ok(GetResult::Stream(
io_stats_on_bytestream(stream, io_stats),
io_stats_on_bytestream(Box::pin(stream), io_stats),
None,
None,
))
Expand All @@ -456,7 +455,7 @@ impl ObjectSource for AzureBlobSource {
.get_properties()
.await
.context(UnableToOpenFileSnafu::<String> { path: uri.into() })?;
io_stats.map(|is| is.mark_head_requests(1));
io_stats.as_ref().map(|is| is.mark_head_requests(1));

Ok(metadata.blob.properties.content_length as usize)
}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl GCSClientWrapper {
.context(UnableToListObjectsSnafu {
path: format!("{GCS_SCHEME}://{}/{}", bucket, key),
})?;
io_stats.map(|is| is.mark_list_requests(1));
io_stats.as_ref().map(|is| is.mark_list_requests(1));

let response_items = ls_response.items.unwrap_or_default();
let response_prefixes = ls_response.prefixes.unwrap_or_default();
Expand Down
6 changes: 3 additions & 3 deletions src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl ObjectSource for HttpSource {
let response = response
.error_for_status()
.context(UnableToOpenFileSnafu::<String> { path: uri.into() })?;
io_stats.map(|is| is.mark_get_requests(1));
io_stats.as_ref().map(|is| is.mark_get_requests(1));
let size_bytes = response.content_length().map(|s| s as usize);
let stream = response.bytes_stream();
let owned_string = uri.to_owned();
Expand Down Expand Up @@ -223,7 +223,7 @@ impl ObjectSource for HttpSource {
.error_for_status()
.context(UnableToOpenFileSnafu::<String> { path: uri.into() })?;

io_stats.map(|is| is.mark_head_requests(1));
io_stats.as_ref().map(|is| is.mark_head_requests(1));

let headers = response.headers();
match headers.get(CONTENT_LENGTH) {
Expand Down Expand Up @@ -275,7 +275,7 @@ impl ObjectSource for HttpSource {
.context(UnableToConnectSnafu::<String> { path: path.into() })?
.error_for_status()
.with_context(|_| UnableToOpenFileSnafu { path })?;
io_stats.map(|is| is.mark_list_requests(1));
io_stats.as_ref().map(|is| is.mark_list_requests(1));

// Reconstruct the actual path of the request, which may have been redirected via a 301
// This is important because downstream URL joining logic relies on proper trailing-slashes/index.html
Expand Down
21 changes: 16 additions & 5 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use common_io_config::{AzureConfig, IOConfig, S3Config};
pub use object_io::GetResult;
#[cfg(feature = "python")]
pub use python::register_modules;
pub use stats::{IOStatsContext, IOStatsRef};
use tokio::runtime::RuntimeFlavor;

use std::{borrow::Cow, collections::HashMap, hash::Hash, ops::Range, sync::Arc};
Expand Down Expand Up @@ -168,26 +169,32 @@ impl IOClient {
&self,
input: String,
range: Option<Range<usize>>,
io_stats: Option<IOStatsRef>,
) -> Result<GetResult> {
let (scheme, path) = parse_url(&input)?;
let source = self.get_source(&scheme).await?;
source.get(path.as_ref(), range).await
source.get(path.as_ref(), range, io_stats).await
}

pub async fn single_url_get_size(&self, input: String) -> Result<usize> {
pub async fn single_url_get_size(
&self,
input: String,
io_stats: Option<IOStatsRef>,
) -> Result<usize> {
let (scheme, path) = parse_url(&input)?;
let source = self.get_source(&scheme).await?;
source.get_size(path.as_ref()).await
source.get_size(path.as_ref(), io_stats).await
}

async fn single_url_download(
&self,
index: usize,
input: Option<String>,
raise_error_on_failure: bool,
io_stats: Option<IOStatsRef>,
) -> Result<Option<bytes::Bytes>> {
let value = if let Some(input) = input {
let response = self.single_url_get(input, None).await;
let response = self.single_url_get(input, None, io_stats).await;
let res = match response {
Ok(res) => res.bytes().await,
Err(err) => Err(err),
Expand Down Expand Up @@ -357,6 +364,7 @@ pub fn _url_download(
raise_error_on_failure: bool,
multi_thread: bool,
config: Arc<IOConfig>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<BinaryArray> {
let urls = array.as_arrow().iter();
let name = array.name();
Expand All @@ -378,11 +386,12 @@ pub fn _url_download(
let fetches = futures::stream::iter(urls.enumerate().map(|(i, url)| {
let owned_url = url.map(|s| s.to_string());
let owned_client = io_client.clone();
let owned_io_stats = io_stats.clone();
tokio::spawn(async move {
(
i,
owned_client
.single_url_download(i, owned_url, raise_error_on_failure)
.single_url_download(i, owned_url, raise_error_on_failure, owned_io_stats)
.await,
)
})
Expand Down Expand Up @@ -434,6 +443,7 @@ pub fn url_download(
raise_error_on_failure: bool,
multi_thread: bool,
config: Arc<IOConfig>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<Series> {
match series.data_type() {
DataType::Utf8 => Ok(_url_download(
Expand All @@ -442,6 +452,7 @@ pub fn url_download(
raise_error_on_failure,
multi_thread,
config,
io_stats,
)?
.into_series()),
dt => Err(DaftError::TypeError(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ pub(crate) trait ObjectSource: Sync + Send {
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let uri = uri.to_string();
let s = stream! {
let lsr = self.ls(&uri, posix, None, page_size, io_stats).await?;
let lsr = self.ls(&uri, posix, None, page_size, io_stats.clone()).await?;
for fm in lsr.files {
yield Ok(fm);
}

let mut continuation_token = lsr.continuation_token.clone();
while continuation_token.is_some() {
let lsr = self.ls(&uri, posix, continuation_token.as_deref(), page_size, io_stats).await?;
let lsr = self.ls(&uri, posix, continuation_token.as_deref(), page_size, io_stats.clone()).await?;
continuation_token = lsr.continuation_token.clone();
for fm in lsr.files {
yield Ok(fm);
Expand Down
8 changes: 4 additions & 4 deletions src/daft-io/src/object_store_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ pub(crate) async fn glob(
.fanout_limit
.map(|fanout_limit| fanout_limit / state.current_fanout),
state.page_size,
io_stats,
io_stats.clone(),
)
.await;

Expand All @@ -412,7 +412,7 @@ pub(crate) async fn glob(
state.current_fragment_idx,
stream_dir_count,
),
io_stats,
io_stats.clone(),
);
}
// Return any Files that match
Expand Down Expand Up @@ -506,7 +506,7 @@ pub(crate) async fn glob(
.fanout_limit
.map(|fanout_limit| fanout_limit / state.current_fanout),
state.page_size,
io_stats,
io_stats.clone(),
)
.await;

Expand All @@ -529,7 +529,7 @@ pub(crate) async fn glob(
stream_dir_count,
)
.with_wildcard_mode(),
io_stats,
io_stats.clone(),
);
}
FileType::File
Expand Down
Loading

0 comments on commit 19a163f

Please sign in to comment.