diff --git a/quickwit/quickwit-storage/src/opendal_storage/base.rs b/quickwit/quickwit-storage/src/opendal_storage/base.rs index 7dd10b9bde4..87bcf26db3b 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/base.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/base.rs @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWriteExt}; use crate::storage::SendableAsync; use crate::{ BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind, - StorageResolverError, StorageResult, + StorageResolverError, StorageResult, STORAGE_METRICS, }; /// OpenDAL based storage implementation. @@ -88,20 +88,35 @@ impl Storage for OpendalStorage { tokio::io::copy(&mut payload_reader, &mut storage_writer).await?; storage_writer.close().await?; + // FIXME: should be payload.len() / ByteSize::mb(8)? + // crate::STORAGE_METRICS.object_storage_put_parts.inc(); + crate::STORAGE_METRICS + .object_storage_upload_num_bytes + .inc_by(payload.len()); + Ok(()) } async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> { let path = path.as_os_str().to_string_lossy(); let mut storage_reader = self.op.reader(&path).await?; - tokio::io::copy(&mut storage_reader, output).await?; + let num_bytes_copied = tokio::io::copy(&mut storage_reader, output).await?; output.flush().await?; + crate::STORAGE_METRICS.object_storage_get_total.inc(); + STORAGE_METRICS + .object_storage_download_num_bytes + .inc_by(num_bytes_copied); + Ok(()) } async fn get_slice(&self, path: &Path, range: Range) -> StorageResult { let path = path.as_os_str().to_string_lossy(); let range = range.start as u64..range.end as u64; + crate::STORAGE_METRICS.object_storage_get_total.inc(); + STORAGE_METRICS + .object_storage_download_num_bytes + .inc_by(range.end - range.start as u64); let storage_content = self.op.read_with(&path).range(range).await?; Ok(OwnedBytes::new(storage_content)) @@ -114,6 +129,10 @@ impl Storage for OpendalStorage { ) -> StorageResult> { let path = path.as_os_str().to_string_lossy(); let range = range.start as u64..range.end as u64; + crate::STORAGE_METRICS.object_storage_get_total.inc(); + STORAGE_METRICS + .object_storage_download_num_bytes + .inc_by(range.end - range.start as u64); let storage_reader = self.op.reader_with(&path).range(range).await?; Ok(Box::new(storage_reader)) @@ -121,7 +140,11 @@ impl Storage for OpendalStorage { async fn get_all(&self, path: &Path) -> StorageResult { let path = path.as_os_str().to_string_lossy(); + crate::STORAGE_METRICS.object_storage_get_total.inc(); let storage_content = self.op.read(&path).await?; + STORAGE_METRICS + .object_storage_download_num_bytes + .inc_by(storage_content.len() as u64); Ok(OwnedBytes::new(storage_content)) }