Skip to content

Commit

Permalink
Merge #1755
Browse files Browse the repository at this point in the history
1755: Reuse Rebuild IO handles r=tiagolobocastro a=tiagolobocastro

    fix(rebuild): reuse rebuild IO handles
    
    Reuses the rebuild IO handles, rather than attempting to allocate
    them per rebuild task.
    The main issue with handle allocation on the fly is that the target
    may have not cleaned up a previous IO qpair connection, and so the
    connect may fail. We started seeing this more on CI because we forgot
    to cherry-pick a commit increasing the retry delay.
    However, after inspecting a bunch of user support bundles I see that
    we still have occasional connect errors. Rather than increasing the
    timeout, we attempt here to reuse the handles, thus avoid the
    problem almost entirely.
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    refactor(rebuild): rebuild completion is not an error
    
    When the rebuild has been complete, if we wait for it this fails because
    the channels are not longer available.
    Instead, simply return the rebuild state, since this is what we want anyway.
    
    Signed-off-by: Tiago Castro <[email protected]>

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Oct 17, 2024
2 parents e279153 + b59bc00 commit c7ec2b1
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[submodule "spdk-rs"]
path = spdk-rs
url = https://github.com/openebs/spdk-rs
url = ../spdk-rs.git
branch = develop
[submodule "utils/dependencies"]
path = utils/dependencies
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions io-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ libc = "0.2.149"
log = "0.4.20"
md5 = "0.7.0"
merge = "0.1.0"
nix = { version = "0.27.1", default-features = false, features = [ "hostname", "net", "socket", "ioctl" ] }
nix = { version = "0.27.1", default-features = false, features = ["hostname", "net", "socket", "ioctl"] }
once_cell = "1.18.0"
parking_lot = "0.12.1"
pin-utils = "0.1.0"
Expand Down Expand Up @@ -102,9 +102,10 @@ async-process = { version = "1.8.1" }
rstack = { version = "0.3.3" }
tokio-stream = "0.1.14"
rustls = "0.21.12"
either = "1.9.0"

devinfo = { path = "../utils/dependencies/devinfo" }
jsonrpc = { path = "../jsonrpc"}
jsonrpc = { path = "../jsonrpc" }
io-engine-api = { path = "../utils/dependencies/apis/io-engine" }
spdk-rs = { path = "../spdk-rs" }
sysfs = { path = "../sysfs" }
Expand Down
9 changes: 5 additions & 4 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,11 @@ impl<'n> Nexus<'n> {
// Cancel rebuild job for this child, if any.
if let Some(job) = child.rebuild_job() {
debug!("{self:?}: retire: stopping rebuild job...");
let terminated = job.force_fail();
Reactors::master().send_future(async move {
terminated.await.ok();
});
if let either::Either::Left(terminated) = job.force_fail() {
Reactors::master().send_future(async move {
terminated.await.ok();
});
}
}

debug!("{child:?}: retire: enqueuing device '{dev}' to retire");
Expand Down
22 changes: 14 additions & 8 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ impl<'n> Nexus<'n> {
async fn terminate_rebuild(&self, child_uri: &str) {
// If a rebuild job is not found that's ok
// as we were just going to remove it anyway.
if let Ok(rj) = self.rebuild_job_mut(child_uri) {
let ch = rj.force_stop();
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
let Ok(rj) = self.rebuild_job_mut(child_uri) else {
return;
};
let either::Either::Left(ch) = rj.force_stop() else {
return;
};
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
to terminate with error {}",
e.verbose()
);
}
e.verbose()
);
}
}

