diff --git a/src/lib.rs b/src/lib.rs index c0bb844..6f3ccda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,7 @@ pub type ZstorResult = Result; #[derive(Debug)] pub struct ZstorError { kind: ZstorErrorKind, - internal: Box, + internal: InternalError, } impl fmt::Display for ZstorError { @@ -49,7 +49,10 @@ impl fmt::Display for ZstorError { impl std::error::Error for ZstorError { fn cause(&self) -> Option<&dyn std::error::Error> { - Some(&*self.internal) + match self.internal { + InternalError::Zdb(ref e) => Some(e), + InternalError::Other(ref e) => Some(e.as_ref()), + } } } @@ -58,13 +61,45 @@ impl ZstorError { pub fn new_io(msg: String, e: std::io::Error) -> Self { ZstorError { kind: ZstorErrorKind::LocalIO(msg), - internal: Box::new(e), + internal: InternalError::Other(Box::new(e)), } } /// Create a new ZstorError from any kind, with the underlying error included pub fn new(kind: ZstorErrorKind, internal: Box) -> Self { - ZstorError { kind, internal } + ZstorError { + kind, + internal: InternalError::Other(internal), + } + } + + /// Return a reference to the embedded [`zstor_v2::zdb::ZdbError`], if this error is caused by + /// a ZdbError, or nothing otherwise. + pub fn zdb_error(&self) -> Option<&ZdbError> { + match self.internal { + InternalError::Zdb(ref e) => Some(e), + _ => None, + } + } +} + +/// Wrapper error for the ZstorError +#[derive(Debug)] +enum InternalError { + Zdb(ZdbError), + Other(Box), +} + +impl fmt::Display for InternalError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + match self { + InternalError::Zdb(ref e) => e as &dyn std::error::Error, + InternalError::Other(e) => e.as_ref(), + } + ) } } @@ -114,7 +149,7 @@ impl From for ZstorError { fn from(e: ZdbError) -> Self { ZstorError { kind: ZstorErrorKind::Storage, - internal: Box::new(e), + internal: InternalError::Zdb(e), } } } @@ -123,7 +158,7 @@ impl From for ZstorError { fn from(e: EtcdError) -> Self { ZstorError { kind: ZstorErrorKind::Metadata, - internal: Box::new(e), + internal: InternalError::Other(Box::new(e)), } } } @@ -132,7 +167,7 @@ impl From for ZstorError { fn from(e: EncodingError) -> Self { ZstorError { kind: ZstorErrorKind::Encoding, - internal: Box::new(e), + internal: InternalError::Other(Box::new(e)), } } } @@ -141,7 +176,7 @@ impl From for ZstorError { fn from(e: EncryptionError) -> Self { ZstorError { kind: ZstorErrorKind::Encryption, - internal: Box::new(e), + internal: InternalError::Other(Box::new(e)), } } } @@ -150,7 +185,7 @@ impl From for ZstorError { fn from(e: CompressorError) -> Self { ZstorError { kind: ZstorErrorKind::Compression, - internal: Box::new(e), + internal: InternalError::Other(Box::new(e)), } } } @@ -159,7 +194,7 @@ impl From for ZstorError { fn from(e: ConfigError) -> Self { ZstorError { kind: ZstorErrorKind::Config, - internal: Box::new(e), + internal: InternalError::Other(Box::new(e)), } } } @@ -168,7 +203,7 @@ impl From for ZstorError { fn from(e: JoinError) -> Self { ZstorError { kind: ZstorErrorKind::Async, - internal: Box::new(e), + internal: InternalError::Other(Box::new(e)), } } } diff --git a/src/main.rs b/src/main.rs index ab04013..6a86fe4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -423,32 +423,83 @@ async fn store_data(data: Vec, checksum: Checksum, cfg: &Config) -> ZstorRes let shards = encoder.encode(data); trace!("data encoded"); - let backends = cfg.shard_stores()?; + // craate a local copy of the config which we can modify to remove dead nodes + let mut cfg = cfg.clone(); + + let shard_len = shards[0].len(); // Safe as a valid encoder needs at least 1 shard + + trace!("verifiying backends"); + + let dbs = loop { + debug!("Finding backend config"); + let backends = cfg.shard_stores()?; + + let mut failed_shards: usize = 0; + let mut handles: Vec>> = Vec::with_capacity(shards.len()); + + for backend in backends { + handles.push(tokio::spawn(async move { + let mut db = Zdb::new(backend.clone()).await?; + // check space in backend + match db.free_space().await? { + insufficient if insufficient < shard_len => Err(ZstorError::new( + ZstorErrorKind::Storage, + // TODO nospace error + Box::new(std::io::Error::from(std::io::ErrorKind::Other)), + )), + _ => Ok(db), + } + })); + } + + let mut dbs = Vec::new(); + for db in join_all(handles).await { + match db? { + Err(e) => { + if let Some(zdbe) = e.zdb_error() { + debug!("could not connect to 0-db: {}", zdbe); + cfg.remove_shard(zdbe.address()); + failed_shards += 1; + } + } + Ok(db) => dbs.push(db), + // no error so healthy db backend + } + } + // if we find one we are good + if failed_shards == 0 { + debug!("found valid backend configuration"); + break dbs; + } + + debug!("Backend config failed"); + }; + + // TODO trace!("store shards in backends"); - let mut handles: Vec>> = Vec::with_capacity(shards.len()); - for (backend, (shard_idx, shard)) in backends.into_iter().zip(shards.into_iter().enumerate()) { + let mut metadata = MetaData::new( + cfg.data_shards(), + cfg.parity_shards(), + checksum, + cfg.encryption().clone(), + cfg.compression().clone(), + ); + + let mut handles: Vec>> = Vec::with_capacity(shards.len()); + for (mut db, (shard_idx, shard)) in dbs.into_iter().zip(shards.into_iter().enumerate()) { handles.push(tokio::spawn(async move { - let mut db = Zdb::new(backend.clone()).await?; let keys = db.set(&shard).await?; Ok(ShardInfo::new( shard_idx, shard.checksum(), keys, - backend.clone(), + db.connection_info().await.clone(), )) })); } - let mut metadata = MetaData::new( - cfg.data_shards(), - cfg.parity_shards(), - checksum, - cfg.encryption().clone(), - cfg.compression().clone(), - ); - for shard_info in try_join_all(handles).await? { metadata.add_shard(shard_info?); } diff --git a/src/zdb.rs b/src/zdb.rs index ff70051..c018cc3 100644 --- a/src/zdb.rs +++ b/src/zdb.rs @@ -20,14 +20,13 @@ const MAX_ZDB_CHUNK_SIZE: usize = 2 * 1024 * 1024; /// the remote closed). No reconnection is attempted. pub struct Zdb { conn: Connection, - // connection info tracked to conveniently inspect the remote address. - ci: ConnectionInfo, - ns: Option, + // connection info tracked to conveniently inspect the remote address and namespace. + ci: ZdbConnectionInfo, } impl fmt::Debug for Zdb { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "ZDB at {}", self.ci.addr) + write!(f, "ZDB at {}", self.ci.address) } } @@ -79,12 +78,12 @@ impl Zdb { let client = redis::Client::open(ci.clone()).map_err(|e| ZdbError { kind: ZdbErrorKind::Connect, - remote: ci.addr.to_string(), + remote: info.address, internal: ErrorCause::Redis(e), })?; let mut conn = client.get_async_connection().await.map_err(|e| ZdbError { kind: ZdbErrorKind::Connect, - remote: ci.addr.to_string(), + remote: info.address, internal: ErrorCause::Redis(e), })?; trace!("opened connection to db"); @@ -94,7 +93,7 @@ impl Zdb { .await .map_err(|e| ZdbError { kind: ZdbErrorKind::Connect, - remote: ci.addr.to_string(), + remote: info.address, internal: ErrorCause::Redis(e), })?; trace!("db connection established"); @@ -130,17 +129,13 @@ impl Zdb { } else { ZdbErrorKind::Ns }, - remote: ci.addr.to_string(), + remote: info.address, internal: ErrorCause::Redis(e), })?; trace!("opened namespace"); } - Ok(Self { - conn, - ci, - ns: info.namespace, - }) + Ok(Self { conn, ci: info }) } /// Store some data in the zdb. The generated keys are returned for later retrieval. @@ -161,7 +156,7 @@ impl Zdb { .await .map_err(|e| ZdbError { kind: ZdbErrorKind::Write, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Redis(e), })?; @@ -185,12 +180,12 @@ impl Zdb { .await .map_err(|e| ZdbError { kind: ZdbErrorKind::Read, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Redis(e), })? .ok_or(ZdbError { kind: ZdbErrorKind::Read, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other(format!("missing key {}", key)), })?, ); @@ -216,7 +211,7 @@ impl Zdb { /// Query info about the namespace. pub async fn ns_info(&mut self) -> ZdbResult { let list: String = redis::cmd("NSINFO") - .arg(if let Some(ref ns) = self.ns { + .arg(if let Some(ref ns) = self.ci.namespace { ns } else { "default" @@ -225,7 +220,7 @@ impl Zdb { .await .map_err(|e| ZdbError { kind: ZdbErrorKind::Read, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Redis(e), })?; @@ -251,7 +246,7 @@ impl Zdb { name: kvs["name"].to_string(), entries: kvs["entries"].parse().map_err(|e| ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other(format!("expected entries to be an integer ({})", e)), })?, public: match kvs["public"] { @@ -260,7 +255,7 @@ impl Zdb { _ => { return Err(ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other("expected public to be yes/no".to_string()), }) } @@ -271,14 +266,14 @@ impl Zdb { _ => { return Err(ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other("expected password to be yes/no".to_string()), }) } }, data_size_bytes: kvs["data_size_bytes"].parse().map_err(|e| ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other(format!( "expected data_size_bytes to be an integer ({})", e @@ -286,7 +281,7 @@ impl Zdb { })?, data_limit_bytes: match kvs["data_limits_bytes"].parse().map_err(|e| ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other(format!( "expected data_limit_bytes to be an integer ({})", e @@ -297,7 +292,7 @@ impl Zdb { }, index_size_bytes: kvs["index_size_bytes"].parse().map_err(|e| ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other(format!( "expected index_size_bytes to be an integer ({})", e @@ -309,7 +304,7 @@ impl Zdb { _ => { return Err(ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other( "expected mode to be usermode/sequential".to_string(), ), @@ -319,7 +314,7 @@ impl Zdb { index_disk_freespace_bytes: kvs["index_disk_freespace_bytes"].parse().map_err(|e| { ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other(format!( "expected index_disk_freespace_bytes to be an integer ({})", e @@ -329,7 +324,7 @@ impl Zdb { data_disk_freespace_bytes: kvs["data_disk_freespace_bytes"].parse().map_err(|e| { ZdbError { kind: ZdbErrorKind::Format, - remote: self.ci.addr.to_string(), + remote: self.ci.address, internal: ErrorCause::Other(format!( "expected index_disk_freespace_bytes to be an integer ({})", e @@ -338,6 +333,11 @@ impl Zdb { })?, }) } + + /// Returns the [`zstor_v2::zdb::ZdbConnectionInfo`] object used to connect to this db. + pub async fn connection_info(&self) -> &ZdbConnectionInfo { + &self.ci + } } /// Information about a 0-db namespace, as reported by the db itself. @@ -380,7 +380,7 @@ fn read_le_key(input: &[u8]) -> Key { #[derive(Debug)] pub struct ZdbError { kind: ZdbErrorKind, - remote: String, + remote: SocketAddr, internal: ErrorCause, } @@ -404,6 +404,13 @@ impl std::error::Error for ZdbError { } } +impl ZdbError { + /// The address of the 0-db which caused this error. + pub fn address(&self) -> &SocketAddr { + &self.remote + } +} + /// The cause of a zero db error. #[derive(Debug)] enum ErrorCause {