Skip to content

Commit

Permalink
refactor(core)!: Make oio::Write always write all given buffer (#4880)
Browse files Browse the repository at this point in the history
* Remove returning n in write

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* Fix tests

Signed-off-by: Xuanwo <[email protected]>

* Fix write

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Jul 12, 2024
1 parent 740928e commit 6e171fd
Show file tree
Hide file tree
Showing 39 changed files with 221 additions and 298 deletions.
4 changes: 2 additions & 2 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for AsyncBacktraceWrapper<R> {

impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
#[async_backtrace::framed]
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}

Expand All @@ -185,7 +185,7 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for AsyncBacktraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for AwaitTreeWrapper<R> {
}

impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.write(bs)
.instrument_await(format!("opendal::{}", WriteOperation::Write.into_static()))
Expand All @@ -211,7 +211,7 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
}

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.handle.block_on(self.inner.write(bs))
}

Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down Expand Up @@ -689,13 +689,12 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
let n = w.write(bs)?;

Ok(n)
w.write(bs)
}

fn close(&mut self) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
}

impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}

Expand All @@ -276,7 +276,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,15 +379,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
}

impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
.write(bs)
.await
.map(|n| {
probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n);
n
.map(|_| {
probe_lazy!(opendal, writer_write_ok, c_path.as_ptr());
})
.map_err(|err| {
probe_lazy!(opendal, writer_write_error, c_path.as_ptr());
Expand Down Expand Up @@ -427,14 +426,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
self.inner
.write(bs)
.map(|n| {
probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr(), n);
n
.map(|_| {
probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr());
})
.map_err(|err| {
probe_lazy!(opendal, blocking_writer_write_error, c_path.as_ptr());
Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,13 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
}

impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
self.inner
.write(bs)
.await
.map(|n| {
self.processed += n as u64;
n
.map(|_| {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::Write)
Expand Down Expand Up @@ -423,13 +422,12 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}

impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
self.inner
.write(bs)
.map(|n| {
self.processed += n as u64;
n
.map(|_| {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
Expand Down
25 changes: 11 additions & 14 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,21 +1072,20 @@ impl<W> LoggingWriter<W> {
}

impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
match self.inner.write(bs.clone()).await {
Ok(n) => {
self.written += n as u64;
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
match self.inner.write(bs).await {
Ok(_) => {
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> input data {}B, write {}B",
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::Write,
self.path,
self.written,
bs.len(),
n,
size,
);
Ok(n)
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand Down Expand Up @@ -1170,21 +1169,19 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}

impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
match self.inner.write(bs.clone()) {
Ok(n) => {
self.written += n as u64;
Ok(_) => {
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> input data {}B, write {}B",
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::BlockingWrite,
self.path,
self.written,
bs.len(),
n
);
Ok(n)
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand Down
17 changes: 9 additions & 8 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,17 +785,17 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for MetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let start = Instant::now();
let size = bs.len();

self.inner
.write(bs)
.await
.map(|n| {
self.bytes_counter.increment(n as u64);
.map(|_| {
self.bytes_counter.increment(size as u64);
self.requests_duration_seconds
.record(start.elapsed().as_secs_f64());
n
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
Expand All @@ -819,12 +819,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();

self.inner
.write(bs)
.map(|n| {
self.bytes_counter.increment(n as u64);
n
.map(|_| {
self.bytes_counter.increment(size as u64);
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
self.inner.write(bs)
Expand All @@ -326,7 +326,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner.write(bs)
}

Expand All @@ -298,7 +298,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
20 changes: 12 additions & 8 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();

let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Write.into_static(),
Expand All @@ -758,12 +760,12 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
timer.observe_duration();

match res {
Ok(n) => {
Ok(_) => {
self.stats
.bytes_total
.with_label_values(&labels)
.observe(n as f64);
Ok(n)
.observe(size as f64);
Ok(())
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Expand Down Expand Up @@ -822,7 +824,9 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();

let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingWrite.into_static(),
Expand All @@ -838,12 +842,12 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
timer.observe_duration();

match res {
Ok(n) => {
Ok(_) => {
self.stats
.bytes_total
.with_label_values(&labels)
.observe(n as f64);
Ok(n)
.observe(size as f64);
Ok(())
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Expand Down
Loading

0 comments on commit 6e171fd

Please sign in to comment.