From 123a00420084c2acba0ccee980d09622c2e2d267 Mon Sep 17 00:00:00 2001 From: Dmitry Savitskiy Date: Tue, 27 Jun 2023 19:51:21 +0300 Subject: [PATCH 1/2] fix(nexus): fixing nexus unshare and nvmf error handling NVMF subsystem API and Nexus unshare had some issues with error handling, which could potentially lead to use-after-free. Signed-off-by: Dmitry Savitskiy --- mayastor/src/bdev/nexus/nexus_bdev.rs | 13 +- mayastor/src/bdev/nexus/nexus_io_subsystem.rs | 16 +- mayastor/src/bdev/nexus/nexus_share.rs | 77 ++--- mayastor/src/core/bdev.rs | 13 +- mayastor/src/core/share.rs | 3 +- mayastor/src/lvs/lvol.rs | 20 +- mayastor/src/subsys/nvmf/mod.rs | 6 + mayastor/src/subsys/nvmf/subsystem.rs | 267 +++++++----------- mayastor/src/subsys/nvmf/target.rs | 5 +- 9 files changed, 198 insertions(+), 222 deletions(-) diff --git a/mayastor/src/bdev/nexus/nexus_bdev.rs b/mayastor/src/bdev/nexus/nexus_bdev.rs index 8bd023ef28..743a555044 100644 --- a/mayastor/src/bdev/nexus/nexus_bdev.rs +++ b/mayastor/src/bdev/nexus/nexus_bdev.rs @@ -479,9 +479,6 @@ pub struct Nexus<'n> { pub state: parking_lot::Mutex, /// The offset in blocks where the data partition starts. pub(crate) data_ent_offset: u64, - /// the handle to be used when sharing the nexus, this allows for the bdev - /// to be shared with vbdevs on top - pub(crate) share_handle: Option, /// enum containing the protocol-specific target used to publish the nexus pub nexus_target: Option, /// Indicates if the Nexus has an I/O device. @@ -588,7 +585,6 @@ impl<'n> Nexus<'n> { state: parking_lot::Mutex::new(NexusState::Init), bdev: None, data_ent_offset: 0, - share_handle: None, req_size: size, nexus_target: None, nvme_params, @@ -879,7 +875,7 @@ impl<'n> Nexus<'n> { pub async fn destroy(mut self: Pin<&mut Self>) -> Result<(), Error> { info!("Destroying nexus {}", self.name); - self.as_mut().destroy_shares().await?; + self.as_mut().unshare_nexus().await?; // wait for all rebuild jobs to be cancelled before proceeding with the // destruction of the nexus @@ -1473,6 +1469,13 @@ async fn nexus_create_internal( children: &[String], nexus_info_key: Option, ) -> Result<(), Error> { + info!( + "Creating new nexus '{}' ({} child(ren): {:?})...", + name, + children.len(), + children + ); + if let Some(nexus) = nexus_lookup_name_uuid(name, nexus_uuid) { // FIXME: Instead of error, we return Ok without checking // that the children match, which seems wrong. diff --git a/mayastor/src/bdev/nexus/nexus_io_subsystem.rs b/mayastor/src/bdev/nexus/nexus_io_subsystem.rs index 04f6162417..a6d09f0dde 100644 --- a/mayastor/src/bdev/nexus/nexus_io_subsystem.rs +++ b/mayastor/src/bdev/nexus/nexus_io_subsystem.rs @@ -90,7 +90,13 @@ impl<'n> NexusIoSubsystem<'n> { NvmfSubsystem::nqn_lookup(&self.name) { trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "pausing subsystem"); - subsystem.pause().await.unwrap(); + if let Err(e) = subsystem.pause().await { + panic!( + "Failed to pause subsystem '{}: {}", + subsystem.get_nqn(), + e + ); + } trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem paused"); } } @@ -183,7 +189,13 @@ impl<'n> NexusIoSubsystem<'n> { self.pause_state .store(NexusPauseState::Unpausing); trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "resuming subsystem"); - subsystem.resume().await.unwrap(); + if let Err(e) = subsystem.resume().await { + panic!( + "Failed to resume subsystem '{}: {}", + subsystem.get_nqn(), + e + ); + } trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem resumed"); } self.pause_state.store(NexusPauseState::Unpaused); diff --git a/mayastor/src/bdev/nexus/nexus_share.rs b/mayastor/src/bdev/nexus/nexus_share.rs index 4312443c3f..34092053c8 100644 --- a/mayastor/src/bdev/nexus/nexus_share.rs +++ b/mayastor/src/bdev/nexus/nexus_share.rs @@ -14,7 +14,6 @@ use super::{ use crate::core::{Protocol, Share}; -#[async_trait(? Send)] /// /// The sharing of the nexus is different compared to regular bdevs /// the Impl of ['Share'] handles this accordingly @@ -23,6 +22,7 @@ use crate::core::{Protocol, Share}; /// endpoints (not targets) however, we want to avoid too many /// protocol specifics and for bdevs the need for different endpoints /// is not implemented yet as the need for it has not arrived yet. +#[async_trait(? Send)] impl<'n> Share for Nexus<'n> { type Error = Error; type Output = String; @@ -31,8 +31,10 @@ impl<'n> Share for Nexus<'n> { mut self: Pin<&mut Self>, cntlid_range: Option<(u16, u16)>, ) -> Result { - match self.shared() { + let uri = match self.shared() { Some(Protocol::Off) | None => { + info!("{:?}: sharing NVMF target...", self); + let name = self.name.clone(); self.as_mut() .pin_bdev_mut() @@ -41,20 +43,37 @@ impl<'n> Share for Nexus<'n> { .context(ShareNvmfNexus { name, })?; + + let uri = self.share_uri().unwrap(); + info!("{:?}: shared NVMF target as '{}'", self, uri); + uri } - Some(Protocol::Nvmf) => {} - } - Ok(self.share_uri().unwrap()) + Some(Protocol::Nvmf) => { + let uri = self.share_uri().unwrap(); + info!("{:?}: already shared as '{}'", self, uri); + uri + } + }; + + Ok(uri) } /// TODO - async fn unshare( - self: Pin<&mut Self>, - ) -> Result { + async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { + info!("{:?}: unsharing nexus bdev...", self); + let name = self.name.clone(); - self.pin_bdev_mut().unshare().await.context(UnshareNexus { - name, - }) + self.as_mut() + .pin_bdev_mut() + .unshare() + .await + .context(UnshareNexus { + name, + })?; + + info!("{:?}: unshared nexus bdev", self); + + Ok(()) } /// TODO @@ -144,33 +163,21 @@ impl<'n> Nexus<'n> { /// TODO pub async fn unshare_nexus(mut self: Pin<&mut Self>) -> Result<(), Error> { - unsafe { - match self.as_mut().get_unchecked_mut().nexus_target.take() { - Some(NexusTarget::NbdDisk(disk)) => { - disk.destroy(); - } - Some(NexusTarget::NexusNvmfTarget) => { - self.as_mut().unshare().await?; - } - None => { - warn!("{} was not shared", self.name); - } + match unsafe { self.as_mut().get_unchecked_mut().nexus_target.take() } { + Some(NexusTarget::NbdDisk(disk)) => { + info!("{:?}: destroying NBD device target...", self); + disk.destroy(); + } + Some(NexusTarget::NexusNvmfTarget) => { + info!("{:?}: unsharing NVMF target...", self); + } + None => { + // Try unshare nexus bdev anyway, just in case it was shared + // via bdev API. It is no-op if bdev was not shared. } } - Ok(()) - } - - /// Shutdowns all shares. - pub(crate) async fn destroy_shares( - mut self: Pin<&mut Self>, - ) -> Result<(), Error> { - let _ = self.as_mut().unshare_nexus().await; - assert_eq!(self.share_handle, None); - - // no-op when not shared and will be removed once the old share bits are - // gone. Ignore device name provided in case of successful unsharing. - self.as_mut().unshare().await.map(|_| ()) + self.as_mut().unshare().await } /// TODO diff --git a/mayastor/src/core/bdev.rs b/mayastor/src/core/bdev.rs index 894801cff2..55ece811ab 100644 --- a/mayastor/src/core/bdev.rs +++ b/mayastor/src/core/bdev.rs @@ -199,21 +199,18 @@ where } /// unshare the bdev regardless of current active share - async fn unshare( - self: Pin<&mut Self>, - ) -> Result { + async fn unshare(self: Pin<&mut Self>) -> Result<(), Self::Error> { match self.shared() { Some(Protocol::Nvmf) => { - if let Some(subsystem) = NvmfSubsystem::nqn_lookup(self.name()) - { - subsystem.stop().await.context(UnshareNvmf {})?; - subsystem.destroy(); + if let Some(ss) = NvmfSubsystem::nqn_lookup(self.name()) { + ss.stop().await.context(UnshareNvmf {})?; + ss.destroy(); } } Some(Protocol::Off) | None => {} } - Ok(self.name().to_string()) + Ok(()) } /// returns if the bdev is currently shared diff --git a/mayastor/src/core/share.rs b/mayastor/src/core/share.rs index 2262da574a..dc667e13ed 100644 --- a/mayastor/src/core/share.rs +++ b/mayastor/src/core/share.rs @@ -49,8 +49,7 @@ pub trait Share: std::fmt::Debug { ) -> Result; /// TODO - async fn unshare(self: Pin<&mut Self>) - -> Result; + async fn unshare(self: Pin<&mut Self>) -> Result<(), Self::Error>; /// TODO fn shared(&self) -> Option; diff --git a/mayastor/src/lvs/lvol.rs b/mayastor/src/lvs/lvol.rs index 6c35947ef8..3b1ee89d48 100644 --- a/mayastor/src/lvs/lvol.rs +++ b/mayastor/src/lvs/lvol.rs @@ -134,20 +134,18 @@ impl Share for Lvol { } /// unshare the nvmf target - async fn unshare( - mut self: Pin<&mut Self>, - ) -> Result { - let share = - Pin::new(&mut self.as_bdev()).unshare().await.map_err(|e| { - Error::LvolUnShare { - source: e, - name: self.name(), - } - })?; + async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { + Pin::new(&mut self.as_bdev()).unshare().await.map_err(|e| { + Error::LvolUnShare { + source: e, + name: self.name(), + } + })?; self.as_mut().set(PropValue::Shared(false)).await?; + info!("unshared {}", self); - Ok(share) + Ok(()) } /// return the protocol this bdev is shared under diff --git a/mayastor/src/subsys/nvmf/mod.rs b/mayastor/src/subsys/nvmf/mod.rs index 744f2b3660..278afbd760 100644 --- a/mayastor/src/subsys/nvmf/mod.rs +++ b/mayastor/src/subsys/nvmf/mod.rs @@ -63,6 +63,12 @@ pub enum Error { PgError { msg: String }, #[snafu(display("Failed to create transport {}", msg))] Transport { source: Errno, msg: String }, + #[snafu(display( + "Failed to {} subsystem '{}': subsystem is busy", + op, + nqn + ))] + SubsystemBusy { nqn: String, op: String }, #[snafu(display("Failed nvmf subsystem operation for {} {} error: {}", source.desc(), nqn, msg))] Subsystem { source: Errno, diff --git a/mayastor/src/subsys/nvmf/subsystem.rs b/mayastor/src/subsys/nvmf/subsystem.rs index 6c7571be9b..9ace34e47b 100644 --- a/mayastor/src/subsys/nvmf/subsystem.rs +++ b/mayastor/src/subsys/nvmf/subsystem.rs @@ -34,6 +34,7 @@ use spdk_rs::libspdk::{ spdk_nvmf_subsystem_set_mn, spdk_nvmf_subsystem_set_sn, spdk_nvmf_subsystem_start, + spdk_nvmf_subsystem_state_change_done, spdk_nvmf_subsystem_stop, spdk_nvmf_tgt, SPDK_NVMF_SUBTYPE_DISCOVERY, @@ -340,188 +341,131 @@ impl NvmfSubsystem { }) } - /// start the subsystem previously created -- note that we destroy it on - /// failure to ensure the state is not in limbo and to avoid leaking - /// resources - pub async fn start(self) -> Result { - extern "C" fn start_cb( - ss: *mut spdk_nvmf_subsystem, + /// TODO + async fn change_state( + &self, + op: &str, + f: impl Fn( + *mut spdk_nvmf_subsystem, + spdk_nvmf_subsystem_state_change_done, + *mut c_void, + ) -> i32, + ) -> Result<(), Error> { + extern "C" fn state_change_cb( + _ss: *mut spdk_nvmf_subsystem, arg: *mut c_void, status: i32, ) { let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed start subsystem state {} -- destroying it", - ss.get_nqn() - ); - ss.destroy(); - } - s.send(status).unwrap(); } - self.add_listener().await?; + info!(?self, "Subsystem {} in progress...", op); - let (s, r) = oneshot::channel::(); + let res = { + let mut n = 0; - unsafe { - spdk_nvmf_subsystem_start( - self.0.as_ptr(), - Some(start_cb), - cb_arg(s), - ) - } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "out of memory".to_string(), - })?; + let (rc, r) = loop { + let (s, r) = oneshot::channel::(); - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to start the subsystem".to_string(), - })?; + let rc = -f(self.0.as_ptr(), Some(state_change_cb), cb_arg(s)); - debug!(?self, "shared"); - Ok(self.get_nqn()) - } + if rc != libc::EBUSY || n >= 3 { + break (rc, r); + } - /// stop the subsystem - pub async fn stop(&self) -> Result<(), Error> { - extern "C" fn stop_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; + n += 1; - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to STOP", - ss.get_nqn() + warn!( + "Failed to {} '{}': subsystem is busy, retrying {}...", + op, + self.get_nqn(), + n ); - } - s.send(status).unwrap(); - } + crate::sleep::mayastor_sleep(std::time::Duration::from_millis( + 100, + )) + .await + .unwrap(); + }; - let (s, r) = oneshot::channel::(); - debug!("stopping {:?}", self); - unsafe { - spdk_nvmf_subsystem_stop(self.0.as_ptr(), Some(stop_cb), cb_arg(s)) - } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "out of memory".to_string(), - })?; + match rc { + 0 => r.await.unwrap().to_result(|e| Error::Subsystem { + source: Errno::from_i32(e), + nqn: self.get_nqn(), + msg: format!("{} failed", op), + }), + libc::EBUSY => Err(Error::SubsystemBusy { + nqn: self.get_nqn(), + op: op.to_owned(), + }), + e => Err(Error::Subsystem { + source: Errno::from_i32(e), + nqn: self.get_nqn(), + msg: format!("failed to initiate {}", op), + }), + } + }; - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to stop the subsystem".to_string(), - })?; + if let Err(ref e) = res { + error!(?self, "Subsystem {} failed: {}", op, e.to_string()); + } else { + info!(?self, "Subsystem {} completed: Ok", op); + } - debug!("stopped {}", self.get_nqn()); - Ok(()) + res } - /// transition the subsystem to paused state - /// intended to be a temporary state while changes are made - pub async fn pause(&self) -> Result<(), Error> { - extern "C" fn pause_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to pause", - ss.get_nqn() - ); - } + /// start the subsystem previously created -- note that we destroy it on + /// failure to ensure the state is not in limbo and to avoid leaking + /// resources + pub async fn start(self) -> Result { + self.add_listener().await?; - s.send(status).unwrap(); - } + if let Err(e) = self + .change_state("start", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_start(ss, cb, arg) + }) + .await + { + error!( + "Failed to start subsystem '{}': {}; destroying it", + self.get_nqn(), + e.to_string(), + ); - let (s, r) = oneshot::channel::(); + self.destroy(); - unsafe { - spdk_nvmf_subsystem_pause( - self.0.as_ptr(), - 1, - Some(pause_cb), - cb_arg(s), - ) + Err(e) + } else { + Ok(self.get_nqn()) } - .to_result(|e| Error::Subsystem { - source: Errno::from_i32(-e), - nqn: self.get_nqn(), - msg: format!("subsystem_pause returned: {}", e), - })?; + } - r.await.unwrap().to_result(|e| Error::Subsystem { - source: Errno::from_i32(e), - nqn: self.get_nqn(), - msg: "failed to pause the subsystem".to_string(), + /// stop the subsystem + pub async fn stop(&self) -> Result<(), Error> { + self.change_state("stop", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_stop(ss, cb, arg) }) + .await + } + + /// transition the subsystem to paused state + /// intended to be a temporary state while changes are made + pub async fn pause(&self) -> Result<(), Error> { + self.change_state("pause", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_pause(ss, 1, cb, arg) + }) + .await } /// transition the subsystem to active state pub async fn resume(&self) -> Result<(), Error> { - extern "C" fn resume_cb( - ss: *mut spdk_nvmf_subsystem, - arg: *mut c_void, - status: i32, - ) { - let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; - - let ss = NvmfSubsystem::from(ss); - if status != 0 { - error!( - "Failed change subsystem state {} -- to RESUME", - ss.get_nqn() - ); - } - - s.send(status).unwrap(); - } - - let (s, r) = oneshot::channel::(); - - let mut rc = unsafe { - spdk_nvmf_subsystem_resume( - self.0.as_ptr(), - Some(resume_cb), - cb_arg(s), - ) - }; - - if rc != 0 { - return Err(Error::Subsystem { - source: Errno::from_i32(-rc), - nqn: self.get_nqn(), - msg: format!("subsystem_resume returned: {}", rc), - }); - } - - rc = r.await.unwrap(); - if rc != 0 { - Err(Error::Subsystem { - source: Errno::UnknownErrno, - nqn: self.get_nqn(), - msg: "failed to resume the subsystem".to_string(), - }) - } else { - Ok(()) - } + self.change_state("resume", |ss, cb, arg| unsafe { + spdk_nvmf_subsystem_resume(ss, cb, arg) + }) + .await } /// get ANA state @@ -587,13 +531,20 @@ impl NvmfSubsystem { /// stop all subsystems pub async fn stop_all(tgt: *mut spdk_nvmf_tgt) { - let ss = unsafe { - NvmfSubsystem( - NonNull::new(spdk_nvmf_subsystem_get_first(tgt)).unwrap(), - ) + let subsystem = unsafe { + NonNull::new(spdk_nvmf_subsystem_get_first(tgt)).map(NvmfSubsystem) }; - for s in ss.into_iter() { - s.stop().await.unwrap(); + + if let Some(subsystem) = subsystem { + for s in subsystem.into_iter() { + if let Err(e) = s.stop().await { + error!( + "Failed to stop subsystem '{}': {}", + s.get_nqn(), + e.to_string() + ); + } + } } } diff --git a/mayastor/src/subsys/nvmf/target.rs b/mayastor/src/subsys/nvmf/target.rs index 5f86cc7ac5..af9a43cbfd 100644 --- a/mayastor/src/subsys/nvmf/target.rs +++ b/mayastor/src/subsys/nvmf/target.rs @@ -296,7 +296,10 @@ impl Target { discovery.allow_any(true); Reactor::block_on(async { - let _ = discovery.start().await.unwrap(); + let nqn = discovery.get_nqn(); + if let Err(e) = discovery.start().await { + error!("Error starting subsystem '{}': {}", nqn, e.to_string()); + } }); } From 365919b1d90ba1ffbcda7e1a05e4b10187edfab7 Mon Sep 17 00:00:00 2001 From: Dmitry Savitskiy Date: Tue, 27 Jun 2023 22:33:28 +0300 Subject: [PATCH 2/2] fix(nexus): fixing child retire during rebuild A Python test for retire-during rebuild added Signed-off-by: Dmitry Savitskiy --- mayastor/src/bdev/nexus/nexus_channel.rs | 18 ++- scripts/pytest-tests.sh | 2 +- test/python/common/hdl.py | 11 +- test/python/tests/nexus/test_nexus_rebuild.py | 122 ++++++++++++++++++ 4 files changed, 146 insertions(+), 7 deletions(-) create mode 100644 test/python/tests/nexus/test_nexus_rebuild.py diff --git a/mayastor/src/bdev/nexus/nexus_channel.rs b/mayastor/src/bdev/nexus/nexus_channel.rs index e5b2db7304..2c79636b4b 100644 --- a/mayastor/src/bdev/nexus/nexus_channel.rs +++ b/mayastor/src/bdev/nexus/nexus_channel.rs @@ -52,7 +52,12 @@ pub(crate) fn fault_nexus_child(nexus: Pin<&mut Nexus>, name: &str) -> bool { nexus .children .iter() - .filter(|c| c.state() == ChildState::Open) + .filter(|c| { + matches!( + c.state(), + ChildState::Open | ChildState::Faulted(Reason::OutOfSync) + ) + }) .filter(|c| { // If there were previous retires, we do not have a reference // to a BlockDevice. We do however, know it can't be the device @@ -65,11 +70,16 @@ pub(crate) fn fault_nexus_child(nexus: Pin<&mut Nexus>, name: &str) -> bool { } }) .any(|c| { - Ok(ChildState::Open) + Ok(ChildState::Faulted(Reason::OutOfSync)) == c.state.compare_exchange( - ChildState::Open, - ChildState::Faulted(Reason::IoError), + ChildState::Faulted(Reason::OutOfSync), + ChildState::Faulted(Reason::RebuildFailed), ) + || Ok(ChildState::Open) + == c.state.compare_exchange( + ChildState::Open, + ChildState::Faulted(Reason::IoError), + ) }) } diff --git a/scripts/pytest-tests.sh b/scripts/pytest-tests.sh index 2f003cfce3..21dc36fd66 100755 --- a/scripts/pytest-tests.sh +++ b/scripts/pytest-tests.sh @@ -26,7 +26,7 @@ function run_tests() ( set -x base=$(dirname "$name") - python -m pytest --tc-file='test_config.ini' --docker-compose="$base" "$name" + python -m pytest --tc-file='test_config.ini' --docker-compose="$base" "$name" -svv ) fi done diff --git a/test/python/common/hdl.py b/test/python/common/hdl.py index ed193d3459..f401897a25 100644 --- a/test/python/common/hdl.py +++ b/test/python/common/hdl.py @@ -1,4 +1,5 @@ """Common code that represents a mayastor handle.""" +from urllib.parse import urlparse import mayastor_pb2 as pb import grpc import mayastor_pb2_grpc as rpc @@ -132,8 +133,14 @@ def replica_list_v2(self): def nexus_create(self, uuid, size, children): """Create a nexus with the given uuid and size. The children should be an array of nvmf URIs.""" + children_ = [] + for child in children: + u = urlparse(child) + host = u.hostname + if host != self.ip_v4: + children_.append(child) return self.ms.CreateNexus( - pb.CreateNexusRequest(uuid=str(uuid), size=size, children=children) + pb.CreateNexusRequest(uuid=str(uuid), size=size, children=children_) ) def nexus_create_v2( @@ -178,7 +185,7 @@ def nexus_list_v2(self): """List all the nexus devices, with separate name and uuid.""" return self.ms.ListNexusV2(pb.Null()).nexus_list - def nexus_add_replica(self, uuid, uri, norebuild): + def nexus_add_replica(self, uuid, uri, norebuild=False): """Add a new replica to the nexus""" return self.ms.AddChildNexus( pb.AddChildNexusRequest(uuid=uuid, uri=uri, norebuild=norebuild) diff --git a/test/python/tests/nexus/test_nexus_rebuild.py b/test/python/tests/nexus/test_nexus_rebuild.py new file mode 100644 index 0000000000..276f0918e7 --- /dev/null +++ b/test/python/tests/nexus/test_nexus_rebuild.py @@ -0,0 +1,122 @@ +from common.hdl import MayastorHandle +from common.command import run_cmd, run_cmd_async +from common.nvme import nvme_connect, nvme_disconnect +from common.fio import Fio +from common.fio_spdk import FioSpdk +from common.mayastor import containers, mayastors, create_temp_files, check_size +import pytest +import asyncio +import uuid as guid +import time +import subprocess +import mayastor_pb2 as pb + +NEXUS_COUNT = 10 +NEXUS_SIZE = 500 * 1024 * 1024 +REPL_SIZE = NEXUS_SIZE +POOL_SIZE = REPL_SIZE * NEXUS_COUNT + 100 * 1024 * 1024 + + +@pytest.fixture +def local_files(mayastors): + files = [] + for name, ms in mayastors.items(): + path = f"/tmp/disk-{name}.img" + pool_size_mb = int(POOL_SIZE / 1024 / 1024) + subprocess.run( + ["sudo", "sh", "-c", f"rm -f '{path}'; truncate -s {pool_size_mb}M '{path}'"], + check=True, + ) + files.append(path) + + yield + for path in files: + subprocess.run(["sudo", "rm", "-f", path], check=True) + + +@pytest.fixture +def create_replicas_on_all_nodes(local_files, mayastors, create_temp_files): + uuids = [] + + for name, ms in mayastors.items(): + ms.pool_create(name, f"aio:///tmp/disk-{name}.img") + # verify we have zero replicas + assert len(ms.replica_list().replicas) == 0 + + for i in range(NEXUS_COUNT): + uuid = guid.uuid4() + for name, ms in mayastors.items(): + before = ms.pool_list() + ms.replica_create(name, uuid, REPL_SIZE) + after = ms.pool_list() + uuids.append(uuid) + + yield uuids + + +@pytest.fixture +def create_nexuses(mayastors, create_replicas_on_all_nodes): + nexuses = [] + nexuses_uris = [] + + uris = [ + [replica.uri for replica in mayastors.get(node).replica_list().replicas] + for node in ["ms1", "ms2", "ms3"] + ] + + ms = mayastors.get("ms0") + for children in zip(*uris): + uuid = guid.uuid4() + nexus = ms.nexus_create(uuid, NEXUS_SIZE, list(children)) + nexuses.append(nexus) + nexuses_uris.append(ms.nexus_publish(uuid)) + + yield nexuses + + +@pytest.mark.parametrize("times", range(10)) +def test_rebuild_failure(containers, mayastors, times, create_nexuses): + ms0 = mayastors.get("ms0") + ms3 = mayastors.get("ms3") + + # Restart container with replica #3 (ms3). + node3 = containers.get("ms3") + node3.stop() + time.sleep(5) + node3.start() + + # Reconnect ms3, and import the existing pool. + ms3.reconnect() + ms3.pool_create("ms1", "aio:///tmp/disk-ms1.img") + time.sleep(1) + + # Add the replicas to the nexuses for rebuild. + for (idx, nexus) in enumerate(ms0.nexus_list()): + child = list(filter(lambda child: child.state == pb.CHILD_FAULTED, list(nexus.children)))[0] + if nexus.state != pb.NEXUS_FAULTED: + try: + ms0.nexus_remove_replica(nexus.uuid, child.uri) + ms0.nexus_add_replica(nexus.uuid, child.uri) + except: + print(f"Failed to remove child {child.uri} from {nexus}") + + time.sleep(5) + + rebuilds = 0 + for nexus in ms0.nexus_list(): + for child in nexus.children: + if child.rebuild_progress > -1: + rebuilds += 1 + print("nexus", nexus.uuid, "rebuilding", child.uri, f"{child.rebuild_progress}") + + assert rebuilds > 0 + + # Stop ms3 again. Rebuild jobs in progress must terminate. + node3.stop() + + time.sleep(30) + + # All rebuild jobs must finish. + for nexus in ms0.nexus_list(): + for child in nexus.children: + assert child.rebuild_progress == -1