From b088f322bf9cf74e33f3c8c042d2012d08946792 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Fri, 2 Aug 2024 15:25:43 +0100 Subject: [PATCH 1/5] feat: allow custom fabrics connect timeout Allows passing this via env NVMF_FABRICS_CONNECT_TIMEOUT. Also defaults it to 1s for now, rather than 500ms. Signed-off-by: Tiago Castro --- io-engine/src/bdev/nvmx/controller.rs | 7 +++++-- io-engine/src/bdev/nvmx/uri.rs | 6 ++++++ io-engine/src/subsys/config/opts.rs | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/io-engine/src/bdev/nvmx/controller.rs b/io-engine/src/bdev/nvmx/controller.rs index 1751a91f6..b98b97c13 100644 --- a/io-engine/src/bdev/nvmx/controller.rs +++ b/io-engine/src/bdev/nvmx/controller.rs @@ -1071,8 +1071,11 @@ pub(crate) mod options { self.admin_timeout_ms = Some(timeout); self } - pub fn with_fabrics_connect_timeout_us(mut self, timeout: u64) -> Self { - self.fabrics_connect_timeout_us = Some(timeout); + pub fn with_fabrics_connect_timeout_us>>( + mut self, + timeout: T, + ) -> Self { + self.fabrics_connect_timeout_us = timeout.into(); self } diff --git a/io-engine/src/bdev/nvmx/uri.rs b/io-engine/src/bdev/nvmx/uri.rs index 29b9d4b44..856b60655 100644 --- a/io-engine/src/bdev/nvmx/uri.rs +++ b/io-engine/src/bdev/nvmx/uri.rs @@ -227,6 +227,12 @@ impl<'probe> NvmeControllerContext<'probe> { ) .with_transport_retry_count( Config::get().nvme_bdev_opts.transport_retry_count as u8, + ) + .with_fabrics_connect_timeout_us( + crate::subsys::config::opts::try_from_env( + "NVMF_FABRICS_CONNECT_TIMEOUT", + 1_000_000, + ), ); let hostnqn = template.hostnqn.clone().or_else(|| { diff --git a/io-engine/src/subsys/config/opts.rs b/io-engine/src/subsys/config/opts.rs index aece4fcda..fb9ad70be 100644 --- a/io-engine/src/subsys/config/opts.rs +++ b/io-engine/src/subsys/config/opts.rs @@ -156,7 +156,7 @@ pub struct NvmfTcpTransportOpts { } /// try to read an env variable or returns the default when not found -fn try_from_env(name: &str, default: T) -> T +pub(crate) fn try_from_env(name: &str, default: T) -> T where T: FromStr + Display + Copy, ::Err: Debug + Display, From ec023391e460ee1c9c549c259b7a1b10d4514986 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 6 Aug 2024 09:45:54 +0100 Subject: [PATCH 2/5] fix(nvmx/qpair): return errno with absolute value Otherwise a returned negative value translates into an unknown Errno. Signed-off-by: Tiago Castro --- io-engine/src/bdev/nvmx/qpair.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/io-engine/src/bdev/nvmx/qpair.rs b/io-engine/src/bdev/nvmx/qpair.rs index 141364ce9..ecdf7bbca 100644 --- a/io-engine/src/bdev/nvmx/qpair.rs +++ b/io-engine/src/bdev/nvmx/qpair.rs @@ -467,9 +467,9 @@ impl<'a> Connection<'a> { 0 => Ok(false), // Connection is still in progress, keep polling. 1 => Ok(true), - // Error occured during polling. + // Error occurred during polling. e => { - let e = Errno::from_i32(-e); + let e = Errno::from_i32(e.abs()); error!(?self, "I/O qpair async connection polling error: {e}"); Err(e) } From ad5c31a6fdbf9d069f5d322dc0818afeff821ce8 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 6 Aug 2024 09:49:47 +0100 Subject: [PATCH 3/5] fix(nexus-child/unplug): remove usage of block_on Initially this block_on was added because the remove callback was running in blocking fashion, but this has since changed and unplug is actually called from async context. As such, we don't need the block_on and simply call the async code directly. Also, simplify complete notification, as we can simply close the sender. Signed-off-by: Tiago Castro --- .../src/bdev/nexus/nexus_bdev_children.rs | 2 +- io-engine/src/bdev/nexus/nexus_child.rs | 34 +++++++------------ 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/io-engine/src/bdev/nexus/nexus_bdev_children.rs b/io-engine/src/bdev/nexus/nexus_bdev_children.rs index 2e5eca333..ecbc2b2b7 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_children.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_children.rs @@ -900,7 +900,7 @@ impl<'n> Nexus<'n> { nexus_name, child_device, "Unplugging nexus child device", ); - child.unplug(); + child.unplug().await; } None => { warn!( diff --git a/io-engine/src/bdev/nexus/nexus_child.rs b/io-engine/src/bdev/nexus/nexus_child.rs index d7dab1f31..c696d3cbd 100644 --- a/io-engine/src/bdev/nexus/nexus_child.rs +++ b/io-engine/src/bdev/nexus/nexus_child.rs @@ -24,8 +24,6 @@ use crate::{ BlockDeviceHandle, CoreError, DeviceEventSink, - Reactor, - Reactors, VerboseError, }, eventing::replica_events::state_change_event_meta, @@ -1109,7 +1107,7 @@ impl<'c> NexusChild<'c> { /// underlying device is removed. /// /// Note: The descriptor *must* be dropped for the unplug to complete. - pub(crate) fn unplug(&mut self) { + pub(crate) async fn unplug(&mut self) { info!("{self:?}: unplugging child..."); let state = self.state(); @@ -1139,12 +1137,10 @@ impl<'c> NexusChild<'c> { // device-related events directly. if state != ChildState::Faulted(FaultReason::IoError) { let nexus_name = self.parent.clone(); - Reactor::block_on(async move { - match nexus_lookup_mut(&nexus_name) { - Some(n) => n.reconfigure(DrEvent::ChildUnplug).await, - None => error!("Nexus '{nexus_name}' not found"), - } - }); + match nexus_lookup_mut(&nexus_name) { + Some(n) => n.reconfigure(DrEvent::ChildUnplug).await, + None => error!("Nexus '{nexus_name}' not found"), + } } if is_destroying { @@ -1153,22 +1149,16 @@ impl<'c> NexusChild<'c> { self.device_descriptor.take(); } - self.unplug_complete(); - info!("{self:?}: child successfully unplugged"); + self.unplug_complete().await; } /// Signal that the child unplug is complete. - fn unplug_complete(&self) { - let sender = self.remove_channel.0.clone(); - let name = self.name.clone(); - Reactors::current().send_future(async move { - if let Err(e) = sender.send(()).await { - error!( - "Failed to send unplug complete for child '{}': {}", - name, e - ); - } - }); + async fn unplug_complete(&self) { + if let Err(error) = self.remove_channel.0.send(()).await { + info!("{self:?}: failed to send unplug complete: {error}"); + } else { + info!("{self:?}: child successfully unplugged"); + } } /// create a new nexus child From d5b5d44da28aa25eb216e9ea61fd2dd5a1ce40e1 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 6 Aug 2024 09:54:42 +0100 Subject: [PATCH 4/5] fix(nvmf/target): remove usage of block_on Split creating from starting the subsystem. This way we can start the subsystem in master reactor, and then move to the next spdk subsystem. Signed-off-by: Tiago Castro --- io-engine/src/subsys/nvmf/target.rs | 34 +++++++++++++++-------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/io-engine/src/subsys/nvmf/target.rs b/io-engine/src/subsys/nvmf/target.rs index 56c23f281..833d39d2d 100644 --- a/io-engine/src/subsys/nvmf/target.rs +++ b/io-engine/src/subsys/nvmf/target.rs @@ -27,7 +27,7 @@ use spdk_rs::libspdk::{ use crate::{ constants::NVME_CONTROLLER_MODEL_ID, - core::{Cores, Mthread, Reactor, Reactors}, + core::{Cores, Mthread, Reactors}, ffihelper::{AsStr, FfiResult}, subsys::{ nvmf::{ @@ -270,9 +270,9 @@ impl Target { Ok(()) } - /// enable discovery for the target -- note that the discovery system is not - /// started - fn enable_discovery(&self) { + /// Create the discovery for the target -- note that the discovery system is + /// not started. + fn create_discovery_subsystem(&self) -> NvmfSubsystem { debug!("enabling discovery for target"); let discovery = unsafe { NvmfSubsystem::from(spdk_nvmf_subsystem_create( @@ -296,12 +296,7 @@ impl Target { discovery.allow_any(true); - Reactor::block_on(async { - let nqn = discovery.get_nqn(); - if let Err(e) = discovery.start().await { - error!("Error starting subsystem '{}': {}", nqn, e.to_string()); - } - }); + discovery } /// stop all subsystems on this target we are borrowed here @@ -355,13 +350,20 @@ impl Target { /// Final state for the target during init. pub fn running(&mut self) { - self.enable_discovery(); - info!( - "nvmf target accepting new connections and is ready to roll..{}", - '\u{1F483}' - ); + let discovery = self.create_discovery_subsystem(); - unsafe { spdk_subsystem_init_next(0) } + Reactors::master().send_future(async move { + let nqn = discovery.get_nqn(); + if let Err(error) = discovery.start().await { + error!("Error starting subsystem '{nqn}': {error}"); + } + + info!( + "nvmf target accepting new connections and is ready to roll..{}", + '\u{1F483}' + ); + unsafe { spdk_subsystem_init_next(0) } + }) } /// Shutdown procedure. From d9e19b31a2b68448c30274934dab844efd761073 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Fri, 9 Aug 2024 10:40:32 +0100 Subject: [PATCH 5/5] chore: add warning to block_on Should this become an unsafe function? Signed-off-by: Tiago Castro --- io-engine/src/bdev/nexus/nexus_bdev.rs | 48 +++++++++++++++----------- io-engine/src/core/reactor.rs | 9 ++++- io-engine/src/grpc/mod.rs | 17 --------- 3 files changed, 36 insertions(+), 38 deletions(-) diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index 0ac9526a0..a85baac81 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -1373,30 +1373,38 @@ impl<'n> BdevOps for Nexus<'n> { return; } - let self_ptr = unsafe { unsafe_static_ptr(&self) }; - - Reactor::block_on(async move { - let self_ref = unsafe { &mut *self_ptr }; - - // TODO: double-check interaction with rebuild job logic - // TODO: cancel rebuild jobs? - let n = self_ref.children.iter().filter(|c| c.is_opened()).count(); - - if n > 0 { - warn!( - "{:?}: {} open children remain(s), closing...", - self_ref, n - ); + let open_children = + self.children.iter().filter(|c| c.is_opened()).count(); + // TODO: This doesn't seem possible to happen at this stage, but seems + // we should still try to handle this in separate future since + // we're handling it here anyway as a block_on is not safe to + // use for running production code. + if open_children > 0 { + let self_ptr = unsafe { unsafe_static_ptr(&self) }; + Reactor::block_on(async move { + let self_ref = unsafe { &mut *self_ptr }; + + // TODO: double-check interaction with rebuild job logic + // TODO: cancel rebuild jobs? + let n = + self_ref.children.iter().filter(|c| c.is_opened()).count(); + + if n > 0 { + warn!( + "{:?}: {} open children remain(s), closing...", + self_ref, n + ); - for child in self_ref.children.iter() { - if child.is_opened() { - child.close().await.ok(); + for child in self_ref.children.iter() { + if child.is_opened() { + child.close().await.ok(); + } } } - } - self_ref.children.clear(); - }); + self_ref.children.clear(); + }); + } self.as_mut().unregister_io_device(); unsafe { diff --git a/io-engine/src/core/reactor.rs b/io-engine/src/core/reactor.rs index f2758bc99..e93fac017 100644 --- a/io-engine/src/core/reactor.rs +++ b/io-engine/src/core/reactor.rs @@ -362,8 +362,15 @@ impl Reactor { task } - /// spawn a future locally on the current core block until the future is + /// Spawns a future locally on the current core block until the future is /// completed. The master core is used. + /// # Warning + /// This code should only be used for testing and not running production! + /// This is because when calling block_on from a thread_poll callback, we + /// may be leaving messages behind, which can lead to timeouts etc... + /// A work-around to make this safe could be to potentially "pull" the + /// messages which haven't been polled, and poll them here before + /// proceeding to re-poll via thread_poll again. pub fn block_on(future: F) -> Option where F: Future + 'static, diff --git a/io-engine/src/grpc/mod.rs b/io-engine/src/grpc/mod.rs index b1c978646..70531a836 100644 --- a/io-engine/src/grpc/mod.rs +++ b/io-engine/src/grpc/mod.rs @@ -2,7 +2,6 @@ use futures::channel::oneshot::Receiver; use nix::errno::Errno; pub use server::MayastorGrpcServer; use std::{ - error::Error, fmt::{Debug, Display}, future::Future, time::Duration, @@ -158,22 +157,6 @@ macro_rules! spdk_submit { pub type GrpcResult = std::result::Result, Status>; -/// call the given future within the context of the reactor on the first core -/// on the init thread, while the future is waiting to be completed the reactor -/// is continuously polled so that forward progress can be made -pub fn rpc_call(future: G) -> Result, tonic::Status> -where - G: Future> + 'static, - I: 'static, - L: Into + Error + 'static, - A: 'static + From, -{ - Reactor::block_on(future) - .unwrap() - .map(|r| Response::new(A::from(r))) - .map_err(|e| e.into()) -} - /// Submit rpc code to the primary reactor. pub fn rpc_submit( future: F,