Skip to content

Commit

Permalink
Workaround for s3 issue (#36)
Browse files Browse the repository at this point in the history
* Workaround for s3 issue

rfs can upload/download multiple files in parallel but it seems
this s3 client can't handle this correctly so random issues appear
during upload

this trick is to force the store to do synched read/write but the
problem is it hurts pefromance badly.

Instead we should find a better client since this issue is too old
(2018)

https://github.com/awslabs/aws-sdk-rust/tree/main/sdk/s3

* Luckily updating the client to latest RC fixed the issue
  • Loading branch information
muhamadazmy authored Oct 27, 2023
1 parent ecccba9 commit 3ad935b
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 93 deletions.
141 changes: 76 additions & 65 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ simple_logger = {version = "1.0.1", optional = true}
daemonize = { version = "0.5", optional = true }
tempfile = { version = "3.3.0", optional = true }
workers = { git="https://github.com/threefoldtech/tokio-worker-pool.git" }
rust-s3 = "0.33"
rust-s3 = "0.34.0-rc3"
openssl = { version = "0.10", features = ["vendored"] }
regex = "1.9.6"

Expand Down
9 changes: 7 additions & 2 deletions src/fungi/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::{

use sqlx::{sqlite::SqliteRow, FromRow, Row, SqlitePool};

use crate::store;

const ID_LEN: usize = 32;
const KEY_LEN: usize = 32;
const TYPE_MASK: u32 = nix::libc::S_IFMT;
Expand Down Expand Up @@ -41,7 +43,7 @@ static SCHEMA: &str = include_str!("../../schema/schema.sql");

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("failed to execute query: {0:#}")]
#[error("failed to execute query: {0}")]
SqlError(#[from] sqlx::Error),

#[error("invalid hash length")]
Expand All @@ -53,7 +55,10 @@ pub enum Error {
#[error("io error: {0:#}")]
IO(#[from] std::io::Error),

#[error("{0:#}")]
#[error("store error: {0}")]
Store(#[from] store::Error),

#[error("unknown meta error: {0}")]
Anyhow(#[from] anyhow::Error),
}

Expand Down
14 changes: 7 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ struct PackOptions {
#[clap(short, long, action=ArgAction::Append)]
store: Vec<String>,

/// no_strip_password strips password from store url, otherwise password will be stored in the fl and then shipped.
/// Some stores like ZDB has a public namespace which means writing requires a password
#[clap(long, default_value_t = false)]
/// no_strip_password strips password from store url, otherwise password will be stored in the fl and then shipped.
/// Some stores like ZDB has a public namespace which means writing requires a password
#[clap(long, default_value_t = false)]
no_strip_password: bool,

/// target directory to upload
Expand Down Expand Up @@ -168,11 +168,11 @@ fn mount(opts: MountOptions) -> Result<()> {
}

match daemon.execute() {
daemonize::Outcome::Parent(Ok(_)) => {
daemonize::Outcome::Parent(result) => {
result.context("daemonize")?;
wait_child(target, pid_file);
return Ok(());
}
daemonize::Outcome::Parent(Err(err)) => anyhow::bail!("failed to daemonize: {}", err),
_ => {}
}
}
Expand Down Expand Up @@ -203,11 +203,11 @@ fn wait_child(target: String, mut pid_file: tempfile::NamedTempFile) {
}
let mut buf = String::new();
if let Err(e) = pid_file.read_to_string(&mut buf) {
error!("failed to read pid_file: {}", e);
error!("failed to read pid_file: {:#}", e);
}
let pid = buf.parse::<i32>();
match pid {
Err(e) => error!("failed to parse pid_file contents {}: {}", buf, e),
Err(e) => error!("failed to parse pid_file contents {}: {:#}", buf, e),
Ok(v) => {
let _ = signal::kill(Pid::from_raw(v), Signal::SIGTERM);
} // probably the child exited on its own
Expand Down
7 changes: 2 additions & 5 deletions src/pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,7 @@ where
}

// write block to remote store
let block = self
.store
.set(&self.buffer[..size])
.await
.context("failed to store blob")?;
let block = self.store.set(&self.buffer[..size]).await?;

// write block info to meta
self.writer.block(ino, &block.id, &block.key).await?;
Expand All @@ -233,6 +229,7 @@ where
type Output = ();

async fn run(&mut self, (ino, path): Self::Input) -> Self::Output {
log::info!("uploading {:?}", path);
if let Err(err) = self.upload(ino, &path).await {
log::error!("failed to upload file ({:?}): {:#}", path, err);
}
Expand Down
Loading

0 comments on commit 3ad935b

Please sign in to comment.