Skip to content

Commit

Permalink
Merge #1415
Browse files Browse the repository at this point in the history
1415: fix(nexus/io): set nvmf subsystem as frozen r=tiagolobocastro a=tiagolobocastro

Multiple children may be retired at "the same time". In case the first retires "wins" and pauses the subsystem, if we simply leave it paused at the end, the other retires will remain stuck in the pause.
Instead, introduce a new state of Frozen, which allows other pauses to execute their paused code and at the end the subsystem remains frozen.

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Jun 20, 2023
2 parents 5e08424 + 3130f6a commit 122e96f
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 27 deletions.
2 changes: 1 addition & 1 deletion deploy/csi-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
# the same.
containers:
- name: mayastor-csi
image: mayadata/mayastor:v1.0.7
image: mayadata/mayastor:v1.0.8
imagePullPolicy: IfNotPresent
# we need privileged because we mount filesystems and use mknod
securityContext:
Expand Down
2 changes: 1 addition & 1 deletion deploy/mayastor-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ spec:
command: ['sh', '-c', 'until nc -vz nats 4222; do echo "Waiting for message bus..."; sleep 1; done;']
containers:
- name: mayastor
image: mayadata/mayastor:v1.0.7
image: mayadata/mayastor:v1.0.8
imagePullPolicy: IfNotPresent
env:
- name: RUST_LOG
Expand Down
1 change: 1 addition & 0 deletions mayastor/src/bdev/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use nexus_bdev::{
Error,
Nexus,
NexusNvmeParams,
NexusPauseState,
NexusState,
NexusStatus,
NexusTarget,
Expand Down
46 changes: 36 additions & 10 deletions mayastor/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,26 @@ pub enum NexusPauseState {
Unpaused,
Pausing,
Paused,
Frozen,
Unpausing,
}
impl From<super::nexus_io_subsystem::NexusPauseState> for NexusPauseState {
fn from(value: super::nexus_io_subsystem::NexusPauseState) -> Self {
match value {
super::nexus_io_subsystem::NexusPauseState::Unpaused => {
Self::Unpaused
}
super::nexus_io_subsystem::NexusPauseState::Pausing => {
Self::Pausing
}
super::nexus_io_subsystem::NexusPauseState::Paused => Self::Paused,
super::nexus_io_subsystem::NexusPauseState::Frozen => Self::Frozen,
super::nexus_io_subsystem::NexusPauseState::Unpausing => {
Self::Unpausing
}
}
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum NvmeAnaState {
Expand Down Expand Up @@ -902,11 +920,28 @@ impl<'n> Nexus<'n> {
unsafe { self.get_unchecked_mut().io_subsystem.as_mut().unwrap() }
}

/// Get the subsystem pause state.
pub fn io_subsystem_state(&self) -> Option<NexusPauseState> {
self.io_subsystem.as_ref().map(|io| io.pause_state().into())
}

/// Resumes I/O to the Bdev.
/// Note: in order to handle concurrent resumes properly, this function must
/// be called only from the master core.
pub async fn resume(self: Pin<&mut Self>) -> Result<(), Error> {
self.io_subsystem_mut().resume().await
// If we are faulted then rather than failing all IO back to the
// initiator we can instead leave the subsystem frozen, and wait
// for the control-plane to do something about this.
// Meanwhile the initiator will begin its reconnect loop and won't see
// a swarm of IO failures which could cause a fs to shutdown.
let freeze = match self.status() {
NexusStatus::Faulted => {
tracing::warn!(?self, "Nexus Faulted: will not resume I/Os");
true
}
_ => false,
};
self.io_subsystem_mut().resume(freeze).await
}

/// Suspend any incoming IO to the bdev pausing the controller allows us to
Expand Down Expand Up @@ -1030,15 +1065,6 @@ impl<'n> Nexus<'n> {
})))
.await;
}
// If we are faulted then rather than failing all IO back to the
// initiator we can instead leave the subsystem paused, and wait
// for the control-plane to do something about this.
// Meanwhile the initiator will begin it's reconnect loop and won't see
// a swarm of IO failures which could cause a fs to shutdown.
if self.status() == NexusStatus::Faulted {
tracing::warn!(?self, "Nexus Faulted: not resuming subsystem");
return Ok(());
}
debug!(?self, "RESUMING");
self.resume().await
}
Expand Down
46 changes: 33 additions & 13 deletions mayastor/src/bdev/nexus/nexus_io_subsystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub(super) enum NexusPauseState {
Unpaused,
Pausing,
Paused,
Frozen,
Unpausing,
}

