diff --git a/io-engine-tests/src/compose/mod.rs b/io-engine-tests/src/compose/mod.rs index cae17aa94..6daa7cc89 100644 --- a/io-engine-tests/src/compose/mod.rs +++ b/io-engine-tests/src/compose/mod.rs @@ -5,13 +5,21 @@ use std::future::Future; use tokio::sync::oneshot::channel; use crate::mayastor_test_init_ex; -use io_engine::core::{ - mayastor_env_stop, - MayastorCliArgs, - MayastorEnvironment, - Reactor, - Reactors, - GLOBAL_RC, +use io_engine::{ + core::{ + device_monitor_loop, + mayastor_env_stop, + runtime, + MayastorCliArgs, + MayastorEnvironment, + ProtectedSubsystems, + Reactor, + Reactors, + ResourceLockManager, + ResourceLockManagerConfig, + GLOBAL_RC, + }, + grpc, }; use std::time::Duration; @@ -99,6 +107,34 @@ impl<'a> MayastorTest<'a> { tokio::time::sleep(Duration::from_millis(500)).await; } } + + /// Starts the device monitor loop which is required to fully + /// remove devices when they are not in use. + pub fn start_device_monitor(&self) { + runtime::spawn(device_monitor_loop()); + } + + /// Start the gRPC server which can be useful to debug tests. + pub fn start_grpc(&self) { + let cfg = ResourceLockManagerConfig::default() + .with_subsystem(ProtectedSubsystems::POOL, 32) + .with_subsystem(ProtectedSubsystems::NEXUS, 512) + .with_subsystem(ProtectedSubsystems::REPLICA, 1024); + ResourceLockManager::initialize(cfg); + + let env = MayastorEnvironment::global_or_default(); + runtime::spawn(async { + grpc::MayastorGrpcServer::run( + &env.node_name, + &env.node_nqn, + env.grpc_endpoint.unwrap(), + env.rpc_addr, + env.api_versions, + ) + .await + .ok(); + }); + } } impl<'a> Drop for MayastorTest<'a> { diff --git a/io-engine/src/bdev/nexus/nexus_bdev_children.rs b/io-engine/src/bdev/nexus/nexus_bdev_children.rs index ecbc2b2b7..d14c9dd05 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_children.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_children.rs @@ -41,6 +41,7 @@ use super::{ Nexus, NexusChild, NexusOperation, + NexusPauseState, NexusState, NexusStatus, PersistOp, @@ -787,6 +788,13 @@ impl<'n> DeviceEventListener for Nexus<'n> { false, ); } + DeviceEventType::AdminQNoticeCtrlFailed => { + Reactors::master().send_future(Nexus::disconnect_failed_child( + self.name.clone(), + dev_name.to_owned(), + )); + } + _ => { warn!( "{:?}: ignoring event '{:?}' for device '{}'", @@ -917,6 +925,28 @@ impl<'n> Nexus<'n> { } } + /// Disconnect a failed child from the given nexus. + async fn disconnect_failed_child(nexus_name: String, dev: String) { + let Some(nex) = nexus_lookup_mut(&nexus_name) else { + warn!( + "Nexus '{nexus_name}': retiring failed device '{dev}': \ + nexus already gone" + ); + return; + }; + + info!("Nexus '{nexus_name}': disconnect handlers for controller failed device: '{dev}'"); + + if nex.io_subsystem_state() == Some(NexusPauseState::Pausing) { + nex.traverse_io_channels_async((), |channel, _| { + channel.disconnect_detached_devices(|h| { + h.get_device().device_name() == dev && h.is_ctrlr_failed() + }); + }) + .await; + } + } + /// Retires a child device for the given nexus. async fn child_retire_routine( nexus_name: String, @@ -981,12 +1011,12 @@ impl<'n> Nexus<'n> { // channels, and all I/Os failing due to this device will eventually // resubmit and succeeded (if any healthy children are left). // - // Device disconnection is done in two steps (detach, than disconnect) + // Device disconnection is done in two steps (detach, then disconnect) // in order to prevent an I/O race when retiring a device. self.detach_device(&dev).await; // Disconnect the devices with failed controllers _before_ pause, - // otherwise pause would stuck. Keep all controoled that are _not_ + // otherwise pause would get stuck. Keep all controllers which are _not_ // failed (e.g., in the case I/O failed due to ENOSPC). self.traverse_io_channels_async((), |channel, _| { channel.disconnect_detached_devices(|h| h.is_ctrlr_failed()); diff --git a/io-engine/src/bdev/nexus/nexus_channel.rs b/io-engine/src/bdev/nexus/nexus_channel.rs index 831d9c969..2cc82626f 100644 --- a/io-engine/src/bdev/nexus/nexus_channel.rs +++ b/io-engine/src/bdev/nexus/nexus_channel.rs @@ -32,13 +32,14 @@ impl<'n> Debug for NexusChannel<'n> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]", + "{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c}]", io = if self.is_io_chan { "I/O" } else { "Aux" }, nex = self.nexus.nexus_name(), core = self.core, cur = Cores::current(), r = self.readers.len(), w = self.writers.len(), + d = self.detached.len(), l = self.io_logs.len(), c = self.nexus.child_count(), ) diff --git a/io-engine/src/bdev/nvmx/controller.rs b/io-engine/src/bdev/nvmx/controller.rs index b98b97c13..8cdfcce16 100644 --- a/io-engine/src/bdev/nvmx/controller.rs +++ b/io-engine/src/bdev/nvmx/controller.rs @@ -843,13 +843,14 @@ pub extern "C" fn nvme_poll_adminq(ctx: *mut c_void) -> i32 { if result < 0 { if context.start_device_destroy() { error!( - "process adminq: {}: {}", + "process adminq: {}: ctrl failed: {}, error: {}", context.name, + context.is_failed(), Errno::from_i32(result.abs()) ); info!("dispatching nexus fault and retire: {}", context.name); - let dev_name = context.name.to_string(); - let carc = NVME_CONTROLLERS.lookup_by_name(&dev_name).unwrap(); + let dev_name = context.name.as_str(); + let carc = NVME_CONTROLLERS.lookup_by_name(dev_name).unwrap(); debug!( ?dev_name, "notifying listeners of admin command completion failure" @@ -863,6 +864,11 @@ pub extern "C" fn nvme_poll_adminq(ctx: *mut c_void) -> i32 { ?num_listeners, "listeners notified of admin command completion failure" ); + } else if context.report_failed() { + if let Some(carc) = NVME_CONTROLLERS.lookup_by_name(&context.name) { + carc.lock() + .notify_listeners(DeviceEventType::AdminQNoticeCtrlFailed); + } } return 1; } diff --git a/io-engine/src/bdev/nvmx/controller_inner.rs b/io-engine/src/bdev/nvmx/controller_inner.rs index 35d3b8ec9..cff4b69f7 100644 --- a/io-engine/src/bdev/nvmx/controller_inner.rs +++ b/io-engine/src/bdev/nvmx/controller_inner.rs @@ -74,6 +74,7 @@ pub(crate) struct TimeoutConfig { reset_attempts: u32, next_reset_time: Instant, destroy_in_progress: AtomicCell, + report_failed: AtomicCell, } impl Drop for TimeoutConfig { @@ -94,6 +95,7 @@ impl TimeoutConfig { reset_attempts: MAX_RESET_ATTEMPTS, next_reset_time: Instant::now(), destroy_in_progress: AtomicCell::new(false), + report_failed: AtomicCell::new(true), } } @@ -116,6 +118,19 @@ impl TimeoutConfig { } } + /// Check if the SPDK's nvme controller is failed. + pub fn is_failed(&self) -> bool { + self.ctrlr.is_failed + } + /// Check if we need to report the controller failure. + /// We only report this failure once. + pub fn report_failed(&mut self) -> bool { + if !self.is_failed() { + return false; + } + self.report_failed.compare_exchange(true, false).is_ok() + } + fn reset_cb(success: bool, ctx: *mut c_void) { let timeout_ctx = TimeoutConfig::from_ptr(ctx as *mut TimeoutConfig); diff --git a/io-engine/src/core/device_events.rs b/io-engine/src/core/device_events.rs index 7f3eaeaee..196a04b35 100644 --- a/io-engine/src/core/device_events.rs +++ b/io-engine/src/core/device_events.rs @@ -19,8 +19,14 @@ pub enum DeviceEventType { DeviceResized, /// TODO MediaManagement, - /// TODO + /// Sent when admin q polling fails for the first time. AdminCommandCompletionFailed, + /// When the adminq poll fails the first time, the controller may not yet + /// be failed. + /// Next time the admin q poll fails, if the controller is noticed as + /// failed for the first time, this event is sent, allowing further + /// clean up to be performed. + AdminQNoticeCtrlFailed, } /// TODO diff --git a/io-engine/src/core/env.rs b/io-engine/src/core/env.rs index e214b2238..e7e7e3f2c 100644 --- a/io-engine/src/core/env.rs +++ b/io-engine/src/core/env.rs @@ -382,7 +382,7 @@ type Result = std::result::Result; #[allow(dead_code)] pub struct MayastorEnvironment { pub node_name: String, - node_nqn: Option, + pub node_nqn: Option, pub grpc_endpoint: Option, pub registration_endpoint: Option, ps_endpoint: Option, @@ -421,7 +421,7 @@ pub struct MayastorEnvironment { nvmf_tgt_interface: Option, /// NVMF target Command Retry Delay in x100 ms. pub nvmf_tgt_crdt: [u16; TARGET_CRDT_LEN], - api_versions: Vec, + pub api_versions: Vec, skip_sig_handler: bool, enable_io_all_thrd_nexus_channels: bool, developer_delay: bool, diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index 491427dfa..c068ed0b7 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -320,7 +320,7 @@ impl RebuildJob { } } -#[derive(Debug, Clone)] +#[derive(Debug)] struct RebuildFBendChan { sender: async_channel::Sender, } diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index 8aae76f79..360599d3d 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -336,7 +336,7 @@ impl RebuildJobBackendManager { } } - /// Reply back to the requester with the generic rebuild stats. + /// Reply to the requester with the generic rebuild stats. async fn reply_stats( &mut self, requester: oneshot::Sender, @@ -488,10 +488,27 @@ impl RebuildJobBackendManager { } impl Drop for RebuildJobBackendManager { + /// Close and drain comms channel allowing sender to see the cancellation + /// error, should it attempt to communicate. + /// This is required because it seems if a message was already sent then it + /// will not get dropped until both the receivers and the senders are + /// dropped. fn drop(&mut self) { + // set final stats now so failed stats requesters can still get stats. let stats = self.stats(); info!("{self}: backend dropped; final stats: {stats:?}"); - self.states.write().set_final_stats(stats); + self.states.write().set_final_stats(stats.clone()); + + // we close before draining, ensuring no new messages can be sent + self.info_chan.receiver.close(); + // now we can drain, and we could just ignore, but let's try to + // reply to any stats requests + while let Ok(message) = self.info_chan.receiver.try_recv() { + if let RebuildJobRequest::GetStats(reply) = message { + reply.send(stats.clone()).ok(); + } + } + for sender in self.complete_chan.lock().drain(..) { sender.send(self.state()).ok(); } diff --git a/io-engine/src/rebuild/rebuild_state.rs b/io-engine/src/rebuild/rebuild_state.rs index 7dde6c9de..0a417ba4c 100644 --- a/io-engine/src/rebuild/rebuild_state.rs +++ b/io-engine/src/rebuild/rebuild_state.rs @@ -74,8 +74,10 @@ impl RebuildStates { } /// Set the final rebuild statistics. pub(super) fn set_final_stats(&mut self, mut stats: RebuildStats) { - stats.end_time = Some(Utc::now()); - self.final_stats = Some(stats); + if self.final_stats.is_none() { + stats.end_time = Some(Utc::now()); + self.final_stats = Some(stats); + } } /// Set's the next pending state diff --git a/io-engine/src/subsys/config/opts.rs b/io-engine/src/subsys/config/opts.rs index fb9ad70be..8c2a05f69 100644 --- a/io-engine/src/subsys/config/opts.rs +++ b/io-engine/src/subsys/config/opts.rs @@ -392,7 +392,7 @@ impl Default for NvmeBdevOpts { nvme_adminq_poll_period_us: time_try_from_env( "NVME_ADMINQ_POLL_PERIOD", 1_000, - TimeUnit::MilliSeconds, + TimeUnit::MicroSeconds, ), nvme_ioq_poll_period_us: time_try_from_env( "NVME_IOQ_POLL_PERIOD",