Skip to content

Commit

Permalink
feat(rebuild): adding rebuild I/O verification
Browse files Browse the repository at this point in the history
At this moment rebuild verification is controlled via NEXUS_REBUILD_VERIFY
shell var:
- "fail": rebuild job fails if verification fails
- "panic": I/O engine panics if verification fails
- any other value or not set: do no verify

Signed-off-by: Dmitry Savitskiy <[email protected]>
  • Loading branch information
dsavitskiy committed Jul 30, 2023
1 parent 4854d40 commit 3298bda
Show file tree
Hide file tree
Showing 11 changed files with 412 additions and 53 deletions.
25 changes: 25 additions & 0 deletions io-engine-tests/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ impl NexusBuilder {
self.online_child_bdev(&self.replica_uri(r)).await
}

pub async fn online_child_replica_wait(
&self,
r: &ReplicaBuilder,
d: Duration,
) -> Result<(), Status> {
self.online_child_replica(r).await?;
self.wait_replica_state(r, ChildState::Online, None, d)
.await
}

pub async fn offline_child_bdev(
&self,
bdev: &str,
Expand All @@ -297,6 +307,21 @@ impl NexusBuilder {
self.offline_child_bdev(&self.replica_uri(r)).await
}

pub async fn offline_child_replica_wait(
&self,
r: &ReplicaBuilder,
d: Duration,
) -> Result<(), Status> {
self.offline_child_replica(r).await?;
self.wait_replica_state(
r,
ChildState::Degraded,
Some(ChildStateReason::ByClient),
d,
)
.await
}

pub async fn add_injection_at_replica(
&self,
r: &ReplicaBuilder,
Expand Down
28 changes: 28 additions & 0 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use crate::{
HistoryRecord,
RebuildError,
RebuildJob,
RebuildJobOptions,
RebuildState,
RebuildStats,
RebuildVerifyMode,
},
};

Expand Down Expand Up @@ -151,6 +153,31 @@ impl<'n> Nexus<'n> {
src_child_uri: &str,
dst_child_uri: &str,
) -> Result<(), Error> {
let verify_mode = match std::env::var("NEXUS_REBUILD_VERIFY")
.unwrap_or_default()
.as_str()
{
"fail" => {
warn!(
"{self:?}: starting rebuild for '{dst_child_uri}' with \
fail verification mode"
);
RebuildVerifyMode::Fail
}
"panic" => {
warn!(
"{self:?}: starting rebuild for '{dst_child_uri}' with \
panic verification mode"
);
RebuildVerifyMode::Panic
}
_ => RebuildVerifyMode::None,
};

let opts = RebuildJobOptions {
verify_mode,
};

RebuildJob::new(
&self.name,
src_child_uri,
Expand All @@ -159,6 +186,7 @@ impl<'n> Nexus<'n> {
start: self.data_ent_offset,
end: self.num_blocks() + self.data_ent_offset,
},
opts,
|nexus, job| {
Reactors::current().send_future(async move {
Nexus::notify_rebuild(nexus, job).await;
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ mod rebuild_task;

use rebuild_descriptor::RebuildDescriptor;
pub(crate) use rebuild_error::RebuildError;
pub use rebuild_job::RebuildJob;
use rebuild_job::RebuildOperation;
pub use rebuild_job::{RebuildJob, RebuildJobOptions, RebuildVerifyMode};
use rebuild_job_backend::{
RebuildFBendChan,
RebuildJobBackend,
Expand Down
124 changes: 121 additions & 3 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::core::{
ReadOptions,
};

use super::{RebuildError, RebuildMap};
use super::{RebuildError, RebuildJobOptions, RebuildMap, RebuildVerifyMode};

/// Contains all descriptors and their associated information which allows the
/// tasks to copy/rebuild data from source to destination.
Expand All @@ -21,13 +21,15 @@ pub(super) struct RebuildDescriptor {
pub(super) block_size: u64,
/// The range of the entire rebuild.
pub(super) range: std::ops::Range<u64>,
/// Rebuild job options.
pub(super) options: RebuildJobOptions,
/// Segment size in blocks (number of segments divided by device block
/// size).
pub(super) segment_size_blks: u64,
/// Source URI of the healthy child to rebuild from.
pub src_uri: String,
pub(super) src_uri: String,
/// Target URI of the out of sync child to rebuild.
pub dst_uri: String,
pub(super) dst_uri: String,
/// Pre-opened descriptor for the source block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) src_descriptor: Box<dyn BlockDeviceDescriptor>,
Expand Down Expand Up @@ -183,4 +185,120 @@ impl RebuildDescriptor {
bdev: self.dst_uri.clone(),
})
}

/// Verify segment copy operation by reading destination, and comparing with
/// the source.
pub(super) async fn verify_segment(
&self,
offset_blk: u64,
copy_iovs: &[IoVec],
verify_iovs: &mut [IoVec],
) -> Result<(), RebuildError> {
// Read the source again.
self.src_io_handle()
.await?
.readv_blocks_async(
verify_iovs,
offset_blk,
self.get_segment_size_blks(offset_blk),
ReadOptions::None,
)
.await
.map_err(|err| RebuildError::VerifyIoFailed {
source: err,
bdev: self.dst_uri.clone(),
})?;

// Compare the original buffer from the source with the new one from the
// source, to catch read errors.
self.verify_iovs(offset_blk, copy_iovs, verify_iovs, "source")?;

// Read the destination.
self.dst_io_handle()
.await?
.readv_blocks_async(
verify_iovs,
offset_blk,
self.get_segment_size_blks(offset_blk),
ReadOptions::None,
)
.await
.map_err(|err| RebuildError::VerifyIoFailed {
source: err,
bdev: self.dst_uri.clone(),
})?;

// Compare destionation with the source.
self.verify_iovs(offset_blk, copy_iovs, verify_iovs, "destination")
}

/// Verifies two arrays of `IoVec` buffers.
fn verify_iovs(
&self,
offset_blk: u64,
a: &[IoVec],
b: &[IoVec],
obj: &str,
) -> Result<(), RebuildError> {
assert_eq!(a.len(), b.len());
for i in 0 .. a.len() {
self.verify_iov(offset_blk, &a[i], &b[i], obj)?;
}
Ok(())
}

/// Verifies two `IoVec` buffers.
fn verify_iov(
&self,
offset_blk: u64,
a: &IoVec,
b: &IoVec,
obj: &str,
) -> Result<(), RebuildError> {
let Some(idx) = IoVec::compare(a, b) else {
return Ok(());
};

let msg = if a.len() != b.len() {
format!(
"buffers have different lengths: {a} != {b}",
a = a.len(),
b = b.len()
)
} else {
format!(
"buffers differ at {pos} (block {off} x {bs} + {idx}): \
0x{a:x} != 0x{b:x}",
pos = offset_blk * self.block_size + idx,
off = offset_blk,
bs = self.block_size,
a = a[idx],
b = b[idx]
)
};

let msg = format!(
"Rebuild job '{src}' -> '{dst}': {obj} verification failed: {msg}",
src = self.src_uri,
dst = self.dst_uri
);

match self.options.verify_mode {
RebuildVerifyMode::None => {
error!("{msg}: ignoring");
Ok(())
}
RebuildVerifyMode::Fail => {
error!("{msg}: failing rebuild");
Err(RebuildError::VerifyCompareFailed {
bdev: self.dst_uri.clone(),
verify_message: msg,
})
}
RebuildVerifyMode::Panic => {
error!("{msg}: will panic");
panic!("{}", msg);
}
}
}
}
11 changes: 11 additions & 0 deletions io-engine/src/rebuild/rebuild_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ pub enum RebuildError {
ReadIoFailed { source: CoreError, bdev: String },
#[snafu(display("Write IO failed for bdev {}", bdev))]
WriteIoFailed { source: CoreError, bdev: String },
#[snafu(display("Verify IO failed for bdev {}", bdev))]
VerifyIoFailed { source: CoreError, bdev: String },
#[snafu(display(
"Verify compare failed for bdev {}: {}",
bdev,
verify_message
))]
VerifyCompareFailed {
bdev: String,
verify_message: String,
},
#[snafu(display("Failed to find rebuild job {}", job))]
JobNotFound { job: String },
#[snafu(display("Missing rebuild destination {}", job))]
Expand Down
19 changes: 19 additions & 0 deletions io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ use super::{
};
use crate::core::{Reactors, VerboseError};

/// Rebuild I/O verification mode.
#[derive(Debug, Clone)]
pub enum RebuildVerifyMode {
/// Do not verify rebuild I/Os.
None,
/// Fail rebuild job if I/O verification fails.
Fail,
/// Panic if I/O verification fails.
Panic,
}

/// Rebuild job options.
#[derive(Debug, Clone)]
pub struct RebuildJobOptions {
pub verify_mode: RebuildVerifyMode,
}

/// Operations used to control the state of the job.
#[derive(Debug)]
pub(super) enum RebuildOperation {
Expand Down Expand Up @@ -80,6 +97,7 @@ impl RebuildJob {
src_uri: &str,
dst_uri: &str,
range: Range<u64>,
options: RebuildJobOptions,
notify_fn: fn(String, String) -> (),
) -> Result<Self, RebuildError> {
// Allocate an instance of the rebuild back-end.
Expand All @@ -88,6 +106,7 @@ impl RebuildJob {
src_uri,
dst_uri,
range.clone(),
options,
notify_fn,
)
.await?;
Expand Down
26 changes: 23 additions & 3 deletions io-engine/src/rebuild/rebuild_job_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::{
rebuild_error::{BdevInvalidUri, BdevNotFound, NoCopyBuffer},
RebuildDescriptor,
RebuildError,
RebuildJobOptions,
RebuildMap,
RebuildState,
RebuildStates,
Expand All @@ -34,6 +35,7 @@ use crate::{
bdev::device_open,
bdev_api::bdev_get_name,
core::{BlockDevice, Reactors, UntypedBdev},
rebuild::RebuildVerifyMode,
};

/// Request between frontend and backend.
Expand Down Expand Up @@ -149,6 +151,7 @@ impl RebuildJobBackend {
src_uri: &str,
dst_uri: &str,
range: std::ops::Range<u64>,
options: RebuildJobOptions,
notify_fn: fn(String, String) -> (),
) -> Result<Self, RebuildError> {
let src_descriptor = device_open(
Expand Down Expand Up @@ -201,11 +204,27 @@ impl RebuildJobBackend {
};

for _ in 0 .. tasks.total {
let buffer = destination_hdl
.dma_malloc(segment_size_blks * block_size)
let buf_size = segment_size_blks * block_size;
let copy_buffer = destination_hdl
.dma_malloc(buf_size)
.context(NoCopyBuffer {})?;

tasks.push(RebuildTask::new(buffer, tasks.channel.0.clone()));
let verify_buffer =
if matches!(options.verify_mode, RebuildVerifyMode::None) {
None
} else {
Some(
destination_hdl
.dma_malloc(buf_size)
.context(NoCopyBuffer {})?,
)
};

tasks.push(RebuildTask::new(
copy_buffer,
verify_buffer,
tasks.channel.0.clone(),
));
}

let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false)
Expand Down Expand Up @@ -233,6 +252,7 @@ impl RebuildJobBackend {
src_uri: src_uri.to_string(),
dst_uri: dst_uri.to_string(),
range,
options,
block_size,
segment_size_blks,
src_descriptor,
Expand Down
Loading

0 comments on commit 3298bda

Please sign in to comment.