Skip to content

Commit

Permalink
fix(services/fs,hdfs): fix poll_close when retry
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Feb 4, 2024
1 parent 7d94296 commit d4689a0
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 20 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/test_edge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ on:
- ".github/workflows/edge_test.yml"

jobs:
test_file_close_with_retry_on_full_disk:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Create disk image
run: |
fallocate -l 512K disk.img
mkfs disk.img
- name: Mount disk image
run: |
mkdir /tmp/test_dir
sudo mount -o loop disk.img /tmp/test_dir
- name: Set permissions
run: sudo chmod a+wr /tmp/test_dir

- name: Test
working-directory: core/edge/file_close_with_retry_on_full_disk
run: cargo run
env:
OPENDAL_FS_ROOT: /tmp/test_dir
test_file_write_on_full_disk:
runs-on: ubuntu-latest

Expand Down
30 changes: 30 additions & 0 deletions core/edge/file_close_with_retry_on_full_disk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
edition = "2021"
name = "edge_file_close_with_retry_on_full_disk"
publish = false
version = "0.0.0"

license.workspace = true

[dependencies]
futures = "0.3"
opendal = { workspace = true }
rand = "0.8"
tokio = { version = "1", features = ["full"] }
14 changes: 14 additions & 0 deletions core/edge/file_close_with_retry_on_full_disk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# File Close with Retry on Full Disk

Reported by [fs::Writer::poll_close can't be retried multiple times when error occurs](https://github.com/apache/opendal/issues/4058).

Setup:

```shell
fallocate -l 512K disk.img
mkfs disk.img
mkdir ./td
sudo mount -o loop td.img ./td
chmod a+wr ./td
```

47 changes: 47 additions & 0 deletions core/edge/file_close_with_retry_on_full_disk/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::env;

use opendal::services::Fs;
use opendal::Operator;
use opendal::Result;
use rand::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
let mut builder = Fs::default();
builder.root(&env::var("OPENDAL_FS_ROOT").expect("root must be set for this test"));
let op = Operator::new(builder)?
.layer(RetryLayer::new().with_max_times(3))
.finish();

let size = thread_rng().gen_range(512 * 1024 + 1..4 * 1024 * 1024);
let mut bs = vec![0; size];
thread_rng().fill_bytes(&mut bs);

let mut w = op.writer("/test").await?;
w.write(bs).await?;
let result = w.close().await;
// Write file with size > 512KB should fail
assert!(
result.is_err(),
"write file on full disk should return error"
);

Ok(())
}
51 changes: 37 additions & 14 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct FsWriter<F> {
tmp_path: Option<PathBuf>,

f: Option<F>,
fut: Option<BoxFuture<'static, Result<()>>>,
fut: Option<BoxFuture<'static, (F, Result<()>)>>,
}

impl<F> FsWriter<F> {
Expand Down Expand Up @@ -69,23 +69,35 @@ impl oio::Write for FsWriter<tokio::fs::File> {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
if let Err(e) = res.1 {
self.f = Some(res.0);
return Poll::Ready(Err(e));
}
return Poll::Ready(Ok(()));
}

let mut f = self.f.take().expect("FsWriter must be initialized");
let tmp_path = self.tmp_path.clone();
let target_path = self.target_path.clone();
self.fut = Some(Box::pin(async move {
f.flush().await.map_err(new_std_io_error)?;
f.sync_all().await.map_err(new_std_io_error)?;
if let Err(e) = f.flush().await.map_err(new_std_io_error) {
// Reserve the original error for retry.
return (f, Err(e));
}
if let Err(e) = f.sync_all().await.map_err(new_std_io_error) {
return (f, Err(e));
}

if let Some(tmp_path) = &tmp_path {
tokio::fs::rename(tmp_path, &target_path)
if let Err(e) = tokio::fs::rename(tmp_path, &target_path)
.await
.map_err(new_std_io_error)?;
.map_err(new_std_io_error)
{
return (f, Err(e));
}
}

Ok(())
(f, Ok(()))
}));
}
}
Expand All @@ -95,21 +107,32 @@ impl oio::Write for FsWriter<tokio::fs::File> {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
if let Err(e) = res.1 {
self.f = Some(res.0);
return Poll::Ready(Err(e));
}
return Poll::Ready(Ok(()));
}

let _ = self.f.take().expect("FsWriter must be initialized");
let f = self.f.take().expect("FsWriter must be initialized");
let tmp_path = self.tmp_path.clone();
self.fut = Some(Box::pin(async move {
if let Some(tmp_path) = &tmp_path {
tokio::fs::remove_file(tmp_path)
if let Err(e) = tokio::fs::remove_file(tmp_path)
.await
.map_err(new_std_io_error)
{
return (f, Err(e));
}
(f, Ok(()))
} else {
Err(Error::new(
ErrorKind::Unsupported,
"Fs doesn't support abort if atomic_write_dir is not set",
))
(
f,
Err(Error::new(
ErrorKind::Unsupported,
"Fs doesn't support abort if atomic_write_dir is not set",
)),
)
}
}));
}
Expand Down
22 changes: 16 additions & 6 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct HdfsWriter<F> {
tmp_path: Option<String>,
f: Option<F>,
client: Arc<hdrs::Client>,
fut: Option<BoxFuture<'static, Result<()>>>,
fut: Option<BoxFuture<'static, (F, Result<()>)>>,
}

/// # Safety
Expand Down Expand Up @@ -76,7 +76,11 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
if let Err(e) = res.1 {
self.f = Some(res.0);
return Poll::Ready(Err(e));
}
return Poll::Ready(Ok(()));
}

let mut f = self.f.take().expect("HdfsWriter must be initialized");
Expand All @@ -86,15 +90,21 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
let client = self.client.clone();

self.fut = Some(Box::pin(async move {
f.close().await.map_err(new_std_io_error)?;
if let Err(e) = f.close().await.map_err(new_std_io_error) {
// Reserve the original error for retry.
return (f, Err(e));
}

if let Some(tmp_path) = tmp_path {
client
if let Err(e) = client
.rename_file(&tmp_path, &target_path)
.map_err(new_std_io_error)?;
.map_err(new_std_io_error)
{
return (f, Err(e));
}
}

Ok(())
(f, Ok(()))
}));
}
}
Expand Down

0 comments on commit d4689a0

Please sign in to comment.