Skip to content

Commit

Permalink
[FEAT] Add debug logging to s3 native apis (#1414)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Sep 23, 2023
1 parent c3cf438 commit e6252f9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,21 @@ pub(crate) async fn recursive_iter(
source: Arc<dyn ObjectSource>,
uri: &str,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
log::debug!(target: "recursive_iter", "starting recursive_iter: with top level of: {uri}");
let (to_rtn_tx, mut to_rtn_rx) = tokio::sync::mpsc::channel(16 * 1024);
fn add_to_channel(
source: Arc<dyn ObjectSource>,
tx: Sender<super::Result<FileMetadata>>,
dir: String,
) {
log::debug!(target: "recursive_iter", "recursive_iter: spawning task to list: {dir}");
tokio::spawn(async move {
let s = source.iter_dir(&dir, None, None).await;
log::debug!(target: "recursive_iter", "started listing task for {dir}");
let mut s = match s {
Ok(s) => s,
Err(e) => {
log::debug!(target: "recursive_iter", "Error occurred when listing {dir}\nerror:\n{e}");
tx.send(Err(e)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
})?;
Expand All @@ -142,6 +146,7 @@ pub(crate) async fn recursive_iter(
.await
.map_err(|e| super::Error::UnableToSendDataOverChannel { source: e.into() })?;
}
log::debug!(target: "recursive_iter", "completed listing task for {dir}");
super::Result::Ok(())
});
}
Expand Down
5 changes: 5 additions & 0 deletions src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ impl S3LikeSource {
range: Option<Range<usize>>,
region: &Region,
) -> super::Result<GetResult> {
log::debug!("S3 get at {uri}, range: {range:?}, in region: {region}");
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let bucket = match parsed.host_str() {
Some(s) => Ok(s),
Expand All @@ -362,6 +363,7 @@ impl S3LikeSource {
}?;
let key = parsed.path();
if let Some(key) = key.strip_prefix('/') {
log::debug!("S3 get parsed uri: {uri} into Bucket: {bucket}, Key: {key}");
let request = self
.get_s3_client(region)
.await?
Expand Down Expand Up @@ -461,6 +463,7 @@ impl S3LikeSource {
uri: &str,
region: &Region,
) -> super::Result<usize> {
log::debug!("S3 head at {uri} in region: {region}");
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;

let bucket = match parsed.host_str() {
Expand All @@ -472,6 +475,7 @@ impl S3LikeSource {
}?;
let key = parsed.path();
if let Some(key) = key.strip_prefix('/') {
log::debug!("S3 head parsed uri: {uri} into Bucket: {bucket}, Key: {key}");
let request = self
.get_s3_client(region)
.await?
Expand Down Expand Up @@ -547,6 +551,7 @@ impl S3LikeSource {
continuation_token: Option<String>,
region: &Region,
) -> super::Result<LSResult> {
log::debug!("S3 list_objects: Bucket: {bucket}, Key: {key}, continuation_token: {continuation_token:?} in region: {region}");
let request = self
.get_s3_client(region)
.await?
Expand Down

0 comments on commit e6252f9

Please sign in to comment.