Skip to content

Commit

Permalink
Close #27: Check ns space left before writing
Browse files Browse the repository at this point in the history
Close #4: Remove unhealthy nodes and retry config before writing

Signed-off-by: Lee Smet <[email protected]>
  • Loading branch information
LeeSmet committed Mar 16, 2021
1 parent b41b750 commit 5b303fb
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 52 deletions.
57 changes: 46 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub type ZstorResult<T> = Result<T, ZstorError>;
#[derive(Debug)]
pub struct ZstorError {
kind: ZstorErrorKind,
internal: Box<dyn std::error::Error + Send>,
internal: InternalError,
}

impl fmt::Display for ZstorError {
Expand All @@ -49,7 +49,10 @@ impl fmt::Display for ZstorError {

impl std::error::Error for ZstorError {
fn cause(&self) -> Option<&dyn std::error::Error> {
Some(&*self.internal)
match self.internal {
InternalError::Zdb(ref e) => Some(e),
InternalError::Other(ref e) => Some(e.as_ref()),
}
}
}

Expand All @@ -58,13 +61,45 @@ impl ZstorError {
pub fn new_io(msg: String, e: std::io::Error) -> Self {
ZstorError {
kind: ZstorErrorKind::LocalIO(msg),
internal: Box::new(e),
internal: InternalError::Other(Box::new(e)),
}
}

/// Create a new ZstorError from any kind, with the underlying error included
pub fn new(kind: ZstorErrorKind, internal: Box<dyn std::error::Error + Send>) -> Self {
ZstorError { kind, internal }
ZstorError {
kind,
internal: InternalError::Other(internal),
}
}

/// Return a reference to the embedded [`zstor_v2::zdb::ZdbError`], if this error is caused by
/// a ZdbError, or nothing otherwise.
pub fn zdb_error(&self) -> Option<&ZdbError> {
match self.internal {
InternalError::Zdb(ref e) => Some(e),
_ => None,
}
}
}

/// Wrapper error for the ZstorError
#[derive(Debug)]
enum InternalError {
Zdb(ZdbError),
Other(Box<dyn std::error::Error + Send>),
}

impl fmt::Display for InternalError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
InternalError::Zdb(ref e) => e as &dyn std::error::Error,
InternalError::Other(e) => e.as_ref(),
}
)
}
}

Expand Down Expand Up @@ -114,7 +149,7 @@ impl From<ZdbError> for ZstorError {
fn from(e: ZdbError) -> Self {
ZstorError {
kind: ZstorErrorKind::Storage,
internal: Box::new(e),
internal: InternalError::Zdb(e),
}
}
}
Expand All @@ -123,7 +158,7 @@ impl From<EtcdError> for ZstorError {
fn from(e: EtcdError) -> Self {
ZstorError {
kind: ZstorErrorKind::Metadata,
internal: Box::new(e),
internal: InternalError::Other(Box::new(e)),
}
}
}
Expand All @@ -132,7 +167,7 @@ impl From<EncodingError> for ZstorError {
fn from(e: EncodingError) -> Self {
ZstorError {
kind: ZstorErrorKind::Encoding,
internal: Box::new(e),
internal: InternalError::Other(Box::new(e)),
}
}
}
Expand All @@ -141,7 +176,7 @@ impl From<EncryptionError> for ZstorError {
fn from(e: EncryptionError) -> Self {
ZstorError {
kind: ZstorErrorKind::Encryption,
internal: Box::new(e),
internal: InternalError::Other(Box::new(e)),
}
}
}
Expand All @@ -150,7 +185,7 @@ impl From<CompressorError> for ZstorError {
fn from(e: CompressorError) -> Self {
ZstorError {
kind: ZstorErrorKind::Compression,
internal: Box::new(e),
internal: InternalError::Other(Box::new(e)),
}
}
}
Expand All @@ -159,7 +194,7 @@ impl From<ConfigError> for ZstorError {
fn from(e: ConfigError) -> Self {
ZstorError {
kind: ZstorErrorKind::Config,
internal: Box::new(e),
internal: InternalError::Other(Box::new(e)),
}
}
}
Expand All @@ -168,7 +203,7 @@ impl From<JoinError> for ZstorError {
fn from(e: JoinError) -> Self {
ZstorError {
kind: ZstorErrorKind::Async,
internal: Box::new(e),
internal: InternalError::Other(Box::new(e)),
}
}
}
77 changes: 64 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,32 +423,83 @@ async fn store_data(data: Vec<u8>, checksum: Checksum, cfg: &Config) -> ZstorRes
let shards = encoder.encode(data);
trace!("data encoded");

let backends = cfg.shard_stores()?;
// craate a local copy of the config which we can modify to remove dead nodes
let mut cfg = cfg.clone();

let shard_len = shards[0].len(); // Safe as a valid encoder needs at least 1 shard

trace!("verifiying backends");

let dbs = loop {
debug!("Finding backend config");
let backends = cfg.shard_stores()?;

let mut failed_shards: usize = 0;
let mut handles: Vec<JoinHandle<ZstorResult<_>>> = Vec::with_capacity(shards.len());

for backend in backends {
handles.push(tokio::spawn(async move {
let mut db = Zdb::new(backend.clone()).await?;
// check space in backend
match db.free_space().await? {
insufficient if insufficient < shard_len => Err(ZstorError::new(
ZstorErrorKind::Storage,
// TODO nospace error
Box::new(std::io::Error::from(std::io::ErrorKind::Other)),
)),
_ => Ok(db),
}
}));
}

let mut dbs = Vec::new();
for db in join_all(handles).await {
match db? {
Err(e) => {
if let Some(zdbe) = e.zdb_error() {
debug!("could not connect to 0-db: {}", zdbe);
cfg.remove_shard(zdbe.address());
failed_shards += 1;
}
}
Ok(db) => dbs.push(db),
// no error so healthy db backend
}
}

// if we find one we are good
if failed_shards == 0 {
debug!("found valid backend configuration");
break dbs;
}

debug!("Backend config failed");
};

// TODO
trace!("store shards in backends");
let mut handles: Vec<JoinHandle<ZstorResult<_>>> = Vec::with_capacity(shards.len());

for (backend, (shard_idx, shard)) in backends.into_iter().zip(shards.into_iter().enumerate()) {
let mut metadata = MetaData::new(
cfg.data_shards(),
cfg.parity_shards(),
checksum,
cfg.encryption().clone(),
cfg.compression().clone(),
);

let mut handles: Vec<JoinHandle<ZstorResult<_>>> = Vec::with_capacity(shards.len());
for (mut db, (shard_idx, shard)) in dbs.into_iter().zip(shards.into_iter().enumerate()) {
handles.push(tokio::spawn(async move {
let mut db = Zdb::new(backend.clone()).await?;
let keys = db.set(&shard).await?;
Ok(ShardInfo::new(
shard_idx,
shard.checksum(),
keys,
backend.clone(),
db.connection_info().await.clone(),
))
}));
}

let mut metadata = MetaData::new(
cfg.data_shards(),
cfg.parity_shards(),
checksum,
cfg.encryption().clone(),
cfg.compression().clone(),
);

for shard_info in try_join_all(handles).await? {
metadata.add_shard(shard_info?);
}
Expand Down
Loading

0 comments on commit 5b303fb

Please sign in to comment.