Expand Down Expand Up @@ -51,6 +52,11 @@ impl<'n> NexusIoSubsystem<'n> {
}
}

/// Get the subsystem pause state.
pub(super) fn pause_state(&self) -> NexusPauseState {
self.pause_state.load()
}

/// Suspend any incoming IO to the bdev pausing the controller allows us to
/// handle internal events and which is a protocol feature.
/// In case concurrent pause requests take place, the other callers
Expand Down Expand Up @@ -99,7 +105,7 @@ impl<'n> NexusIoSubsystem<'n> {
break;
}
// Subsystem is already paused, increment number of paused.
Err(NexusPauseState::Paused) => {
Err(NexusPauseState::Paused | NexusPauseState::Frozen) => {
trace!(nexus=%self.name, "nexus is already paused, incrementing refcount");
self.pause_cnt.fetch_add(1, Ordering::SeqCst);
break;
Expand Down Expand Up @@ -133,13 +139,17 @@ impl<'n> NexusIoSubsystem<'n> {
/// Resume IO to the bdev.
/// Note: in order to handle concurrent resumes properly, this function must
/// be called only from the master core.
pub(super) async fn resume(&mut self) -> Result<(), NexusError> {
pub(super) async fn resume(
&mut self,
freeze: bool,
) -> Result<(), NexusError> {
assert_eq!(Cores::current(), Cores::first());

trace!(?self.name, "resuming nexus I/O");

loop {
match self.pause_state.load() {
let state = self.pause_state.load();
match state {
// Already unpaused, bail out.
NexusPauseState::Unpaused => {
break;
Expand All @@ -154,20 +164,30 @@ impl<'n> NexusIoSubsystem<'n> {
trace!(?self.name, "completed state transition, retrying Resume operation");
}
// Unpause the subsystem, taking into account the overall number
// of pauses.
NexusPauseState::Paused => {
// of pauses, or leave it frozen.
NexusPauseState::Paused | NexusPauseState::Frozen => {
let v = self.pause_cnt.fetch_sub(1, Ordering::SeqCst);
// In case the last pause discarded, resume the subsystem.
if v == 1 {
if let Some(subsystem) =
NvmfSubsystem::nqn_lookup(&self.name)
{
self.pause_state.store(NexusPauseState::Unpausing);
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "resuming subsystem");
subsystem.resume().await.unwrap();
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem resumed");
if state == NexusPauseState::Frozen || freeze {
if let Some(subsystem) =
NvmfSubsystem::nqn_lookup(&self.name)
{
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "not resuming frozen subsystem");
}
self.pause_state.store(NexusPauseState::Frozen);
} else {
if let Some(subsystem) =
NvmfSubsystem::nqn_lookup(&self.name)
{
self.pause_state
.store(NexusPauseState::Unpausing);
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "resuming subsystem");
subsystem.resume().await.unwrap();
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem resumed");
}
self.pause_state.store(NexusPauseState::Unpaused);
}
self.pause_state.store(NexusPauseState::Unpaused);
}
break;
}
Expand Down
185 changes: 184 additions & 1 deletion mayastor/tests/nexus_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ use mayastor::{
bdev::nexus::{
nexus_create,
nexus_create_v2,
nexus_lookup,
nexus_lookup_mut,
NexusNvmeParams,
NexusPauseState,
NexusStatus,
NvmeAnaState,
},
core::{MayastorCliArgs, Protocol},
Expand All @@ -25,13 +28,17 @@ use rpc::mayastor::{
use std::process::{Command, ExitStatus};

pub mod common;
use common::{compose::Builder, MayastorTest};
use common::{
compose::{Builder, ComposeTest},
MayastorTest,
};

extern crate libnvme_rs;

static POOL_NAME: &str = "tpool";
static NXNAME: &str = "nexus0";
static UUID: &str = "cdc2a7db-3ac3-403a-af80-7fadc1581c47";
static UUID2: &str = "cdc2a7db-3ac3-403a-af80-7fadc1581c48";
static HOSTNQN: &str = "nqn.2019-05.io.openebs";
static HOSTID0: &str = "53b35ce9-8e71-49a9-ab9b-cba7c5670fad";
static HOSTID1: &str = "c1affd2d-ef79-4ba4-b5cf-8eb48f9c07d0";
Expand Down Expand Up @@ -610,3 +617,179 @@ async fn nexus_io_write_zeroes() {
})
.await;
}

#[tokio::test]
async fn nexus_io_freeze() {
std::env::set_var("NEXUS_NVMF_ANA_ENABLE", "1");
std::env::set_var("NEXUS_NVMF_RESV_ENABLE", "1");
// create a new composeTest
let test = Builder::new()
.name("nexus_io_freeze")
.network("10.1.0.0/16")
.add_container("ms1")
.with_clean(true)
.build()
.await
.unwrap();
create_pool_replicas(&test, 0).await;

let mayastor = get_ms();
let hdls = test.grpc_handles().await.unwrap();
let ip0 = hdls[0].endpoint.ip();
let nexus_name = format!("nexus-{}", UUID);
let nexus_children = [
format!("nvmf://{}:8420/{}:{}", ip0, HOSTNQN, UUID),
format!("nvmf://{}:8420/{}:{}", ip0, HOSTNQN, UUID2),
];

let name = nexus_name.clone();
let children = nexus_children.clone();
mayastor
.spawn(async move {
// create nexus on local node with remote replica as child
nexus_create(&name, 32 * 1024 * 1024, Some(UUID), &children)
.await
.unwrap();
// publish nexus on local node over nvmf
nexus_lookup_mut(&name)
.unwrap()
.share(Protocol::Nvmf, None)
.await
.unwrap();
assert_eq!(
nexus_pause_state(&name),
Some(NexusPauseState::Unpaused)
);
})
.await;

// This will lead into a child retire, which means the nexus will be faulted
// and subsystem frozen!
test.restart("ms1").await.unwrap();
wait_nexus_faulted(&nexus_name, std::time::Duration::from_secs(2))
.await
.unwrap();

let name = nexus_name.clone();
mayastor
.spawn(async move {
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().pause().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().resume().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().destroy().await.unwrap();
})
.await;

create_pool_replicas(&test, 0).await;

let name = nexus_name.clone();
let children = nexus_children.clone();
mayastor
.spawn(async move {
nexus_create(&name, 32 * 1024 * 1024, Some(UUID), &children)
.await
.unwrap();
nexus_lookup_mut(&name)
.unwrap()
.share(Protocol::Nvmf, None)
.await
.unwrap();

// Pause, so now WE must be the ones which resume to frozen!
nexus_lookup_mut(&name).unwrap().pause().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Paused));
})
.await;

test.restart("ms1").await.unwrap();
wait_nexus_faulted(&nexus_name, std::time::Duration::from_secs(2))
.await
.unwrap();

let name = nexus_name.clone();
mayastor
.spawn(async move {
nexus_lookup_mut(&name).unwrap().pause().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Paused));

nexus_lookup_mut(&name).unwrap().resume().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Paused));

// Final resume, transition to Frozen!
nexus_lookup_mut(&name).unwrap().resume().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().destroy().await.unwrap();
})
.await;
}

fn nexus_pause_state(name: &str) -> Option<NexusPauseState> {
nexus_lookup(name).unwrap().io_subsystem_state()
}

async fn create_pool_replicas(test: &ComposeTest, index: usize) {
let mut hdls = test.grpc_handles().await.unwrap();
let hdl = &mut hdls[index];

// create a pool on remote node
hdl.mayastor
.create_pool(CreatePoolRequest {
name: POOL_NAME.to_string(),
disks: vec!["malloc:///disk0?size_mb=128".into()],
})
.await
.unwrap();

// create replica, shared over nvmf
hdl.mayastor
.create_replica(CreateReplicaRequest {
uuid: UUID.to_string(),
pool: POOL_NAME.to_string(),
size: 32 * 1024 * 1024,
thin: false,
share: 1,
})
.await
.unwrap();

// create replica, shared over nvmf
hdl.mayastor
.create_replica(CreateReplicaRequest {
uuid: UUID2.to_string(),
pool: POOL_NAME.to_string(),
size: 32 * 1024 * 1024,
thin: false,
share: 1,
})
.await
.unwrap();
}

async fn wait_nexus_faulted(
name: &str,
timeout: std::time::Duration,
) -> Result<(), std::time::Duration> {
let mayastor = get_ms();
let start = std::time::Instant::now();

while start.elapsed() <= timeout {
let name = name.to_string();
let faulted = mayastor
.spawn(async move {
nexus_lookup(&name).unwrap().status() == NexusStatus::Faulted
})
.await;
if faulted {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

Err(start.elapsed())
}
Loading

0 comments on commit 122e96f

Please sign in to comment.