From 123a00420084c2acba0ccee980d09622c2e2d267 Mon Sep 17 00:00:00 2001 From: Dmitry Savitskiy Date: Tue, 27 Jun 2023 19:51:21 +0300 Subject: [PATCH] fix(nexus): fixing nexus unshare and nvmf error handling NVMF subsystem API and Nexus unshare had some issues with error handling, which could potentially lead to use-after-free. Signed-off-by: Dmitry Savitskiy --- mayastor/src/bdev/nexus/nexus_bdev.rs | 13 +- mayastor/src/bdev/nexus/nexus_io_subsystem.rs | 16 +- mayastor/src/bdev/nexus/nexus_share.rs | 77 ++--- mayastor/src/core/bdev.rs | 13 +- mayastor/src/core/share.rs | 3 +- mayastor/src/lvs/lvol.rs | 20 +- mayastor/src/subsys/nvmf/mod.rs | 6 + mayastor/src/subsys/nvmf/subsystem.rs | 267 +++++++----------- mayastor/src/subsys/nvmf/target.rs | 5 +- 9 files changed, 198 insertions(+), 222 deletions(-) diff --git a/mayastor/src/bdev/nexus/nexus_bdev.rs b/mayastor/src/bdev/nexus/nexus_bdev.rs index 8bd023ef28..743a555044 100644 --- a/mayastor/src/bdev/nexus/nexus_bdev.rs +++ b/mayastor/src/bdev/nexus/nexus_bdev.rs @@ -479,9 +479,6 @@ pub struct Nexus<'n> { pub state: parking_lot::Mutex, /// The offset in blocks where the data partition starts. pub(crate) data_ent_offset: u64, - /// the handle to be used when sharing the nexus, this allows for the bdev - /// to be shared with vbdevs on top - pub(crate) share_handle: Option, /// enum containing the protocol-specific target used to publish the nexus pub nexus_target: Option, /// Indicates if the Nexus has an I/O device. @@ -588,7 +585,6 @@ impl<'n> Nexus<'n> { state: parking_lot::Mutex::new(NexusState::Init), bdev: None, data_ent_offset: 0, - share_handle: None, req_size: size, nexus_target: None, nvme_params, @@ -879,7 +875,7 @@ impl<'n> Nexus<'n> { pub async fn destroy(mut self: Pin<&mut Self>) -> Result<(), Error> { info!("Destroying nexus {}", self.name); - self.as_mut().destroy_shares().await?; + self.as_mut().unshare_nexus().await?; // wait for all rebuild jobs to be cancelled before proceeding with the // destruction of the nexus @@ -1473,6 +1469,13 @@ async fn nexus_create_internal( children: &[String], nexus_info_key: Option, ) -> Result<(), Error> { + info!( + "Creating new nexus '{}' ({} child(ren): {:?})...", + name, + children.len(), + children + ); + if let Some(nexus) = nexus_lookup_name_uuid(name, nexus_uuid) { // FIXME: Instead of error, we return Ok without checking // that the children match, which seems wrong. diff --git a/mayastor/src/bdev/nexus/nexus_io_subsystem.rs b/mayastor/src/bdev/nexus/nexus_io_subsystem.rs index 04f6162417..a6d09f0dde 100644 --- a/mayastor/src/bdev/nexus/nexus_io_subsystem.rs +++ b/mayastor/src/bdev/nexus/nexus_io_subsystem.rs @@ -90,7 +90,13 @@ impl<'n> NexusIoSubsystem<'n> { NvmfSubsystem::nqn_lookup(&self.name) { trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "pausing subsystem"); - subsystem.pause().await.unwrap(); + if let Err(e) = subsystem.pause().await { + panic!( + "Failed to pause subsystem '{}: {}", + subsystem.get_nqn(), + e + ); + } trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem paused"); } } @@ -183,7 +189,13 @@ impl<'n> NexusIoSubsystem<'n> { self.pause_state .store(NexusPauseState::Unpausing); trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "resuming subsystem"); - subsystem.resume().await.unwrap(); + if let Err(e) = subsystem.resume().await { + panic!( + "Failed to resume subsystem '{}: {}", + subsystem.get_nqn(), + e + ); + } trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem resumed"); } self.pause_state.store(NexusPauseState::Unpaused); diff --git a/mayastor/src/bdev/nexus/nexus_share.rs b/mayastor/src/bdev/nexus/nexus_share.rs index 4312443c3f..34092053c8 100644 --- a/mayastor/src/bdev/nexus/nexus_share.rs +++ b/mayastor/src/bdev/nexus/nexus_share.rs @@ -14,7 +14,6 @@ use super::{ use crate::core::{Protocol, Share}; -#[async_trait(? Send)] /// /// The sharing of the nexus is different compared to regular bdevs /// the Impl of ['Share'] handles this accordingly @@ -23,6 +22,7 @@ use crate::core::{Protocol, Share}; /// endpoints (not targets) however, we want to avoid too many /// protocol specifics and for bdevs the need for different endpoints /// is not implemented yet as the need for it has not arrived yet. +#[async_trait(? Send)] impl<'n> Share for Nexus<'n> { type Error = Error; type Output = String; @@ -31,8 +31,10 @@ impl<'n> Share for Nexus<'n> { mut self: Pin<&mut Self>, cntlid_range: Option<(u16, u16)>, ) -> Result { - match self.shared() { + let uri = match self.shared() { Some(Protocol::Off) | None => { + info!("{:?}: sharing NVMF target...", self); + let name = self.name.clone(); self.as_mut() .pin_bdev_mut() @@ -41,20 +43,37 @@ impl<'n> Share for Nexus<'n> { .context(ShareNvmfNexus { name, })?; + + let uri = self.share_uri().unwrap(); + info!("{:?}: shared NVMF target as '{}'", self, uri); + uri } - Some(Protocol::Nvmf) => {} - } - Ok(self.share_uri().unwrap()) + Some(Protocol::Nvmf) => { + let uri = self.share_uri().unwrap(); + info!("{:?}: already shared as '{}'", self, uri); + uri + } + }; + + Ok(uri) } /// TODO - async fn unshare( - self: Pin<&mut Self>, - ) -> Result { + async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { + info!("{:?}: unsharing nexus bdev...", self); + let name = self.name.clone(); - self.pin_bdev_mut().unshare().await.context(UnshareNexus { - name, - }) + self.as_mut() + .pin_bdev_mut() + .unshare() + .await + .context(UnshareNexus { + name, + })?; + + info!("{:?}: unshared nexus bdev", self); + + Ok(()) } /// TODO @@ -144,33 +163,21 @@ impl<'n> Nexus<'n> { /// TODO pub async fn unshare_nexus(mut self: Pin<&mut Self>) -> Result<(), Error> { - unsafe { - match self.as_mut().get_unchecked_mut().nexus_target.take() { - Some(NexusTarget::NbdDisk(disk)) => { - disk.destroy(); - } - Some(NexusTarget::NexusNvmfTarget) => { - self.as_mut().unshare().await?; - } - None => { - warn!("{} was not shared", self.name); - } + match unsafe { self.as_mut().get_unchecked_mut().nexus_target.take() } { + Some(NexusTarget::NbdDisk(disk)) => { + info!("{:?}: destroying NBD device target...", self); + disk.destroy(); + } + Some(NexusTarget::NexusNvmfTarget) => { + info!("{:?}: unsharing NVMF target...", self); + } + None => { + // Try unshare nexus bdev anyway, just in case it was shared + // via bdev API. It is no-op if bdev was not shared. } } - Ok(()) - } - - /// Shutdowns all shares. - pub(crate) async fn destroy_shares( - mut self: Pin<&mut Self>, - ) -> Result<(), Error> { - let _ = self.as_mut().unshare_nexus().await; - assert_eq!(self.share_handle, None); - - // no-op when not shared and will be removed once the old share bits are - // gone. Ignore device name provided in case of successful unsharing. - self.as_mut().unshare().await.map(|_| ()) + self.as_mut().unshare().await } /// TODO diff --git a/mayastor/src/core/bdev.rs b/mayastor/src/core/bdev.rs index 894801cff2..55ece811ab 100644 --- a/mayastor/src/core/bdev.rs +++ b/mayastor/src/core/bdev.rs @@ -199,21 +199,18 @@ where } /// unshare the bdev regardless of current active share - async fn unshare( - self: Pin<&mut Self>, - ) -> Result { + async fn unshare(self: Pin<&mut Self>) -> Result<(), Self::Error> { match self.shared() { Some(Protocol::Nvmf) => { - if let Some(subsystem) = NvmfSubsystem::nqn_lookup(self.name()) - { - subsystem.stop().await.context(UnshareNvmf {})?; - subsystem.destroy(); + if let Some(ss) = NvmfSubsystem::nqn_lookup(self.name()) { + ss.stop().await.context(UnshareNvmf {})?; + ss.destroy(); } } Some(Protocol::Off) | None => {} } - Ok(self.name().to_string()) + Ok(()) } /// returns if the bdev is currently shared diff --git a/mayastor/src/core/share.rs b/mayastor/src/core/share.rs index 2262da574a..dc667e13ed 100644 --- a/mayastor/src/core/share.rs +++ b/mayastor/src/core/share.rs @@ -49,8 +49,7 @@ pub trait Share: std::fmt::Debug { ) -> Result; /// TODO - async fn unshare(self: Pin<&mut Self>) - -> Result; + async fn unshare(self: Pin<&mut Self>) -> Result<(), Self::Error>; /// TODO fn shared(&self) -> Option; diff --git a/mayastor/src/lvs/lvol.rs b/mayastor/src/lvs/lvol.rs index 6c35947ef8..3b1ee89d48 100644 --- a/mayastor/src/lvs/lvol.rs +++ b/mayastor/src/lvs/lvol.rs @@ -134,20 +134,18 @@ impl Share for Lvol { } /// unshare the nvmf target - async fn unshare( - mut self: Pin<&mut Self>, - ) -> Result { - let share = - Pin::new(&mut self.as_bdev()).unshare().await.map_err(|e| { - Error::LvolUnShare { - source: e, - name: self.name(), - } - })?; + async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { + Pin::new(&mut self.as_bdev()).unshare().await.map_err(|e| { + Error::LvolUnShare { + source: e, + name: self.name(), + } + })?; self.as_mut().set(PropValue::Shared(false)).await?; + info!("unshared {}", self); - Ok(share) + Ok(()) } /// return the protocol this bdev is shared under diff --git a/mayastor/src/subsys/nvmf/mod.rs b/mayastor/src/subsys/nvmf/mod.rs index 744f2b3660..278afbd760 100644 --- a/mayastor/src/subsys/nvmf/mod.rs +++ b/mayastor/src/subsys/nvmf/mod.rs @@ -63,6 +63,12 @@ pub enum Error { PgError { msg: String }, #[snafu(display("Failed to create transport {}", msg))] Transport { source: Errno, msg: String }, + #[snafu(display( + "Failed to {} subsystem '{}': subsystem is busy", + op, + nqn + ))] + SubsystemBusy { nqn: String, op: String }, #[snafu(display("Failed nvmf subsystem operation for {} {} error: {}", source.desc(), nqn, msg))] Subsystem { source: Errno, diff --git a/mayastor/src/subsys/nvmf/subsystem.rs b/mayastor/src/subsys/nvmf/subsystem.rs index 6c7571be9b..9ace34e47b 100644 --- a/mayastor/src/subsys/nvmf/subsystem.rs +++ b/mayastor/src/subsys/nvmf/subsystem.rs @@ -34,6 +34,7 @@ use spdk_rs::libspdk::{ spdk_nvmf_subsystem_set_mn, spdk_nvmf_subsystem_set_sn, spdk_nvmf_subsystem_start, + spdk_nvmf_subsystem_state_change_done, spdk_nvmf_subsystem_stop, spdk_nvmf_tgt, SPDK_NVMF_SUBTYPE_DISCOVERY, @@ -340,188 +341,131 @@ impl NvmfSubsystem { }) } - /// start the subsystem previously created -- note that we destroy it on - /// failure to ensure the state is not in limbo and to avoid leaking - /// resources - pub async fn start(self) -> Result { - extern "C" fn start_cb( - ss: *mut spdk_nvmf_subsystem, + /// TODO + async fn change_state( + &self, + op: &str, + f: impl Fn( + *mut spdk_nvmf_subsystem, + spdk_nvmf_subsystem_state_change_done, + *mut c_void, + ) -> i32, + ) -> Result<(), Error> { + extern "C" fn state_change_cb( + _ss: *mut spdk_nvmf_subsystem, arg: *mut c_void, status: i32, ) { let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed start subsystem state {} -- destroying it", - ss.get_nqn() - ); - ss.destroy(); - } - s.send(status).unwrap(); } - self.add_listener().await?; + info!(?self, "Subsystem {} in progress...", op); - let (s, r) = oneshot::channel::(); + let res = { + let mut n = 0; - unsafe { - spdk_nvmf_subsystem_start( - self.0.as_ptr(), - Some(start_cb), - cb_arg(s), - ) - } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "out of memory".to_string(), - })?; + let (rc, r) = loop { + let (s, r) = oneshot::channel::(); - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to start the subsystem".to_string(), - })?; + let rc = -f(self.0.as_ptr(), Some(state_change_cb), cb_arg(s)); - debug!(?self, "shared"); - Ok(self.get_nqn()) - } + if rc != libc::EBUSY || n >= 3 { + break (rc, r); + } - /// stop the subsystem - pub async fn stop(&self) -> Result<(), Error> { - extern "C" fn stop_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; + n += 1; - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to STOP", - ss.get_nqn() + warn!( + "Failed to {} '{}': subsystem is busy, retrying {}...", + op, + self.get_nqn(), + n ); - } - s.send(status).unwrap(); - } + crate::sleep::mayastor_sleep(std::time::Duration::from_millis( + 100, + )) + .await + .unwrap(); + }; - let (s, r) = oneshot::channel::(); - debug!("stopping {:?}", self); - unsafe { - spdk_nvmf_subsystem_stop(self.0.as_ptr(), Some(stop_cb), cb_arg(s)) - } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "out of memory".to_string(), - })?; + match rc { + 0 => r.await.unwrap().to_result(|e| Error::Subsystem { + source: Errno::from_i32(e), + nqn: self.get_nqn(), + msg: format!("{} failed", op), + }), + libc::EBUSY => Err(Error::SubsystemBusy { + nqn: self.get_nqn(), + op: op.to_owned(), + }), + e => Err(Error::Subsystem { + source: Errno::from_i32(e), + nqn: self.get_nqn(), + msg: format!("failed to initiate {}", op), + }), + } + }; - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to stop the subsystem".to_string(), - })?; + if let Err(ref e) = res { + error!(?self, "Subsystem {} failed: {}", op, e.to_string()); + } else { + info!(?self, "Subsystem {} completed: Ok", op); + } - debug!("stopped {}", self.get_nqn()); - Ok(()) + res } - /// transition the subsystem to paused state - /// intended to be a temporary state while changes are made - pub async fn pause(&self) -> Result<(), Error> { - extern "C" fn pause_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to pause", - ss.get_nqn() - ); - } + /// start the subsystem previously created -- note that we destroy it on + /// failure to ensure the state is not in limbo and to avoid leaking + /// resources + pub async fn start(self) -> Result { + self.add_listener().await?; - s.send(status).unwrap(); - } + if let Err(e) = self + .change_state("start", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_start(ss, cb, arg) + }) + .await + { + error!( + "Failed to start subsystem '{}': {}; destroying it", + self.get_nqn(), + e.to_string(), + ); - let (s, r) = oneshot::channel::(); + self.destroy(); - unsafe { - spdk_nvmf_subsystem_pause( - self.0.as_ptr(), - 1, - Some(pause_cb), - cb_arg(s), - ) + Err(e) + } else { + Ok(self.get_nqn()) } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(-e), - nqn: self.get_nqn(), - msg: format!("subsystem_pause returned: {}", e), - })?; + } - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to pause the subsystem".to_string(), + /// stop the subsystem + pub async fn stop(&self) -> Result<(), Error> { + self.change_state("stop", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_stop(ss, cb, arg) }) + .await + } + + /// transition the subsystem to paused state + /// intended to be a temporary state while changes are made + pub async fn pause(&self) -> Result<(), Error> { + self.change_state("pause", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_pause(ss, 1, cb, arg) + }) + .await } /// transition the subsystem to active state pub async fn resume(&self) -> Result<(), Error> { - extern "C" fn resume_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to RESUME", - ss.get_nqn() - ); - } - - s.send(status).unwrap(); - } - - let (s, r) = oneshot::channel::(); - - let mut rc = unsafe { - spdk_nvmf_subsystem_resume( - self.0.as_ptr(), - Some(resume_cb), - cb_arg(s), - ) - }; - - if rc != 0 { - return Err(Error::Subsystem { - source: Errno::from_i32(-rc), - nqn: self.get_nqn(), - msg: format!("subsystem_resume returned: {}", rc), - }); - } - - rc = r.await.unwrap(); - if rc != 0 { - Err(Error::Subsystem { - source: Errno::UnknownErrno, - nqn: self.get_nqn(), - msg: "failed to resume the subsystem".to_string(), - }) - } else { - Ok(()) - } + self.change_state("resume", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_resume(ss, cb, arg) + }) + .await } /// get ANA state @@ -587,13 +531,20 @@ impl NvmfSubsystem { /// stop all subsystems pub async fn stop_all(tgt: *mut spdk_nvmf_tgt) { - let ss = unsafe { - NvmfSubsystem( - NonNull::new(spdk_nvmf_subsystem_get_first(tgt)).unwrap(), - ) + let subsystem = unsafe { + NonNull::new(spdk_nvmf_subsystem_get_first(tgt)).map(NvmfSubsystem) }; - for s in ss.into_iter() { - s.stop().await.unwrap(); + + if let Some(subsystem) = subsystem { + for s in subsystem.into_iter() { + if let Err(e) = s.stop().await { + error!( + "Failed to stop subsystem '{}': {}", + s.get_nqn(), + e.to_string() + ); + } + } } } diff --git a/mayastor/src/subsys/nvmf/target.rs b/mayastor/src/subsys/nvmf/target.rs index 5f86cc7ac5..af9a43cbfd 100644 --- a/mayastor/src/subsys/nvmf/target.rs +++ b/mayastor/src/subsys/nvmf/target.rs @@ -296,7 +296,10 @@ impl Target { discovery.allow_any(true); Reactor::block_on(async { - let _ = discovery.start().await.unwrap(); + let nqn = discovery.get_nqn(); + if let Err(e) = discovery.start().await { + error!("Error starting subsystem '{}': {}", nqn, e.to_string()); + } }); }