diff --git a/Jenkinsfile b/Jenkinsfile index 744873217c..983d94d8f6 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -140,6 +140,7 @@ pipeline { anyOf { branch 'master' branch 'release/*' + branch 'trying' expression { run_linter == false } } } diff --git a/io-engine-tests/src/compose/rpc/v1.rs b/io-engine-tests/src/compose/rpc/v1.rs index dc0eb6cabd..5bea2a2020 100644 --- a/io-engine-tests/src/compose/rpc/v1.rs +++ b/io-engine-tests/src/compose/rpc/v1.rs @@ -51,6 +51,7 @@ pub struct RpcHandle { pub host: host::HostRpcClient, pub nexus: nexus::NexusRpcClient, pub snapshot: snapshot::SnapshotRpcClient, + pub testing: testing::TestingRpcClient, } impl RpcHandle { @@ -99,11 +100,17 @@ impl RpcHandle { nexus::NexusRpcClient::connect(format!("http://{endpoint}")) .await .unwrap(); + let snapshot = snapshot::SnapshotRpcClient::connect(format!("http://{endpoint}")) .await .unwrap(); + let testing = + testing::TestingRpcClient::connect(format!("http://{endpoint}")) + .await + .unwrap(); + Ok(Self { name, endpoint, @@ -114,6 +121,7 @@ impl RpcHandle { host, nexus, snapshot, + testing, }) } } diff --git a/io-engine-tests/src/lib.rs b/io-engine-tests/src/lib.rs index 1d305734d8..145d397a1f 100644 --- a/io-engine-tests/src/lib.rs +++ b/io-engine-tests/src/lib.rs @@ -32,6 +32,7 @@ pub mod nvme; pub mod nvmf; pub mod pool; pub mod replica; +pub mod testing; pub use compose::MayastorTest; diff --git a/io-engine-tests/src/nexus.rs b/io-engine-tests/src/nexus.rs index 77e65ed4f3..3251306c56 100644 --- a/io-engine-tests/src/nexus.rs +++ b/io-engine-tests/src/nexus.rs @@ -9,16 +9,12 @@ use super::{ ChildStateReason, CreateNexusRequest, DestroyNexusRequest, - InjectNexusFaultRequest, - InjectedFault, - ListInjectedNexusFaultsRequest, ListNexusOptions, Nexus, PublishNexusRequest, RebuildHistoryRecord, RebuildHistoryRequest, RemoveChildNexusRequest, - RemoveInjectedNexusFaultRequest, }, SharedRpcHandle, Status, @@ -277,6 +273,16 @@ impl NexusBuilder { self.online_child_bdev(&self.replica_uri(r)).await } + pub async fn online_child_replica_wait( + &self, + r: &ReplicaBuilder, + d: Duration, + ) -> Result<(), Status> { + self.online_child_replica(r).await?; + self.wait_replica_state(r, ChildState::Online, None, d) + .await + } + pub async fn offline_child_bdev( &self, bdev: &str, @@ -301,53 +307,22 @@ impl NexusBuilder { self.offline_child_bdev(&self.replica_uri(r)).await } - pub async fn inject_nexus_fault( - &self, - inj_uri: &str, - ) -> Result<(), Status> { - self.rpc() - .lock() - .await - .nexus - .inject_nexus_fault(InjectNexusFaultRequest { - uuid: self.uuid(), - uri: inj_uri.to_owned(), - }) - .await - .map(|r| r.into_inner()) - } - - pub async fn remove_injected_nexus_fault( + pub async fn offline_child_replica_wait( &self, - inj_uri: &str, + r: &ReplicaBuilder, + d: Duration, ) -> Result<(), Status> { - self.rpc() - .lock() - .await - .nexus - .remove_injected_nexus_fault(RemoveInjectedNexusFaultRequest { - uuid: self.uuid(), - uri: inj_uri.to_owned(), - }) - .await - .map(|r| r.into_inner()) - } - - pub async fn list_injected_faults( - &self, - ) -> Result, Status> { - self.rpc() - .lock() - .await - .nexus - .list_injected_nexus_faults(ListInjectedNexusFaultsRequest { - uuid: self.uuid(), - }) - .await - .map(|r| r.into_inner().injections) + self.offline_child_replica(r).await?; + self.wait_replica_state( + r, + ChildState::Degraded, + Some(ChildStateReason::ByClient), + d, + ) + .await } - pub async fn inject_fault_at_replica( + pub async fn add_injection_at_replica( &self, r: &ReplicaBuilder, inj_params: &str, @@ -362,7 +337,7 @@ impl NexusBuilder { })?; let inj_uri = format!("inject://{dev}?{inj_params}",); - self.inject_nexus_fault(&inj_uri).await?; + super::testing::add_injection(self.rpc(), &inj_uri).await?; Ok(inj_uri) } diff --git a/io-engine-tests/src/testing.rs b/io-engine-tests/src/testing.rs new file mode 100644 index 0000000000..fb48ce1af1 --- /dev/null +++ b/io-engine-tests/src/testing.rs @@ -0,0 +1,49 @@ +use super::compose::rpc::v1::{ + testing::{ + AddInjectionRequest, + Injection, + ListInjectionsRequest, + RemoveInjectionRequest, + }, + SharedRpcHandle, + Status, +}; + +pub async fn add_injection( + rpc: SharedRpcHandle, + inj_uri: &str, +) -> Result<(), Status> { + rpc.lock() + .await + .testing + .add_injection(AddInjectionRequest { + uri: inj_uri.to_owned(), + }) + .await + .map(|r| r.into_inner()) +} + +pub async fn remove_injection( + rpc: SharedRpcHandle, + inj_uri: &str, +) -> Result<(), Status> { + rpc.lock() + .await + .testing + .remove_injection(RemoveInjectionRequest { + uri: inj_uri.to_owned(), + }) + .await + .map(|r| r.into_inner()) +} + +pub async fn list_injections( + rpc: SharedRpcHandle, +) -> Result, Status> { + rpc.lock() + .await + .testing + .list_injections(ListInjectionsRequest {}) + .await + .map(|r| r.into_inner().injections) +} diff --git a/io-engine/Cargo.toml b/io-engine/Cargo.toml index f397afdc8d..a547cf90bc 100644 --- a/io-engine/Cargo.toml +++ b/io-engine/Cargo.toml @@ -9,8 +9,8 @@ build = "build.rs" [features] default = ["spdk-async-qpair-connect", "spdk-subsystem-events"] -io-engine-testing = ["nexus-fault-injection"] -nexus-fault-injection = [] # Enables nexus-level fault injection code. +io-engine-testing = ["fault-injection"] +fault-injection = [] # Enables fault injection code. nexus-io-tracing = [] # Enables nexus I/O tracing code. spdk-async-qpair-connect = [] spdk-subsystem-events = [] diff --git a/io-engine/src/bdev/device.rs b/io-engine/src/bdev/device.rs index a4108d8c92..1ff2cdcdff 100644 --- a/io-engine/src/bdev/device.rs +++ b/io-engine/src/bdev/device.rs @@ -14,21 +14,23 @@ use once_cell::sync::{Lazy, OnceCell}; use spdk_rs::{ libspdk::{ - iovec, spdk_bdev_flush, spdk_bdev_free_io, spdk_bdev_io, - spdk_bdev_readv_blocks, + spdk_bdev_readv_blocks_with_flags, spdk_bdev_reset, spdk_bdev_unmap_blocks, spdk_bdev_write_zeroes_blocks, spdk_bdev_writev_blocks, + SPDK_NVME_IO_FLAGS_UNWRITTEN_READ_FAIL, }, nvme_admin_opc, + AsIoVecPtr, BdevOps, DmaBuf, DmaError, IoType, + IoVec, }; use crate::{ @@ -50,7 +52,7 @@ use crate::{ IoCompletionCallbackArg, IoCompletionStatus, NvmeStatus, - ReadMode, + ReadOptions, SnapshotParams, ToErrno, UntypedBdev, @@ -60,6 +62,14 @@ use crate::{ lvs::Lvol, }; +#[cfg(feature = "fault-injection")] +use crate::core::fault_injection::{ + inject_completion_error, + inject_submission_error, + FaultDomain, + InjectIoCtx, +}; + /// TODO type EventDispatcherMap = HashMap; @@ -250,13 +260,12 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { DmaBuf::new(size, self.device.alignment()) } - async fn read_at_ex( + async fn read_at( &self, offset: u64, buffer: &mut DmaBuf, - mode: Option, ) -> Result { - self.handle.read_at_ex(offset, buffer, mode).await + self.handle.read_at(offset, buffer).await } async fn write_at( @@ -269,35 +278,56 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { fn readv_blocks( &self, - iov: *mut iovec, - iovcnt: i32, + iovs: &mut [IoVec], offset_blocks: u64, num_blocks: u64, + opts: ReadOptions, cb: IoCompletionCallback, cb_arg: IoCompletionCallbackArg, ) -> Result<(), CoreError> { + let flags = match opts { + ReadOptions::None => 0, + ReadOptions::UnwrittenFail => { + SPDK_NVME_IO_FLAGS_UNWRITTEN_READ_FAIL + } + }; + let ctx = alloc_bdev_io_ctx( IoType::Read, IoCtx { device: self.device, cb, cb_arg, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::with_iovs( + self.get_device(), + IoType::Read, + offset_blocks, + num_blocks, + iovs, + ), }, offset_blocks, num_blocks, )?; + #[cfg(feature = "fault-injection")] + inject_submission_error(FaultDomain::BlockDevice, unsafe { + &(*ctx).inj_op + })?; + let (desc, chan) = self.handle.io_tuple(); let rc = unsafe { - spdk_bdev_readv_blocks( + spdk_bdev_readv_blocks_with_flags( desc, chan, - iov, - iovcnt, + iovs.as_io_vec_mut_ptr(), + iovs.len() as i32, offset_blocks, num_blocks, Some(bdev_io_completion), ctx as *mut c_void, + flags, ) }; @@ -314,8 +344,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { fn writev_blocks( &self, - iov: *mut iovec, - iovcnt: i32, + iovs: &[IoVec], offset_blocks: u64, num_blocks: u64, cb: IoCompletionCallback, @@ -327,18 +356,31 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { device: self.device, cb, cb_arg, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::with_iovs( + self.get_device(), + IoType::Write, + offset_blocks, + num_blocks, + iovs, + ), }, offset_blocks, num_blocks, )?; + #[cfg(feature = "fault-injection")] + inject_submission_error(FaultDomain::BlockDevice, unsafe { + &(*ctx).inj_op + })?; + let (desc, chan) = self.handle.io_tuple(); let rc = unsafe { spdk_bdev_writev_blocks( desc, chan, - iov, - iovcnt, + iovs.as_ptr() as *mut _, + iovs.len() as i32, offset_blocks, num_blocks, Some(bdev_io_completion), @@ -368,6 +410,8 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { device: self.device, cb, cb_arg, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::default(), }, 0, 0, @@ -405,6 +449,8 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { device: self.device, cb, cb_arg, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::default(), }, offset_blocks, num_blocks, @@ -446,6 +492,8 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { device: self.device, cb, cb_arg, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::default(), }, offset_blocks, num_blocks, @@ -542,6 +590,8 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { device: self.device, cb, cb_arg, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::default(), }, 0, 0, @@ -575,6 +625,8 @@ struct IoCtx { device: SpdkBlockDevice, cb: IoCompletionCallback, cb_arg: IoCompletionCallbackArg, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx, } /// TODO @@ -656,6 +708,10 @@ extern "C" fn bdev_io_completion( IoCompletionStatus::from(NvmeStatus::from(child_bio)) }; + #[cfg(feature = "fault-injection")] + let status = + inject_completion_error(FaultDomain::BlockDevice, &bio.inj_op, status); + (bio.cb)(&bio.device, status, bio.cb_arg); // Free ctx. diff --git a/io-engine/src/bdev/nexus/mod.rs b/io-engine/src/bdev/nexus/mod.rs index afcaa09c56..399fbea744 100644 --- a/io-engine/src/bdev/nexus/mod.rs +++ b/io-engine/src/bdev/nexus/mod.rs @@ -12,7 +12,6 @@ mod nexus_bdev_rebuild; mod nexus_bdev_snapshot; mod nexus_channel; mod nexus_child; -mod nexus_injection; mod nexus_io; mod nexus_io_log; mod nexus_io_subsystem; @@ -48,7 +47,6 @@ pub use nexus_child::{ FaultReason, NexusChild, }; -pub use nexus_injection::{Injection, InjectionOp}; use nexus_io::{NexusBio, NioCtx}; use nexus_io_log::{IOLog, IOLogChannel}; use nexus_io_subsystem::NexusIoSubsystem; diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index 25e0a4ebbd..b289312115 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -22,7 +22,6 @@ use uuid::Uuid; use super::{ nexus_err, - nexus_injection::Injections, nexus_lookup_name_uuid, DrEvent, Error, @@ -265,9 +264,6 @@ pub struct Nexus<'n> { event_sink: Option, /// Rebuild history of all children of this nexus instance. pub(super) rebuild_history: parking_lot::Mutex>, - /// TODO - #[allow(dead_code)] - pub(super) injections: Injections, /// Flag to control shutdown from I/O path. pub(crate) shutdown_requested: AtomicCell, /// Prevent auto-Unpin. @@ -377,7 +373,6 @@ impl<'n> Nexus<'n> { nexus_uuid: Default::default(), event_sink: None, rebuild_history: parking_lot::Mutex::new(Vec::new()), - injections: Injections::new(), shutdown_requested: AtomicCell::new(false), _pin: Default::default(), }; @@ -450,12 +445,12 @@ impl<'n> Nexus<'n> { } /// Returns nexus name. - pub(crate) fn nexus_name(&self) -> &str { + pub fn nexus_name(&self) -> &str { &self.name } /// Returns the Nexus uuid. - pub(crate) fn uuid(&self) -> Uuid { + pub fn uuid(&self) -> Uuid { self.nexus_uuid } diff --git a/io-engine/src/bdev/nexus/nexus_bdev_children.rs b/io-engine/src/bdev/nexus/nexus_bdev_children.rs index 12dd5afc05..de02030623 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_children.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_children.rs @@ -45,9 +45,6 @@ use super::{ PersistOp, }; -#[allow(unused_imports)] -use super::nexus_injection::InjectionOp; - use crate::{ bdev::{dev::device_name, device_create, device_destroy, device_lookup}, bdev_api::BdevError, diff --git a/io-engine/src/bdev/nexus/nexus_bdev_error.rs b/io-engine/src/bdev/nexus/nexus_bdev_error.rs index 8edb80a5d3..4e3ee844af 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_error.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_error.rs @@ -2,12 +2,7 @@ use nix::errno::Errno; use snafu::Snafu; use tonic::{Code, Status}; -use super::{ - nexus_injection::InjectionError, - ChildError, - NbdError, - NexusPauseState, -}; +use super::{ChildError, NbdError, NexusPauseState}; use crate::{ bdev_api::BdevError, @@ -218,11 +213,6 @@ pub enum Error { state: NexusPauseState, name: String, }, - #[snafu(display("Nexus '{}': bad fault injection", name))] - BadFaultInjection { - source: InjectionError, - name: String, - }, #[snafu(display("Operation not allowed: {}", reason))] OperationNotAllowed { reason: String }, #[snafu(display("Invalid value for nvme reservation: {}", reservation))] diff --git a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs index 83b3502e3f..0c27bb2299 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs @@ -19,8 +19,10 @@ use crate::{ HistoryRecord, RebuildError, RebuildJob, + RebuildJobOptions, RebuildState, RebuildStats, + RebuildVerifyMode, }, }; @@ -151,6 +153,31 @@ impl<'n> Nexus<'n> { src_child_uri: &str, dst_child_uri: &str, ) -> Result<(), Error> { + let verify_mode = match std::env::var("NEXUS_REBUILD_VERIFY") + .unwrap_or_default() + .as_str() + { + "fail" => { + warn!( + "{self:?}: starting rebuild for '{dst_child_uri}' with \ + fail verification mode" + ); + RebuildVerifyMode::Fail + } + "panic" => { + warn!( + "{self:?}: starting rebuild for '{dst_child_uri}' with \ + panic verification mode" + ); + RebuildVerifyMode::Panic + } + _ => RebuildVerifyMode::None, + }; + + let opts = RebuildJobOptions { + verify_mode, + }; + RebuildJob::new( &self.name, src_child_uri, @@ -159,6 +186,7 @@ impl<'n> Nexus<'n> { start: self.data_ent_offset, end: self.num_blocks() + self.data_ent_offset, }, + opts, |nexus, job| { Reactors::current().send_future(async move { Nexus::notify_rebuild(nexus, job).await; diff --git a/io-engine/src/bdev/nexus/nexus_injection.rs b/io-engine/src/bdev/nexus/nexus_injection.rs deleted file mode 100644 index eb7e4791d1..0000000000 --- a/io-engine/src/bdev/nexus/nexus_injection.rs +++ /dev/null @@ -1,470 +0,0 @@ -use snafu::Snafu; -use std::time::Duration; -use url::ParseError; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)), context(suffix(false)))] -pub enum InjectionError { - #[snafu(display("Injections are disabled"))] - InjectionsDisabled {}, - #[snafu(display("URI is not an injection: '{}'", uri))] - NotInjectionUri { uri: String }, - #[snafu(display("Invalid injection URI: '{}'", uri))] - InvalidUri { source: ParseError, uri: String }, - #[snafu(display("Unknown injection parameter: '{}={}'", name, value))] - UnknownParameter { name: String, value: String }, - #[snafu(display("Bad injection parameter value: '{}={}'", name, value))] - BadParameterValue { name: String, value: String }, - #[snafu(display( - "Bad injection '{}' timer durations: {:?}, {:?}", - name, - begin, - end - ))] - BadDurations { - name: String, - begin: Duration, - end: Duration, - }, -} - -/// Information about a injected fault. -pub struct InjectionInfo { - pub device_name: String, - pub is_active: bool, -} - -pub use inj_impl::{injections_enabled, Injection, Injections}; - -/// Operation type. -#[allow(dead_code)] -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum InjectionOp { - Read, - ReadSubmission, - Write, - WriteSubmission, -} - -#[cfg(feature = "nexus-fault-injection")] -mod inj_impl { - use std::{ - fmt::{Debug, Display, Formatter}, - ops::Range, - sync::atomic::{AtomicBool, Ordering}, - time::{Duration, Instant}, - }; - - use snafu::ResultExt; - use url::Url; - - use crate::core::{BlockDevice, VerboseError}; - - use super::{ - super::{nexus_err, Error, Nexus}, - InjectionError, - InjectionInfo, - InjectionOp, - }; - - static INJECTIONS_ENABLED: AtomicBool = AtomicBool::new(false); - - /// Checks if injections are globally enabled. - /// This method is fast and can used in I/O code path to quick check - /// before checking if an injection has to be applied to a particular - /// device. - #[inline] - pub fn injections_enabled() -> bool { - INJECTIONS_ENABLED.load(Ordering::SeqCst) - } - - /// TODO - fn parse_op(k: &str, v: &str) -> Result { - let op = match v { - "read" => InjectionOp::Read, - "sread" => InjectionOp::ReadSubmission, - "write" => InjectionOp::Write, - "swrite" => InjectionOp::WriteSubmission, - _ => { - return Err(InjectionError::UnknownParameter { - name: k.to_string(), - value: v.to_string(), - }) - } - }; - Ok(op) - } - - /// TODO - fn parse_timer(k: &str, v: &str) -> Result { - let b = v.parse::().map_err(|_| { - InjectionError::BadParameterValue { - name: k.to_string(), - value: v.to_string(), - } - })?; - - Ok(Duration::from_millis(b)) - } - - /// TODO - fn parse_num(k: &str, v: &str) -> Result { - v.parse::() - .map_err(|_| InjectionError::BadParameterValue { - name: k.to_string(), - value: v.to_string(), - }) - } - - /// Tests if teo ranges overlap. - fn is_overlapping(a: &Range, b: &Range) -> bool { - a.end > b.start && b.end > a.start - } - - /// Injected failures. - #[derive(Debug, Clone)] - pub struct Injection { - name: String, - op: InjectionOp, - started: Option, - begin: Duration, - end: Duration, - range: Range, - } - - impl Display for Injection { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - fn fmt_duration(u: &Duration) -> String { - if *u == Duration::MAX { - "INF".to_string() - } else { - format!("{u:?}") - } - } - - fn fmt_u64(u: u64) -> String { - if u == u64::MAX { - "INF".to_string() - } else { - format!("{u:?}") - } - } - - write!( - f, - "{op:?} Injection '{name}' [{b:?} -> {e} ({t:?})] @ {rs}..{re}", - op = self.op, - name = self.name, - b = self.begin, - e = fmt_duration(&self.end), - t = self.now(), - rs = self.range.start, - re = fmt_u64(self.range.end), - ) - } - } - - impl Injection { - /// Creates new injection. - #[allow(dead_code)] - pub fn new( - name: &str, - op: InjectionOp, - begin: Duration, - end: Duration, - range: Range, - ) -> Self { - Self { - name: name.to_owned(), - op, - started: None, - begin, - end, - range, - } - } - - /// Parses an injection URI and creates injection object. - fn from_uri(uri: &str) -> Result { - if !uri.starts_with("inject://") { - return Err(InjectionError::NotInjectionUri { - uri: uri.to_owned(), - }); - } - - let p = - Url::parse(uri).map_err(|e| InjectionError::InvalidUri { - source: e, - uri: uri.to_owned(), - })?; - - let mut r = Self { - name: format!( - "{host}{port}{path}", - host = p.host_str().unwrap_or_default(), - port = if let Some(port) = p.port() { - format!(":{port}") - } else { - "".to_string() - }, - path = p.path() - ), - op: InjectionOp::Read, - started: None, - begin: Duration::ZERO, - end: Duration::MAX, - range: 0 .. u64::MAX, - }; - - for (k, v) in p.query_pairs() { - match k.as_ref() { - "op" => r.op = parse_op(&k, &v)?, - "begin" => r.begin = parse_timer(&k, &v)?, - "end" => r.end = parse_timer(&k, &v)?, - "offset" => r.range.start = parse_num(&k, &v)?, - "num_blk" => r.range.end = parse_num(&k, &v)?, - _ => { - return Err(InjectionError::UnknownParameter { - name: k.to_string(), - value: v.to_string(), - }) - } - }; - } - - r.range.end = r.range.start.saturating_add(r.range.end); - - if r.begin > r.end { - return Err(InjectionError::BadDurations { - name: r.name, - begin: r.begin, - end: r.end, - }); - } - - Ok(r) - } - - /// Returns current time relative to injection start. - fn now(&self) -> Duration { - self.started.map_or(Duration::MAX, |s| { - Instant::now().saturating_duration_since(s) - }) - } - - /// True if the injection is currently active. - fn is_active(&self) -> bool { - let d = self.now(); - d >= self.begin && d < self.end - } - - /// Checks if the injection is applied to the given device. - fn is_applied( - &mut self, - dev: &dyn BlockDevice, - op: InjectionOp, - range: Range, - ) -> bool { - if op != self.op || dev.device_name() != self.name { - return false; - } - - if self.started.is_none() { - debug!("{self:?}: starting"); - self.started = Some(Instant::now()); - } - - self.is_active() && is_overlapping(&self.range, &range) - } - } - - /// A list of fault injections. - pub struct Injections { - items: parking_lot::Mutex>, - } - - impl Injections { - pub fn new() -> Self { - Self { - items: parking_lot::Mutex::new(Vec::new()), - } - } - } - - impl From<&Injection> for InjectionInfo { - fn from(src: &Injection) -> Self { - InjectionInfo { - device_name: src.name.clone(), - is_active: src.is_active(), - } - } - } - - impl<'n> Nexus<'n> { - /// Adds an injection. - pub fn inject_add(&self, inj: Injection) { - if INJECTIONS_ENABLED - .compare_exchange( - false, - true, - Ordering::Acquire, - Ordering::Relaxed, - ) - .is_ok() - { - warn!("Enabling nexus fault injections globally"); - } - - self.injections.items.lock().push(inj); - } - - /// Removes all injections matching the name. - pub fn inject_remove(&self, name: &str) { - self.injections.items.lock().retain(|inj| inj.name != name); - } - - /// Creates a injected fault from URI. - pub async fn inject_add_fault(&self, uri: &str) -> Result<(), Error> { - let name = self.name.clone(); - - self.inject_from_uri(uri) - .map_err(|e| { - error!( - "Failed to add injected fault '{uri}': {err}", - err = e.verbose() - ); - e - }) - .context(nexus_err::BadFaultInjection { - name, - }) - } - - /// Creates an injection from URI. - fn inject_from_uri(&self, uri: &str) -> Result<(), InjectionError> { - let inj = Injection::from_uri(uri)?; - info!("{self:?}: add a injected fault: {inj} from URI '{uri}'"); - self.inject_add(inj); - Ok(()) - } - - /// Removes an injected fault by its name from URI. - pub async fn inject_remove_fault( - &self, - uri: &str, - ) -> Result<(), Error> { - let t = Injection::from_uri(uri) - .map_err(|e| { - error!( - "Failed to remove injected fault '{uri}': {err}", - err = e.verbose() - ); - e - }) - .context(nexus_err::BadFaultInjection { - name: self.name.clone(), - })?; - - info!( - "{self:?}: removing injected fault(s) for device '{name}'", - name = t.name - ); - - self.inject_remove(&t.name); - - Ok(()) - } - - /// Returns list of injected faults. - pub async fn list_injections( - &self, - ) -> Result, Error> { - Ok(self - .injections - .items - .lock() - .iter() - .map(InjectionInfo::from) - .collect()) - } - - /// Returns a copy of the injection list. - pub fn get_injections(&self) -> Vec { - (*self.injections.items.lock()).clone() - } - - /// Tests if there exists an active injected fault for the device. - pub fn inject_is_faulted( - &self, - dev: &dyn BlockDevice, - op: InjectionOp, - offset: u64, - num_blocks: u64, - ) -> bool { - if !injections_enabled() { - return false; - } - - self.injections.items.lock().iter_mut().any(|inj| { - inj.is_applied(dev, op, offset .. offset + num_blocks) - }) - } - } -} - -#[cfg(not(feature = "nexus-fault-injection"))] -#[allow(dead_code)] -mod inj_impl { - use super::{ - super::{Error, Nexus}, - InjectionError, - InjectionInfo, - }; - - #[inline] - pub fn injections_enabled() -> bool { - false - } - - pub struct Injections(); - - impl Injections { - pub fn new() -> Self { - Self() - } - } - - pub struct Injection(); - - impl<'n> Nexus<'n> { - /// TODO - fn injections_disabled(&self) -> Result<(), Error> { - warn!("{self:?}: injections are disabled"); - - Err(Error::BadFaultInjection { - source: InjectionError::InjectionsDisabled {}, - name: self.name.clone(), - }) - } - - /// Creates a injected fault from URI. - pub async fn inject_add_fault(&self, _uri: &str) -> Result<(), Error> { - self.injections_disabled() - } - - /// Removes an injected fault by its name from URI. - pub async fn inject_remove_fault( - &self, - _uri: &str, - ) -> Result<(), Error> { - self.injections_disabled() - } - - /// Returns list of injected faults. - pub async fn list_injections( - &self, - ) -> Result, Error> { - self.injections_disabled().map(|_| Vec::new()) - } - } -} diff --git a/io-engine/src/bdev/nexus/nexus_io.rs b/io-engine/src/bdev/nexus/nexus_io.rs index c94f867de9..59319fd071 100644 --- a/io-engine/src/bdev/nexus/nexus_io.rs +++ b/io-engine/src/bdev/nexus/nexus_io.rs @@ -13,12 +13,6 @@ use spdk_rs::{ use super::{FaultReason, IOLogChannel, Nexus, NexusChannel, NEXUS_PRODUCT_ID}; -#[allow(unused_imports)] -use super::{ - nexus_injection::injections_enabled, - nexus_injection::InjectionOp, -}; - use crate::core::{ BlockDevice, BlockDeviceHandle, @@ -32,6 +26,7 @@ use crate::core::{ LvolFailure, Mthread, NvmeStatus, + ReadOptions, }; #[cfg(feature = "nexus-io-tracing")] @@ -240,7 +235,7 @@ impl<'n> NexusBio<'n> { child: &dyn BlockDevice, status: IoCompletionStatus, ) { - #[cfg(feature = "nexus-fault-injection")] + #[cfg(feature = "fault-injection")] let status = self.inject_completion_error(child, status); debug_assert!(self.ctx().in_flight > 0); @@ -327,14 +322,14 @@ impl<'n> NexusBio<'n> { &self, hdl: &dyn BlockDeviceHandle, ) -> Result<(), CoreError> { - #[cfg(feature = "nexus-fault-injection")] + #[cfg(feature = "fault-injection")] self.inject_submission_error(hdl)?; hdl.readv_blocks( - self.iovs(), - self.iov_count(), + self.iovs_mut(), self.effective_offset(), self.num_blocks(), + ReadOptions::None, Self::child_completion, self.as_ptr().cast(), ) @@ -474,12 +469,11 @@ impl<'n> NexusBio<'n> { name = hdl.get_device().device_name() ); - #[cfg(feature = "nexus-fault-injection")] + #[cfg(feature = "fault-injection")] self.inject_submission_error(hdl)?; hdl.writev_blocks( self.iovs(), - self.iov_count(), self.effective_offset(), self.num_blocks(), Self::child_completion, @@ -515,7 +509,7 @@ impl<'n> NexusBio<'n> { name = hdl.get_device().device_name() ); - #[cfg(feature = "nexus-fault-injection")] + #[cfg(feature = "fault-injection")] self.inject_submission_error(hdl)?; hdl.write_zeroes( @@ -744,66 +738,54 @@ impl<'n> NexusBio<'n> { } /// Checks if an error is to be injected upon submission. - #[cfg(feature = "nexus-fault-injection")] + #[cfg(feature = "fault-injection")] #[inline] fn inject_submission_error( &self, hdl: &dyn BlockDeviceHandle, ) -> Result<(), CoreError> { - if !injections_enabled() { - return Ok(()); - } - - let op = match self.io_type() { - IoType::Read => InjectionOp::ReadSubmission, - _ => InjectionOp::WriteSubmission, + use crate::core::fault_injection::{ + inject_submission_error, + FaultDomain::Nexus, + InjectIoCtx, }; - if self.nexus().inject_is_faulted( - hdl.get_device(), - op, - self.offset(), - self.num_blocks(), - ) { - Err(crate::bdev::device::io_type_to_err( + inject_submission_error( + Nexus, + &InjectIoCtx::with_iovs( + hdl.get_device(), self.io_type(), - Errno::ENXIO, self.offset(), self.num_blocks(), - )) - } else { - Ok(()) - } + self.iovs(), + ), + ) } /// Checks if an error is to be injected upon completion. - #[cfg(feature = "nexus-fault-injection")] + #[cfg(feature = "fault-injection")] #[inline] fn inject_completion_error( &self, child: &dyn BlockDevice, status: IoCompletionStatus, ) -> IoCompletionStatus { - if !injections_enabled() { - return status; - } - - let op = match self.io_type() { - IoType::Read => InjectionOp::Read, - _ => InjectionOp::Write, + use crate::core::fault_injection::{ + inject_completion_error, + FaultDomain::Nexus, + InjectIoCtx, }; - if self.nexus().inject_is_faulted( - child, - op, - self.offset(), - self.num_blocks(), - ) { - IoCompletionStatus::NvmeError(NvmeStatus::Generic( - GenericStatusCode::DataTransferError, - )) - } else { - status - } + inject_completion_error( + Nexus, + &InjectIoCtx::with_iovs( + child, + self.io_type(), + self.offset(), + self.num_blocks(), + self.iovs(), + ), + status, + ) } } diff --git a/io-engine/src/bdev/nvmx/handle.rs b/io-engine/src/bdev/nvmx/handle.rs index 8af19818fd..464b8eae41 100644 --- a/io-engine/src/bdev/nvmx/handle.rs +++ b/io-engine/src/bdev/nvmx/handle.rs @@ -28,8 +28,10 @@ use spdk_rs::{ nvme_admin_opc, nvme_nvm_opcode, nvme_reservation_register_action, + AsIoVecPtr, DmaBuf, DmaError, + IoVec, MediaErrorStatusCode, NvmeStatus, }; @@ -58,13 +60,21 @@ use crate::{ IoCompletionStatus, IoType, Reactors, - ReadMode, + ReadOptions, SnapshotParams, }, ffihelper::{cb_arg, done_cb, FfiResult}, subsys, }; +#[cfg(feature = "fault-injection")] +use crate::core::fault_injection::{ + inject_completion_error, + inject_submission_error, + FaultDomain, + InjectIoCtx, +}; + use super::NvmeIoChannelInner; /* @@ -82,6 +92,8 @@ struct NvmeIoCtx { op: IoType, num_blocks: u64, channel: *mut spdk_io_channel, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx, } unsafe impl Send for NvmeIoCtx {} @@ -343,6 +355,13 @@ fn complete_nvme_command(ctx: *mut NvmeIoCtx, cpl: *const spdk_nvme_cpl) { IoCompletionStatus::from(NvmeStatus::from(cpl)) }; + #[cfg(feature = "fault-injection")] + let status = inject_completion_error( + FaultDomain::BlockDevice, + &io_ctx.inj_op, + status, + ); + (io_ctx.cb)(&*inner.device, status, io_ctx.cb_arg); free_nvme_io_ctx(ctx); @@ -402,15 +421,14 @@ extern "C" fn nvme_flush_completion( fn check_io_args( op: IoType, - iov: *mut iovec, - iovcnt: i32, + iovs: &[IoVec], offset_blocks: u64, num_blocks: u64, ) -> Result<(), CoreError> { // Make sure I/O structures look sane. // As of now, we assume that I/O vector is fully prepared by the caller. - if iovcnt <= 0 { - error!("insufficient number of elements in I/O vector: {}", iovcnt); + if iovs.is_empty() { + error!("empty I/O vector"); return Err(io_type_to_err( op, libc::EINVAL, @@ -418,17 +436,17 @@ fn check_io_args( num_blocks, )); } - unsafe { - if (*iov).iov_base.is_null() { - error!("I/O vector is not initialized"); - return Err(io_type_to_err( - op, - libc::EINVAL, - offset_blocks, - num_blocks, - )); - } + + if !iovs[0].is_initialized() { + error!("I/O vector is not initialized"); + return Err(io_type_to_err( + op, + libc::EINVAL, + offset_blocks, + num_blocks, + )); } + Ok(()) } @@ -549,11 +567,10 @@ impl BlockDeviceHandle for NvmeDeviceHandle { DmaBuf::new(size, self.ns.alignment()) } - async fn read_at_ex( + async fn read_at( &self, offset: u64, buffer: &mut DmaBuf, - mode: Option, ) -> Result { let (valid, offset_blocks, num_blocks) = self.bytes_to_blocks(offset, buffer.len()); @@ -577,13 +594,6 @@ impl BlockDeviceHandle for NvmeDeviceHandle { }); } - let flags = mode.map_or(self.prchk_flags, |m| match m { - ReadMode::Normal => self.prchk_flags, - ReadMode::UnwrittenFail => { - self.prchk_flags | SPDK_NVME_IO_FLAGS_UNWRITTEN_READ_FAIL - } - }); - let inner = NvmeIoChannel::inner_from_channel(self.io_channel.as_ptr()); // Make sure channel allows I/O. @@ -595,12 +605,12 @@ impl BlockDeviceHandle for NvmeDeviceHandle { spdk_nvme_ns_cmd_read( self.ns.as_ptr(), inner.qpair_ptr(), - **buffer, + buffer.as_mut_ptr(), offset_blocks, num_blocks as u32, Some(nvme_async_io_completion), cb_arg(s), - flags, + self.prchk_flags, ) }; @@ -630,7 +640,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { len: buffer.len(), }), status => Err(CoreError::ReadFailed { - status, + status: IoCompletionStatus::NvmeError(status), offset, len: buffer.len(), }), @@ -677,7 +687,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { spdk_nvme_ns_cmd_write( self.ns.as_ptr(), inner.qpair_ptr(), - **buffer, + buffer.as_ptr() as *mut _, offset_blocks, num_blocks as u32, Some(nvme_async_io_completion), @@ -706,7 +716,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { Ok(buffer.len()) } status => Err(CoreError::WriteFailed { - status, + status: IoCompletionStatus::NvmeError(status), offset, len: buffer.len(), }), @@ -718,14 +728,22 @@ impl BlockDeviceHandle for NvmeDeviceHandle { // bdev_nvme_get_buf_cb fn readv_blocks( &self, - iov: *mut iovec, - iovcnt: i32, + iovs: &mut [IoVec], offset_blocks: u64, num_blocks: u64, + opts: ReadOptions, cb: IoCompletionCallback, cb_arg: IoCompletionCallbackArg, ) -> Result<(), CoreError> { - check_io_args(IoType::Read, iov, iovcnt, offset_blocks, num_blocks)?; + check_io_args(IoType::Read, iovs, offset_blocks, num_blocks)?; + + // Get read flags. + let flags = match opts { + ReadOptions::None => self.prchk_flags, + ReadOptions::UnwrittenFail => { + self.prchk_flags | SPDK_NVME_IO_FLAGS_UNWRITTEN_READ_FAIL + } + }; let channel = self.io_channel.as_ptr(); let inner = NvmeIoChannel::inner_from_channel(channel); @@ -738,29 +756,42 @@ impl BlockDeviceHandle for NvmeDeviceHandle { NvmeIoCtx { cb, cb_arg, - iov, - iovcnt: iovcnt as u64, + iov: iovs.as_io_vec_mut_ptr(), + iovcnt: iovs.len() as u64, iovpos: 0, iov_offset: 0, channel, op: IoType::Read, num_blocks, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::with_iovs( + self.get_device(), + IoType::Read, + offset_blocks, + num_blocks, + iovs, + ), }, offset_blocks, num_blocks, )?; - let rc = if iovcnt == 1 { + #[cfg(feature = "fault-injection")] + inject_submission_error(FaultDomain::BlockDevice, unsafe { + &(*bio).inj_op + })?; + + let rc = if iovs.len() == 1 { unsafe { spdk_nvme_ns_cmd_read( self.ns.as_ptr(), inner.qpair_ptr(), - (*iov).iov_base, + iovs[0].as_mut_ptr(), offset_blocks, num_blocks as u32, Some(nvme_io_done), bio as *mut c_void, - self.prchk_flags, + flags, ) } } else { @@ -772,7 +803,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { num_blocks as u32, Some(nvme_io_done), bio as *mut c_void, - self.prchk_flags, + flags, Some(nvme_queued_reset_sgl), Some(nvme_queued_next_sge), ) @@ -793,14 +824,13 @@ impl BlockDeviceHandle for NvmeDeviceHandle { fn writev_blocks( &self, - iov: *mut iovec, - iovcnt: i32, + iovs: &[IoVec], offset_blocks: u64, num_blocks: u64, cb: IoCompletionCallback, cb_arg: IoCompletionCallbackArg, ) -> Result<(), CoreError> { - check_io_args(IoType::Write, iov, iovcnt, offset_blocks, num_blocks)?; + check_io_args(IoType::Write, iovs, offset_blocks, num_blocks)?; let channel = self.io_channel.as_ptr(); let inner = NvmeIoChannel::inner_from_channel(channel); @@ -813,24 +843,37 @@ impl BlockDeviceHandle for NvmeDeviceHandle { NvmeIoCtx { cb, cb_arg, - iov, - iovcnt: iovcnt as u64, + iov: iovs.as_ptr() as *mut _, + iovcnt: iovs.len() as u64, iovpos: 0, iov_offset: 0, channel, op: IoType::Write, num_blocks, + #[cfg(feature = "fault-injection")] + inj_op: InjectIoCtx::with_iovs( + self.get_device(), + IoType::Write, + offset_blocks, + num_blocks, + iovs, + ), }, offset_blocks, num_blocks, )?; - let rc = if iovcnt == 1 { + #[cfg(feature = "fault-injection")] + inject_submission_error(FaultDomain::BlockDevice, unsafe { + &(*bio).inj_op + })?; + + let rc = if iovs.len() == 1 { unsafe { spdk_nvme_ns_cmd_write( self.ns.as_ptr(), inner.qpair_ptr(), - (*iov).iov_base, + iovs[0].as_ptr() as *mut _, offset_blocks, num_blocks as u32, Some(nvme_io_done), @@ -916,6 +959,8 @@ impl BlockDeviceHandle for NvmeDeviceHandle { channel, op: IoType::Flush, num_blocks, + #[cfg(feature = "fault-injection")] + inj_op: Default::default(), }, 0, num_blocks, // Flush all device blocks. @@ -977,6 +1022,8 @@ impl BlockDeviceHandle for NvmeDeviceHandle { channel, op: IoType::Unmap, num_blocks, + #[cfg(feature = "fault-injection")] + inj_op: Default::default(), }, offset_blocks, num_blocks, @@ -1074,6 +1121,8 @@ impl BlockDeviceHandle for NvmeDeviceHandle { channel, op: IoType::WriteZeros, num_blocks, + #[cfg(feature = "fault-injection")] + inj_op: Default::default(), }, offset_blocks, num_blocks, @@ -1154,7 +1203,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { } let (ptr, size) = match buffer { - Some(buf) => (**buf, buf.len()), + Some(buf) => (buf.as_mut_ptr(), buf.len()), None => (std::ptr::null_mut(), 0), }; @@ -1343,7 +1392,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { } let (ptr, size) = match buffer { - Some(buf) => (**buf, buf.len()), + Some(buf) => (buf.as_mut_ptr(), buf.len()), None => (std::ptr::null_mut(), 0), }; diff --git a/io-engine/src/bin/casperf.rs b/io-engine/src/bin/casperf.rs index bde3284fc0..4dada06d98 100644 --- a/io-engine/src/bin/casperf.rs +++ b/io-engine/src/bin/casperf.rs @@ -233,7 +233,7 @@ impl Io { if spdk_bdev_read( self.job.as_ref().desc.legacy_as_ptr(), self.job.as_ref().ch.as_ref().unwrap().legacy_as_ptr(), - *self.buf, + self.buf.as_mut_ptr(), offset, self.buf.len(), Some(Job::io_completion), @@ -256,7 +256,7 @@ impl Io { if spdk_bdev_write( self.job.as_ref().desc.legacy_as_ptr(), self.job.as_ref().ch.as_ref().unwrap().legacy_as_ptr(), - *self.buf, + self.buf.as_mut_ptr(), offset, self.buf.len(), Some(Job::io_completion), diff --git a/io-engine/src/bin/initiator.rs b/io-engine/src/bin/initiator.rs index 58a4e50de9..328f204561 100644 --- a/io-engine/src/bin/initiator.rs +++ b/io-engine/src/bin/initiator.rs @@ -97,6 +97,7 @@ async fn read(uri: &str, offset: u64, file: &str) -> Result<()> { let bdev = device_create(uri).await?; let h = device_open(&bdev, false).unwrap().into_handle().unwrap(); let mut buf = h.dma_malloc(h.get_device().block_len()).unwrap(); + #[allow(deprecated)] let n = h.read_at(offset, &mut buf).await?; fs::write(file, buf.as_slice())?; info!("{} bytes read", n); @@ -113,6 +114,7 @@ async fn write(uri: &str, offset: u64, file: &str) -> Result<()> { if n < buf.len() as usize { warn!("Writing a buffer which was not fully initialized from a file"); } + #[allow(deprecated)] let written = h.write_at(offset, &buf).await?; info!("{} bytes written", written); Ok(()) diff --git a/io-engine/src/bin/io-engine-client/context.rs b/io-engine/src/bin/io-engine-client/context.rs index df324d0757..5accf47d68 100644 --- a/io-engine/src/bin/io-engine-client/context.rs +++ b/io-engine/src/bin/io-engine-client/context.rs @@ -66,6 +66,7 @@ mod v1 { pub type HostRpcClient = host::HostRpcClient; pub type NexusRpcClient = nexus::NexusRpcClient; pub type SnapshotRpcClient = snapshot::SnapshotRpcClient; + pub type TestingRpcClient = testing::TestingRpcClient; pub struct Context { pub bdev: BdevRpcClient, @@ -75,6 +76,7 @@ mod v1 { pub host: HostRpcClient, pub nexus: NexusRpcClient, pub snapshot: SnapshotRpcClient, + pub testing: TestingRpcClient, } impl Context { @@ -86,6 +88,7 @@ mod v1 { let host = HostRpcClient::connect(h.clone()).await.unwrap(); let nexus = NexusRpcClient::connect(h.clone()).await.unwrap(); let snapshot = SnapshotRpcClient::connect(h.clone()).await.unwrap(); + let testing = TestingRpcClient::connect(h.clone()).await.unwrap(); Ok(Self { bdev, @@ -95,6 +98,7 @@ mod v1 { host, nexus, snapshot, + testing, }) } } diff --git a/io-engine/src/bin/io-engine-client/v0/nexus_cli.rs b/io-engine/src/bin/io-engine-client/v0/nexus_cli.rs index 478cdadcb6..48d9e2310c 100644 --- a/io-engine/src/bin/io-engine-client/v0/nexus_cli.rs +++ b/io-engine/src/bin/io-engine-client/v0/nexus_cli.rs @@ -221,35 +221,6 @@ pub fn subcommands<'a, 'b>() -> App<'a, 'b> { .help("uuid of nexus"), ); - let inject = SubCommand::with_name("inject") - .about("manage injected faults") - .arg( - Arg::with_name("uuid") - .required(true) - .index(1) - .help("uuid of nexus"), - ) - .arg( - Arg::with_name("add") - .short("a") - .long("add") - .required(false) - .takes_value(true) - .multiple(true) - .number_of_values(1) - .help("new injection uri"), - ) - .arg( - Arg::with_name("remove") - .short("r") - .long("remove") - .required(false) - .takes_value(true) - .multiple(true) - .number_of_values(1) - .help("injection uri"), - ); - SubCommand::with_name("nexus") .settings(&[ AppSettings::SubcommandRequiredElseHelp, @@ -270,7 +241,6 @@ pub fn subcommands<'a, 'b>() -> App<'a, 'b> { .subcommand(list2) .subcommand(children) .subcommand(children_2) - .subcommand(inject) .subcommand(nexus_child_cli::subcommands()) } @@ -293,7 +263,6 @@ pub async fn handler( ("add", Some(args)) => nexus_add(ctx, args).await, ("remove", Some(args)) => nexus_remove(ctx, args).await, ("child", Some(args)) => nexus_child_cli::handler(ctx, args).await, - ("inject", Some(args)) => injections(ctx, args).await, (cmd, _) => { Err(Status::not_found(format!("command {cmd} does not exist"))) .context(GrpcStatus) @@ -980,78 +949,6 @@ async fn nexus_remove( Ok(()) } -async fn injections( - mut ctx: Context, - matches: &ArgMatches<'_>, -) -> crate::Result<()> { - let uuid = matches - .value_of("uuid") - .ok_or_else(|| ClientError::MissingValue { - field: "uuid".to_string(), - })? - .to_string(); - - let inj_add = matches.values_of("add"); - let inj_remove = matches.values_of("remove"); - - if inj_add.is_none() && inj_remove.is_none() { - return list_nexus_injections(ctx, &uuid).await; - } - - if let Some(uris) = inj_add { - for uri in uris { - println!("Injecting fault: {uri}"); - ctx.client - .inject_nexus_fault(v0::InjectNexusFaultRequest { - uuid: uuid.clone(), - uri: uri.to_owned(), - }) - .await - .context(GrpcStatus)?; - } - } - - if let Some(uris) = inj_remove { - for uri in uris { - println!("Removing injected fault: {uri}"); - ctx.client - .remove_injected_nexus_fault( - v0::RemoveInjectedNexusFaultRequest { - uuid: uuid.clone(), - uri: uri.to_owned(), - }, - ) - .await - .context(GrpcStatus)?; - } - } - - Ok(()) -} - -async fn list_nexus_injections( - mut ctx: Context, - uuid: &str, -) -> crate::Result<()> { - let response = ctx - .client - .list_injected_nexus_faults(v0::ListInjectedNexusFaultsRequest { - uuid: uuid.to_owned(), - }) - .await - .context(GrpcStatus)?; - - println!( - "{}", - serde_json::to_string_pretty(response.get_ref()) - .unwrap() - .to_colored_json_auto() - .unwrap() - ); - - Ok(()) -} - fn ana_state_idx_to_str(idx: i32) -> &'static str { match v0::NvmeAnaState::from_i32(idx).unwrap() { v0::NvmeAnaState::NvmeAnaInvalidState => "invalid", diff --git a/io-engine/src/bin/io-engine-client/v1/mod.rs b/io-engine/src/bin/io-engine-client/v1/mod.rs index 67931bacd7..d91a27c6f5 100644 --- a/io-engine/src/bin/io-engine-client/v1/mod.rs +++ b/io-engine/src/bin/io-engine-client/v1/mod.rs @@ -9,6 +9,7 @@ pub mod pool_cli; pub mod rebuild_cli; pub mod replica_cli; pub mod snapshot_cli; +pub mod testing_cli; pub(crate) use super::context; use crate::ContextCreate; @@ -73,6 +74,7 @@ pub(super) async fn main_() -> crate::Result<()> { .subcommand(perf_cli::subcommands()) .subcommand(rebuild_cli::subcommands()) .subcommand(snapshot_cli::subcommands()) + .subcommand(testing_cli::subcommands()) .subcommand(jsonrpc_cli::subcommands()) .subcommand(controller_cli::subcommands()) .get_matches(); @@ -90,6 +92,7 @@ pub(super) async fn main_() -> crate::Result<()> { ("replica", Some(args)) => replica_cli::handler(ctx, args).await, ("rebuild", Some(args)) => rebuild_cli::handler(ctx, args).await, ("snapshot", Some(args)) => snapshot_cli::handler(ctx, args).await, + ("testing", Some(args)) => testing_cli::handler(ctx, args).await, ("controller", Some(args)) => controller_cli::handler(ctx, args).await, ("jsonrpc", Some(args)) => jsonrpc_cli::json_rpc_call(ctx, args).await, _ => panic!("Command not found"), diff --git a/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs b/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs index b89a074c26..11ed94c4c0 100644 --- a/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs +++ b/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs @@ -199,35 +199,6 @@ pub fn subcommands<'a, 'b>() -> App<'a, 'b> { .help("uuid of nexus"), ); - let inject = SubCommand::with_name("inject") - .about("manage injected faults") - .arg( - Arg::with_name("uuid") - .required(true) - .index(1) - .help("uuid of nexus"), - ) - .arg( - Arg::with_name("add") - .short("a") - .long("add") - .required(false) - .takes_value(true) - .multiple(true) - .number_of_values(1) - .help("new injection uri"), - ) - .arg( - Arg::with_name("remove") - .short("r") - .long("remove") - .required(false) - .takes_value(true) - .multiple(true) - .number_of_values(1) - .help("injection uri"), - ); - SubCommand::with_name("nexus") .settings(&[ AppSettings::SubcommandRequiredElseHelp, @@ -245,7 +216,6 @@ pub fn subcommands<'a, 'b>() -> App<'a, 'b> { .subcommand(ana_state) .subcommand(list) .subcommand(children) - .subcommand(inject) .subcommand(nexus_child_cli::subcommands()) } @@ -265,7 +235,6 @@ pub async fn handler( ("add", Some(args)) => nexus_add(ctx, args).await, ("remove", Some(args)) => nexus_remove(ctx, args).await, ("child", Some(args)) => nexus_child_cli::handler(ctx, args).await, - ("inject", Some(args)) => injections(ctx, args).await, (cmd, _) => { Err(Status::not_found(format!("command {cmd} does not exist"))) .context(GrpcStatus) @@ -854,80 +823,6 @@ async fn nexus_remove( Ok(()) } -async fn injections( - mut ctx: Context, - matches: &ArgMatches<'_>, -) -> crate::Result<()> { - let uuid = matches - .value_of("uuid") - .ok_or_else(|| ClientError::MissingValue { - field: "uuid".to_string(), - })? - .to_string(); - - let inj_add = matches.values_of("add"); - let inj_remove = matches.values_of("remove"); - if inj_add.is_none() && inj_remove.is_none() { - return list_nexus_injections(ctx, uuid.clone().as_str()).await; - } - - if let Some(uris) = inj_add { - for uri in uris { - println!("Injecting fault: {uri}"); - ctx.v1 - .nexus - .inject_nexus_fault(v1::nexus::InjectNexusFaultRequest { - uuid: uuid.clone(), - uri: uri.to_string(), - }) - .await - .context(GrpcStatus)?; - } - } - - if let Some(uris) = inj_remove { - for uri in uris { - println!("Removing injected fault: {uri}"); - ctx.v1 - .nexus - .remove_injected_nexus_fault( - v1::nexus::RemoveInjectedNexusFaultRequest { - uuid: uuid.clone(), - uri: uri.to_owned(), - }, - ) - .await - .context(GrpcStatus)?; - } - } - - Ok(()) -} - -async fn list_nexus_injections( - mut ctx: Context, - uuid: &str, -) -> crate::Result<()> { - let response = ctx - .v1 - .nexus - .list_injected_nexus_faults(v1::nexus::ListInjectedNexusFaultsRequest { - uuid: uuid.to_owned(), - }) - .await - .context(GrpcStatus)?; - - println!( - "{}", - serde_json::to_string_pretty(response.get_ref()) - .unwrap() - .to_colored_json_auto() - .unwrap() - ); - - Ok(()) -} - fn ana_state_idx_to_str(idx: i32) -> &'static str { match v1::nexus::NvmeAnaState::from_i32(idx).unwrap() { v1::nexus::NvmeAnaState::NvmeAnaInvalidState => "invalid", diff --git a/io-engine/src/bin/io-engine-client/v1/testing_cli.rs b/io-engine/src/bin/io-engine-client/v1/testing_cli.rs new file mode 100644 index 0000000000..1d2fe7fe02 --- /dev/null +++ b/io-engine/src/bin/io-engine-client/v1/testing_cli.rs @@ -0,0 +1,111 @@ +use crate::{context::Context, GrpcStatus}; +use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; +use colored_json::ToColoredJson; +use mayastor_api::v1; +use snafu::ResultExt; +use tonic::Status; + +pub fn subcommands<'a, 'b>() -> App<'a, 'b> { + let inject = SubCommand::with_name("inject") + .about("manage fault injections") + .arg( + Arg::with_name("add") + .short("a") + .long("add") + .required(false) + .takes_value(true) + .multiple(true) + .number_of_values(1) + .help("new injection uri"), + ) + .arg( + Arg::with_name("remove") + .short("r") + .long("remove") + .required(false) + .takes_value(true) + .multiple(true) + .number_of_values(1) + .help("injection uri"), + ); + + SubCommand::with_name("testing") + .settings(&[ + AppSettings::SubcommandRequiredElseHelp, + AppSettings::ColoredHelp, + AppSettings::ColorAlways, + ]) + .about("Testing commands") + .subcommand(inject) +} + +pub async fn handler( + ctx: Context, + matches: &ArgMatches<'_>, +) -> crate::Result<()> { + match matches.subcommand() { + ("inject", Some(args)) => injections(ctx, args).await, + (cmd, _) => { + Err(Status::not_found(format!("command {cmd} does not exist"))) + .context(GrpcStatus) + } + } +} + +async fn injections( + mut ctx: Context, + matches: &ArgMatches<'_>, +) -> crate::Result<()> { + let inj_add = matches.values_of("add"); + let inj_remove = matches.values_of("remove"); + if inj_add.is_none() && inj_remove.is_none() { + return list_injections(ctx).await; + } + + if let Some(uris) = inj_add { + for uri in uris { + println!("Injection: '{uri}'"); + ctx.v1 + .testing + .add_injection(v1::testing::AddInjectionRequest { + uri: uri.to_owned(), + }) + .await + .context(GrpcStatus)?; + } + } + + if let Some(uris) = inj_remove { + for uri in uris { + println!("Removing injected fault: {uri}"); + ctx.v1 + .testing + .remove_injection(v1::testing::RemoveInjectionRequest { + uri: uri.to_owned(), + }) + .await + .context(GrpcStatus)?; + } + } + + Ok(()) +} + +async fn list_injections(mut ctx: Context) -> crate::Result<()> { + let response = ctx + .v1 + .testing + .list_injections(v1::testing::ListInjectionsRequest {}) + .await + .context(GrpcStatus)?; + + println!( + "{}", + serde_json::to_string_pretty(response.get_ref()) + .unwrap() + .to_colored_json_auto() + .unwrap() + ); + + Ok(()) +} diff --git a/io-engine/src/bin/io-engine.rs b/io-engine/src/bin/io-engine.rs index bd4871af02..d6008a3c45 100644 --- a/io-engine/src/bin/io-engine.rs +++ b/io-engine/src/bin/io-engine.rs @@ -86,7 +86,7 @@ fn start_tokio_runtime(args: &MayastorCliArgs) { print_feature!("Async QPair connection", "spdk-async-qpair-connect"); print_feature!("SPDK subsystem events", "spdk-subsystem-events"); - print_feature!("Nexus-level fault injection", "nexus-fault-injection"); + print_feature!("Fault injection", "fault-injection"); // Initialize Lock manager. let cfg = ResourceLockManagerConfig::default() diff --git a/io-engine/src/core/block_device.rs b/io-engine/src/core/block_device.rs index 83a0750692..0a306a3add 100644 --- a/io-engine/src/core/block_device.rs +++ b/io-engine/src/core/block_device.rs @@ -9,8 +9,10 @@ use super::{ use spdk_rs::{DmaBuf, DmaError, IoVec}; use async_trait::async_trait; +use futures::channel::oneshot; use merge::Merge; use nix::errno::Errno; +use spdk_rs::ffihelper::{cb_arg, done_cb}; use std::os::raw::c_void; use uuid::Uuid; @@ -124,10 +126,10 @@ pub type OpCompletionCallbackArg = *mut c_void; /// TODO pub type OpCompletionCallback = fn(bool, OpCompletionCallbackArg) -> (); -/// Read modes. -pub enum ReadMode { +/// Read options. +pub enum ReadOptions { /// Normal read operation. - Normal, + None, /// Fail when reading an unwritten block of a thin-provisioned device. UnwrittenFail, } @@ -136,64 +138,176 @@ pub enum ReadMode { /// TODO: Add text. #[async_trait(?Send)] pub trait BlockDeviceHandle { - // Generic functions. - /// TODO fn get_device(&self) -> &dyn BlockDevice; /// TODO fn dma_malloc(&self, size: u64) -> Result; - // Futures-based I/O functions. - /// TODO + #[deprecated(note = "use read_buf_blocks_async()")] async fn read_at( &self, offset: u64, buffer: &mut DmaBuf, - ) -> Result { - self.read_at_ex(offset, buffer, None).await - } - - /// TODO - async fn read_at_ex( - &self, - offset: u64, - buffer: &mut DmaBuf, - mode: Option, ) -> Result; /// TODO + #[deprecated(note = "use write_buf_blocks_async()")] async fn write_at( &self, offset: u64, buffer: &DmaBuf, ) -> Result; - // Callback-based I/O functions. - - /// TODO + /// Reads the given number of blocks into the list of buffers from the + /// device, starting at the given offset. + /// + /// The caller must ensure that the number of blocks to read does not exceed + /// the total size of `iovs` buffer list. + /// + /// The given completion callback is called when the operation finishes. + /// This method may return error immediately in the case operation dispatch + /// fails. fn readv_blocks( &self, - iov: *mut IoVec, - iovcnt: i32, + iovs: &mut [IoVec], offset_blocks: u64, num_blocks: u64, + opts: ReadOptions, cb: IoCompletionCallback, cb_arg: IoCompletionCallbackArg, ) -> Result<(), CoreError>; - /// TODO + /// Reads the given number of blocks into the list of buffers from the + /// device, starting at the given offset. + /// + /// The caller must ensure that the number of blocks to read does not exceed + /// the total size of `iovs` buffer list. + /// + /// Operation is performed asynchronously; I/O completion status is wrapped + /// into `CoreError::ReadFailed` in the case of failure. + async fn readv_blocks_async( + &self, + iovs: &mut [IoVec], + offset_blocks: u64, + num_blocks: u64, + opts: ReadOptions, + ) -> Result<(), CoreError> { + let (s, r) = oneshot::channel::(); + + self.readv_blocks( + iovs, + offset_blocks, + num_blocks, + opts, + block_device_io_completion, + cb_arg(s), + )?; + + match r.await.expect("Failed awaiting at readv_blocks()") { + IoCompletionStatus::Success => Ok(()), + status => Err(CoreError::ReadFailed { + status, + offset: offset_blocks, + len: num_blocks, + }), + } + } + + /// Reads the given number of blocks into the buffer from the device, + /// starting at the given offset. + /// + /// The caller must ensure that the `buf` buffer has enough space allocated. + /// + /// Operation is performed asynchronously; I/O completion status is wrapped + /// into `CoreError::ReadFailed` in the case of failure. + async fn read_buf_blocks_async( + &self, + buf: &mut DmaBuf, + offset_blocks: u64, + num_blocks: u64, + opts: ReadOptions, + ) -> Result<(), CoreError> { + self.readv_blocks_async( + &mut [buf.to_io_vec()], + offset_blocks, + num_blocks, + opts, + ) + .await + } + + /// Writes the given number of blocks from the list of buffers to the + /// device, starting at the given offset. + /// + /// The caller must ensure that the number of blocks to write does not go + /// beyond the size of `iovs` array. + /// + /// The given completion callback is called when the operation finishes. + /// This method may return error immediately in the case operation dispatch + /// fails. fn writev_blocks( &self, - iov: *mut IoVec, - iovcnt: i32, + iovs: &[IoVec], offset_blocks: u64, num_blocks: u64, cb: IoCompletionCallback, cb_arg: IoCompletionCallbackArg, ) -> Result<(), CoreError>; + /// Writes the given number of blocks from the list of buffers to the + /// device, starting at the given offset. + /// + /// The caller must ensure that the number of blocks to write does not go + /// beyond the size of `iovs` array. + /// + /// Operation is performed asynchronously; I/O completion status is wrapped + /// into `CoreError::WriteFailed` in the case of failure. + async fn writev_blocks_async( + &self, + iovs: &[IoVec], + offset_blocks: u64, + num_blocks: u64, + ) -> Result<(), CoreError> { + let (s, r) = oneshot::channel::(); + + self.writev_blocks( + iovs, + offset_blocks, + num_blocks, + block_device_io_completion, + cb_arg(s), + )?; + + match r.await.expect("Failed awaiting at writev_blocks()") { + IoCompletionStatus::Success => Ok(()), + status => Err(CoreError::WriteFailed { + status, + offset: offset_blocks, + len: num_blocks, + }), + } + } + + /// Writes the given number of blocks from the buffer to the device, + /// starting at the given offset. + /// + /// The caller must ensure that the `buf` buffer is large enough to write + /// `num_blocks`. + /// + /// Operation is performed asynchronously; I/O completion status is wrapped + /// into `CoreError::WriteFailed` in the case of failure. + async fn write_buf_blocks_async( + &self, + buf: &DmaBuf, + offset_blocks: u64, + num_blocks: u64, + ) -> Result<(), CoreError> { + self.writev_blocks_async(&[buf.to_io_vec()], offset_blocks, num_blocks) + .await + } + /// TODO fn reset( &self, @@ -315,6 +429,14 @@ pub trait BlockDeviceHandle { ) -> Result<(), CoreError>; } +fn block_device_io_completion( + _device: &dyn BlockDevice, + status: IoCompletionStatus, + ctx: *mut c_void, +) { + done_cb(ctx, status); +} + /// TODO pub trait LbaRangeController {} diff --git a/io-engine/src/core/fault_injection/injection.rs b/io-engine/src/core/fault_injection/injection.rs new file mode 100644 index 0000000000..876877d263 --- /dev/null +++ b/io-engine/src/core/fault_injection/injection.rs @@ -0,0 +1,350 @@ +#![cfg(feature = "fault-injection")] + +use rand::{rngs::StdRng, RngCore, SeedableRng}; +use std::{ + fmt::{Debug, Display, Formatter}, + ops::Range, + time::{Duration, Instant}, +}; + +use url::Url; + +use crate::core::IoCompletionStatus; + +use super::{ + FaultDomain, + FaultIoStage, + FaultIoType, + FaultType, + InjectIoCtx, + InjectionError, +}; + +/// Fault injection. +#[derive(Debug, Clone)] +pub struct Injection { + pub uri: String, + pub domain: FaultDomain, + pub device_name: String, + pub fault_io_type: FaultIoType, + pub fault_io_stage: FaultIoStage, + pub fault_type: FaultType, + pub started: Option, + pub begin: Duration, + pub end: Duration, + pub range: Range, + rng: StdRng, +} + +impl Display for Injection { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt_duration(u: &Duration) -> String { + if *u == Duration::MAX { + "INF".to_string() + } else { + format!("{u:?}") + } + } + + fn fmt_u64(u: u64) -> String { + if u == u64::MAX { + "INF".to_string() + } else { + format!("{u:?}") + } + } + + write!( + f, + "{io}::{stage}::{ft} injection <{d}::{n}> [{b:?} -> \ + {e} ({t:?})] @ {rs}..{re}", + io = self.fault_io_type, + stage = self.fault_io_stage, + ft = self.fault_type, + d = self.domain, + n = self.device_name, + b = self.begin, + e = fmt_duration(&self.end), + t = self.now(), + rs = self.range.start, + re = fmt_u64(self.range.end), + ) + } +} + +fn new_rng() -> StdRng { + let seed = [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + ]; + StdRng::from_seed(seed) +} + +impl Injection { + /// Creates a new injection. + #[allow(dead_code)] + pub fn new( + domain: FaultDomain, + name: &str, + fault_io_type: FaultIoType, + fault_io_stage: FaultIoStage, + fault_type: FaultType, + begin: Duration, + end: Duration, + range: Range, + ) -> Self { + let opts = vec![ + format!("domain={domain}"), + format!("op={fault_io_type}"), + format!("stage={fault_io_stage}"), + format!("type={fault_type}"), + format!("begin={begin:?}"), + format!("end={end:?}"), + format!("offset={}", range.start), + format!("num_blk={}", range.end), + ] + .join("&"); + + let uri = format!("inject://{name}?{opts}"); + + Self { + uri, + domain, + device_name: name.to_owned(), + fault_io_type, + fault_io_stage, + fault_type, + started: None, + begin, + end, + range, + rng: new_rng(), + } + } + + /// Parses an injection URI and creates injection object. + pub fn from_uri(uri: &str) -> Result { + if !uri.starts_with("inject://") { + return Err(InjectionError::NotInjectionUri { + uri: uri.to_owned(), + }); + } + + let p = Url::parse(uri).map_err(|e| InjectionError::InvalidUri { + source: e, + uri: uri.to_owned(), + })?; + + let mut r = Self { + uri: uri.to_owned(), + domain: FaultDomain::None, + device_name: format!( + "{host}{port}{path}", + host = p.host_str().unwrap_or_default(), + port = if let Some(port) = p.port() { + format!(":{port}") + } else { + "".to_string() + }, + path = p.path() + ), + fault_io_type: FaultIoType::Read, + fault_io_stage: FaultIoStage::Completion, + fault_type: FaultType::status_data_transfer_error(), + started: None, + begin: Duration::ZERO, + end: Duration::MAX, + range: 0 .. u64::MAX, + rng: new_rng(), + }; + + for (k, v) in p.query_pairs() { + match k.as_ref() { + "domain" => r.domain = parse_domain(&k, &v)?, + "op" => r.fault_io_type = parse_fault_io_type(&k, &v)?, + "stage" => r.fault_io_stage = parse_fault_io_stage(&k, &v)?, + "type" => r.fault_type = parse_fault_type(&k, &v)?, + "begin" => r.begin = parse_timer(&k, &v)?, + "end" => r.end = parse_timer(&k, &v)?, + "offset" => r.range.start = parse_num(&k, &v)?, + "num_blk" => r.range.end = parse_num(&k, &v)?, + _ => { + return Err(InjectionError::UnknownParameter { + name: k.to_string(), + value: v.to_string(), + }) + } + }; + } + + r.range.end = r.range.start.saturating_add(r.range.end); + + if r.begin > r.end { + return Err(InjectionError::BadDurations { + name: r.device_name, + begin: r.begin, + end: r.end, + }); + } + + Ok(r) + } + + /// Returns current time relative to injection start. + fn now(&self) -> Duration { + self.started.map_or(Duration::MAX, |s| { + Instant::now().saturating_duration_since(s) + }) + } + + /// True if the injection is currently active. + pub fn is_active(&self) -> bool { + let d = self.now(); + d >= self.begin && d < self.end + } + + /// Injects an error for the given I/O context. + /// If this injected fault does not apply to this context, returns `None`. + /// Otherwise, returns an operation status to be returned by the calling I/O + /// routine. + #[inline] + pub fn inject( + &mut self, + domain: FaultDomain, + fault_io_type: FaultIoType, + fault_io_stage: FaultIoStage, + ctx: &InjectIoCtx, + ) -> Option { + if domain != self.domain + || fault_io_type != self.fault_io_type + || fault_io_stage != self.fault_io_stage + || ctx.device_name() != self.device_name + { + return None; + } + + if self.started.is_none() { + debug!("{self:?}: starting"); + self.started = Some(Instant::now()); + } + + if !self.is_active() || !is_overlapping(&self.range, &ctx.range) { + return None; + } + + match self.fault_type { + FaultType::Status(status) => Some(status), + FaultType::Data => { + self.inject_data_errors(ctx); + Some(IoCompletionStatus::Success) + } + } + } + + fn inject_data_errors(&mut self, ctx: &InjectIoCtx) { + let Some(iovs) = ctx.iovs_mut() else { + return; + }; + + for iov in iovs { + for i in 0 .. iov.len() { + iov[i] = self.rng.next_u32() as u8; + } + } + } +} + +/// TODO +fn parse_domain(k: &str, v: &str) -> Result { + let r = match v { + "none" => FaultDomain::None, + "nexus" => FaultDomain::Nexus, + "block" | "block_device" => FaultDomain::BlockDevice, + _ => { + return Err(InjectionError::UnknownParameter { + name: k.to_string(), + value: v.to_string(), + }) + } + }; + Ok(r) +} + +/// TODO +fn parse_fault_io_type( + k: &str, + v: &str, +) -> Result { + let res = match v { + "read" | "r" => FaultIoType::Read, + "write" | "w" => FaultIoType::Write, + _ => { + return Err(InjectionError::UnknownParameter { + name: k.to_string(), + value: v.to_string(), + }) + } + }; + Ok(res) +} + +/// TODO +fn parse_fault_io_stage( + k: &str, + v: &str, +) -> Result { + let res = match v { + "submit" | "s" | "submission" => FaultIoStage::Submission, + "compl" | "c" | "completion" => FaultIoStage::Submission, + _ => { + return Err(InjectionError::UnknownParameter { + name: k.to_string(), + value: v.to_string(), + }) + } + }; + Ok(res) +} + +/// TODO +fn parse_fault_type(k: &str, v: &str) -> Result { + let res = match v { + // TODO: add more statuses. + "status" => FaultType::status_data_transfer_error(), + // TODO: add data corruption methods. + "data" => FaultType::Data, + _ => { + return Err(InjectionError::UnknownParameter { + name: k.to_string(), + value: v.to_string(), + }) + } + }; + Ok(res) +} + +/// TODO +fn parse_timer(k: &str, v: &str) -> Result { + let b = + v.parse::() + .map_err(|_| InjectionError::BadParameterValue { + name: k.to_string(), + value: v.to_string(), + })?; + + Ok(Duration::from_millis(b)) +} + +/// TODO +fn parse_num(k: &str, v: &str) -> Result { + v.parse::() + .map_err(|_| InjectionError::BadParameterValue { + name: k.to_string(), + value: v.to_string(), + }) +} + +/// Tests if teo ranges overlap. +fn is_overlapping(a: &Range, b: &Range) -> bool { + a.end > b.start && b.end > a.start +} diff --git a/io-engine/src/core/fault_injection/injections.rs b/io-engine/src/core/fault_injection/injections.rs new file mode 100644 index 0000000000..cbdf4f823c --- /dev/null +++ b/io-engine/src/core/fault_injection/injections.rs @@ -0,0 +1,167 @@ +#![cfg(feature = "fault-injection")] + +use nix::errno::Errno; +use once_cell::sync::OnceCell; +use std::{ + convert::TryInto, + sync::atomic::{AtomicBool, Ordering}, +}; + +use crate::core::{CoreError, IoCompletionStatus}; + +use super::{FaultDomain, FaultIoStage, FaultIoType, InjectIoCtx, Injection}; + +/// A list of fault injections. +struct Injections { + items: Vec, +} + +static INJECTIONS: OnceCell> = OnceCell::new(); + +impl Injections { + fn new() -> Self { + Self { + items: Vec::new(), + } + } + + #[inline(always)] + fn get() -> parking_lot::MutexGuard<'static, Self> { + INJECTIONS + .get_or_init(|| parking_lot::Mutex::new(Injections::new())) + .lock() + } + + /// Adds an injection. + pub fn add(&mut self, inj: Injection) { + info!("Adding injected fault: '{uri}'", uri = inj.uri); + self.items.push(inj); + } + + /// Removes all injections matching the URI. + pub fn remove(&mut self, uri: &str) { + info!("Removing injected fault: '{uri}'"); + self.items.retain(|inj| inj.uri != uri); + } + + /// Returns a copy of the injection list. + pub fn list(&self) -> Vec { + self.items.clone() + } + + /// TODO + #[inline(always)] + fn inject( + &mut self, + domain: FaultDomain, + fault_io_type: FaultIoType, + fault_io_stage: FaultIoStage, + op: &InjectIoCtx, + ) -> Option { + self.items.iter_mut().find_map(|inj| { + inj.inject(domain, fault_io_type, fault_io_stage, op) + }) + } +} + +static INJECTIONS_ENABLED: AtomicBool = AtomicBool::new(false); + +/// Checks if fault injection is globally enabled. +/// This method is fast and can used in I/O code path to quick check +/// before checking if an injection has to be applied to a particular +/// device. +#[inline] +pub fn injections_enabled() -> bool { + INJECTIONS_ENABLED.load(Ordering::SeqCst) +} + +/// Enables fault injections globally. +#[inline] +fn enable_injections() { + if INJECTIONS_ENABLED + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + warn!("Enabling fault injection globally"); + } +} +/// Adds an fault injection. +pub fn add_injection(inj: Injection) { + enable_injections(); + Injections::get().add(inj); +} + +/// Removes all injections matching the URI. +pub fn remove_injection(uri: &str) { + Injections::get().remove(uri); +} + +/// Lists fault injections. A clone of current state of injection is returned. +pub fn list_injections() -> Vec { + Injections::get().list() +} + +/// Finds and injects a fault for the given I/O context, at the submission I/O +/// stage. In the case a fault is injected, returns the corresponding +/// `CoreError`. +#[inline] +pub fn inject_submission_error( + domain: FaultDomain, + ctx: &InjectIoCtx, +) -> Result<(), CoreError> { + if !injections_enabled() || !ctx.is_valid() { + return Ok(()); + } + + let Ok(fault_io_type) = ctx.io_type.try_into() else { + return Ok(()); + }; + + match Injections::get().inject( + domain, + fault_io_type, + FaultIoStage::Submission, + ctx, + ) { + None => Ok(()), + Some(IoCompletionStatus::Success) => Ok(()), + Some(_) => Err(crate::bdev::device::io_type_to_err( + ctx.io_type, + Errno::ENXIO, + ctx.range.start, + ctx.range.end - ctx.range.start, + )), + } +} + +/// Finds and injects a fault for the given I/O context, at the completion I/O +/// stage. +/// In the case a fault is injected, returns the corresponding +/// `IoCompletionStatus`. +#[inline] +pub fn inject_completion_error( + domain: FaultDomain, + ctx: &InjectIoCtx, + status: IoCompletionStatus, +) -> IoCompletionStatus { + if !injections_enabled() + || !ctx.is_valid() + || status != IoCompletionStatus::Success + { + return status; + } + + let Ok(fault_io_type) = ctx.io_type.try_into() else { + return status; + }; + + match Injections::get().inject( + domain, + fault_io_type, + FaultIoStage::Completion, + ctx, + ) { + Some(inj) => inj, + None => IoCompletionStatus::Success, + } +} diff --git a/io-engine/src/core/fault_injection/mod.rs b/io-engine/src/core/fault_injection/mod.rs new file mode 100644 index 0000000000..9252e3d5f7 --- /dev/null +++ b/io-engine/src/core/fault_injection/mod.rs @@ -0,0 +1,229 @@ +#![cfg(feature = "fault-injection")] + +use snafu::Snafu; +use std::{ + convert::TryFrom, + fmt::{Display, Formatter}, + ops::Range, + slice::from_raw_parts_mut, + time::Duration, +}; +use url::ParseError; + +mod injection; +mod injections; + +use crate::core::{BlockDevice, IoCompletionStatus}; +pub use injection::Injection; +pub use injections::{ + add_injection, + inject_completion_error, + inject_submission_error, + list_injections, + remove_injection, +}; +use spdk_rs::{IoType, IoVec}; + +/// Fault domain. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum FaultDomain { + None, + Nexus, + BlockDevice, +} + +impl Display for FaultDomain { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Self::None => "none", + Self::Nexus => "nexus", + Self::BlockDevice => "block_device", + }) + } +} + +/// Data fault mode. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum DataFaultMode { + Rand, +} + +impl Display for DataFaultMode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Self::Rand => "rand", + }) + } +} + +/// Fault I/O type. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum FaultIoType { + Read, + Write, +} + +impl Display for FaultIoType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Read => f.write_str("read"), + Self::Write => f.write_str("write"), + } + } +} + +impl TryFrom for FaultIoType { + type Error = (); + + fn try_from(value: IoType) -> Result { + match value { + IoType::Read => Ok(Self::Read), + IoType::Write => Ok(Self::Write), + _ => Err(()), + } + } +} + +/// Fault I/O stage. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum FaultIoStage { + Submission, + Completion, +} + +impl Display for FaultIoStage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Submission => f.write_str("submit"), + Self::Completion => f.write_str("compl"), + } + } +} + +/// Fault type. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum FaultType { + Status(IoCompletionStatus), + Data, +} + +impl Display for FaultType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Status(_) => f.write_str("status"), + Self::Data => f.write_str("data"), + } + } +} + +impl FaultType { + pub fn status_data_transfer_error() -> Self { + use spdk_rs::{GenericStatusCode, NvmeStatus}; + + Self::Status(IoCompletionStatus::NvmeError(NvmeStatus::Generic( + GenericStatusCode::DataTransferError, + ))) + } +} + +/// Injection I/O. +#[derive(Debug, Clone)] +pub struct InjectIoCtx { + dev: Option<*mut dyn BlockDevice>, + io_type: IoType, + range: Range, + iovs: *mut IoVec, + iovs_len: usize, +} + +impl Default for InjectIoCtx { + fn default() -> Self { + Self { + dev: None, + io_type: IoType::Invalid, + range: 0 .. 0, + iovs: std::ptr::null_mut(), + iovs_len: 0, + } + } +} + +impl InjectIoCtx { + /// TODO + #[inline(always)] + pub fn with_iovs( + dev: &dyn BlockDevice, + io_type: IoType, + offset: u64, + num_blocks: u64, + iovs: &[IoVec], + ) -> Self { + Self { + dev: Some(dev as *const _ as *mut dyn BlockDevice), + io_type, + range: offset .. offset + num_blocks, + iovs: iovs.as_ptr() as *mut _, + iovs_len: iovs.len(), + } + } + + /// TODO + #[inline(always)] + pub fn is_valid(&self) -> bool { + self.dev.is_some() + } + + /// TODO + #[inline(always)] + pub fn device(&self) -> &dyn BlockDevice { + unsafe { &*self.dev.unwrap() } + } + + /// TODO + #[inline(always)] + pub fn device_name(&self) -> String { + self.device().device_name() + } + + /// TODO + #[inline(always)] + pub fn iovs_mut(&self) -> Option<&mut [IoVec]> { + unsafe { + if self.iovs.is_null() + || !(*self.iovs).is_initialized() + || (*self.iovs).is_empty() + || self.iovs_len == 0 + { + None + } else { + Some(from_raw_parts_mut(self.iovs, self.iovs_len)) + } + } + } +} + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)), context(suffix(false)))] +pub enum InjectionError { + #[snafu(display("Injections are disabled"))] + InjectionsDisabled {}, + #[snafu(display("URI is not an injection: '{}'", uri))] + NotInjectionUri { uri: String }, + #[snafu(display("Invalid injection URI: '{}'", uri))] + InvalidUri { source: ParseError, uri: String }, + #[snafu(display("Unknown injection parameter: '{}={}'", name, value))] + UnknownParameter { name: String, value: String }, + #[snafu(display("Bad injection parameter value: '{}={}'", name, value))] + BadParameterValue { name: String, value: String }, + #[snafu(display( + "Bad injection '{}' timer durations: {:?}, {:?}", + name, + begin, + end + ))] + BadDurations { + name: String, + begin: Duration, + end: Duration, + }, +} diff --git a/io-engine/src/core/handle.rs b/io-engine/src/core/handle.rs index c247875439..6e1e51e503 100644 --- a/io-engine/src/core/handle.rs +++ b/io-engine/src/core/handle.rs @@ -14,13 +14,12 @@ use spdk_rs::{ spdk_bdev_free_io, spdk_bdev_io, spdk_bdev_nvme_admin_passthru_ro, - spdk_bdev_read_with_flags, + spdk_bdev_read, spdk_bdev_reset, spdk_bdev_write, spdk_bdev_write_zeroes, spdk_io_channel, spdk_nvme_cmd, - SPDK_NVME_IO_FLAGS_UNWRITTEN_READ_FAIL, }, nvme_admin_opc, BdevOps, @@ -33,7 +32,13 @@ use spdk_rs::{ }; use crate::{ - core::{Bdev, CoreError, DescriptorGuard, ReadMode, SnapshotParams}, + core::{ + Bdev, + CoreError, + DescriptorGuard, + IoCompletionStatus, + SnapshotParams, + }, ffihelper::cb_arg, subsys, }; @@ -140,7 +145,7 @@ impl BdevHandle { spdk_bdev_write( self.desc.legacy_as_ptr(), self.channel.legacy_as_ptr(), - **buffer, + buffer.as_ptr() as *mut _, offset, buffer.len(), Some(Self::io_completion_cb), @@ -159,44 +164,29 @@ impl BdevHandle { match r.await.expect("Failed awaiting write IO") { NvmeStatus::Generic(GenericStatusCode::Success) => Ok(buffer.len()), status => Err(CoreError::WriteFailed { - status, + status: IoCompletionStatus::NvmeError(status), offset, len: buffer.len(), }), } } - /// read at given offset into the ['DmaBuf'] - pub async fn read_at( - &self, - offset: u64, - buffer: &mut DmaBuf, - ) -> Result { - self.read_at_ex(offset, buffer, None).await - } /// read at given offset into the ['DmaBuf'] - pub(crate) async fn read_at_ex( + pub async fn read_at( &self, offset: u64, buffer: &mut DmaBuf, - mode: Option, ) -> Result { - let flags = mode.map_or(0, |m| match m { - ReadMode::Normal => 0, - ReadMode::UnwrittenFail => SPDK_NVME_IO_FLAGS_UNWRITTEN_READ_FAIL, - }); - let (s, r) = oneshot::channel::(); let errno = unsafe { - spdk_bdev_read_with_flags( + spdk_bdev_read( self.desc.legacy_as_ptr(), self.channel.legacy_as_ptr(), - **buffer, + buffer.as_mut_ptr(), offset, buffer.len(), Some(Self::io_completion_cb), cb_arg(s), - flags, ) }; @@ -217,7 +207,7 @@ impl BdevHandle { len: buffer.len(), }), status => Err(CoreError::ReadFailed { - status, + status: IoCompletionStatus::NvmeError(status), offset, len: buffer.len(), }), @@ -332,19 +322,18 @@ impl BdevHandle { let (s, r) = oneshot::channel::(); // Use the spdk-rs variant spdk_bdev_nvme_admin_passthru that // assumes read commands + let (ptr, len) = match buffer { + Some(b) => (b.as_mut_ptr(), b.len()), + None => (std::ptr::null_mut(), 0), + }; + let errno = unsafe { spdk_bdev_nvme_admin_passthru_ro( self.desc.legacy_as_ptr(), self.channel.legacy_as_ptr(), nvme_cmd, - match buffer { - Some(ref b) => ***b, - None => std::ptr::null_mut(), - }, - match buffer { - Some(b) => b.len(), - None => 0, - }, + ptr, + len, Some(Self::io_completion_cb), cb_arg(s), ) diff --git a/io-engine/src/core/io_driver.rs b/io-engine/src/core/io_driver.rs index 921dd5ec13..9cd40c3592 100644 --- a/io-engine/src/core/io_driver.rs +++ b/io-engine/src/core/io_driver.rs @@ -81,7 +81,7 @@ impl Io { if spdk_bdev_read( self.job.as_ref().desc.legacy_as_ptr(), self.job.as_ref().ch.as_ref().unwrap().legacy_as_ptr(), - *self.buf, + self.buf.as_mut_ptr(), offset, self.buf.len(), Some(Job::io_completion), @@ -104,7 +104,7 @@ impl Io { if spdk_bdev_write( self.job.as_ref().desc.legacy_as_ptr(), self.job.as_ref().ch.as_ref().unwrap().legacy_as_ptr(), - *self.buf, + self.buf.as_mut_ptr(), offset, self.buf.len(), Some(Job::io_completion), diff --git a/io-engine/src/core/mod.rs b/io-engine/src/core/mod.rs index 7d856d6a3a..f3080dddc8 100644 --- a/io-engine/src/core/mod.rs +++ b/io-engine/src/core/mod.rs @@ -18,7 +18,7 @@ pub use block_device::{ LbaRangeController, OpCompletionCallback, OpCompletionCallbackArg, - ReadMode, + ReadOptions, }; pub use cpu_cores::{Core, Cores}; pub use descriptor::{DescriptorGuard, UntypedDescriptorGuard}; @@ -82,6 +82,7 @@ mod device_events; mod device_monitor; pub mod diagnostics; mod env; +pub mod fault_injection; mod handle; mod io_device; pub mod io_driver; @@ -215,7 +216,7 @@ pub enum CoreError { status ))] WriteFailed { - status: NvmeStatus, + status: IoCompletionStatus, offset: u64, len: u64, }, @@ -226,7 +227,7 @@ pub enum CoreError { status ))] ReadFailed { - status: NvmeStatus, + status: IoCompletionStatus, offset: u64, len: u64, }, diff --git a/io-engine/src/grpc/mod.rs b/io-engine/src/grpc/mod.rs index 6bfcd3a364..1b76317bb4 100644 --- a/io-engine/src/grpc/mod.rs +++ b/io-engine/src/grpc/mod.rs @@ -53,6 +53,7 @@ pub mod v1 { pub mod pool; pub mod replica; pub mod snapshot; + pub mod testing; } /// Default timeout for gRPC calls, in seconds. Should be enforced in case diff --git a/io-engine/src/grpc/server.rs b/io-engine/src/grpc/server.rs index c03bca15a6..0267c51c9f 100644 --- a/io-engine/src/grpc/server.rs +++ b/io-engine/src/grpc/server.rs @@ -12,6 +12,7 @@ use super::{ pool::PoolService, replica::ReplicaService, snapshot::SnapshotService, + testing::TestingService, }, }; @@ -92,6 +93,9 @@ impl MayastorGrpcServer { .add_optional_service(enable_v1.map(|_| { v1::snapshot::SnapshotRpcServer::new(SnapshotService::new()) })) + .add_optional_service(enable_v1.map(|_| { + v1::testing::TestingRpcServer::new(TestingService::new()) + })) .add_optional_service(enable_v1.map(|_| { v1::host::HostRpcServer::new(HostService::new( node_name, diff --git a/io-engine/src/grpc/v0/mayastor_grpc.rs b/io-engine/src/grpc/v0/mayastor_grpc.rs index 91b358cbbb..440dbf5be0 100644 --- a/io-engine/src/grpc/v0/mayastor_grpc.rs +++ b/io-engine/src/grpc/v0/mayastor_grpc.rs @@ -1301,101 +1301,6 @@ impl mayastor_server::Mayastor for MayastorSvc { .await } - #[named] - async fn inject_nexus_fault( - &self, - request: Request, - ) -> GrpcResult { - let ctx = GrpcClientContext::new(&request, function_name!()); - let args = request.into_inner(); - - self.serialized(ctx, args.uuid.clone(), false, async move { - let rx = rpc_submit::<_, _, nexus::Error>(async move { - trace!("{:?}", args); - let uuid = args.uuid.clone(); - let uri = args.uri.clone(); - debug!("Injecting fault to nexus '{}': '{}'", uuid, uri); - nexus_lookup(&args.uuid)? - .inject_add_fault(&args.uri) - .await?; - info!("Injected fault to nexus '{}': '{}'", uuid, uri); - Ok(Null {}) - })?; - - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) - }) - .await - } - - #[named] - async fn remove_injected_nexus_fault( - &self, - request: Request, - ) -> GrpcResult { - let ctx = GrpcClientContext::new(&request, function_name!()); - let args = request.into_inner(); - - self.serialized(ctx, args.uuid.clone(), false, async move { - let rx = rpc_submit::<_, _, nexus::Error>(async move { - trace!("{:?}", args); - let uuid = args.uuid.clone(); - let uri = args.uri.clone(); - debug!( - "Removing injected fault to nexus '{}': '{}'", - uuid, uri - ); - nexus_lookup(&args.uuid)? - .inject_remove_fault(&args.uri) - .await?; - info!("Removed injected fault to nexus '{}': '{}'", uuid, uri); - Ok(Null {}) - })?; - - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) - }) - .await - } - - #[named] - async fn list_injected_nexus_faults( - &self, - request: Request, - ) -> GrpcResult { - let ctx = GrpcClientContext::new(&request, function_name!()); - let args = request.into_inner(); - trace!("{:?}", args); - - self.serialized(ctx, args.uuid.clone(), false, async move { - let rx = rpc_submit::<_, _, nexus::Error>(async move { - let res = nexus_lookup(&args.uuid)? - .list_injections() - .await? - .into_iter() - .map(|inj| InjectedFault { - device_name: inj.device_name, - is_active: inj.is_active, - }) - .collect(); - - Ok(ListInjectedNexusFaultsReply { - injections: res, - }) - })?; - - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) - }) - .await - } - #[named] async fn publish_nexus( &self, diff --git a/io-engine/src/grpc/v1/nexus.rs b/io-engine/src/grpc/v1/nexus.rs index 572d2c5c2a..4d2f264ff5 100644 --- a/io-engine/src/grpc/v1/nexus.rs +++ b/io-engine/src/grpc/v1/nexus.rs @@ -642,108 +642,6 @@ impl NexusRpc for NexusService { .await } - #[named] - async fn inject_nexus_fault( - &self, - request: Request, - ) -> GrpcResult<()> { - let ctx = GrpcClientContext::new(&request, function_name!()); - let args = request.into_inner(); - - self.serialized(ctx, args.uuid.clone(), false, async move { - let rx = rpc_submit::<_, _, nexus::Error>(async move { - trace!("{:?}", args); - - debug!( - "Injecting fault to child {} of nexus {}", - args.uri, args.uuid - ); - nexus_lookup(&args.uuid)? - .inject_add_fault(&args.uri) - .await?; - info!( - "Injected fault to child {} of nexus {}", - args.uri, args.uuid - ); - Ok(()) - })?; - - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) - }) - .await - } - - #[named] - async fn remove_injected_nexus_fault( - &self, - request: Request, - ) -> GrpcResult<()> { - let ctx = GrpcClientContext::new(&request, function_name!()); - let args = request.into_inner(); - - self.serialized(ctx, args.uuid.clone(), false, async move { - let rx = rpc_submit::<_, _, nexus::Error>(async move { - trace!("{:?}", args); - - debug!( - "Removing injected fault from child {} of nexus {}", - args.uri, args.uuid - ); - nexus_lookup(&args.uuid)? - .inject_remove_fault(&args.uri) - .await?; - info!( - "Removed injected fault from child {} of nexus {}", - args.uri, args.uuid - ); - Ok(()) - })?; - - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) - }) - .await - } - - #[named] - async fn list_injected_nexus_faults( - &self, - request: Request, - ) -> GrpcResult { - let ctx = GrpcClientContext::new(&request, function_name!()); - let args = request.into_inner(); - trace!("{:?}", args); - - self.serialized(ctx, args.uuid.clone(), false, async move { - let rx = rpc_submit::<_, _, nexus::Error>(async move { - let res = nexus_lookup(&args.uuid)? - .list_injections() - .await? - .into_iter() - .map(|inj| InjectedFault { - device_name: inj.device_name, - is_active: inj.is_active, - }) - .collect(); - - Ok(ListInjectedNexusFaultsReply { - injections: res, - }) - })?; - - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) - }) - .await - } - #[named] async fn publish_nexus( &self, diff --git a/io-engine/src/grpc/v1/testing.rs b/io-engine/src/grpc/v1/testing.rs new file mode 100644 index 0000000000..4c2d84d0bc --- /dev/null +++ b/io-engine/src/grpc/v1/testing.rs @@ -0,0 +1,172 @@ +/// RPC service for testing operations +#[derive(Debug)] +pub struct TestingService {} + +impl TestingService { + pub fn new() -> Self { + Self {} + } +} + +impl Default for TestingService { + fn default() -> Self { + Self::new() + } +} + +#[cfg(feature = "fault-injection")] +mod injections { + use mayastor_api::v1::{self, testing::*}; + use tonic::{Request, Response, Status}; + + use crate::{ + core::fault_injection::{ + add_injection, + list_injections, + remove_injection, + Injection, + InjectionError, + }, + grpc::{rpc_submit, GrpcResult}, + }; + + impl From for tonic::Status { + fn from(e: InjectionError) -> Self { + match e { + e => Status::invalid_argument(e.to_string()), + } + } + } + + impl From for v1::testing::Injection { + fn from(src: Injection) -> Self { + let is_active = src.is_active(); + Self { + uri: src.uri, + device_name: src.device_name, + is_active, + } + } + } + + #[tonic::async_trait] + impl TestingRpc for super::TestingService { + #[tracing::instrument(skip(self))] + async fn add_injection( + &self, + request: Request, + ) -> GrpcResult<()> { + let args = request.into_inner(); + trace!("{:?}", args); + + let rx = rpc_submit::<_, _, InjectionError>(async move { + let uri = args.uri.clone(); + let inj = Injection::from_uri(&uri)?; + add_injection(inj); + Ok(()) + })?; + + rx.await + .map_err(|_| Status::cancelled("cancelled"))? + .map_err(Status::from) + .map(Response::new) + } + + #[tracing::instrument(skip(self))] + async fn remove_injection( + &self, + request: Request, + ) -> GrpcResult<()> { + let args = request.into_inner(); + trace!("{:?}", args); + + let rx = rpc_submit::<_, _, InjectionError>(async move { + let uri = args.uri.clone(); + + // Validate injection URI by trying to parse it. + Injection::from_uri(&uri)?; + + remove_injection(&uri); + + Ok(()) + })?; + + rx.await + .map_err(|_| Status::cancelled("cancelled"))? + .map_err(Status::from) + .map(Response::new) + } + + #[tracing::instrument(skip(self))] + async fn list_injections( + &self, + request: Request, + ) -> GrpcResult { + let args = request.into_inner(); + trace!("{:?}", args); + + let rx = rpc_submit::<_, _, InjectionError>(async move { + let injections = list_injections() + .into_iter() + .map(v1::testing::Injection::from) + .collect(); + + Ok(ListInjectionsReply { + injections, + }) + })?; + + rx.await + .map_err(|_| Status::cancelled("cancelled"))? + .map_err(Status::from) + .map(Response::new) + } + } +} + +#[cfg(not(feature = "fault-injection"))] +mod no_injections { + use mayastor_api::v1::testing::*; + use tonic::Request; + + use crate::grpc::GrpcResult; + + #[tonic::async_trait] + impl TestingRpc for super::TestingService { + #[tracing::instrument(skip(self))] + async fn add_injection( + &self, + request: Request, + ) -> GrpcResult<()> { + let args = request.into_inner(); + trace!("{:?}", args); + GrpcResult::Err(tonic::Status::unimplemented( + "Fault injection feature is disabled", + )) + } + + #[tracing::instrument(skip(self))] + async fn remove_injection( + &self, + request: Request, + ) -> GrpcResult<()> { + let args = request.into_inner(); + trace!("{:?}", args); + GrpcResult::Err(tonic::Status::unimplemented( + "Fault injection feature is disabled", + )) + } + + #[tracing::instrument(skip(self))] + async fn list_injections( + &self, + request: Request, + ) -> GrpcResult { + let args = request.into_inner(); + trace!("{:?}", args); + GrpcResult::Err(tonic::Status::unimplemented( + "Fault injection feature is disabled", + )) + } + } +} diff --git a/io-engine/src/rebuild/mod.rs b/io-engine/src/rebuild/mod.rs index 2bae31a768..bfc9a73e78 100644 --- a/io-engine/src/rebuild/mod.rs +++ b/io-engine/src/rebuild/mod.rs @@ -9,8 +9,8 @@ mod rebuild_task; use rebuild_descriptor::RebuildDescriptor; pub(crate) use rebuild_error::RebuildError; -pub use rebuild_job::RebuildJob; use rebuild_job::RebuildOperation; +pub use rebuild_job::{RebuildJob, RebuildJobOptions, RebuildVerifyMode}; use rebuild_job_backend::{ RebuildFBendChan, RebuildJobBackend, diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index a5f1a16525..703b8bdcd9 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -1,9 +1,17 @@ +use chrono::{DateTime, Utc}; +use spdk_rs::{DmaBuf, IoVec, MediaErrorStatusCode, NvmeStatus}; use std::sync::Arc; -use chrono::{DateTime, Utc}; +use crate::core::{ + BlockDeviceDescriptor, + BlockDeviceHandle, + CoreError, + DescriptorGuard, + IoCompletionStatus, + ReadOptions, +}; -use super::{rebuild_error::RebuildError, RebuildMap}; -use crate::core::{BlockDeviceDescriptor, BlockDeviceHandle, DescriptorGuard}; +use super::{RebuildError, RebuildJobOptions, RebuildMap, RebuildVerifyMode}; /// Contains all descriptors and their associated information which allows the /// tasks to copy/rebuild data from source to destination. @@ -13,13 +21,15 @@ pub(super) struct RebuildDescriptor { pub(super) block_size: u64, /// The range of the entire rebuild. pub(super) range: std::ops::Range, + /// Rebuild job options. + pub(super) options: RebuildJobOptions, /// Segment size in blocks (number of segments divided by device block /// size). pub(super) segment_size_blks: u64, /// Source URI of the healthy child to rebuild from. - pub src_uri: String, + pub(super) src_uri: String, /// Target URI of the out of sync child to rebuild. - pub dst_uri: String, + pub(super) dst_uri: String, /// Pre-opened descriptor for the source block device. #[allow(clippy::non_send_fields_in_send_ty)] pub(super) src_descriptor: Box, @@ -36,6 +46,7 @@ pub(super) struct RebuildDescriptor { impl RebuildDescriptor { /// Return the size of the segment to be copied. + #[inline(always)] pub(super) fn get_segment_size_blks(&self, blk: u64) -> u64 { // Adjust the segments size for the last segment if (blk + self.segment_size_blks) > self.range.end { @@ -45,6 +56,7 @@ impl RebuildDescriptor { } /// Get a `BlockDeviceHandle` for the source. + #[inline(always)] pub(super) async fn src_io_handle( &self, ) -> Result, RebuildError> { @@ -52,6 +64,7 @@ impl RebuildDescriptor { } /// Get a `BlockDeviceHandle` for the destination. + #[inline(always)] pub(super) async fn dst_io_handle( &self, ) -> Result, RebuildError> { @@ -59,6 +72,7 @@ impl RebuildDescriptor { } /// Get a `BlockDeviceHandle` for the given block device descriptor. + #[inline(always)] pub(super) async fn io_handle( descriptor: &dyn BlockDeviceDescriptor, ) -> Result, RebuildError> { @@ -76,6 +90,7 @@ impl RebuildDescriptor { /// Checks if the block has to be transferred. /// If no rebuild map is present, all blocks are considered unsynced. + #[inline(always)] pub(super) fn is_blk_sync(&self, blk: u64) -> bool { self.rebuild_map .lock() @@ -85,9 +100,173 @@ impl RebuildDescriptor { /// Marks the rebuild segment starting from the given logical block as /// already transferred. + #[inline(always)] pub(super) fn blk_synced(&self, blk: u64) { if let Some(map) = self.rebuild_map.lock().as_mut() { map.blk_clean(blk); } } + + /// Reads a rebuild segment at the given offset from the source replica, and + /// returns the buffer to be written to the destination. + /// In the case the segment is not allocated on the source, returns None. + pub(super) async fn read_src_segment( + &self, + offset_blk: u64, + ) -> Result, RebuildError> { + let src = self.src_io_handle().await?; + + let num_blocks = self.get_segment_size_blks(offset_blk); + + let mut buf = + src.dma_malloc(self.block_size * num_blocks) + .map_err(|err| RebuildError::NoCopyBuffer { + source: err, + })?; + + match src + .read_buf_blocks_async( + &mut buf, + offset_blk, + num_blocks, + ReadOptions::UnwrittenFail, + ) + .await + { + // Read is okay, data has to be copied to the destination. + Ok(_) => Ok(Some(buf)), + + // Read from an unallocated block occured, no need to copy it. + Err(CoreError::ReadFailed { + status, .. + }) if matches!( + status, + IoCompletionStatus::NvmeError(NvmeStatus::MediaError( + MediaErrorStatusCode::DeallocatedOrUnwrittenBlock + )) + ) => + { + Ok(None) + } + + // Read error. + Err(err) => Err(RebuildError::ReadIoFailed { + source: err, + bdev: self.src_uri.clone(), + }), + } + } + + /// Writes the given buffer to the destionation replica. + pub(super) async fn write_dst_segment( + &self, + offset_blk: u64, + buf: &DmaBuf, + ) -> Result<(), RebuildError> { + self.dst_io_handle() + .await? + .write_buf_blocks_async( + buf, + offset_blk, + self.get_segment_size_blks(offset_blk), + ) + .await + .map_err(|err| RebuildError::WriteIoFailed { + source: err, + bdev: self.dst_uri.clone(), + }) + } + + /// Verify segment copy operation by reading destination, and comparing with + /// the source. + pub(super) async fn verify_segment( + &self, + offset_blk: u64, + org_buf: &DmaBuf, + ) -> Result<(), RebuildError> { + // Read the source again. + let mut buf = self + .read_src_segment(offset_blk) + .await? + .expect("Buffer must have been read correctly"); + + // Compare the original buffer from the source with the new one from the + // source, to catch read errors. + self.verify_buffers(offset_blk, org_buf, &buf, "source")?; + + // Read the destination, reusing the buffer allocated on the source + // read. + self.dst_io_handle() + .await? + .read_buf_blocks_async( + &mut buf, + offset_blk, + self.get_segment_size_blks(offset_blk), + ReadOptions::None, + ) + .await + .map_err(|err| RebuildError::VerifyIoFailed { + source: err, + bdev: self.dst_uri.clone(), + })?; + + // Compare the original buffer from the source with the one from the + // destination, to catch source read errors. + self.verify_buffers(offset_blk, org_buf, &buf, "destination") + } + + /// Verifies two buffers. + fn verify_buffers( + &self, + offset_blk: u64, + a: &DmaBuf, + b: &DmaBuf, + obj: &str, + ) -> Result<(), RebuildError> { + let Some(idx) = IoVec::compare(a, b) else { + return Ok(()); + }; + + let msg = if a.len() != b.len() { + format!( + "buffers have different lengths: {a} != {b}", + a = a.len(), + b = b.len() + ) + } else { + format!( + "buffers differ at {pos} (block {off} x {bs} + {idx}): \ + 0x{a:x} != 0x{b:x}", + pos = offset_blk * self.block_size + idx, + off = offset_blk, + bs = self.block_size, + a = a[idx], + b = b[idx] + ) + }; + + let msg = format!( + "Rebuild job '{src}' -> '{dst}': {obj} verification failed: {msg}", + src = self.src_uri, + dst = self.dst_uri + ); + + match self.options.verify_mode { + RebuildVerifyMode::None => { + error!("{msg}: ignoring"); + Ok(()) + } + RebuildVerifyMode::Fail => { + error!("{msg}: failing rebuild"); + Err(RebuildError::VerifyCompareFailed { + bdev: self.dst_uri.clone(), + verify_message: msg, + }) + } + RebuildVerifyMode::Panic => { + error!("{msg}: will panic"); + panic!("{}", msg); + } + } + } } diff --git a/io-engine/src/rebuild/rebuild_error.rs b/io-engine/src/rebuild/rebuild_error.rs index b98bbc5434..a8f302ab5a 100644 --- a/io-engine/src/rebuild/rebuild_error.rs +++ b/io-engine/src/rebuild/rebuild_error.rs @@ -25,6 +25,17 @@ pub enum RebuildError { ReadIoFailed { source: CoreError, bdev: String }, #[snafu(display("Write IO failed for bdev {}", bdev))] WriteIoFailed { source: CoreError, bdev: String }, + #[snafu(display("Verify IO failed for bdev {}", bdev))] + VerifyIoFailed { source: CoreError, bdev: String }, + #[snafu(display( + "Verify compare failed for bdev {}: {}", + bdev, + verify_message + ))] + VerifyCompareFailed { + bdev: String, + verify_message: String, + }, #[snafu(display("Failed to find rebuild job {}", job))] JobNotFound { job: String }, #[snafu(display("Missing rebuild destination {}", job))] diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index ca9b2403a4..4128a93313 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -21,6 +21,23 @@ use super::{ }; use crate::core::{Reactors, VerboseError}; +/// Rebuild I/O verification mode. +#[derive(Debug, Clone)] +pub enum RebuildVerifyMode { + /// Do not verify rebuild I/Os. + None, + /// Fail rebuild job if I/O verification fails. + Fail, + /// Panic if I/O verification fails. + Panic, +} + +/// Rebuild job options. +#[derive(Debug, Clone)] +pub struct RebuildJobOptions { + pub verify_mode: RebuildVerifyMode, +} + /// Operations used to control the state of the job. #[derive(Debug)] pub(super) enum RebuildOperation { @@ -80,6 +97,7 @@ impl RebuildJob { src_uri: &str, dst_uri: &str, range: Range, + options: RebuildJobOptions, notify_fn: fn(String, String) -> (), ) -> Result { // Allocate an instance of the rebuild back-end. @@ -88,6 +106,7 @@ impl RebuildJob { src_uri, dst_uri, range.clone(), + options, notify_fn, ) .await?; diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index 7205a37f3e..9651fa9b45 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -16,9 +16,10 @@ use futures::{ use snafu::ResultExt; use super::{ - rebuild_error::{BdevInvalidUri, BdevNotFound, NoCopyBuffer}, + rebuild_error::{BdevInvalidUri, BdevNotFound}, RebuildDescriptor, RebuildError, + RebuildJobOptions, RebuildMap, RebuildState, RebuildStates, @@ -149,6 +150,7 @@ impl RebuildJobBackend { src_uri: &str, dst_uri: &str, range: std::ops::Range, + options: RebuildJobOptions, notify_fn: fn(String, String) -> (), ) -> Result { let src_descriptor = device_open( @@ -201,11 +203,7 @@ impl RebuildJobBackend { }; for _ in 0 .. tasks.total { - let copy_buffer = destination_hdl - .dma_malloc(segment_size_blks * block_size) - .context(NoCopyBuffer {})?; tasks.push(RebuildTask { - buffer: copy_buffer, sender: tasks.channel.0.clone(), error: None, }); @@ -236,6 +234,7 @@ impl RebuildJobBackend { src_uri: src_uri.to_string(), dst_uri: dst_uri.to_string(), range, + options, block_size, segment_size_blks, src_descriptor, diff --git a/io-engine/src/rebuild/rebuild_task.rs b/io-engine/src/rebuild/rebuild_task.rs index e014d5565c..371bd62868 100644 --- a/io-engine/src/rebuild/rebuild_task.rs +++ b/io-engine/src/rebuild/rebuild_task.rs @@ -1,19 +1,15 @@ use futures::{channel::mpsc, stream::FusedStream, SinkExt, StreamExt}; use snafu::ResultExt; +use spdk_rs::LbaRange; + +use crate::core::{Reactors, VerboseError}; use super::{ - rebuild_error::{ - NoCopyBuffer, - RangeLockFailed, - RangeUnlockFailed, - ReadIoFailed, - WriteIoFailed, - }, + rebuild_error::{RangeLockFailed, RangeUnlockFailed}, RebuildDescriptor, RebuildError, + RebuildVerifyMode, }; -use crate::core::{CoreError, Reactors, ReadMode, VerboseError}; -use spdk_rs::{DmaBuf, LbaRange}; /// Result returned by each segment task worker. /// Used to communicate with the management task indicating that the @@ -35,8 +31,6 @@ pub(super) struct TaskResult { /// An mpsc channel is used to communicate with the management task. #[derive(Debug)] pub(super) struct RebuildTask { - /// The pre-allocated `DmaBuf` used to read/write. - pub(super) buffer: DmaBuf, /// The channel used to notify when the task completes/fails. pub(super) sender: mpsc::Sender, /// Last error seen by this particular task. @@ -110,58 +104,17 @@ impl RebuildTask { /// Copies one segment worth of data from source into destination. async fn copy_one( &mut self, - blk: u64, - descriptor: &RebuildDescriptor, + offset_blk: u64, + desc: &RebuildDescriptor, ) -> Result<(), RebuildError> { - let mut copy_buffer: DmaBuf; - let source_hdl = descriptor.src_io_handle().await?; - let destination_hdl = descriptor.dst_io_handle().await?; - - let copy_buffer = if descriptor.get_segment_size_blks(blk) - == descriptor.segment_size_blks - { - &mut self.buffer - } else { - let segment_size_blks = descriptor.range.end - blk; - - debug!( - "Adjusting last segment size from {} to {}. offset: {}, range: {:?}", - descriptor.segment_size_blks, segment_size_blks, blk, descriptor.range, - ); - - copy_buffer = destination_hdl - .dma_malloc(segment_size_blks * descriptor.block_size) - .context(NoCopyBuffer {})?; + if let Some(buf) = desc.read_src_segment(offset_blk).await? { + desc.write_dst_segment(offset_blk, &buf).await?; - &mut copy_buffer - }; - - let res = source_hdl - .read_at_ex( - blk * descriptor.block_size, - copy_buffer, - Some(ReadMode::UnwrittenFail), - ) - .await; - - if let Err(CoreError::ReadingUnallocatedBlock { - .. - }) = res - { - return Ok(()); + if !matches!(desc.options.verify_mode, RebuildVerifyMode::None) { + desc.verify_segment(offset_blk, &buf).await?; + } } - res.context(ReadIoFailed { - bdev: &descriptor.src_uri, - })?; - - destination_hdl - .write_at(blk * descriptor.block_size, copy_buffer) - .await - .context(WriteIoFailed { - bdev: &descriptor.dst_uri, - })?; - Ok(()) } } diff --git a/io-engine/tests/block_device_nvmf.rs b/io-engine/tests/block_device_nvmf.rs index 4560c358db..7a359dd485 100755 --- a/io-engine/tests/block_device_nvmf.rs +++ b/io-engine/tests/block_device_nvmf.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + use libc::c_void; use once_cell::sync::{Lazy, OnceCell}; @@ -24,7 +26,6 @@ use io_engine::{ }; use std::{ - alloc::Layout, slice, str, sync::{ @@ -33,12 +34,12 @@ use std::{ }, }; -use spdk_rs::{DmaBuf, IoVec}; +use spdk_rs::{AsIoVecs, DmaBuf}; pub mod common; use io_engine::{ constants::NVME_CONTROLLER_MODEL_ID, - core::{DeviceEventListener, DeviceEventSink}, + core::{DeviceEventListener, DeviceEventSink, ReadOptions}, }; use uuid::Uuid; @@ -385,8 +386,7 @@ async fn nvmf_io_stats() { // Placeholder structure to let all the fields outlive API invocations. struct IoCtx { - iov: IoVec, - dma_buf: DmaBuf, + dma_buf: Vec, handle: Box, } @@ -454,21 +454,21 @@ async fn nvmf_io_stats() { ); let mut ctx = IoCtx { - iov: IoVec::default(), - dma_buf: create_io_buffer(alignment, 6 * BUF_SIZE, IO_PATTERN), + dma_buf: vec![create_io_buffer( + alignment, + 6 * BUF_SIZE, + IO_PATTERN, + )], handle, }; - ctx.iov.iov_base = *ctx.dma_buf; - ctx.iov.iov_len = 6 * BUF_SIZE; - // Schedule asynchronous I/O operations and check stats later. ctx.handle .readv_blocks( - &mut ctx.iov, - 1, + ctx.dma_buf.as_io_vecs_mut(), (3 * 1024 * 1024) / block_len, 6 * BUF_SIZE / block_len, + ReadOptions::None, io_completion_callback, MAYASTOR_CTRLR_TITLE.as_ptr() as *mut c_void, ) @@ -476,8 +476,7 @@ async fn nvmf_io_stats() { ctx.handle .writev_blocks( - &mut ctx.iov, - 1, + ctx.dma_buf.as_io_vecs(), (4 * 1024 * 1024) / block_len, 4 * BUF_SIZE / block_len, io_completion_callback, @@ -628,9 +627,7 @@ async fn nvmf_device_readv_test() { // Placeholder structure to let all the fields outlive API invocations. struct IoCtx { - iov: IoVec, - iovcnt: i32, - dma_buf: DmaBuf, + dma_buf: Vec, handle: Box, } @@ -688,23 +685,22 @@ async fn nvmf_device_readv_test() { // Create a buffer with the guard pattern. let mut io_ctx = IoCtx { - iov: IoVec::default(), - iovcnt: 1, - dma_buf: create_io_buffer(alignment, BUF_SIZE, GUARD_PATTERN), + dma_buf: vec![create_io_buffer( + alignment, + BUF_SIZE, + GUARD_PATTERN, + )], handle, }; - io_ctx.iov.iov_base = *io_ctx.dma_buf; - io_ctx.iov.iov_len = BUF_SIZE; - // Initiate a read operation into the buffer. io_ctx .handle .readv_blocks( - &mut io_ctx.iov, - io_ctx.iovcnt, + io_ctx.dma_buf.as_io_vecs_mut(), (3 * 1024 * 1024) / block_len, BUF_SIZE / block_len, + ReadOptions::None, read_completion_callback, // Use a predefined string to check that we receive the // same context pointer as we pass upon @@ -730,7 +726,7 @@ async fn nvmf_device_readv_test() { // with data pattern. We should see all zeroes in the buffer instead of // the guard pattern. let b = buf_ptr.into_inner(); - check_buf_pattern(unsafe { &((*b).dma_buf) }, 0); + check_buf_pattern(unsafe { &((*b).dma_buf[0]) }, 0); // Turn placeholder structure into a box to trigger drop() action // on handle's resources once the box is dropped. @@ -795,8 +791,7 @@ async fn nvmf_device_writev_test() { // Placeholder structure to let all the fields outlive API invocations. struct IoCtx { - iov: IoVec, - dma_buf: DmaBuf, + dma_buf: Vec, handle: Box, } @@ -829,21 +824,18 @@ async fn nvmf_device_writev_test() { .unwrap(); assert_eq!(r, BUF_SIZE, "The amount of data written mismatches"); - let mut ctx = IoCtx { - iov: IoVec::default(), - dma_buf: create_io_buffer(alignment, BUF_SIZE, IO_PATTERN), + let ctx = IoCtx { + dma_buf: vec![create_io_buffer( + alignment, BUF_SIZE, IO_PATTERN, + )], handle, }; - ctx.iov.iov_base = *ctx.dma_buf; - ctx.iov.iov_len = BUF_SIZE; - // Write data buffer between guard buffers to catch writes outside // the range. ctx.handle .writev_blocks( - &mut ctx.iov, - 1, + ctx.dma_buf.as_io_vecs(), (OP_OFFSET + BUF_SIZE) / block_len, BUF_SIZE / block_len, write_completion_callback, @@ -912,8 +904,7 @@ async fn nvmf_device_writev_test() { #[tokio::test] async fn nvmf_device_readv_iovs_test() { const OP_OFFSET: u64 = 6 * 1024 * 1024; - const IOVCNT: usize = 5; - const IOVSIZES: [u64; IOVCNT] = [ + const IOVSIZES: [u64; 5] = [ // Sizes of I/O vectors, in kilobytes. 512 * 1024, 128 * 1024, @@ -966,7 +957,6 @@ async fn nvmf_device_readv_iovs_test() { // Placeholder structure to let all the fields outlive API invocations. struct IoCtx { - iovs: *mut IoVec, buffers: Vec, handle: Box, } @@ -987,26 +977,11 @@ async fn nvmf_device_readv_iovs_test() { // Store device name for further checking from I/O callback. DEVICE_NAME.set(device_name.clone()).unwrap(); - let mut buffers = Vec::::with_capacity(IOVCNT); - - // Allocate phsycally continous memory for storing raw I/O vectors. - let l = Layout::array::(IOVCNT).unwrap(); - let iovs = unsafe { std::alloc::alloc(l) } as *mut IoVec; - - for (i, s) in IOVSIZES.iter().enumerate().take(IOVCNT) { - let mut iov = IoVec::default(); - let buf = create_io_buffer(alignment, *s, GUARD_PATTERN); - - iov.iov_base = *buf; - iov.iov_len = buf.len(); - - buffers.push(buf); - unsafe { *iovs.add(i) = iov }; - } - - let io_ctx = IoCtx { - iovs, - buffers, + let mut io_ctx = IoCtx { + buffers: IOVSIZES + .iter() + .map(|s| create_io_buffer(alignment, *s, GUARD_PATTERN)) + .collect(), handle, }; @@ -1019,10 +994,10 @@ async fn nvmf_device_readv_iovs_test() { io_ctx .handle .readv_blocks( - io_ctx.iovs, - IOVCNT as i32, + io_ctx.buffers.as_io_vecs_mut(), OP_OFFSET / block_len, iosize / block_len, + ReadOptions::None, read_completion_callback, // Use a predefined string to check that we receive the // same context pointer as we pass upon @@ -1128,7 +1103,6 @@ async fn nvmf_device_writev_iovs_test() { // Placeholder structure to let all the fields outlive API invocations. struct IoCtx { - iovs: *mut IoVec, buffers: Vec, handle: Box, } @@ -1146,25 +1120,14 @@ async fn nvmf_device_writev_iovs_test() { // Store device name for further checking from I/O callback. DEVICE_NAME.set(device_name.clone()).unwrap(); - let mut buffers = Vec::::with_capacity(IOVCNT); - - // Allocate phsycally continous memory for storing raw I/O vectors. - let l = Layout::array::(IOVCNT).unwrap(); - let iovs = unsafe { std::alloc::alloc(l) } as *mut IoVec; + let mut buffers = Vec::::with_capacity(IOVSIZES.len()); - for (i, s) in IOVSIZES.iter().enumerate().take(IOVCNT) { - let mut iov = IoVec::default(); + for s in IOVSIZES.iter() { let buf = create_io_buffer(alignment, *s, IO_PATTERN); - - iov.iov_base = *buf; - iov.iov_len = buf.len(); - buffers.push(buf); - unsafe { *iovs.add(i) = iov }; } let io_ctx = IoCtx { - iovs, buffers, handle, }; @@ -1190,8 +1153,7 @@ async fn nvmf_device_writev_iovs_test() { io_ctx .handle .writev_blocks( - io_ctx.iovs, - IOVCNT as i32, + io_ctx.buffers.as_io_vecs(), OP_OFFSET / block_len, iosize / block_len, write_completion_callback, @@ -1581,9 +1543,7 @@ async fn nvmf_reset_abort_io() { // Placeholder structure to let all the fields outlive API invocations. struct IoCtx { - iov: IoVec, - iovcnt: i32, - dma_buf: DmaBuf, + dma_buf: Vec, handle: Box, } @@ -1699,15 +1659,14 @@ async fn nvmf_reset_abort_io() { DEVICE_NAME.set(name.clone()).unwrap(); let mut io_ctx = IoCtx { - iov: IoVec::default(), - iovcnt: 1, - dma_buf: create_io_buffer(alignment, BUF_SIZE, GUARD_PATTERN), + dma_buf: vec![create_io_buffer( + alignment, + BUF_SIZE, + GUARD_PATTERN, + )], handle, }; - io_ctx.iov.iov_base = *io_ctx.dma_buf; - io_ctx.iov.iov_len = BUF_SIZE; - // Initiate a 3 read and 3 write operations into the buffer. // We use the same IOVs as we don't care about the I/O result and // care only about failures which we're gonna trigger. @@ -1715,10 +1674,10 @@ async fn nvmf_reset_abort_io() { io_ctx .handle .readv_blocks( - &mut io_ctx.iov, - io_ctx.iovcnt, + io_ctx.dma_buf.as_io_vecs_mut(), (3 * 1024 * 1024) / block_len, BUF_SIZE / block_len, + ReadOptions::None, read_completion_callback, // Use a predefined string to check that we receive the // same context pointer as we pass upon @@ -1731,8 +1690,7 @@ async fn nvmf_reset_abort_io() { io_ctx .handle .writev_blocks( - &mut io_ctx.iov, - io_ctx.iovcnt, + io_ctx.dma_buf.as_io_vecs(), (3 * 1024 * 1024) / block_len, BUF_SIZE / block_len, write_completion_callback, diff --git a/io-engine/tests/nexus_child_online.rs b/io-engine/tests/nexus_child_online.rs index 5b4d87a068..1e4e02b574 100644 --- a/io-engine/tests/nexus_child_online.rs +++ b/io-engine/tests/nexus_child_online.rs @@ -136,25 +136,13 @@ async fn nexus_child_online() { .await .unwrap(); - nex_0.offline_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - Some(ChildStateReason::ByClient), - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); - nex_0.online_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Online, - None, - Duration::from_secs(1), - ) + .online_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); diff --git a/io-engine/tests/nexus_child_retire.rs b/io-engine/tests/nexus_child_retire.rs index b78883f031..443988281d 100644 --- a/io-engine/tests/nexus_child_retire.rs +++ b/io-engine/tests/nexus_child_retire.rs @@ -1,4 +1,4 @@ -#![cfg(feature = "nexus-fault-injection")] +#![cfg(feature = "fault-injection")] use std::time::Duration; @@ -35,13 +35,24 @@ use io_engine::{ nexus_lookup_mut, ChildState, FaultReason, - Injection, - InjectionOp, NexusStatus, }, NexusInfo, }, - core::{CoreError, MayastorCliArgs, Protocol}, + core::{ + fault_injection::{ + add_injection, + FaultDomain, + FaultIoStage, + FaultIoType, + FaultType, + Injection, + }, + CoreError, + IoCompletionStatus, + MayastorCliArgs, + Protocol, + }, lvs::Lvs, persistent_store::PersistentStoreBuilder, pool_backend::PoolArgs, @@ -227,9 +238,9 @@ async fn nexus_child_retire_persist_unresponsive_with_fio() { // Fault replica #0 at block 10. nex_0 - .inject_fault_at_replica( + .add_injection_at_replica( &repl_0, - &format!("op=write&offset={offset}", offset = 10), + &format!("domain=nexus&op=write&offset={offset}", offset = 10), ) .await .unwrap(); @@ -346,9 +357,12 @@ async fn nexus_child_retire_persist_unresponsive_with_bdev_io() { let inj_device = nex.child_at(0).get_device_name().unwrap(); - nex.inject_add(Injection::new( + add_injection(Injection::new( + FaultDomain::Nexus, &inj_device, - InjectionOp::Write, + FaultIoType::Write, + FaultIoStage::Completion, + FaultType::status_data_transfer_error(), Duration::ZERO, Duration::MAX, 0 .. 1, @@ -422,9 +436,12 @@ async fn nexus_child_retire_persist_failure_with_bdev_io() { let inj_device = nex.child_at(0).get_device_name().unwrap(); - nex.inject_add(Injection::new( + add_injection(Injection::new( + FaultDomain::Nexus, &inj_device, - InjectionOp::Write, + FaultIoType::Write, + FaultIoStage::Completion, + FaultType::status_data_transfer_error(), Duration::ZERO, Duration::MAX, 0 .. 1, @@ -448,9 +465,9 @@ async fn nexus_child_retire_persist_failure_with_bdev_io() { assert!(matches!( res, Err(CoreError::WriteFailed { - status: NvmeStatus::Generic( + status: IoCompletionStatus::NvmeError(NvmeStatus::Generic( GenericStatusCode::InternalDeviceError - ), + )), .. }) )); diff --git a/io-engine/tests/nexus_fault_injection.rs b/io-engine/tests/nexus_fault_injection.rs index 12dcff6617..d0a7cc60e1 100644 --- a/io-engine/tests/nexus_fault_injection.rs +++ b/io-engine/tests/nexus_fault_injection.rs @@ -1,4 +1,4 @@ -#![cfg(feature = "nexus-fault-injection")] +#![cfg(feature = "fault-injection")] pub mod common; @@ -18,6 +18,7 @@ use common::{ nexus::{test_write_to_nexus, NexusBuilder}, pool::PoolBuilder, replica::ReplicaBuilder, + testing::{add_injection, list_injections}, }; static POOL_SIZE: u64 = 60; @@ -139,10 +140,10 @@ async fn test_injection_uri(inj_part: &str) { let dev_name = children[0].device_name.as_ref().unwrap(); let inj_uri = format!("inject://{dev_name}?{inj_part}"); - nex_0.inject_nexus_fault(&inj_uri).await.unwrap(); + add_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // List injected fault. - let lst = nex_0.list_injected_faults().await.unwrap(); + let lst = list_injections(nex_0.rpc()).await.unwrap(); assert_eq!(lst.len(), 1); assert_eq!(&lst[0].device_name, dev_name); @@ -164,22 +165,22 @@ async fn test_injection_uri(inj_part: &str) { #[tokio::test] async fn nexus_fault_injection_write_submission() { - test_injection_uri("op=swrite&offset=64").await; + test_injection_uri("domain=nexus&op=write&stage=submit&offset=64").await; } #[tokio::test] async fn nexus_fault_injection_write() { - test_injection_uri("op=write&offset=64").await; + test_injection_uri("domain=nexus&op=write&offset=64").await; } #[tokio::test] async fn nexus_fault_injection_read_submission() { - test_injection_uri("op=sread&offset=64").await; + test_injection_uri("domain=nexus&op=read&stage=submit&offset=64").await; } #[tokio::test] async fn nexus_fault_injection_read() { - test_injection_uri("op=read&offset=64").await; + test_injection_uri("domain=nexus&op=read&offset=64").await; } #[tokio::test] @@ -201,12 +202,12 @@ async fn nexus_fault_injection_time_based() { // Create an injection that will start in 1 sec after first I/O // to the device, and end after 5s. - let inj_part = "op=write&begin=1000&end=5000"; + let inj_part = "domain=nexus&op=write&begin=1000&end=5000"; let inj_uri = format!("inject://{dev_name}?{inj_part}"); - nex_0.inject_nexus_fault(&inj_uri).await.unwrap(); + add_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // List injected fault. - let lst = nex_0.list_injected_faults().await.unwrap(); + let lst = list_injections(nex_0.rpc()).await.unwrap(); assert_eq!(lst.len(), 1); assert_eq!(&lst[0].device_name, dev_name); @@ -279,12 +280,12 @@ async fn nexus_fault_injection_range_based() { // Create injection that will fail at offset of 128 blocks, for a span // of 16 blocks. - let inj_part = "op=write&offset=128&num_blk=16"; + let inj_part = "domain=nexus&op=write&offset=128&num_blk=16"; let inj_uri = format!("inject://{dev_name}?{inj_part}"); - nex_0.inject_nexus_fault(&inj_uri).await.unwrap(); + add_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // List injected fault. - let lst = nex_0.list_injected_faults().await.unwrap(); + let lst = list_injections(nex_0.rpc()).await.unwrap(); assert_eq!(lst.len(), 1); assert_eq!(&lst[0].device_name, dev_name); diff --git a/io-engine/tests/nexus_rebuild.rs b/io-engine/tests/nexus_rebuild.rs index 3e44b26f65..c4f2d0fe96 100644 --- a/io-engine/tests/nexus_rebuild.rs +++ b/io-engine/tests/nexus_rebuild.rs @@ -108,6 +108,7 @@ async fn nexus_share() -> String { device } +#[allow(deprecated)] async fn wait_for_replica_rebuild(src_replica: &str, new_replica: &str) { let ms = get_ms(); diff --git a/io-engine/tests/nexus_rebuild_partial.rs b/io-engine/tests/nexus_rebuild_partial.rs index d4c01206f9..2fd806c8ba 100644 --- a/io-engine/tests/nexus_rebuild_partial.rs +++ b/io-engine/tests/nexus_rebuild_partial.rs @@ -16,15 +16,18 @@ use common::{ replica::{validate_replicas, ReplicaBuilder}, }; -#[cfg(feature = "nexus-fault-injection")] +#[cfg(feature = "fault-injection")] use io_engine_tests::{ fio::{Fio, FioJob}, nexus::test_fio_to_nexus, }; -#[cfg(feature = "nexus-fault-injection")] +#[cfg(feature = "fault-injection")] use common::compose::rpc::v1::nexus::RebuildJobState; +#[cfg(feature = "fault-injection")] +use common::testing::{add_injection, remove_injection}; + use std::time::Duration; /// Pool size. @@ -147,7 +150,7 @@ async fn create_test_storage(test: &ComposeTest) -> StorageBuilder { } #[tokio::test] -#[cfg(feature = "nexus-fault-injection")] +#[cfg(feature = "fault-injection")] // 1. Create a nexus with two replicas. // 2. Create a fault injection on one replica, and write some data. // 3. Online the failed replica and wait until it gets back. @@ -186,10 +189,11 @@ async fn nexus_partial_rebuild_io_fault() { // All write operations starting of segment #7 will fail. let dev_name_1 = children[1].device_name.as_ref().unwrap(); let inj_uri = format!( - "inject://{dev_name_1}?op=write&offset={offset}", + "inject://{dev_name_1}?domain=nexus&op=write&offset={offset}", offset = 7 * SEG_BLK ); - nex_0.inject_nexus_fault(&inj_uri).await.unwrap(); + + add_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // This write must be okay as the injection is not triggered yet. test_write_to_nexus( @@ -253,7 +257,7 @@ async fn nexus_partial_rebuild_io_fault() { .unwrap(); // Remove injection. - nex_0.remove_injected_nexus_fault(&inj_uri).await.unwrap(); + remove_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // Bring the child online. That will trigger partial rebuild. nex_0.online_child_replica(&repl_1).await.unwrap(); @@ -309,16 +313,8 @@ async fn nexus_partial_rebuild_offline_online() { .unwrap(); // Offline the replica. - nex_0.offline_child_replica(&repl_0).await.unwrap(); - - // Transition to offline state is not immediate, nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - None, - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); @@ -360,7 +356,7 @@ async fn nexus_partial_rebuild_offline_online() { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[cfg(feature = "nexus-fault-injection")] +#[cfg(feature = "fault-injection")] /// I/O failure during rebuild. /// Initiate a (partial) rebuild, and force a replica to fail with I/O error /// while the rebuild job is running. @@ -466,16 +462,11 @@ async fn nexus_partial_rebuild_double_fault() { let child_0_dev_name = children[0].device_name.as_ref().unwrap(); // Offline the replica again. - nex_0.offline_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - None, - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); + let children = nex_0.get_nexus().await.unwrap().children; assert_eq!(children[0].state(), ChildState::Degraded); assert_eq!(children[0].state_reason(), ChildStateReason::ByClient); @@ -496,10 +487,11 @@ async fn nexus_partial_rebuild_double_fault() { // Inject a failure at FAULT_POS. let inj_uri = format!( - "inject://{child_0_dev_name}?op=write&offset={offset}&num_blk=1", + "inject://{child_0_dev_name}?\ + domain=nexus&op=write&offset={offset}&num_blk=1", offset = FAULT_POS * 1024 * 1024 / BLK_SIZE ); - nex_0.inject_nexus_fault(&inj_uri).await.unwrap(); + add_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // Online the replica, triggering the rebuild. let j0 = tokio::spawn({ @@ -548,16 +540,11 @@ async fn nexus_partial_rebuild_double_fault() { .unwrap(); // Offline the replica again. - nex_0.offline_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - None, - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); + let children = nex_0.get_nexus().await.unwrap().children; assert_eq!(children[0].state(), ChildState::Degraded); assert_eq!(children[0].state_reason(), ChildStateReason::ByClient); diff --git a/io-engine/tests/nexus_rebuild_verify.rs b/io-engine/tests/nexus_rebuild_verify.rs new file mode 100644 index 0000000000..e11e741e1a --- /dev/null +++ b/io-engine/tests/nexus_rebuild_verify.rs @@ -0,0 +1,148 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use common::{ + compose::{ + rpc::v1::{ + nexus::{ChildState, ChildStateReason, RebuildJobState}, + GrpcConnect, + }, + Binary, + Builder, + }, + nexus::NexusBuilder, + pool::PoolBuilder, + replica::ReplicaBuilder, + testing::add_injection, +}; + +use std::time::Duration; + +#[allow(dead_code)] +struct StorageConfig { + pool_0: PoolBuilder, + pool_1: PoolBuilder, + repl_0: ReplicaBuilder, + repl_1: ReplicaBuilder, + nex_0: NexusBuilder, +} + +const POOL_SIZE: u64 = 80; +const REPL_SIZE: u64 = 60; +const NEXUS_SIZE: u64 = REPL_SIZE; + +#[tokio::test] +async fn nexus_rebuild_verify_local() { + common::composer_init(); + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec!["-l", "1"]), + ) + .add_container_bin( + "ms_1", + Binary::from_dbg("io-engine").with_args(vec!["-l", "2"]), + ) + .add_container_bin( + "ms_nex", + Binary::from_dbg("io-engine") + // Disable partial rebuild to force rebuild I/O. + .with_env("NEXUS_PARTIAL_REBUILD", "0") + // Set rebuild revify mode to fail. + .with_env("NEXUS_REBUILD_VERIFY", "fail") + .with_args(vec!["-l", "3", "-Fcolor,compact"]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + + let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); + let ms_1 = conn.grpc_handle_shared("ms_1").await.unwrap(); + let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(ms_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(false); + + pool_0.create().await.unwrap(); + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let mut pool_1 = PoolBuilder::new(ms_1.clone()) + .with_name("pool1") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_1 = ReplicaBuilder::new(ms_1.clone()) + .with_pool(&pool_1) + .with_name("r1") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(false); + + pool_1.create().await.unwrap(); + repl_1.create().await.unwrap(); + repl_1.share().await.unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name("nexus0") + .with_new_uuid() + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0) + .with_replica(&repl_1); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); + + let children = nex_0.get_nexus().await.unwrap().children; + assert_eq!(children.len(), 2); + let dev_name = children[0].device_name.as_ref().unwrap(); + + // Offline the replica. + nex_0 + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) + .await + .unwrap(); + + // Add an injection as block device level. + let inj_part = "domain=block&op=write&stage=submission&type=data\ + &offset=10240&num_blk=1"; + let inj_uri = format!("inject://{dev_name}?{inj_part}"); + add_injection(nex_0.rpc(), &inj_uri).await.unwrap(); + + // Online the replica. Rebuild must fail at some point because of injected + // miscompare. + nex_0.online_child_replica(&repl_0).await.unwrap(); + + // Wait until the rebuild fails. + nex_0 + .wait_replica_state( + &repl_0, + ChildState::Faulted, + Some(ChildStateReason::RebuildFailed), + Duration::from_secs(5), + ) + .await + .unwrap(); + + // Check that the rebuild history has a single failed record. + let hist = nex_0.get_rebuild_history().await.unwrap(); + assert_eq!(hist.len(), 1); + assert_eq!(hist[0].state(), RebuildJobState::Failed); +} diff --git a/io-engine/tests/nvme_device_timeout.rs b/io-engine/tests/nvme_device_timeout.rs index 13c24346f6..3aa213a5f4 100755 --- a/io-engine/tests/nvme_device_timeout.rs +++ b/io-engine/tests/nvme_device_timeout.rs @@ -20,10 +20,11 @@ use io_engine::{ DeviceTimeoutAction, IoCompletionStatus, MayastorCliArgs, + ReadOptions, }, subsys::{Config, NvmeBdevOpts}, }; -use spdk_rs::{DmaBuf, IoVec}; +use spdk_rs::{AsIoVecs, DmaBuf}; pub mod common; @@ -37,9 +38,8 @@ static CALLBACK_FLAG: AtomicCell = AtomicCell::new(false); const BUF_SIZE: u64 = 32768; struct IoOpCtx { - iov: IoVec, device_url: String, - dma_buf: DmaBuf, + dma_buf: Vec, handle: Box, } @@ -181,24 +181,20 @@ async fn test_io_timeout(action_on_timeout: DeviceTimeoutAction) { }; let mut io_ctx = IoOpCtx { - iov: IoVec::default(), device_url: ctx.device_url, - dma_buf: DmaBuf::new(BUF_SIZE, alignment).unwrap(), + dma_buf: vec![DmaBuf::new(BUF_SIZE, alignment).unwrap()], handle: ctx.handle, }; - io_ctx.iov.iov_base = *io_ctx.dma_buf; - io_ctx.iov.iov_len = BUF_SIZE; - CALLBACK_FLAG.store(false); io_ctx .handle .readv_blocks( - &mut io_ctx.iov, - 1, + io_ctx.dma_buf.as_io_vecs_mut(), (3 * 1024 * 1024) / block_len, BUF_SIZE / block_len, + ReadOptions::None, read_completion_callback, TEST_CTX_STRING.as_ptr() as *mut c_void, ) @@ -377,24 +373,20 @@ async fn io_timeout_ignore() { }; let mut io_ctx = IoOpCtx { - iov: IoVec::default(), device_url: ctx.device_url, - dma_buf: DmaBuf::new(BUF_SIZE, alignment).unwrap(), + dma_buf: vec![DmaBuf::new(BUF_SIZE, alignment).unwrap()], handle: ctx.handle, }; - io_ctx.iov.iov_base = *io_ctx.dma_buf; - io_ctx.iov.iov_len = BUF_SIZE; - CALLBACK_FLAG.store(false); io_ctx .handle .readv_blocks( - &mut io_ctx.iov, - 1, + io_ctx.dma_buf.as_io_vecs_mut(), (3 * 1024 * 1024) / block_len, BUF_SIZE / block_len, + ReadOptions::None, read_completion_callback, TEST_CTX_STRING.as_ptr() as *mut c_void, ) diff --git a/rpc/mayastor-api b/rpc/mayastor-api index 400c703362..e03b92dea9 160000 --- a/rpc/mayastor-api +++ b/rpc/mayastor-api @@ -1 +1 @@ -Subproject commit 400c7033627f336457982380edae05cabb08c7e5 +Subproject commit e03b92dea984907eeddfe53c8fb60ae56f60564c diff --git a/spdk-rs b/spdk-rs index a457fb208b..26881d3be3 160000 --- a/spdk-rs +++ b/spdk-rs @@ -1 +1 @@ -Subproject commit a457fb208bebd0b18990d34b01e32e7c6f2e6ebf +Subproject commit 26881d3be3d87c5d454ddf70e1768af6d2e08b14