Expand Down Expand Up @@ -355,6 +358,9 @@ impl<'n> Nexus<'n> {

// wait for the jobs to complete terminating
for job in terminated_jobs {
let either::Either::Left(job) = job else {
continue;
};
if let Err(e) = job.await {
error!(
"{:?}: error when waiting for the rebuild job \
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/grpc/v1/snapshot_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl SnapshotRebuildRpc for SnapshotRebuildService {
let Ok(job) = SnapshotRebuildJob::lookup(&args.uuid) else {
return Err(tonic::Status::not_found(""));
};
let rx = job.force_stop().await.ok();
let rx = match job.force_stop() {
either::Either::Left(chan) => chan.await,
either::Either::Right(stopped) => Ok(stopped),
};
info!("Snapshot Rebuild stopped: {rx:?}");
job.destroy();
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ impl WithinRange<u64> for std::ops::Range<u64> {
/// Shutdown all pending snapshot rebuilds.
pub(crate) async fn shutdown_snapshot_rebuilds() {
let jobs = SnapshotRebuildJob::list().into_iter();
for recv in jobs.map(|job| job.force_stop()).collect::<Vec<_>>() {
for recv in jobs
.flat_map(|job| job.force_stop().left())
.collect::<Vec<_>>()
{
recv.await.ok();
}
}
Expand Down
29 changes: 12 additions & 17 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ pub(super) struct RebuildDescriptor {
/// Pre-opened descriptor for the source block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) src_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) src_handle: Box<dyn BlockDeviceHandle>,
/// Pre-opened descriptor for destination block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) dst_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) dst_handle: Box<dyn BlockDeviceHandle>,
/// Start time of this rebuild.
pub(super) start_time: DateTime<Utc>,
}
Expand Down Expand Up @@ -90,9 +92,8 @@ impl RebuildDescriptor {
});
}

let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let destination_hdl =
RebuildDescriptor::io_handle(&*dst_descriptor).await?;
let src_handle = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let dst_handle = RebuildDescriptor::io_handle(&*dst_descriptor).await?;

let range = match range {
None => {
Expand All @@ -105,8 +106,8 @@ impl RebuildDescriptor {
};

if !Self::validate(
source_hdl.get_device(),
destination_hdl.get_device(),
src_handle.get_device(),
dst_handle.get_device(),
&range,
) {
return Err(RebuildError::InvalidSrcDstRange {});
Expand All @@ -123,7 +124,9 @@ impl RebuildDescriptor {
block_size,
segment_size_blks,
src_descriptor,
src_handle,
dst_descriptor,
dst_handle,
start_time: Utc::now(),
})
}
Expand Down Expand Up @@ -173,18 +176,14 @@ impl RebuildDescriptor {

/// Get a `BlockDeviceHandle` for the source.
#[inline(always)]
pub(super) async fn src_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.src_descriptor).await
pub(super) fn src_io_handle(&self) -> &dyn BlockDeviceHandle {
self.src_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the destination.
#[inline(always)]
pub(super) async fn dst_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.dst_descriptor).await
pub(super) fn dst_io_handle(&self) -> &dyn BlockDeviceHandle {
self.dst_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the given block device descriptor.
Expand Down Expand Up @@ -231,7 +230,6 @@ impl RebuildDescriptor {
) -> Result<bool, RebuildError> {
match self
.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand Down Expand Up @@ -269,7 +267,6 @@ impl RebuildDescriptor {
iovs: &[IoVec],
) -> Result<(), RebuildError> {
self.dst_io_handle()
.await?
.writev_blocks_async(
iovs,
offset_blk,
Expand All @@ -291,7 +288,6 @@ impl RebuildDescriptor {
) -> Result<(), RebuildError> {
// Read the source again.
self.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand All @@ -306,7 +302,6 @@ impl RebuildDescriptor {

match self
.dst_io_handle()
.await?
.comparev_blocks_async(
iovs,
offset_blk,
Expand Down
17 changes: 12 additions & 5 deletions io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ impl RebuildJob {

/// Forcefully stops the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_stop(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_stop(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Stop)
}

/// Forcefully fails the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_fail(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_fail(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Fail)
}

Expand All @@ -179,10 +183,13 @@ impl RebuildJob {
fn force_terminate(
&self,
op: RebuildOperation,
) -> oneshot::Receiver<RebuildState> {
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.exec_internal_op(op).ok();
self.add_completion_listener()
.unwrap_or_else(|_| oneshot::channel().1)

match self.add_completion_listener() {
Ok(chan) => either::Either::Left(chan),
Err(_) => either::Either::Right(self.state()),
}
}

/// Get the rebuild stats.
Expand Down
2 changes: 1 addition & 1 deletion spdk-rs

0 comments on commit c7ec2b1

Please sign in to comment.