diff --git a/io-engine-tests/src/nexus.rs b/io-engine-tests/src/nexus.rs index 31fb5c05be..3251306c56 100644 --- a/io-engine-tests/src/nexus.rs +++ b/io-engine-tests/src/nexus.rs @@ -273,6 +273,16 @@ impl NexusBuilder { self.online_child_bdev(&self.replica_uri(r)).await } + pub async fn online_child_replica_wait( + &self, + r: &ReplicaBuilder, + d: Duration, + ) -> Result<(), Status> { + self.online_child_replica(r).await?; + self.wait_replica_state(r, ChildState::Online, None, d) + .await + } + pub async fn offline_child_bdev( &self, bdev: &str, @@ -297,6 +307,21 @@ impl NexusBuilder { self.offline_child_bdev(&self.replica_uri(r)).await } + pub async fn offline_child_replica_wait( + &self, + r: &ReplicaBuilder, + d: Duration, + ) -> Result<(), Status> { + self.offline_child_replica(r).await?; + self.wait_replica_state( + r, + ChildState::Degraded, + Some(ChildStateReason::ByClient), + d, + ) + .await + } + pub async fn add_injection_at_replica( &self, r: &ReplicaBuilder, diff --git a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs index 83b3502e3f..0c27bb2299 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs @@ -19,8 +19,10 @@ use crate::{ HistoryRecord, RebuildError, RebuildJob, + RebuildJobOptions, RebuildState, RebuildStats, + RebuildVerifyMode, }, }; @@ -151,6 +153,31 @@ impl<'n> Nexus<'n> { src_child_uri: &str, dst_child_uri: &str, ) -> Result<(), Error> { + let verify_mode = match std::env::var("NEXUS_REBUILD_VERIFY") + .unwrap_or_default() + .as_str() + { + "fail" => { + warn!( + "{self:?}: starting rebuild for '{dst_child_uri}' with \ + fail verification mode" + ); + RebuildVerifyMode::Fail + } + "panic" => { + warn!( + "{self:?}: starting rebuild for '{dst_child_uri}' with \ + panic verification mode" + ); + RebuildVerifyMode::Panic + } + _ => RebuildVerifyMode::None, + }; + + let opts = RebuildJobOptions { + verify_mode, + }; + RebuildJob::new( &self.name, src_child_uri, @@ -159,6 +186,7 @@ impl<'n> Nexus<'n> { start: self.data_ent_offset, end: self.num_blocks() + self.data_ent_offset, }, + opts, |nexus, job| { Reactors::current().send_future(async move { Nexus::notify_rebuild(nexus, job).await; diff --git a/io-engine/src/rebuild/mod.rs b/io-engine/src/rebuild/mod.rs index 2bae31a768..bfc9a73e78 100644 --- a/io-engine/src/rebuild/mod.rs +++ b/io-engine/src/rebuild/mod.rs @@ -9,8 +9,8 @@ mod rebuild_task; use rebuild_descriptor::RebuildDescriptor; pub(crate) use rebuild_error::RebuildError; -pub use rebuild_job::RebuildJob; use rebuild_job::RebuildOperation; +pub use rebuild_job::{RebuildJob, RebuildJobOptions, RebuildVerifyMode}; use rebuild_job_backend::{ RebuildFBendChan, RebuildJobBackend, diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index 3148408653..e72aee940a 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -11,7 +11,7 @@ use crate::core::{ ReadOptions, }; -use super::{RebuildError, RebuildMap}; +use super::{RebuildError, RebuildJobOptions, RebuildMap, RebuildVerifyMode}; /// Contains all descriptors and their associated information which allows the /// tasks to copy/rebuild data from source to destination. @@ -21,13 +21,15 @@ pub(super) struct RebuildDescriptor { pub(super) block_size: u64, /// The range of the entire rebuild. pub(super) range: std::ops::Range, + /// Rebuild job options. + pub(super) options: RebuildJobOptions, /// Segment size in blocks (number of segments divided by device block /// size). pub(super) segment_size_blks: u64, /// Source URI of the healthy child to rebuild from. - pub src_uri: String, + pub(super) src_uri: String, /// Target URI of the out of sync child to rebuild. - pub dst_uri: String, + pub(super) dst_uri: String, /// Pre-opened descriptor for the source block device. #[allow(clippy::non_send_fields_in_send_ty)] pub(super) src_descriptor: Box, @@ -183,4 +185,120 @@ impl RebuildDescriptor { bdev: self.dst_uri.clone(), }) } + + /// Verify segment copy operation by reading destination, and comparing with + /// the source. + pub(super) async fn verify_segment( + &self, + offset_blk: u64, + copy_iovs: &[IoVec], + verify_iovs: &mut [IoVec], + ) -> Result<(), RebuildError> { + // Read the source again. + self.src_io_handle() + .await? + .readv_blocks_async( + verify_iovs, + offset_blk, + self.get_segment_size_blks(offset_blk), + ReadOptions::None, + ) + .await + .map_err(|err| RebuildError::VerifyIoFailed { + source: err, + bdev: self.dst_uri.clone(), + })?; + + // Compare the original buffer from the source with the new one from the + // source, to catch read errors. + self.verify_iovs(offset_blk, copy_iovs, verify_iovs, "source")?; + + // Read the destination. + self.dst_io_handle() + .await? + .readv_blocks_async( + verify_iovs, + offset_blk, + self.get_segment_size_blks(offset_blk), + ReadOptions::None, + ) + .await + .map_err(|err| RebuildError::VerifyIoFailed { + source: err, + bdev: self.dst_uri.clone(), + })?; + + // Compare destionation with the source. + self.verify_iovs(offset_blk, copy_iovs, verify_iovs, "destination") + } + + /// Verifies two arrays of `IoVec` buffers. + fn verify_iovs( + &self, + offset_blk: u64, + a: &[IoVec], + b: &[IoVec], + obj: &str, + ) -> Result<(), RebuildError> { + assert_eq!(a.len(), b.len()); + for i in 0 .. a.len() { + self.verify_iov(offset_blk, &a[i], &b[i], obj)?; + } + Ok(()) + } + + /// Verifies two `IoVec` buffers. + fn verify_iov( + &self, + offset_blk: u64, + a: &IoVec, + b: &IoVec, + obj: &str, + ) -> Result<(), RebuildError> { + let Some(idx) = IoVec::compare(a, b) else { + return Ok(()); + }; + + let msg = if a.len() != b.len() { + format!( + "buffers have different lengths: {a} != {b}", + a = a.len(), + b = b.len() + ) + } else { + format!( + "buffers differ at {pos} (block {off} x {bs} + {idx}): \ + 0x{a:x} != 0x{b:x}", + pos = offset_blk * self.block_size + idx, + off = offset_blk, + bs = self.block_size, + a = a[idx], + b = b[idx] + ) + }; + + let msg = format!( + "Rebuild job '{src}' -> '{dst}': {obj} verification failed: {msg}", + src = self.src_uri, + dst = self.dst_uri + ); + + match self.options.verify_mode { + RebuildVerifyMode::None => { + error!("{msg}: ignoring"); + Ok(()) + } + RebuildVerifyMode::Fail => { + error!("{msg}: failing rebuild"); + Err(RebuildError::VerifyCompareFailed { + bdev: self.dst_uri.clone(), + verify_message: msg, + }) + } + RebuildVerifyMode::Panic => { + error!("{msg}: will panic"); + panic!("{}", msg); + } + } + } } diff --git a/io-engine/src/rebuild/rebuild_error.rs b/io-engine/src/rebuild/rebuild_error.rs index b98bbc5434..a8f302ab5a 100644 --- a/io-engine/src/rebuild/rebuild_error.rs +++ b/io-engine/src/rebuild/rebuild_error.rs @@ -25,6 +25,17 @@ pub enum RebuildError { ReadIoFailed { source: CoreError, bdev: String }, #[snafu(display("Write IO failed for bdev {}", bdev))] WriteIoFailed { source: CoreError, bdev: String }, + #[snafu(display("Verify IO failed for bdev {}", bdev))] + VerifyIoFailed { source: CoreError, bdev: String }, + #[snafu(display( + "Verify compare failed for bdev {}: {}", + bdev, + verify_message + ))] + VerifyCompareFailed { + bdev: String, + verify_message: String, + }, #[snafu(display("Failed to find rebuild job {}", job))] JobNotFound { job: String }, #[snafu(display("Missing rebuild destination {}", job))] diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index ca9b2403a4..4128a93313 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -21,6 +21,23 @@ use super::{ }; use crate::core::{Reactors, VerboseError}; +/// Rebuild I/O verification mode. +#[derive(Debug, Clone)] +pub enum RebuildVerifyMode { + /// Do not verify rebuild I/Os. + None, + /// Fail rebuild job if I/O verification fails. + Fail, + /// Panic if I/O verification fails. + Panic, +} + +/// Rebuild job options. +#[derive(Debug, Clone)] +pub struct RebuildJobOptions { + pub verify_mode: RebuildVerifyMode, +} + /// Operations used to control the state of the job. #[derive(Debug)] pub(super) enum RebuildOperation { @@ -80,6 +97,7 @@ impl RebuildJob { src_uri: &str, dst_uri: &str, range: Range, + options: RebuildJobOptions, notify_fn: fn(String, String) -> (), ) -> Result { // Allocate an instance of the rebuild back-end. @@ -88,6 +106,7 @@ impl RebuildJob { src_uri, dst_uri, range.clone(), + options, notify_fn, ) .await?; diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index f2442e0c64..b3f0c4b009 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -19,6 +19,7 @@ use super::{ rebuild_error::{BdevInvalidUri, BdevNotFound, NoCopyBuffer}, RebuildDescriptor, RebuildError, + RebuildJobOptions, RebuildMap, RebuildState, RebuildStates, @@ -34,6 +35,7 @@ use crate::{ bdev::device_open, bdev_api::bdev_get_name, core::{BlockDevice, Reactors, UntypedBdev}, + rebuild::RebuildVerifyMode, }; /// Request between frontend and backend. @@ -149,6 +151,7 @@ impl RebuildJobBackend { src_uri: &str, dst_uri: &str, range: std::ops::Range, + options: RebuildJobOptions, notify_fn: fn(String, String) -> (), ) -> Result { let src_descriptor = device_open( @@ -201,11 +204,27 @@ impl RebuildJobBackend { }; for _ in 0 .. tasks.total { - let buffer = destination_hdl - .dma_malloc(segment_size_blks * block_size) + let buf_size = segment_size_blks * block_size; + let copy_buffer = destination_hdl + .dma_malloc(buf_size) .context(NoCopyBuffer {})?; - tasks.push(RebuildTask::new(buffer, tasks.channel.0.clone())); + let verify_buffer = + if matches!(options.verify_mode, RebuildVerifyMode::None) { + None + } else { + Some( + destination_hdl + .dma_malloc(buf_size) + .context(NoCopyBuffer {})?, + ) + }; + + tasks.push(RebuildTask::new( + copy_buffer, + verify_buffer, + tasks.channel.0.clone(), + )); } let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false) @@ -233,6 +252,7 @@ impl RebuildJobBackend { src_uri: src_uri.to_string(), dst_uri: dst_uri.to_string(), range, + options, block_size, segment_size_blks, src_descriptor, diff --git a/io-engine/src/rebuild/rebuild_task.rs b/io-engine/src/rebuild/rebuild_task.rs index bf12d7bb7c..a04fb9991f 100644 --- a/io-engine/src/rebuild/rebuild_task.rs +++ b/io-engine/src/rebuild/rebuild_task.rs @@ -29,28 +29,41 @@ pub(super) struct TaskResult { pub(super) error: Option, /// Indicates if the segment was actually transferred (partial rebuild may /// skip segments). - pub(super) is_transferred: bool, + is_transferred: bool, +} + +/// Rebuild task re-allocated buffers. +#[derive(Debug)] +struct Buffers { + /// The pre-allocated `DmaBuf` used to read/write. + copy_buffer: DmaBuf, + /// The pre-allocated `DmaBuf` used to verify. + verify_buffer: Option, } /// Each rebuild task needs a unique buffer to read/write from source to target. /// An mpsc channel is used to communicate with the management task. #[derive(Debug)] pub(super) struct RebuildTask { - /// The pre-allocated `DmaBuf` used to read/write. - pub(super) buffer: Mutex, + /// The pre-allocated buffers. + buffers: Mutex, /// The channel used to notify when the task completes/fails. - pub(super) sender: mpsc::Sender, + sender: mpsc::Sender, /// Last error seen by this particular task. - pub(super) error: Option, + error: Option, } impl RebuildTask { pub(super) fn new( - buffer: DmaBuf, + copy_buffer: DmaBuf, + verify_buffer: Option, sender: mpsc::Sender, ) -> Self { Self { - buffer: Mutex::new(buffer), + buffers: Mutex::new(Buffers { + copy_buffer, + verify_buffer, + }), sender, error: None, } @@ -125,12 +138,19 @@ impl RebuildTask { offset_blk: u64, desc: &RebuildDescriptor, ) -> Result<(), RebuildError> { - let buf_lock = self.buffer.lock().await; - let iov = desc.adjusted_iov(&buf_lock, offset_blk); + let buf_lock = self.buffers.lock().await; + let iov = desc.adjusted_iov(&buf_lock.copy_buffer, offset_blk); let iovs = &mut [iov]; if desc.read_src_segment(offset_blk, iovs).await? { desc.write_dst_segment(offset_blk, iovs).await?; + + if let Some(verify_buffer) = &buf_lock.verify_buffer { + let verify_iov = desc.adjusted_iov(verify_buffer, offset_blk); + let verify_iovs = &mut [verify_iov]; + + desc.verify_segment(offset_blk, iovs, verify_iovs).await?; + } } Ok(()) diff --git a/io-engine/tests/nexus_child_online.rs b/io-engine/tests/nexus_child_online.rs index 5b4d87a068..1e4e02b574 100644 --- a/io-engine/tests/nexus_child_online.rs +++ b/io-engine/tests/nexus_child_online.rs @@ -136,25 +136,13 @@ async fn nexus_child_online() { .await .unwrap(); - nex_0.offline_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - Some(ChildStateReason::ByClient), - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); - nex_0.online_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Online, - None, - Duration::from_secs(1), - ) + .online_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); diff --git a/io-engine/tests/nexus_rebuild_partial.rs b/io-engine/tests/nexus_rebuild_partial.rs index aa894b2263..2fd806c8ba 100644 --- a/io-engine/tests/nexus_rebuild_partial.rs +++ b/io-engine/tests/nexus_rebuild_partial.rs @@ -313,16 +313,8 @@ async fn nexus_partial_rebuild_offline_online() { .unwrap(); // Offline the replica. - nex_0.offline_child_replica(&repl_0).await.unwrap(); - - // Transition to offline state is not immediate, nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - None, - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); @@ -470,16 +462,11 @@ async fn nexus_partial_rebuild_double_fault() { let child_0_dev_name = children[0].device_name.as_ref().unwrap(); // Offline the replica again. - nex_0.offline_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - None, - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); + let children = nex_0.get_nexus().await.unwrap().children; assert_eq!(children[0].state(), ChildState::Degraded); assert_eq!(children[0].state_reason(), ChildStateReason::ByClient); @@ -553,16 +540,11 @@ async fn nexus_partial_rebuild_double_fault() { .unwrap(); // Offline the replica again. - nex_0.offline_child_replica(&repl_0).await.unwrap(); nex_0 - .wait_replica_state( - &repl_0, - ChildState::Degraded, - None, - Duration::from_secs(1), - ) + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) .await .unwrap(); + let children = nex_0.get_nexus().await.unwrap().children; assert_eq!(children[0].state(), ChildState::Degraded); assert_eq!(children[0].state_reason(), ChildStateReason::ByClient); diff --git a/io-engine/tests/nexus_rebuild_verify.rs b/io-engine/tests/nexus_rebuild_verify.rs new file mode 100644 index 0000000000..e11e741e1a --- /dev/null +++ b/io-engine/tests/nexus_rebuild_verify.rs @@ -0,0 +1,148 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use common::{ + compose::{ + rpc::v1::{ + nexus::{ChildState, ChildStateReason, RebuildJobState}, + GrpcConnect, + }, + Binary, + Builder, + }, + nexus::NexusBuilder, + pool::PoolBuilder, + replica::ReplicaBuilder, + testing::add_injection, +}; + +use std::time::Duration; + +#[allow(dead_code)] +struct StorageConfig { + pool_0: PoolBuilder, + pool_1: PoolBuilder, + repl_0: ReplicaBuilder, + repl_1: ReplicaBuilder, + nex_0: NexusBuilder, +} + +const POOL_SIZE: u64 = 80; +const REPL_SIZE: u64 = 60; +const NEXUS_SIZE: u64 = REPL_SIZE; + +#[tokio::test] +async fn nexus_rebuild_verify_local() { + common::composer_init(); + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec!["-l", "1"]), + ) + .add_container_bin( + "ms_1", + Binary::from_dbg("io-engine").with_args(vec!["-l", "2"]), + ) + .add_container_bin( + "ms_nex", + Binary::from_dbg("io-engine") + // Disable partial rebuild to force rebuild I/O. + .with_env("NEXUS_PARTIAL_REBUILD", "0") + // Set rebuild revify mode to fail. + .with_env("NEXUS_REBUILD_VERIFY", "fail") + .with_args(vec!["-l", "3", "-Fcolor,compact"]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + + let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); + let ms_1 = conn.grpc_handle_shared("ms_1").await.unwrap(); + let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(ms_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(false); + + pool_0.create().await.unwrap(); + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let mut pool_1 = PoolBuilder::new(ms_1.clone()) + .with_name("pool1") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_1 = ReplicaBuilder::new(ms_1.clone()) + .with_pool(&pool_1) + .with_name("r1") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(false); + + pool_1.create().await.unwrap(); + repl_1.create().await.unwrap(); + repl_1.share().await.unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name("nexus0") + .with_new_uuid() + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0) + .with_replica(&repl_1); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); + + let children = nex_0.get_nexus().await.unwrap().children; + assert_eq!(children.len(), 2); + let dev_name = children[0].device_name.as_ref().unwrap(); + + // Offline the replica. + nex_0 + .offline_child_replica_wait(&repl_0, Duration::from_secs(1)) + .await + .unwrap(); + + // Add an injection as block device level. + let inj_part = "domain=block&op=write&stage=submission&type=data\ + &offset=10240&num_blk=1"; + let inj_uri = format!("inject://{dev_name}?{inj_part}"); + add_injection(nex_0.rpc(), &inj_uri).await.unwrap(); + + // Online the replica. Rebuild must fail at some point because of injected + // miscompare. + nex_0.online_child_replica(&repl_0).await.unwrap(); + + // Wait until the rebuild fails. + nex_0 + .wait_replica_state( + &repl_0, + ChildState::Faulted, + Some(ChildStateReason::RebuildFailed), + Duration::from_secs(5), + ) + .await + .unwrap(); + + // Check that the rebuild history has a single failed record. + let hist = nex_0.get_rebuild_history().await.unwrap(); + assert_eq!(hist.len(), 1); + assert_eq!(hist[0].state(), RebuildJobState::Failed); +}