diff --git a/mayastor/src/bdev/nexus/nexus_bdev.rs b/mayastor/src/bdev/nexus/nexus_bdev.rs index 8bd023ef28..4d3b223b47 100644 --- a/mayastor/src/bdev/nexus/nexus_bdev.rs +++ b/mayastor/src/bdev/nexus/nexus_bdev.rs @@ -479,9 +479,6 @@ pub struct Nexus<'n> { pub state: parking_lot::Mutex, /// The offset in blocks where the data partition starts. pub(crate) data_ent_offset: u64, - /// the handle to be used when sharing the nexus, this allows for the bdev - /// to be shared with vbdevs on top - pub(crate) share_handle: Option, /// enum containing the protocol-specific target used to publish the nexus pub nexus_target: Option, /// Indicates if the Nexus has an I/O device. @@ -588,7 +585,6 @@ impl<'n> Nexus<'n> { state: parking_lot::Mutex::new(NexusState::Init), bdev: None, data_ent_offset: 0, - share_handle: None, req_size: size, nexus_target: None, nvme_params, @@ -716,6 +712,11 @@ impl<'n> Nexus<'n> { unsafe { self.bdev().required_alignment() } } + /// TODO + pub fn children_iter(&self) -> std::slice::Iter> { + self.children.iter() + } + /// Reconfigures the child event handler. pub(crate) async fn reconfigure(&self, event: DrEvent) { info!( @@ -879,7 +880,7 @@ impl<'n> Nexus<'n> { pub async fn destroy(mut self: Pin<&mut Self>) -> Result<(), Error> { info!("Destroying nexus {}", self.name); - self.as_mut().destroy_shares().await?; + self.as_mut().unshare_nexus().await?; // wait for all rebuild jobs to be cancelled before proceeding with the // destruction of the nexus @@ -887,19 +888,7 @@ impl<'n> Nexus<'n> { self.cancel_child_rebuild_jobs(child.get_name()).await; } - unsafe { - for child in self.as_mut().get_unchecked_mut().children.iter_mut() { - info!("Destroying child bdev {}", child.get_name()); - if let Err(e) = child.close().await { - // TODO: should an error be returned here? - error!( - "Failed to close child {} with error {}", - child.get_name(), - e.verbose() - ); - } - } - } + self.close_children().await; // Persist the fact that the nexus destruction has completed. self.persist(PersistOp::Shutdown).await; @@ -1287,16 +1276,9 @@ impl<'n> BdevOps for Nexus<'n> { Reactor::block_on(async move { let self_ref = unsafe { &mut *self_ptr }; - for child in self_ref.children.iter_mut() { - if child.state() == ChildState::Open { - if let Err(e) = child.close().await { - error!( - "{}: child {} failed to close with error {}", - self_ref.name, - child.get_name(), - e.verbose() - ); - } + for child in self_ref.children_iter() { + if child.is_healthy() { + child.close().await.ok(); } } @@ -1473,6 +1455,13 @@ async fn nexus_create_internal( children: &[String], nexus_info_key: Option, ) -> Result<(), Error> { + info!( + "Creating new nexus '{}' ({} child(ren): {:?})...", + name, + children.len(), + children + ); + if let Some(nexus) = nexus_lookup_name_uuid(name, nexus_uuid) { // FIXME: Instead of error, we return Ok without checking // that the children match, which seems wrong. diff --git a/mayastor/src/bdev/nexus/nexus_bdev_children.rs b/mayastor/src/bdev/nexus/nexus_bdev_children.rs index 1bc026e2f4..b08ae23838 100644 --- a/mayastor/src/bdev/nexus/nexus_bdev_children.rs +++ b/mayastor/src/bdev/nexus/nexus_bdev_children.rs @@ -25,7 +25,6 @@ use std::{cmp::min, pin::Pin}; -use futures::future::join_all; use snafu::ResultExt; use super::{ @@ -299,17 +298,12 @@ impl<'n> Nexus<'n> { Some(val) => val, }; - unsafe { - if let Err(e) = self.as_mut().get_unchecked_mut().children[idx] - .close() - .await - { - return Err(Error::CloseChild { - name: self.name.clone(), - child: self.children[idx].get_name().to_string(), - source: e, - }); - } + if let Err(e) = self.children[idx].close().await { + return Err(Error::CloseChild { + name: self.name.clone(), + child: self.children[idx].get_name().to_string(), + source: e, + }); } let child_state = self.children[idx].state(); @@ -460,18 +454,25 @@ impl<'n> Nexus<'n> { } } - /// Close each child that belongs to this nexus. - pub(crate) async fn close_children(mut self: Pin<&mut Self>) { - let futures = unsafe { - self.as_mut() - .get_unchecked_mut() - .children - .iter_mut() - .map(|c| c.close()) - }; - let results = join_all(futures).await; - if results.iter().any(|c| c.is_err()) { - error!("{}: Failed to close children", self.name); + /// Unconditionally closes all children of this nexus. + pub(crate) async fn close_children(&self) { + info!( + "{:?}: closing {n} children...", + self, + n = self.children.len() + ); + + let mut failed = 0; + for child in self.children_iter() { + if child.close().await.is_err() { + failed += 1; + } + } + + if failed == 0 { + info!("{:?}: all children closed", self); + } else { + error!("{:?}: failed to close some of the children", self); } } @@ -520,20 +521,9 @@ impl<'n> Nexus<'n> { // completed yet so we fail the registration all together for now. if failed { // Close any children that WERE succesfully opened. - unsafe { - for child in - self.as_mut().get_unchecked_mut().children.iter_mut() - { - if child.state() == ChildState::Open { - if let Err(error) = child.close().await { - error!( - "{}: failed to close child {}: {}", - name, - child.name, - error.verbose() - ); - } - } + for child in self.children_iter() { + if child.is_healthy() { + child.close().await.ok(); } } @@ -563,20 +553,7 @@ impl<'n> Nexus<'n> { } if let Err(error) = write_ex_err { - unsafe { - for child in - self.as_mut().get_unchecked_mut().children.iter_mut() - { - if let Err(error) = child.close().await { - error!( - "{}: child {} failed to close with error {}", - name, - child.name, - error.verbose() - ); - } - } - } + self.close_children().await; return Err(error); } @@ -609,9 +586,9 @@ impl<'n> Nexus<'n> { } /// TODO - pub async fn destroy_child(&self, name: &str) -> Result<(), Error> { + pub async fn close_child(&self, name: &str) -> Result<(), Error> { if let Some(child) = self.lookup_child(name) { - child.destroy().await.map_err(|source| Error::DestroyChild { + child.close().await.map_err(|source| Error::DestroyChild { source, child: name.to_string(), name: self.name.to_string(), diff --git a/mayastor/src/bdev/nexus/nexus_channel.rs b/mayastor/src/bdev/nexus/nexus_channel.rs index e5b2db7304..2c79636b4b 100644 --- a/mayastor/src/bdev/nexus/nexus_channel.rs +++ b/mayastor/src/bdev/nexus/nexus_channel.rs @@ -52,7 +52,12 @@ pub(crate) fn fault_nexus_child(nexus: Pin<&mut Nexus>, name: &str) -> bool { nexus .children .iter() - .filter(|c| c.state() == ChildState::Open) + .filter(|c| { + matches!( + c.state(), + ChildState::Open | ChildState::Faulted(Reason::OutOfSync) + ) + }) .filter(|c| { // If there were previous retires, we do not have a reference // to a BlockDevice. We do however, know it can't be the device @@ -65,11 +70,16 @@ pub(crate) fn fault_nexus_child(nexus: Pin<&mut Nexus>, name: &str) -> bool { } }) .any(|c| { - Ok(ChildState::Open) + Ok(ChildState::Faulted(Reason::OutOfSync)) == c.state.compare_exchange( - ChildState::Open, - ChildState::Faulted(Reason::IoError), + ChildState::Faulted(Reason::OutOfSync), + ChildState::Faulted(Reason::RebuildFailed), ) + || Ok(ChildState::Open) + == c.state.compare_exchange( + ChildState::Open, + ChildState::Faulted(Reason::IoError), + ) }) } diff --git a/mayastor/src/bdev/nexus/nexus_child.rs b/mayastor/src/bdev/nexus/nexus_child.rs index 95e3e119e0..fcc64ae91f 100644 --- a/mayastor/src/bdev/nexus/nexus_child.rs +++ b/mayastor/src/bdev/nexus/nexus_child.rs @@ -4,13 +4,12 @@ use std::{ }; use crossbeam::atomic::AtomicCell; -use futures::{channel::mpsc, SinkExt, StreamExt}; use nix::errno::Errno; use serde::Serialize; use snafu::{ResultExt, Snafu}; use url::Url; -use super::{nexus_iter_mut, nexus_lookup_mut, DrEvent, VerboseError}; +use super::{nexus_iter_mut, nexus_lookup_mut, DrEvent}; use crate::{ bdev::{device_create, device_destroy, device_lookup}, @@ -136,8 +135,6 @@ pub enum ChildState { ConfigInvalid, /// the child is open for RW Open, - /// the child is being destroyed - Destroying, /// the child has been closed by the nexus Closed, /// the child is faulted @@ -151,12 +148,29 @@ impl Display for ChildState { Self::Init => write!(f, "Init"), Self::ConfigInvalid => write!(f, "Config parameters are invalid"), Self::Open => write!(f, "Child is open"), - Self::Destroying => write!(f, "Child is being destroyed"), Self::Closed => write!(f, "Closed"), } } } +/// State of a child device destroy process. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)] +pub(crate) enum ChildDestroyState { + /// The child is not being destroyed. + None, + /// The child device is being destroyed. + Destroying, +} + +impl Display for ChildDestroyState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::None => write!(f, "none"), + Self::Destroying => write!(f, "destroying"), + } + } +} + #[derive(Serialize)] pub struct NexusChild<'c> { /// name of the parent this child belongs too @@ -164,12 +178,12 @@ pub struct NexusChild<'c> { /// current state of the child #[serde(skip_serializing)] pub state: AtomicCell, - /// previous state of the child + /// current state of device destroy process #[serde(skip_serializing)] - prev_state: AtomicCell, + destroy_state: AtomicCell, /// TODO #[serde(skip_serializing)] - remove_channel: (mpsc::Sender<()>, mpsc::Receiver<()>), + remove_channel: (async_channel::Sender<()>, async_channel::Receiver<()>), /// Name of the child is the URI used to create it. /// Note that block device name can differ from it! pub name: String, @@ -191,17 +205,18 @@ impl Debug for NexusChild<'_> { impl Display for NexusChild<'_> { fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> { - match &self.device { - Some(_dev) => writeln!(f, "{}: {:?}", self.name, self.state(),), - None => writeln!(f, "{}: state {:?}", self.name, self.state()), - } + let dest = match self.destroy_state() { + ChildDestroyState::None => "", + ChildDestroyState::Destroying => " (destroying)", + }; + + writeln!(f, "{}: {:?}{}", self.name, self.state(), dest) } } impl<'c> NexusChild<'c> { pub(crate) fn set_state(&self, state: ChildState) { let prev_state = self.state.swap(state); - self.prev_state.store(prev_state); trace!( "{}: child {}: state change from {} to {}", self.parent, @@ -243,16 +258,17 @@ impl<'c> NexusChild<'c> { info!("called open on an already opened child"); return Ok(self.name.clone()); } - ChildState::Destroying => { - error!( - "{}: cannot open child {} being destroyed", - self.parent, self.name - ); - return Err(ChildError::ChildBeingDestroyed {}); - } _ => {} } + if self.is_destroying() { + error!( + "{}: cannot open child {} being destroyed", + self.parent, self.name + ); + return Err(ChildError::ChildBeingDestroyed {}); + } + let dev = self.device.as_ref().unwrap(); let child_size = dev.size_in_bytes(); @@ -283,6 +299,24 @@ impl<'c> NexusChild<'c> { Ok(self.name.clone()) } + /// Returns the destroy state of the child. + #[inline] + pub(crate) fn destroy_state(&self) -> ChildDestroyState { + self.destroy_state.load() + } + + /// Sets the destroy state of the child. + #[inline] + pub(crate) fn set_destroy_state(&self, s: ChildDestroyState) { + self.destroy_state.store(s) + } + + /// Determines if the child is being destroyed. + #[inline] + pub(crate) fn is_destroying(&self) -> bool { + matches!(self.destroy_state.load(), ChildDestroyState::Destroying) + } + /// Check if we're healthy. pub(crate) fn is_healthy(&self) -> bool { self.state() == ChildState::Open @@ -485,14 +519,7 @@ impl<'c> NexusChild<'c> { self.set_state(ChildState::Faulted(reason)); } _ => { - if let Err(e) = self.close().await { - error!( - "{}: child {} failed to close with error {}", - self.parent, - self.name, - e.verbose() - ); - } + self.close().await.ok(); self.set_state(ChildState::Faulted(reason)); } } @@ -500,14 +527,7 @@ impl<'c> NexusChild<'c> { /// Set the child as temporarily offline pub(crate) async fn offline(&mut self) { - if let Err(e) = self.close().await { - error!( - "{}: child {} failed to close with error {}", - self.parent, - self.name, - e.verbose() - ); - } + self.close().await.ok(); } /// Get full name of this Nexus child. @@ -579,10 +599,21 @@ impl<'c> NexusChild<'c> { } /// Close the nexus child. - pub(crate) async fn close(&mut self) -> Result<(), NexusBdevError> { - info!("{}: closing nexus child", self.name); + pub(crate) async fn close(&self) -> Result<(), NexusBdevError> { + info!("{:?}: closing child...", self); + + if self.destroy_state.compare_exchange( + ChildDestroyState::None, + ChildDestroyState::Destroying, + ) != Ok(ChildDestroyState::None) + { + warn!("{:?}: already being closed", self); + return Ok(()); + } + if self.device.is_none() { - info!("{}: nexus child already closed", self.name); + self.set_destroy_state(ChildDestroyState::None); + warn!("{:?}: no block device: appears to be already closed", self); return Ok(()); } @@ -592,19 +623,30 @@ impl<'c> NexusChild<'c> { } // Destruction raises a device removal event. - let destroyed = self.destroy().await; + info!("{:?}: destroying block device...", self); + match device_destroy(&self.name).await { + Ok(_) => { + info!( + "{:?}: block device destroyed, waiting for removal...", + self + ); - // Only wait for block device removal if the child has been initialised. - // An uninitialized child won't have an underlying devices. - // Also check previous state as remove event may not have occurred. - if self.state.load() != ChildState::Init - && self.prev_state.load() != ChildState::Init - { - self.remove_channel.1.next().await; - } + // Only wait for block device removal if the child has been + // initialised. + if self.state.load() != ChildState::Init { + self.remove_channel.1.recv().await.ok(); + } - info!("{}: nexus child closed", self.name); - destroyed + self.set_destroy_state(ChildDestroyState::None); + info!("{:?}: child closed successfully", self); + Ok(()) + } + Err(e) => { + self.set_destroy_state(ChildDestroyState::None); + error!("{:?}: failed to close child: {}", self, e); + Err(e) + } + } } /// Called in response to a device removal event. @@ -615,35 +657,26 @@ impl<'c> NexusChild<'c> { pub(crate) fn remove(&mut self) { info!("{}: removing child", self.name); - let mut state = self.state(); + let state = self.state(); + let is_destroying = self.is_destroying(); - let mut destroying = false; // Only remove the device if the child is being destroyed instead of // a hot remove event. - if state == ChildState::Destroying { + if is_destroying { // Block device is being removed, so ensure we don't use it again. + debug!("{:?}: dropping block device", self); self.device = None; - destroying = true; - - state = self.prev_state.load(); + } else { + debug!("{:?}: hot remove: keeping block device", self); } - match state { - ChildState::Open | ChildState::Faulted(Reason::OutOfSync) => { - // Change the state of the child to ensure it is taken out of - // the I/O path when the nexus is reconfigured. - self.set_state(ChildState::Closed) - } - // leave the state into whatever we found it as - _ => { - if destroying { - // Restore the previous state - info!( - "Restoring previous child state {}", - state.to_string() - ); - self.set_state(state); - } - } + + if matches!( + state, + ChildState::Open | ChildState::Faulted(Reason::OutOfSync) + ) { + // Change the state of the child to ensure it is taken out of + // the I/O path when the nexus is reconfigured. + self.set_state(ChildState::Closed); } // Remove the child from the I/O path. If we had an IO error the block @@ -662,7 +695,7 @@ impl<'c> NexusChild<'c> { }); } - if destroying { + if is_destroying { // Dropping the last descriptor results in the device being removed. // This must be performed in this function. self.device_descriptor.take(); @@ -674,7 +707,7 @@ impl<'c> NexusChild<'c> { /// Signal that the child removal is complete. fn remove_complete(&self) { - let mut sender = self.remove_channel.0.clone(); + let sender = self.remove_channel.0.clone(); let name = self.name.clone(); Reactors::current().send_future(async move { if let Err(e) = sender.send(()).await { @@ -703,26 +736,12 @@ impl<'c> NexusChild<'c> { parent, device_descriptor: None, state: AtomicCell::new(ChildState::Init), - prev_state: AtomicCell::new(ChildState::Init), - remove_channel: mpsc::channel(0), + destroy_state: AtomicCell::new(ChildDestroyState::None), + remove_channel: async_channel::bounded(1), _c: Default::default(), } } - /// destroy the child device - pub async fn destroy(&self) -> Result<(), NexusBdevError> { - if self.device.is_some() { - self.set_state(ChildState::Destroying); - info!("{}: destroying underlying block device", self.name); - device_destroy(&self.name).await?; - info!("{}: underlying block device destroyed", self.name); - } else { - warn!("{}: no underlying block device", self.name); - } - - Ok(()) - } - /// Return reference to child's block device. pub fn get_device(&self) -> Result<&dyn BlockDevice, ChildError> { if let Some(ref device) = self.device { diff --git a/mayastor/src/bdev/nexus/nexus_io_subsystem.rs b/mayastor/src/bdev/nexus/nexus_io_subsystem.rs index 04f6162417..a6d09f0dde 100644 --- a/mayastor/src/bdev/nexus/nexus_io_subsystem.rs +++ b/mayastor/src/bdev/nexus/nexus_io_subsystem.rs @@ -90,7 +90,13 @@ impl<'n> NexusIoSubsystem<'n> { NvmfSubsystem::nqn_lookup(&self.name) { trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "pausing subsystem"); - subsystem.pause().await.unwrap(); + if let Err(e) = subsystem.pause().await { + panic!( + "Failed to pause subsystem '{}: {}", + subsystem.get_nqn(), + e + ); + } trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem paused"); } } @@ -183,7 +189,13 @@ impl<'n> NexusIoSubsystem<'n> { self.pause_state .store(NexusPauseState::Unpausing); trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "resuming subsystem"); - subsystem.resume().await.unwrap(); + if let Err(e) = subsystem.resume().await { + panic!( + "Failed to resume subsystem '{}: {}", + subsystem.get_nqn(), + e + ); + } trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem resumed"); } self.pause_state.store(NexusPauseState::Unpaused); diff --git a/mayastor/src/bdev/nexus/nexus_share.rs b/mayastor/src/bdev/nexus/nexus_share.rs index 4312443c3f..34092053c8 100644 --- a/mayastor/src/bdev/nexus/nexus_share.rs +++ b/mayastor/src/bdev/nexus/nexus_share.rs @@ -14,7 +14,6 @@ use super::{ use crate::core::{Protocol, Share}; -#[async_trait(? Send)] /// /// The sharing of the nexus is different compared to regular bdevs /// the Impl of ['Share'] handles this accordingly @@ -23,6 +22,7 @@ use crate::core::{Protocol, Share}; /// endpoints (not targets) however, we want to avoid too many /// protocol specifics and for bdevs the need for different endpoints /// is not implemented yet as the need for it has not arrived yet. +#[async_trait(? Send)] impl<'n> Share for Nexus<'n> { type Error = Error; type Output = String; @@ -31,8 +31,10 @@ impl<'n> Share for Nexus<'n> { mut self: Pin<&mut Self>, cntlid_range: Option<(u16, u16)>, ) -> Result { - match self.shared() { + let uri = match self.shared() { Some(Protocol::Off) | None => { + info!("{:?}: sharing NVMF target...", self); + let name = self.name.clone(); self.as_mut() .pin_bdev_mut() @@ -41,20 +43,37 @@ impl<'n> Share for Nexus<'n> { .context(ShareNvmfNexus { name, })?; + + let uri = self.share_uri().unwrap(); + info!("{:?}: shared NVMF target as '{}'", self, uri); + uri } - Some(Protocol::Nvmf) => {} - } - Ok(self.share_uri().unwrap()) + Some(Protocol::Nvmf) => { + let uri = self.share_uri().unwrap(); + info!("{:?}: already shared as '{}'", self, uri); + uri + } + }; + + Ok(uri) } /// TODO - async fn unshare( - self: Pin<&mut Self>, - ) -> Result { + async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { + info!("{:?}: unsharing nexus bdev...", self); + let name = self.name.clone(); - self.pin_bdev_mut().unshare().await.context(UnshareNexus { - name, - }) + self.as_mut() + .pin_bdev_mut() + .unshare() + .await + .context(UnshareNexus { + name, + })?; + + info!("{:?}: unshared nexus bdev", self); + + Ok(()) } /// TODO @@ -144,33 +163,21 @@ impl<'n> Nexus<'n> { /// TODO pub async fn unshare_nexus(mut self: Pin<&mut Self>) -> Result<(), Error> { - unsafe { - match self.as_mut().get_unchecked_mut().nexus_target.take() { - Some(NexusTarget::NbdDisk(disk)) => { - disk.destroy(); - } - Some(NexusTarget::NexusNvmfTarget) => { - self.as_mut().unshare().await?; - } - None => { - warn!("{} was not shared", self.name); - } + match unsafe { self.as_mut().get_unchecked_mut().nexus_target.take() } { + Some(NexusTarget::NbdDisk(disk)) => { + info!("{:?}: destroying NBD device target...", self); + disk.destroy(); + } + Some(NexusTarget::NexusNvmfTarget) => { + info!("{:?}: unsharing NVMF target...", self); + } + None => { + // Try unshare nexus bdev anyway, just in case it was shared + // via bdev API. It is no-op if bdev was not shared. } } - Ok(()) - } - - /// Shutdowns all shares. - pub(crate) async fn destroy_shares( - mut self: Pin<&mut Self>, - ) -> Result<(), Error> { - let _ = self.as_mut().unshare_nexus().await; - assert_eq!(self.share_handle, None); - - // no-op when not shared and will be removed once the old share bits are - // gone. Ignore device name provided in case of successful unsharing. - self.as_mut().unshare().await.map(|_| ()) + self.as_mut().unshare().await } /// TODO diff --git a/mayastor/src/core/bdev.rs b/mayastor/src/core/bdev.rs index 894801cff2..55ece811ab 100644 --- a/mayastor/src/core/bdev.rs +++ b/mayastor/src/core/bdev.rs @@ -199,21 +199,18 @@ where } /// unshare the bdev regardless of current active share - async fn unshare( - self: Pin<&mut Self>, - ) -> Result { + async fn unshare(self: Pin<&mut Self>) -> Result<(), Self::Error> { match self.shared() { Some(Protocol::Nvmf) => { - if let Some(subsystem) = NvmfSubsystem::nqn_lookup(self.name()) - { - subsystem.stop().await.context(UnshareNvmf {})?; - subsystem.destroy(); + if let Some(ss) = NvmfSubsystem::nqn_lookup(self.name()) { + ss.stop().await.context(UnshareNvmf {})?; + ss.destroy(); } } Some(Protocol::Off) | None => {} } - Ok(self.name().to_string()) + Ok(()) } /// returns if the bdev is currently shared diff --git a/mayastor/src/core/mod.rs b/mayastor/src/core/mod.rs index 2d111db9b5..ea2952b67f 100644 --- a/mayastor/src/core/mod.rs +++ b/mayastor/src/core/mod.rs @@ -252,7 +252,7 @@ pub async fn device_monitor() { if let Some(n) = crate::bdev::nexus::nexus_lookup_mut(&nexus) { - if let Err(e) = n.destroy_child(&child).await { + if let Err(e) = n.close_child(&child).await { error!(?e, "destroy child failed"); } } diff --git a/mayastor/src/core/share.rs b/mayastor/src/core/share.rs index 2262da574a..dc667e13ed 100644 --- a/mayastor/src/core/share.rs +++ b/mayastor/src/core/share.rs @@ -49,8 +49,7 @@ pub trait Share: std::fmt::Debug { ) -> Result; /// TODO - async fn unshare(self: Pin<&mut Self>) - -> Result; + async fn unshare(self: Pin<&mut Self>) -> Result<(), Self::Error>; /// TODO fn shared(&self) -> Option; diff --git a/mayastor/src/grpc/nexus_grpc.rs b/mayastor/src/grpc/nexus_grpc.rs index cf249fa15b..86d2a05baa 100644 --- a/mayastor/src/grpc/nexus_grpc.rs +++ b/mayastor/src/grpc/nexus_grpc.rs @@ -29,7 +29,6 @@ impl From for rpc::ChildState { ChildState::Init => rpc::ChildState::ChildDegraded, ChildState::ConfigInvalid => rpc::ChildState::ChildFaulted, ChildState::Open => rpc::ChildState::ChildOnline, - ChildState::Destroying => rpc::ChildState::ChildDegraded, ChildState::Closed => rpc::ChildState::ChildDegraded, ChildState::Faulted(reason) => match reason { Reason::OutOfSync => rpc::ChildState::ChildDegraded, diff --git a/mayastor/src/grpc/v1/nexus.rs b/mayastor/src/grpc/v1/nexus.rs index a2e17c3690..fde65edd84 100644 --- a/mayastor/src/grpc/v1/nexus.rs +++ b/mayastor/src/grpc/v1/nexus.rs @@ -166,7 +166,6 @@ impl From for ChildState { nexus::ChildState::Init => ChildState::ChildDegraded, nexus::ChildState::ConfigInvalid => ChildState::ChildFaulted, nexus::ChildState::Open => ChildState::ChildOnline, - nexus::ChildState::Destroying => ChildState::ChildDegraded, nexus::ChildState::Closed => ChildState::ChildDegraded, nexus::ChildState::Faulted(reason) => match reason { Reason::OutOfSync => ChildState::ChildDegraded, diff --git a/mayastor/src/lvs/lvol.rs b/mayastor/src/lvs/lvol.rs index 6c35947ef8..3b1ee89d48 100644 --- a/mayastor/src/lvs/lvol.rs +++ b/mayastor/src/lvs/lvol.rs @@ -134,20 +134,18 @@ impl Share for Lvol { } /// unshare the nvmf target - async fn unshare( - mut self: Pin<&mut Self>, - ) -> Result { - let share = - Pin::new(&mut self.as_bdev()).unshare().await.map_err(|e| { - Error::LvolUnShare { - source: e, - name: self.name(), - } - })?; + async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { + Pin::new(&mut self.as_bdev()).unshare().await.map_err(|e| { + Error::LvolUnShare { + source: e, + name: self.name(), + } + })?; self.as_mut().set(PropValue::Shared(false)).await?; + info!("unshared {}", self); - Ok(share) + Ok(()) } /// return the protocol this bdev is shared under diff --git a/mayastor/src/subsys/nvmf/mod.rs b/mayastor/src/subsys/nvmf/mod.rs index 744f2b3660..278afbd760 100644 --- a/mayastor/src/subsys/nvmf/mod.rs +++ b/mayastor/src/subsys/nvmf/mod.rs @@ -63,6 +63,12 @@ pub enum Error { PgError { msg: String }, #[snafu(display("Failed to create transport {}", msg))] Transport { source: Errno, msg: String }, + #[snafu(display( + "Failed to {} subsystem '{}': subsystem is busy", + op, + nqn + ))] + SubsystemBusy { nqn: String, op: String }, #[snafu(display("Failed nvmf subsystem operation for {} {} error: {}", source.desc(), nqn, msg))] Subsystem { source: Errno, diff --git a/mayastor/src/subsys/nvmf/subsystem.rs b/mayastor/src/subsys/nvmf/subsystem.rs index 6c7571be9b..9ace34e47b 100644 --- a/mayastor/src/subsys/nvmf/subsystem.rs +++ b/mayastor/src/subsys/nvmf/subsystem.rs @@ -34,6 +34,7 @@ use spdk_rs::libspdk::{ spdk_nvmf_subsystem_set_mn, spdk_nvmf_subsystem_set_sn, spdk_nvmf_subsystem_start, + spdk_nvmf_subsystem_state_change_done, spdk_nvmf_subsystem_stop, spdk_nvmf_tgt, SPDK_NVMF_SUBTYPE_DISCOVERY, @@ -340,188 +341,131 @@ impl NvmfSubsystem { }) } - /// start the subsystem previously created -- note that we destroy it on - /// failure to ensure the state is not in limbo and to avoid leaking - /// resources - pub async fn start(self) -> Result { - extern "C" fn start_cb( - ss: *mut spdk_nvmf_subsystem, + /// TODO + async fn change_state( + &self, + op: &str, + f: impl Fn( + *mut spdk_nvmf_subsystem, + spdk_nvmf_subsystem_state_change_done, + *mut c_void, + ) -> i32, + ) -> Result<(), Error> { + extern "C" fn state_change_cb( + _ss: *mut spdk_nvmf_subsystem, arg: *mut c_void, status: i32, ) { let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed start subsystem state {} -- destroying it", - ss.get_nqn() - ); - ss.destroy(); - } - s.send(status).unwrap(); } - self.add_listener().await?; + info!(?self, "Subsystem {} in progress...", op); - let (s, r) = oneshot::channel::(); + let res = { + let mut n = 0; - unsafe { - spdk_nvmf_subsystem_start( - self.0.as_ptr(), - Some(start_cb), - cb_arg(s), - ) - } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "out of memory".to_string(), - })?; + let (rc, r) = loop { + let (s, r) = oneshot::channel::(); - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to start the subsystem".to_string(), - })?; + let rc = -f(self.0.as_ptr(), Some(state_change_cb), cb_arg(s)); - debug!(?self, "shared"); - Ok(self.get_nqn()) - } + if rc != libc::EBUSY || n >= 3 { + break (rc, r); + } - /// stop the subsystem - pub async fn stop(&self) -> Result<(), Error> { - extern "C" fn stop_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; + n += 1; - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to STOP", - ss.get_nqn() + warn!( + "Failed to {} '{}': subsystem is busy, retrying {}...", + op, + self.get_nqn(), + n ); - } - s.send(status).unwrap(); - } + crate::sleep::mayastor_sleep(std::time::Duration::from_millis( + 100, + )) + .await + .unwrap(); + }; - let (s, r) = oneshot::channel::(); - debug!("stopping {:?}", self); - unsafe { - spdk_nvmf_subsystem_stop(self.0.as_ptr(), Some(stop_cb), cb_arg(s)) - } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "out of memory".to_string(), - })?; + match rc { + 0 => r.await.unwrap().to_result(|e| Error::Subsystem { + source: Errno::from_i32(e), + nqn: self.get_nqn(), + msg: format!("{} failed", op), + }), + libc::EBUSY => Err(Error::SubsystemBusy { + nqn: self.get_nqn(), + op: op.to_owned(), + }), + e => Err(Error::Subsystem { + source: Errno::from_i32(e), + nqn: self.get_nqn(), + msg: format!("failed to initiate {}", op), + }), + } + }; - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to stop the subsystem".to_string(), - })?; + if let Err(ref e) = res { + error!(?self, "Subsystem {} failed: {}", op, e.to_string()); + } else { + info!(?self, "Subsystem {} completed: Ok", op); + } - debug!("stopped {}", self.get_nqn()); - Ok(()) + res } - /// transition the subsystem to paused state - /// intended to be a temporary state while changes are made - pub async fn pause(&self) -> Result<(), Error> { - extern "C" fn pause_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to pause", - ss.get_nqn() - ); - } + /// start the subsystem previously created -- note that we destroy it on + /// failure to ensure the state is not in limbo and to avoid leaking + /// resources + pub async fn start(self) -> Result { + self.add_listener().await?; - s.send(status).unwrap(); - } + if let Err(e) = self + .change_state("start", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_start(ss, cb, arg) + }) + .await + { + error!( + "Failed to start subsystem '{}': {}; destroying it", + self.get_nqn(), + e.to_string(), + ); - let (s, r) = oneshot::channel::(); + self.destroy(); - unsafe { - spdk_nvmf_subsystem_pause( - self.0.as_ptr(), - 1, - Some(pause_cb), - cb_arg(s), - ) + Err(e) + } else { + Ok(self.get_nqn()) } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(-e), - nqn: self.get_nqn(), - msg: format!("subsystem_pause returned: {}", e), - })?; + } - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to pause the subsystem".to_string(), + /// stop the subsystem + pub async fn stop(&self) -> Result<(), Error> { + self.change_state("stop", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_stop(ss, cb, arg) }) + .await + } + + /// transition the subsystem to paused state + /// intended to be a temporary state while changes are made + pub async fn pause(&self) -> Result<(), Error> { + self.change_state("pause", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_pause(ss, 1, cb, arg) + }) + .await } /// transition the subsystem to active state pub async fn resume(&self) -> Result<(), Error> { - extern "C" fn resume_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to RESUME", - ss.get_nqn() - ); - } - - s.send(status).unwrap(); - } - - let (s, r) = oneshot::channel::(); - - let mut rc = unsafe { - spdk_nvmf_subsystem_resume( - self.0.as_ptr(), - Some(resume_cb), - cb_arg(s), - ) - }; - - if rc != 0 { - return Err(Error::Subsystem { - source: Errno::from_i32(-rc), - nqn: self.get_nqn(), - msg: format!("subsystem_resume returned: {}", rc), - }); - } - - rc = r.await.unwrap(); - if rc != 0 { - Err(Error::Subsystem { - source: Errno::UnknownErrno, - nqn: self.get_nqn(), - msg: "failed to resume the subsystem".to_string(), - }) - } else { - Ok(()) - } + self.change_state("resume", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_resume(ss, cb, arg) + }) + .await } /// get ANA state @@ -587,13 +531,20 @@ impl NvmfSubsystem { /// stop all subsystems pub async fn stop_all(tgt: *mut spdk_nvmf_tgt) { - let ss = unsafe { - NvmfSubsystem( - NonNull::new(spdk_nvmf_subsystem_get_first(tgt)).unwrap(), - ) + let subsystem = unsafe { + NonNull::new(spdk_nvmf_subsystem_get_first(tgt)).map(NvmfSubsystem) }; - for s in ss.into_iter() { - s.stop().await.unwrap(); + + if let Some(subsystem) = subsystem { + for s in subsystem.into_iter() { + if let Err(e) = s.stop().await { + error!( + "Failed to stop subsystem '{}': {}", + s.get_nqn(), + e.to_string() + ); + } + } } } diff --git a/mayastor/src/subsys/nvmf/target.rs b/mayastor/src/subsys/nvmf/target.rs index 5f86cc7ac5..af9a43cbfd 100644 --- a/mayastor/src/subsys/nvmf/target.rs +++ b/mayastor/src/subsys/nvmf/target.rs @@ -296,7 +296,10 @@ impl Target { discovery.allow_any(true); Reactor::block_on(async { - let _ = discovery.start().await.unwrap(); + let nqn = discovery.get_nqn(); + if let Err(e) = discovery.start().await { + error!("Error starting subsystem '{}': {}", nqn, e.to_string()); + } }); } diff --git a/scripts/pytest-tests.sh b/scripts/pytest-tests.sh index 2f003cfce3..21dc36fd66 100755 --- a/scripts/pytest-tests.sh +++ b/scripts/pytest-tests.sh @@ -26,7 +26,7 @@ function run_tests() ( set -x base=$(dirname "$name") - python -m pytest --tc-file='test_config.ini' --docker-compose="$base" "$name" + python -m pytest --tc-file='test_config.ini' --docker-compose="$base" "$name" -svv ) fi done diff --git a/test/python/common/hdl.py b/test/python/common/hdl.py index ed193d3459..f401897a25 100644 --- a/test/python/common/hdl.py +++ b/test/python/common/hdl.py @@ -1,4 +1,5 @@ """Common code that represents a mayastor handle.""" +from urllib.parse import urlparse import mayastor_pb2 as pb import grpc import mayastor_pb2_grpc as rpc @@ -132,8 +133,14 @@ def replica_list_v2(self): def nexus_create(self, uuid, size, children): """Create a nexus with the given uuid and size. The children should be an array of nvmf URIs.""" + children_ = [] + for child in children: + u = urlparse(child) + host = u.hostname + if host != self.ip_v4: + children_.append(child) return self.ms.CreateNexus( - pb.CreateNexusRequest(uuid=str(uuid), size=size, children=children) + pb.CreateNexusRequest(uuid=str(uuid), size=size, children=children_) ) def nexus_create_v2( @@ -178,7 +185,7 @@ def nexus_list_v2(self): """List all the nexus devices, with separate name and uuid.""" return self.ms.ListNexusV2(pb.Null()).nexus_list - def nexus_add_replica(self, uuid, uri, norebuild): + def nexus_add_replica(self, uuid, uri, norebuild=False): """Add a new replica to the nexus""" return self.ms.AddChildNexus( pb.AddChildNexusRequest(uuid=uuid, uri=uri, norebuild=norebuild) diff --git a/test/python/tests/nexus/test_nexus_rebuild.py b/test/python/tests/nexus/test_nexus_rebuild.py new file mode 100644 index 0000000000..a39f8e255b --- /dev/null +++ b/test/python/tests/nexus/test_nexus_rebuild.py @@ -0,0 +1,124 @@ +from common.hdl import MayastorHandle +from common.command import run_cmd, run_cmd_async +from common.nvme import nvme_connect, nvme_disconnect +from common.fio import Fio +from common.fio_spdk import FioSpdk +from common.mayastor import containers, mayastors, create_temp_files, check_size +import pytest +import asyncio +import uuid as guid +import time +import subprocess +import mayastor_pb2 as pb + +NEXUS_COUNT = 10 +NEXUS_SIZE = 500 * 1024 * 1024 +REPL_SIZE = NEXUS_SIZE +POOL_SIZE = REPL_SIZE * NEXUS_COUNT + 100 * 1024 * 1024 + + +@pytest.fixture +def local_files(mayastors): + files = [] + for name, ms in mayastors.items(): + path = f"/tmp/disk-{name}.img" + pool_size_mb = int(POOL_SIZE / 1024 / 1024) + subprocess.run( + ["sudo", "sh", "-c", f"rm -f '{path}'; truncate -s {pool_size_mb}M '{path}'"], + check=True, + ) + files.append(path) + + yield + for path in files: + subprocess.run(["sudo", "rm", "-f", path], check=True) + + +@pytest.fixture +def create_replicas_on_all_nodes(local_files, mayastors, create_temp_files): + uuids = [] + + for name, ms in mayastors.items(): + ms.pool_create(name, f"aio:///tmp/disk-{name}.img") + # verify we have zero replicas + assert len(ms.replica_list().replicas) == 0 + + for i in range(NEXUS_COUNT): + uuid = guid.uuid4() + for name, ms in mayastors.items(): + before = ms.pool_list() + ms.replica_create(name, uuid, REPL_SIZE) + after = ms.pool_list() + uuids.append(uuid) + + yield uuids + + +@pytest.fixture +def create_nexuses(mayastors, create_replicas_on_all_nodes): + nexuses = [] + nexuses_uris = [] + + uris = [ + [replica.uri for replica in mayastors.get(node).replica_list().replicas] + for node in ["ms1", "ms2", "ms3"] + ] + + ms = mayastors.get("ms0") + for children in zip(*uris): + uuid = guid.uuid4() + nexus = ms.nexus_create(uuid, NEXUS_SIZE, list(children)) + nexuses.append(nexus) + nexuses_uris.append(ms.nexus_publish(uuid)) + + yield nexuses + + +@pytest.mark.parametrize("times", range(10)) +def test_rebuild_failure(containers, mayastors, times, create_nexuses): + ms0 = mayastors.get("ms0") + ms3 = mayastors.get("ms3") + + # Restart container with replica #3 (ms3). + node3 = containers.get("ms3") + node3.stop() + time.sleep(5) + node3.start() + + # Reconnect ms3, and import the existing pool. + ms3.reconnect() + ms3.pool_create("ms1", "aio:///tmp/disk-ms1.img") + time.sleep(1) + + # Add the replicas to the nexuses for rebuild. + for (idx, nexus) in enumerate(ms0.nexus_list()): + child = list(filter(lambda child: child.state == pb.CHILD_FAULTED, list(nexus.children)))[0] + if nexus.state != pb.NEXUS_FAULTED: + try: + ms0.nexus_remove_replica(nexus.uuid, child.uri) + ms0.nexus_add_replica(nexus.uuid, child.uri) + except: + print(f"Failed to remove child {child.uri} from {nexus}") + + time.sleep(5) + + rebuilds = 0 + for nexus in ms0.nexus_list(): + for child in nexus.children: + if child.rebuild_progress > -1: + rebuilds += 1 + print("nexus", nexus.uuid, "rebuilding", child.uri, f"{child.rebuild_progress}") + + assert rebuilds > 0 + + # Stop ms3 again. Rebuild jobs in progress must terminate. + node3.stop() + + time.sleep(10) + + # All rebuild jobs must finish. + for nexus in ms0.nexus_list(): + for child in nexus.children: + assert child.rebuild_progress == -1 + ms0.nexus_destroy(nexus.uuid) +