From 6e171fddeef675abc65b135747ee17d093781035 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 12 Jul 2024 13:48:48 +0800 Subject: [PATCH] refactor(core)!: Make oio::Write always write all given buffer (#4880) * Remove returning n in write Signed-off-by: Xuanwo * Fix build Signed-off-by: Xuanwo * Fix tests Signed-off-by: Xuanwo * Fix write Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- core/src/layers/async_backtrace.rs | 4 +- core/src/layers/await_tree.rs | 4 +- core/src/layers/blocking.rs | 2 +- core/src/layers/complete.rs | 7 +- core/src/layers/concurrent_limit.rs | 4 +- core/src/layers/dtrace.rs | 14 +-- core/src/layers/error_context.rs | 14 +-- core/src/layers/logging.rs | 25 ++-- core/src/layers/metrics.rs | 17 +-- core/src/layers/minitrace.rs | 4 +- core/src/layers/oteltrace.rs | 4 +- core/src/layers/prometheus.rs | 20 +-- core/src/layers/prometheus_client.rs | 16 +-- core/src/layers/retry.rs | 8 +- core/src/layers/throttle.rs | 4 +- core/src/layers/timeout.rs | 2 +- core/src/layers/tracing.rs | 4 +- core/src/raw/adapters/kv/backend.rs | 10 +- core/src/raw/adapters/typed_kv/backend.rs | 10 +- core/src/raw/enum_utils.rs | 4 +- core/src/raw/oio/write/api.rs | 30 ++--- core/src/raw/oio/write/append_write.rs | 8 +- core/src/raw/oio/write/block_write.rs | 10 +- core/src/raw/oio/write/multipart_write.rs | 10 +- core/src/raw/oio/write/one_shot_write.rs | 5 +- core/src/raw/oio/write/position_write.rs | 10 +- core/src/raw/oio/write/range_write.rs | 10 +- core/src/services/aliyun_drive/writer.rs | 6 +- core/src/services/alluxio/writer.rs | 7 +- core/src/services/compfs/writer.rs | 24 ++-- core/src/services/fs/writer.rs | 19 ++- core/src/services/ftp/writer.rs | 25 ++-- core/src/services/ghac/writer.rs | 4 +- core/src/services/hdfs/writer.rs | 18 ++- core/src/services/hdfs_native/writer.rs | 2 +- core/src/services/sftp/writer.rs | 13 +- core/src/types/blocking_write/std_writer.rs | 5 +- core/src/types/context/write.rs | 127 ++++---------------- core/src/types/write/writer.rs | 9 +- 39 files changed, 221 insertions(+), 298 deletions(-) diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index 6d77591ca00..290171c29c6 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -169,7 +169,7 @@ impl oio::BlockingRead for AsyncBacktraceWrapper { impl oio::Write for AsyncBacktraceWrapper { #[async_backtrace::framed] - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs).await } @@ -185,7 +185,7 @@ impl oio::Write for AsyncBacktraceWrapper { } impl oio::BlockingWrite for AsyncBacktraceWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 7bb20d42fb4..58fd73e8fe4 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -191,7 +191,7 @@ impl oio::BlockingRead for AwaitTreeWrapper { } impl oio::Write for AwaitTreeWrapper { - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { self.inner .write(bs) .instrument_await(format!("opendal::{}", WriteOperation::Write.into_static())) @@ -211,7 +211,7 @@ impl oio::Write for AwaitTreeWrapper { } impl oio::BlockingWrite for AwaitTreeWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 70830981c83..7293ced965a 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -288,7 +288,7 @@ impl oio::BlockingRead for BlockingWrapper { } impl oio::BlockingWrite for BlockingWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.handle.block_on(self.inner.write(bs)) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index c2156a0418e..68b0340aa71 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -654,7 +654,7 @@ impl oio::Write for CompleteWriter where W: oio::Write, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; @@ -689,13 +689,12 @@ impl oio::BlockingWrite for CompleteWriter where W: oio::BlockingWrite, { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - let n = w.write(bs)?; - Ok(n) + w.write(bs) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index a1a61ad01dc..87ad19b50c9 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -262,7 +262,7 @@ impl oio::BlockingRead for ConcurrentLimitWrapper { } impl oio::Write for ConcurrentLimitWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs).await } @@ -276,7 +276,7 @@ impl oio::Write for ConcurrentLimitWrapper { } impl oio::BlockingWrite for ConcurrentLimitWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs index c71277ed9a6..51b001313b5 100644 --- a/core/src/layers/dtrace.rs +++ b/core/src/layers/dtrace.rs @@ -379,15 +379,14 @@ impl oio::BlockingRead for DtraceLayerWrapper { } impl oio::Write for DtraceLayerWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, writer_write_start, c_path.as_ptr()); self.inner .write(bs) .await - .map(|n| { - probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n); - n + .map(|_| { + probe_lazy!(opendal, writer_write_ok, c_path.as_ptr()); }) .map_err(|err| { probe_lazy!(opendal, writer_write_error, c_path.as_ptr()); @@ -427,14 +426,13 @@ impl oio::Write for DtraceLayerWrapper { } impl oio::BlockingWrite for DtraceLayerWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr()); self.inner .write(bs) - .map(|n| { - probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr(), n); - n + .map(|_| { + probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr()); }) .map_err(|err| { probe_lazy!(opendal, blocking_writer_write_error, c_path.as_ptr()); diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 86ae9dba80c..cabe84b0538 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -385,14 +385,13 @@ impl oio::BlockingRead for ErrorContextWrapper { } impl oio::Write for ErrorContextWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); self.inner .write(bs) .await - .map(|n| { - self.processed += n as u64; - n + .map(|_| { + self.processed += size as u64; }) .map_err(|err| { err.with_operation(WriteOperation::Write) @@ -423,13 +422,12 @@ impl oio::Write for ErrorContextWrapper { } impl oio::BlockingWrite for ErrorContextWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); self.inner .write(bs) - .map(|n| { - self.processed += n as u64; - n + .map(|_| { + self.processed += size as u64; }) .map_err(|err| { err.with_operation(WriteOperation::BlockingWrite) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 1e0d80d2647..507745c6d14 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1072,21 +1072,20 @@ impl LoggingWriter { } impl oio::Write for LoggingWriter { - async fn write(&mut self, bs: Buffer) -> Result { - match self.inner.write(bs.clone()).await { - Ok(n) => { - self.written += n as u64; + async fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + match self.inner.write(bs).await { + Ok(_) => { trace!( target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> input data {}B, write {}B", + "service={} operation={} path={} written={}B -> data write {}B", self.ctx.scheme, WriteOperation::Write, self.path, self.written, - bs.len(), - n, + size, ); - Ok(n) + Ok(()) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1170,21 +1169,19 @@ impl oio::Write for LoggingWriter { } impl oio::BlockingWrite for LoggingWriter { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { match self.inner.write(bs.clone()) { - Ok(n) => { - self.written += n as u64; + Ok(_) => { trace!( target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> input data {}B, write {}B", + "service={} operation={} path={} written={}B -> data write {}B", self.ctx.scheme, WriteOperation::BlockingWrite, self.path, self.written, bs.len(), - n ); - Ok(n) + Ok(()) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 47d7a8a936c..decbad81a8d 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -785,17 +785,17 @@ impl oio::BlockingRead for MetricWrapper { } impl oio::Write for MetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) .await - .map(|n| { - self.bytes_counter.increment(n as u64); + .map(|_| { + self.bytes_counter.increment(size as u64); self.requests_duration_seconds .record(start.elapsed().as_secs_f64()); - n }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); @@ -819,12 +819,13 @@ impl oio::Write for MetricWrapper { } impl oio::BlockingWrite for MetricWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + self.inner .write(bs) - .map(|n| { - self.bytes_counter.increment(n as u64); - n + .map(|_| { + self.bytes_counter.increment(size as u64); }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 983d18b6b8f..bca4a2e584d 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -306,7 +306,7 @@ impl oio::BlockingRead for MinitraceWrapper { } impl oio::Write for MinitraceWrapper { - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static()); self.inner.write(bs) @@ -326,7 +326,7 @@ impl oio::Write for MinitraceWrapper { } impl oio::BlockingWrite for MinitraceWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static()); self.inner.write(bs) diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 6fb5d582ba1..b04ad22fddb 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -284,7 +284,7 @@ impl oio::BlockingRead for OtelTraceWrapper { } impl oio::Write for OtelTraceWrapper { - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { self.inner.write(bs) } @@ -298,7 +298,7 @@ impl oio::Write for OtelTraceWrapper { } impl oio::BlockingWrite for OtelTraceWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index ee740d55c42..a4359ead726 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -742,7 +742,9 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + let labels = self.stats.generate_metric_label( self.scheme.into_static(), WriteOperation::Write.into_static(), @@ -758,12 +760,12 @@ impl oio::Write for PrometheusMetricWrapper { timer.observe_duration(); match res { - Ok(n) => { + Ok(_) => { self.stats .bytes_total .with_label_values(&labels) - .observe(n as f64); - Ok(n) + .observe(size as f64); + Ok(()) } Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); @@ -822,7 +824,9 @@ impl oio::Write for PrometheusMetricWrapper { } impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + let labels = self.stats.generate_metric_label( self.scheme.into_static(), Operation::BlockingWrite.into_static(), @@ -838,12 +842,12 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { timer.observe_duration(); match res { - Ok(n) => { + Ok(_) => { self.stats .bytes_total .with_label_values(&labels) - .observe(n as f64); - Ok(n) + .observe(size as f64); + Ok(()) } Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 463e4092750..58fef7e8945 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -627,24 +627,24 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) .await - .map(|n| { + .map(|_| { self.metrics.observe_bytes_total( self.scheme, WriteOperation::Write.into_static(), - n, + size, ); self.metrics.observe_request_duration( self.scheme, WriteOperation::Write.into_static(), start.elapsed(), ); - n }) .map_err(|err| { self.metrics.increment_errors_total( @@ -704,23 +704,23 @@ impl oio::Write for PrometheusMetricWrapper { } impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) - .map(|n| { + .map(|_| { self.metrics.observe_bytes_total( self.scheme, WriteOperation::BlockingWrite.into_static(), - n, + size, ); self.metrics.observe_request_duration( self.scheme, WriteOperation::BlockingWrite.into_static(), start.elapsed(), ); - n }) .map_err(|err| { self.metrics.increment_errors_total( diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index c48307fdbc6..4d42098394a 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -626,7 +626,7 @@ impl oio::BlockingRead for RetryWrapp } impl oio::Write for RetryWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { use backon::RetryableWithContext; let inner = self.take_inner()?; @@ -694,7 +694,7 @@ impl oio::Write for RetryWrapper { } impl oio::BlockingWrite for RetryWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { { || self.inner.as_mut().unwrap().write(bs.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) @@ -938,8 +938,8 @@ mod tests { struct MockWriter {} impl oio::Write for MockWriter { - async fn write(&mut self, bs: Buffer) -> Result { - Ok(bs.len()) + async fn write(&mut self, _: Buffer) -> Result<()> { + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 3b66e327f39..f73f33d3caa 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -191,7 +191,7 @@ impl oio::BlockingRead for ThrottleWrapper { } impl oio::Write for ThrottleWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { @@ -226,7 +226,7 @@ impl oio::Write for ThrottleWrapper { } impl oio::BlockingWrite for ThrottleWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 246049dfbf2..1cbc0c5ac10 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -350,7 +350,7 @@ impl oio::Read for TimeoutWrapper { } impl oio::Write for TimeoutWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let fut = self.inner.write(bs); Self::io_timeout(self.timeout, WriteOperation::Write.into_static(), fut).await } diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 4ba829a6ec7..4a2dc4bc006 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -286,7 +286,7 @@ impl oio::Write for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { self.inner.write(bs) } @@ -312,7 +312,7 @@ impl oio::BlockingWrite for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index bb08e4cb19f..625e7ea9829 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -242,10 +242,9 @@ impl KvWriter { unsafe impl Sync for KvWriter {} impl oio::Write for KvWriter { - async fn write(&mut self, bs: Buffer) -> Result { - let ret = bs.len(); + async fn write(&mut self, bs: Buffer) -> Result<()> { self.buffer.push(bs); - Ok(ret) + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -260,10 +259,9 @@ impl oio::Write for KvWriter { } impl oio::BlockingWrite for KvWriter { - fn write(&mut self, bs: Buffer) -> Result { - let ret = bs.len(); + fn write(&mut self, bs: Buffer) -> Result<()> { self.buffer.push(bs); - Ok(ret) + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index ecce2eb8792..fd6271691b1 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -275,12 +275,11 @@ impl KvWriter { } impl oio::Write for KvWriter { - async fn write(&mut self, bs: Buffer) -> Result { - let size = bs.len(); + async fn write(&mut self, bs: Buffer) -> Result<()> { let mut buf = self.buf.take().unwrap_or_default(); buf.push(bs); self.buf = Some(buf); - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -303,12 +302,11 @@ impl oio::Write for KvWriter { } impl oio::BlockingWrite for KvWriter { - fn write(&mut self, bs: Buffer) -> Result { - let size = bs.len(); + fn write(&mut self, bs: Buffer) -> Result<()> { let mut buf = self.buf.take().unwrap_or_default(); buf.push(bs); self.buf = Some(buf); - Ok(size) + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index c22411904dc..111da78be09 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -70,7 +70,7 @@ impl oio::BlockingRead for TwoWa } impl oio::Write for TwoWays { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { match self { Self::One(v) => v.write(bs).await, Self::Two(v) => v.write(bs).await, @@ -129,7 +129,7 @@ impl o impl oio::Write for ThreeWays { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { match self { Self::One(v) => v.write(bs).await, Self::Two(v) => v.write(bs).await, diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index e6c7c05918e..4ec53adab67 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -77,31 +77,19 @@ pub trait Write: Unpin + Send + Sync { /// /// # Behavior /// - /// - `Ok(n)` means `n` bytes has been written successfully. + /// - `Ok(())` means all bytes has been written successfully. /// - `Err(err)` means error happens and no bytes has been written. - /// - /// It's possible that `n < bs.len()`, caller should pass the remaining bytes - /// repeatedly until all bytes has been written. - #[cfg(not(target_arch = "wasm32"))] - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn write(&mut self, bs: Buffer) -> impl Future>; + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend; /// Close the writer and make sure all data has been flushed. - #[cfg(not(target_arch = "wasm32"))] fn close(&mut self) -> impl Future> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn close(&mut self) -> impl Future>; /// Abort the pending writer. - #[cfg(not(target_arch = "wasm32"))] fn abort(&mut self) -> impl Future> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn abort(&mut self) -> impl Future>; } impl Write for () { - async fn write(&mut self, _: Buffer) -> Result { + async fn write(&mut self, _: Buffer) -> Result<()> { unimplemented!("write is required to be implemented for oio::Write") } @@ -121,7 +109,7 @@ impl Write for () { } pub trait WriteDyn: Unpin + Send + Sync { - fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture>; + fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture>; fn close_dyn(&mut self) -> BoxedFuture>; @@ -129,7 +117,7 @@ pub trait WriteDyn: Unpin + Send + Sync { } impl WriteDyn for T { - fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture> { + fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture> { Box::pin(self.write(bs)) } @@ -143,7 +131,7 @@ impl WriteDyn for T { } impl Write for Box { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.deref_mut().write_dyn(bs).await } @@ -170,14 +158,14 @@ pub trait BlockingWrite: Send + Sync + 'static { /// /// It's possible that `n < bs.len()`, caller should pass the remaining bytes /// repeatedly until all bytes has been written. - fn write(&mut self, bs: Buffer) -> Result; + fn write(&mut self, bs: Buffer) -> Result<()>; /// Close the writer and make sure all data has been flushed. fn close(&mut self) -> Result<()>; } impl BlockingWrite for () { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let _ = bs; unimplemented!("write is required to be implemented for oio::BlockingWrite") @@ -195,7 +183,7 @@ impl BlockingWrite for () { /// /// To make BlockingWriter work as expected, we must add this impl. impl BlockingWrite for Box { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { (**self).write(bs) } diff --git a/core/src/raw/oio/write/append_write.rs b/core/src/raw/oio/write/append_write.rs index 2f48b683079..06c72cc5e2c 100644 --- a/core/src/raw/oio/write/append_write.rs +++ b/core/src/raw/oio/write/append_write.rs @@ -80,7 +80,7 @@ impl oio::Write for AppendWriter where W: AppendWrite, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let offset = match self.offset { Some(offset) => offset, None => { @@ -91,12 +91,10 @@ where }; let size = bs.len(); - self.inner - .append(offset, size as u64, Buffer::from(bs.to_bytes())) - .await?; + self.inner.append(offset, size as u64, bs).await?; // Update offset after succeed. self.offset = Some(offset + size as u64); - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 99c76562ca8..cd0ec43b45a 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -162,10 +162,10 @@ impl oio::Write for BlockWriter where W: BlockWrite, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { if !self.started && self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } // The block upload process has been started. @@ -181,8 +181,8 @@ where }) .await?; self.cache = None; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs index 0d893d7cb32..44a33c7a4ba 100644 --- a/core/src/raw/oio/write/multipart_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -203,14 +203,14 @@ impl oio::Write for MultipartWriter where W: MultipartWrite, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let upload_id = match self.upload_id.clone() { Some(v) => v, None => { // Fill cache with the first write. if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } let upload_id = self.w.initiate_part().await?; @@ -234,8 +234,8 @@ where .await?; self.cache = None; self.next_part_number += 1; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index cd056c14614..938973c33a7 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -50,16 +50,15 @@ impl OneShotWriter { } impl oio::Write for OneShotWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { match &self.buffer { Some(_) => Err(Error::new( ErrorKind::Unsupported, "OneShotWriter doesn't support multiple write", )), None => { - let size = bs.len(); self.buffer = Some(bs); - Ok(size) + Ok(()) } } } diff --git a/core/src/raw/oio/write/position_write.rs b/core/src/raw/oio/write/position_write.rs index 3dbf5c93ef2..5aa5ff32943 100644 --- a/core/src/raw/oio/write/position_write.rs +++ b/core/src/raw/oio/write/position_write.rs @@ -124,10 +124,10 @@ impl PositionWriter { } impl oio::Write for PositionWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + let _ = self.fill_cache(bs); + return Ok(()); } let bytes = self.cache.clone().expect("pending write must exist"); @@ -144,8 +144,8 @@ impl oio::Write for PositionWriter { .await?; self.cache = None; self.next_offset += length; - let size = self.fill_cache(bs); - Ok(size) + let _ = self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 67ae619dd92..f44f06ad9cb 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -155,14 +155,14 @@ impl RangeWriter { } impl oio::Write for RangeWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let location = match self.location.clone() { Some(location) => location, None => { // Fill cache with the first write. if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } let location = self.w.initiate_range().await?; @@ -187,8 +187,8 @@ impl oio::Write for RangeWriter { .await?; self.cache = None; self.next_offset += length; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/aliyun_drive/writer.rs b/core/src/services/aliyun_drive/writer.rs index 30ca2ef94e2..461764541fe 100644 --- a/core/src/services/aliyun_drive/writer.rs +++ b/core/src/services/aliyun_drive/writer.rs @@ -51,7 +51,7 @@ impl AliyunDriveWriter { } impl oio::Write for AliyunDriveWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) { (Some(upload_id), Some(file_id)) => (upload_id, file_id), _ => { @@ -94,8 +94,6 @@ impl oio::Write for AliyunDriveWriter { return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); }; - let size = bs.len(); - if let Err(err) = self.core.upload(upload_url, bs).await { if err.kind() != ErrorKind::AlreadyExists { return Err(err); @@ -104,7 +102,7 @@ impl oio::Write for AliyunDriveWriter { self.part_number += 1; - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/alluxio/writer.rs b/core/src/services/alluxio/writer.rs index e5ca807b609..f452b6b10bb 100644 --- a/core/src/services/alluxio/writer.rs +++ b/core/src/services/alluxio/writer.rs @@ -43,7 +43,7 @@ impl AlluxioWriter { } impl oio::Write for AlluxioWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let stream_id = match self.stream_id { Some(stream_id) => stream_id, None => { @@ -52,9 +52,8 @@ impl oio::Write for AlluxioWriter { stream_id } }; - self.core - .write(stream_id, Buffer::from(bs.to_bytes())) - .await + self.core.write(stream_id, bs).await?; + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/compfs/writer.rs b/core/src/services/compfs/writer.rs index 2e12ea2dc77..749cab10ffc 100644 --- a/core/src/services/compfs/writer.rs +++ b/core/src/services/compfs/writer.rs @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{io::Cursor, sync::Arc}; - -use compio::{buf::buf_try, fs::File, io::AsyncWrite}; - use super::core::CompfsCore; use crate::raw::*; use crate::*; +use compio::io::AsyncWriteExt; +use compio::{buf::buf_try, fs::File}; +use std::{io::Cursor, sync::Arc}; #[derive(Debug)] pub struct CompfsWriter { @@ -36,17 +35,22 @@ impl CompfsWriter { } impl oio::Write for CompfsWriter { - async fn write(&mut self, bs: Buffer) -> Result { + /// FIXME + /// + /// the write_all doesn't work correctly if `bs` is non-contiguous. + /// + /// The IoBuf::buf_len() only returns the length of the current buffer. + async fn write(&mut self, bs: Buffer) -> Result<()> { let mut file = self.file.clone(); - let n = self - .core + self.core .exec(move || async move { - let (n, _) = buf_try!(@try file.write(bs).await); - Ok(n) + buf_try!(@try file.write_all(bs).await); + Ok(()) }) .await?; - Ok(n) + + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index b9e84d736c7..d0d1bc66741 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -53,11 +53,15 @@ impl FsWriter { unsafe impl Sync for FsWriter {} impl oio::Write for FsWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("FsWriter must be initialized"); - // TODO: use write_vectored instead. - f.write(bs.chunk()).await.map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -88,10 +92,15 @@ impl oio::Write for FsWriter { } impl oio::BlockingWrite for FsWriter { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("FsWriter must be initialized"); - f.write(bs.chunk()).map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index ec21609c186..f7f6864dd9c 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,7 +53,7 @@ impl FtpWriter { } impl oio::Write for FtpWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let path = if let Some(tmp_path) = &self.tmp_path { tmp_path } else { @@ -69,17 +69,20 @@ impl oio::Write for FtpWriter { )); } - let size = self - .data_stream - .as_mut() - .unwrap() - .write(bs.chunk()) - .await - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) - })?; + while bs.has_remaining() { + let n = self + .data_stream + .as_mut() + .unwrap() + .write(bs.chunk()) + .await + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) + })?; + bs.advance(n); + } - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 00bce7fac34..3b58dd0fc2d 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -38,7 +38,7 @@ impl GhacWriter { } impl oio::Write for GhacWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); let offset = self.size; @@ -61,7 +61,7 @@ impl oio::Write for GhacWriter { } self.size += size as u64; - Ok(size) + Ok(()) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 96708c1ab1a..0f60014f410 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -53,10 +53,15 @@ impl HdfsWriter { } impl oio::Write for HdfsWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); - f.write(bs.chunk()).await.map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -82,9 +87,14 @@ impl oio::Write for HdfsWriter { } impl oio::BlockingWrite for HdfsWriter { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); - f.write(bs.chunk()).map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs index e6fb0205e4f..4cab45b3be4 100644 --- a/core/src/services/hdfs_native/writer.rs +++ b/core/src/services/hdfs_native/writer.rs @@ -31,7 +31,7 @@ impl HdfsNativeWriter { } impl oio::Write for HdfsNativeWriter { - async fn write(&mut self, _bs: Buffer) -> Result { + async fn write(&mut self, _bs: Buffer) -> Result<()> { todo!() } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 69bff86245c..9b86c789577 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -39,8 +39,17 @@ impl SftpWriter { } impl oio::Write for SftpWriter { - async fn write(&mut self, bs: Buffer) -> Result { - self.file.write(bs.chunk()).await.map_err(new_std_io_error) + async fn write(&mut self, mut bs: Buffer) -> Result<()> { + while bs.has_remaining() { + let n = self + .file + .write(bs.chunk()) + .await + .map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/types/blocking_write/std_writer.rs b/core/src/types/blocking_write/std_writer.rs index 5b18467e369..91accbde535 100644 --- a/core/src/types/blocking_write/std_writer.rs +++ b/core/src/types/blocking_write/std_writer.rs @@ -103,10 +103,9 @@ impl Write for StdWriter { return Ok(()); }; - let n = w - .write(Buffer::from(bs)) + w.write(Buffer::from(bs)) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - self.buf.advance(n); + self.buf.clean(); } } } diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index 557341da3dc..6e2464cfceb 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -18,7 +18,6 @@ use crate::raw::oio::Write; use crate::raw::*; use crate::*; -use bytes::Buf; use std::sync::Arc; /// WriteContext holds the immutable context for give write operation. @@ -136,7 +135,9 @@ impl WriteGenerator { /// Write the entire buffer into writer. pub async fn write(&mut self, mut bs: Buffer) -> Result { let Some(chunk_size) = self.chunk_size else { - return self.w.write_dyn(bs).await; + let size = bs.len(); + self.w.write_dyn(bs).await?; + return Ok(size); }; if self.buffer.len() + bs.len() < chunk_size { @@ -153,10 +154,8 @@ impl WriteGenerator { if !self.exact { let fill_size = bs.len(); self.buffer.push(bs); - let mut buf = self.buffer.take().collect(); - let written = self.w.write_dyn(buf.clone()).await?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; return Ok(fill_size); } @@ -167,10 +166,8 @@ impl WriteGenerator { // Action: // - write existing buffer in chunk_size to make more rooms for writing data. if self.buffer.len() >= chunk_size { - let mut buf = self.buffer.take().collect(); - let written = self.w.write_dyn(buf.clone()).await?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; } // Condition @@ -192,8 +189,8 @@ impl WriteGenerator { break; } - let written = self.w.write_dyn(self.buffer.clone().collect()).await?; - self.buffer.advance(written); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; } self.w.close().await @@ -225,7 +222,9 @@ impl WriteGenerator { /// Write the entire buffer into writer. pub fn write(&mut self, mut bs: Buffer) -> Result { let Some(chunk_size) = self.chunk_size else { - return self.w.write(bs); + let size = bs.len(); + self.w.write(bs)?; + return Ok(size); }; if self.buffer.len() + bs.len() < chunk_size { @@ -242,10 +241,8 @@ impl WriteGenerator { if !self.exact { let fill_size = bs.len(); self.buffer.push(bs); - let mut buf = self.buffer.take().collect(); - let written = self.w.write(buf.clone())?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; return Ok(fill_size); } @@ -256,10 +253,8 @@ impl WriteGenerator { // Action: // - write existing buffer in chunk_size to make more rooms for writing data. if self.buffer.len() >= chunk_size { - let mut buf = self.buffer.take().collect(); - let written = self.w.write(buf.clone())?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; } // Condition @@ -281,8 +276,8 @@ impl WriteGenerator { break; } - let written = self.w.write(self.buffer.clone().collect())?; - self.buffer.advance(written); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; } self.w.close() @@ -293,8 +288,8 @@ impl WriteGenerator { mod tests { use super::*; use crate::raw::oio::Write; - use bytes::Buf; use bytes::Bytes; + use bytes::{Buf, BufMut}; use log::debug; use pretty_assertions::assert_eq; use rand::thread_rng; @@ -309,13 +304,12 @@ mod tests { } impl Write for MockWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len()); - let chunk = bs.chunk(); let mut buf = self.buf.lock().await; - buf.extend_from_slice(chunk); - Ok(chunk.len()) + buf.put(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -478,83 +472,6 @@ mod tests { Ok(()) } - struct PartialWriter { - buf: Arc>>, - } - - impl Write for PartialWriter { - async fn write(&mut self, mut bs: Buffer) -> Result { - let mut buf = self.buf.lock().await; - - if Buffer::count(&bs) > 1 { - // Always leaves last buffer for non-contiguous buffer. - let mut written = 0; - while Buffer::count(&bs) > 1 { - let chunk = bs.chunk(); - buf.extend_from_slice(chunk); - written += chunk.len(); - bs.advance(chunk.len()); - } - Ok(written) - } else { - let chunk = bs.chunk(); - buf.extend_from_slice(chunk); - Ok(chunk.len()) - } - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_inexact_buf_writer_partial_send() -> Result<()> { - let _ = tracing_subscriber::fmt() - .pretty() - .with_test_writer() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let buf = Arc::new(Mutex::new(vec![])); - let mut w = WriteGenerator::new( - Box::new(PartialWriter { buf: buf.clone() }), - Some(10), - false, - ); - - let mut rng = thread_rng(); - let mut expected = vec![]; - - let mut new_content = |size| { - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - expected.extend_from_slice(&content); - Bytes::from(content) - }; - - // content < chunk size. - let content = new_content(5); - assert_eq!(5, w.write(content.into()).await?); - // Non-contiguous buffer. - let content = Buffer::from(vec![new_content(3), new_content(2)]); - assert_eq!(5, w.write(content).await?); - - w.close().await?; - - let buf = buf.lock().await; - assert_eq!(buf.len(), expected.len()); - assert_eq!( - format!("{:x}", Sha256::digest(&*buf)), - format!("{:x}", Sha256::digest(&expected)) - ); - Ok(()) - } - #[tokio::test] async fn test_fuzz_exact_buf_writer() -> Result<()> { let _ = tracing_subscriber::fmt() diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs index dc81d3d7164..114ec886aa0 100644 --- a/core/src/types/write/writer.rs +++ b/core/src/types/write/writer.rs @@ -141,6 +141,7 @@ impl Writer { let n = self.inner.write(bs.clone()).await?; bs.advance(n); } + Ok(()) } @@ -153,12 +154,8 @@ impl Writer { /// Optimize this function to avoid unnecessary copy. pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> { let mut bs = bs; - let mut bs = Buffer::from(bs.copy_to_bytes(bs.remaining())); - while !bs.is_empty() { - let n = self.inner.write(bs.clone()).await?; - bs.advance(n); - } - Ok(()) + let bs = Buffer::from(bs.copy_to_bytes(bs.remaining())); + self.write(bs).await } /// Abort the writer and clean up all written data.