diff --git a/io-engine/src/bdev/device.rs b/io-engine/src/bdev/device.rs index aa3deb6d3..218b71749 100644 --- a/io-engine/src/bdev/device.rs +++ b/io-engine/src/bdev/device.rs @@ -67,7 +67,7 @@ use crate::core::fault_injection::{ FaultDomain, InjectIoCtx, }; -use crate::replica_backend::bdev_as_replica; +use crate::replica_backend::ReplicaFactory; /// TODO type EventDispatcherMap = HashMap; @@ -584,14 +584,14 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { }) } - // NVMe commands are not applicable for non-NVMe devices. + /// NVMe commands are not applicable for non-NVMe devices. async fn create_snapshot( &self, snapshot: SnapshotParams, ) -> Result { let bdev = self.handle.get_bdev(); - let Some(mut replica) = bdev_as_replica(bdev) else { + let Some(mut replica) = ReplicaFactory::bdev_as_replica(bdev) else { return Err(CoreError::NotSupported { source: Errno::ENXIO, }); diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index 0ac9526a0..08a4f269d 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -67,7 +67,7 @@ use crate::{ subsys::NvmfSubsystem, }; -use crate::core::{BlockDeviceIoStats, CoreError, IoCompletionStatus}; +use crate::core::{BdevStater, BdevStats, CoreError, IoCompletionStatus}; use events_api::event::EventAction; use spdk_rs::{ libspdk::spdk_bdev_notify_blockcnt_change, @@ -360,6 +360,21 @@ impl Display for NexusState { } } +#[async_trait::async_trait(?Send)] +impl BdevStater for Nexus<'_> { + type Stats = BdevStats; + + async fn stats(&self) -> Result { + let bdev = unsafe { self.bdev() }; + bdev.stats().await + } + + async fn reset_stats(&self) -> Result<(), CoreError> { + let bdev = unsafe { self.bdev() }; + bdev.reset_bdev_io_stats().await + } +} + impl<'n> Nexus<'n> { /// create a new nexus instance with optionally directly attaching /// children to it. @@ -535,14 +550,6 @@ impl<'n> Nexus<'n> { unsafe { self.bdev().name().to_string() } } - /// Returns io stats for underlying Bdev. - pub(crate) async fn bdev_stats( - &self, - ) -> Result { - let bdev = unsafe { self.bdev() }; - bdev.stats_async().await - } - /// TODO pub fn req_size(&self) -> u64 { self.req_size diff --git a/io-engine/src/core/bdev.rs b/io-engine/src/core/bdev.rs index 32e08abb7..b3cb81f05 100644 --- a/io-engine/src/core/bdev.rs +++ b/io-engine/src/core/bdev.rs @@ -152,8 +152,8 @@ where } /// Gets tick rate of the current io engine instance. - /// NOTE: tick_rate returned in SPDK struct is not accurate. Hence we get it - /// via this method. + /// NOTE: tick_rate returned in SPDK struct is not accurate. Hence, we get + /// it via this method. pub fn get_tick_rate(&self) -> u64 { unsafe { spdk_get_ticks_hz() } } @@ -404,3 +404,59 @@ where Default::default() } } + +/// A Bdev should expose information and IO stats as well as having the ability +/// to reset the cumulative stats. +#[async_trait::async_trait(?Send)] +pub trait BdevStater { + type Stats; + + /// Gets tick rate of the bdev stats. + fn tick_rate(&self) -> u64 { + unsafe { spdk_get_ticks_hz() } + } + + /// Returns IoStats for a particular bdev. + async fn stats(&self) -> Result; + + /// Resets io stats for a given Bdev. + async fn reset_stats(&self) -> Result<(), CoreError>; +} + +/// Bdev IO stats along with its name and uuid. +pub struct BdevStats { + /// Name of the Bdev. + pub name: String, + /// Uuid of the Bdev. + pub uuid: String, + /// Stats of the Bdev. + pub stats: BlockDeviceIoStats, +} +impl BdevStats { + /// Create a new `Self` from the given parts. + pub fn new(name: String, uuid: String, stats: BlockDeviceIoStats) -> Self { + Self { + name, + uuid, + stats, + } + } +} + +#[async_trait::async_trait(?Send)] +impl BdevStater for Bdev { + type Stats = BdevStats; + + async fn stats(&self) -> Result { + let stats = self.stats_async().await?; + Ok(BdevStats::new( + self.name().to_string(), + self.uuid_as_string(), + stats, + )) + } + + async fn reset_stats(&self) -> Result<(), CoreError> { + self.reset_bdev_io_stats().await + } +} diff --git a/io-engine/src/core/mod.rs b/io-engine/src/core/mod.rs index 6e63984b6..cbb9e3a1a 100644 --- a/io-engine/src/core/mod.rs +++ b/io-engine/src/core/mod.rs @@ -8,7 +8,7 @@ use std::{ use nix::errno::Errno; use snafu::Snafu; -pub use bdev::{Bdev, BdevIter, UntypedBdev}; +pub use bdev::{Bdev, BdevIter, BdevStater, BdevStats, UntypedBdev}; pub use block_device::{ BlockDevice, BlockDeviceDescriptor, diff --git a/io-engine/src/grpc/v1/pool.rs b/io-engine/src/grpc/v1/pool.rs index 963648264..accf35dce 100644 --- a/io-engine/src/grpc/v1/pool.rs +++ b/io-engine/src/grpc/v1/pool.rs @@ -1,3 +1,4 @@ +pub use crate::pool_backend::FindPoolArgs as PoolIdProbe; use crate::{ core::{ NvmfShareProps, @@ -16,6 +17,7 @@ use crate::{ lvs::{BsError, LvsError}, pool_backend::{ FindPoolArgs, + IPoolFactory, ListPoolArgs, PoolArgs, PoolBackend, @@ -34,14 +36,12 @@ use io_engine_api::v1::{ use std::{convert::TryFrom, fmt::Debug, ops::Deref, panic::AssertUnwindSafe}; use tonic::{Request, Status}; -pub use crate::pool_backend::FindPoolArgs as PoolIdProbe; - #[derive(Debug)] struct UnixStream(tokio::net::UnixStream); impl From for FindPoolArgs { fn from(value: DestroyPoolRequest) -> Self { - Self::name_uuid(&value.name, &value.uuid) + Self::name_uuid(value.name, &value.uuid) } } impl From<&destroy_replica_request::Pool> for FindPoolArgs { @@ -68,7 +68,7 @@ impl From<&destroy_snapshot_request::Pool> for FindPoolArgs { } impl From for FindPoolArgs { fn from(value: ExportPoolRequest) -> Self { - Self::name_uuid(&value.name, &value.uuid) + Self::name_uuid(value.name, &value.uuid) } } @@ -385,83 +385,51 @@ impl PoolBackend { } /// A pool factory with the various types of specific impls. -pub(crate) struct GrpcPoolFactory { - pool_factory: Box, -} +pub(crate) struct GrpcPoolFactory(PoolFactory); impl GrpcPoolFactory { - fn factories() -> Vec { - vec![PoolBackend::Lvm, PoolBackend::Lvs] + pub(crate) fn factories() -> Vec { + PoolFactory::factories() .into_iter() - .filter_map(|b| Self::new(b).ok()) - .collect() + .map(Self) + .collect::>() } fn new(backend: PoolBackend) -> Result { backend.enabled()?; - let pool_factory = match backend { - PoolBackend::Lvs => { - Box::::default() as _ - } - PoolBackend::Lvm => { - Box::::default() as _ - } - }; - Ok(Self { - pool_factory, - }) + Ok(Self(PoolFactory::new(backend))) } /// Probe backends for the given name and/or uuid and return the right one. pub(crate) async fn finder>( args: I, - ) -> Result { - let args = args.into(); - let mut error = None; - - for factory in Self::factories() { - match factory.find_pool(&args).await { - Ok(Some(pool)) => { - return Ok(pool); - } - Ok(None) => {} - Err(err) => { - error = Some(err); - } - } - } - Err(error.unwrap_or_else(|| { - Status::not_found(format!("Pool {args:?} not found")) - })) - } - async fn find_pool( - &self, - args: &FindPoolArgs, - ) -> Result, tonic::Status> { - let pool = self.as_pool_factory().find(args).await?; - match pool { - Some(pool) => { - let pool_subsystem = ResourceLockManager::get_instance() - .get_subsystem(ProtectedSubsystems::POOL); - let lock_guard = - acquire_subsystem_lock(pool_subsystem, Some(pool.name())) - .await?; - Ok(Some(PoolGrpc::new(pool, lock_guard))) - } - None => Ok(None), - } + ) -> Result { + let pool = PoolFactory::find(args).await?; + let pool_subsystem = ResourceLockManager::get_instance() + .get_subsystem(ProtectedSubsystems::POOL); + let lock_guard = + acquire_subsystem_lock(pool_subsystem, Some(pool.name())).await?; + Ok(PoolGrpc::new(pool, lock_guard)) } async fn list(&self, args: &ListPoolArgs) -> Result, Status> { - let pools = self.as_pool_factory().list(args).await?; + let pools = self.as_factory().list(args).await?; Ok(pools.into_iter().map(Into::into).collect::>()) } + /// Lists all `PoolOps` matching the given arguments. + pub(crate) async fn list_ops( + &self, + args: &ListPoolArgs, + ) -> Result>, Status> { + let pools = self.as_factory().list(args).await?; + Ok(pools) + } fn backend(&self) -> PoolBackend { - self.as_pool_factory().backend() + self.as_factory().backend() } async fn ensure_not_found( &self, args: &FindPoolArgs, backend: PoolBackend, ) -> Result<(), Status> { - if self.as_pool_factory().find(args).await?.is_some() { + if self.as_factory().find(args).await?.is_some() { if self.backend() != backend { return Err(Status::invalid_argument( "Pool Already exists on another backend type", @@ -488,7 +456,7 @@ impl GrpcPoolFactory { // todo: inspect disk contents as well! factory.ensure_not_found(&finder, args.backend).await?; } - let pool = self.as_pool_factory().create(args).await?; + let pool = self.as_factory().create(args).await?; Ok(pool.into()) } async fn import(&self, args: PoolArgs) -> Result { @@ -501,11 +469,11 @@ impl GrpcPoolFactory { for factory in Self::factories() { factory.ensure_not_found(&finder, args.backend).await?; } - let pool = self.as_pool_factory().import(args).await?; + let pool = self.as_factory().import(args).await?; Ok(pool.into()) } - fn as_pool_factory(&self) -> &dyn PoolFactory { - self.pool_factory.deref() + fn as_factory(&self) -> &dyn IPoolFactory { + self.0.as_factory() } } @@ -641,7 +609,7 @@ impl PoolRpc for PoolService { pools.extend(fpools); } Err(error) => { - let backend = factory.pool_factory.backend(); + let backend = factory.0.as_factory().backend(); tracing::error!("Failed to list pools of type {backend:?}, error: {error}"); } } diff --git a/io-engine/src/grpc/v1/replica.rs b/io-engine/src/grpc/v1/replica.rs index 4ffd3ebba..687ca8377 100644 --- a/io-engine/src/grpc/v1/replica.rs +++ b/io-engine/src/grpc/v1/replica.rs @@ -1,10 +1,13 @@ use crate::{ core::{ logical_volume::LvolSpaceUsage, + wiper::{WipeMethod, Wiper}, + Bdev, NvmfShareProps, ProtectedSubsystems, Protocol, ResourceLockManager, + ToErrno, UpdateProps, }, grpc::{ @@ -18,6 +21,7 @@ use crate::{ pool_backend::{FindPoolArgs, PoolBackend}, replica_backend::{ FindReplicaArgs, + IReplicaFactory, ListCloneArgs, ListReplicaArgs, ListSnapshotArgs, @@ -197,42 +201,18 @@ fn filter_replicas_by_replica_type( } /// A replica factory with the various types of specific impls. -pub(crate) struct GrpcReplicaFactory { - repl_factory: Box, -} +pub(crate) struct GrpcReplicaFactory(ReplicaFactory); impl GrpcReplicaFactory { pub(crate) fn factories() -> Vec { - crate::replica_backend::factories() + ReplicaFactory::factories() .into_iter() - .map(|repl_factory| Self { - repl_factory, - }) + .map(Self) .collect::>() } - async fn find_ops( - args: &FindReplicaArgs, - ) -> Result, Status> { - let mut error = None; - - for factory in Self::factories() { - match factory.find_replica(args).await { - Ok(Some(replica)) => { - return Ok(replica); - } - Ok(None) => {} - Err(err) => { - error = Some(err); - } - } - } - Err(error.unwrap_or_else(|| { - Status::not_found(format!("Replica {args:?} not found")) - })) - } pub(crate) async fn finder( args: &FindReplicaArgs, ) -> Result { - let replica = Self::find_ops(args).await?; + let replica = ReplicaFactory::find(args).await?; Ok(ReplicaGrpc::new(replica)) } pub(crate) async fn pool_finder>( @@ -246,26 +226,20 @@ impl GrpcReplicaFactory { } }) } - async fn find_replica( - &self, - args: &FindReplicaArgs, - ) -> Result>, tonic::Status> { - let replica = self.as_factory().find(args).await?; - if let Some(replica) = &replica { - // should this be an error? - if replica.is_snapshot() { - return Ok(None); - } - } - Ok(replica) - } - async fn list( + pub(crate) async fn list( &self, args: &ListReplicaArgs, ) -> Result, Status> { let replicas = self.as_factory().list(args).await?; Ok(replicas.into_iter().map(Into::into).collect::>()) } + pub(crate) async fn list_ops( + &self, + args: &ListReplicaArgs, + ) -> Result>, Status> { + let replicas = self.as_factory().list(args).await?; + Ok(replicas) + } pub(crate) async fn list_snaps( &self, args: &ListSnapshotArgs, @@ -283,8 +257,8 @@ impl GrpcReplicaFactory { pub(crate) fn backend(&self) -> PoolBackend { self.as_factory().backend() } - fn as_factory(&self) -> &dyn ReplicaFactory { - self.repl_factory.deref() + fn as_factory(&self) -> &dyn IReplicaFactory { + self.0.as_factory() } } @@ -292,12 +266,28 @@ impl GrpcReplicaFactory { pub(crate) struct ReplicaGrpc { pub(crate) replica: Box, } + impl ReplicaGrpc { fn new(replica: Box) -> Self { Self { replica, } } + /// Get a wiper for this replica. + pub(crate) fn wiper( + &self, + wipe_method: WipeMethod, + ) -> Result { + let hdl = Bdev::open(&self.replica.try_as_bdev()?, true) + .and_then(|desc| desc.into_handle()) + .map_err(|e| crate::lvs::LvsError::Invalid { + msg: e.to_string(), + source: crate::lvs::BsError::from_errno(e.to_errno()), + })?; + + let wiper = Wiper::new(hdl, wipe_method)?; + Ok(wiper) + } async fn destroy(self) -> Result<(), Status> { self.replica.destroy().await?; Ok(()) @@ -353,7 +343,7 @@ impl ReplicaGrpc { self.replica.set_entity_id(id).await?; Ok(()) } - fn verify_pool(&self, pool: &PoolGrpc) -> Result<(), Status> { + pub(crate) fn verify_pool(&self, pool: &PoolGrpc) -> Result<(), Status> { let pool = pool.as_ops(); let replica = &self.replica; if pool.name() != replica.pool_name() diff --git a/io-engine/src/grpc/v1/snapshot.rs b/io-engine/src/grpc/v1/snapshot.rs index 39e6027da..f8c6f464b 100644 --- a/io-engine/src/grpc/v1/snapshot.rs +++ b/io-engine/src/grpc/v1/snapshot.rs @@ -16,7 +16,6 @@ use crate::{ GrpcResult, RWSerializer, }, - replica_backend, }; use ::function_name::named; use chrono::{DateTime, Utc}; @@ -227,6 +226,7 @@ use crate::{ FindSnapshotArgs, ListCloneArgs, ListSnapshotArgs, + ReplicaFactory, SnapshotOps, }, }; @@ -291,8 +291,8 @@ impl SnapshotGrpc { async fn finder(args: &FindSnapshotArgs) -> Result { let mut error = None; - for factory in replica_backend::factories() { - match factory.find_snap(args).await { + for factory in ReplicaFactory::factories() { + match factory.as_factory().find_snap(args).await { Ok(Some(snapshot)) => { return Ok(Self(snapshot)); } diff --git a/io-engine/src/grpc/v1/stats.rs b/io-engine/src/grpc/v1/stats.rs index db46d12ab..e6670c7e4 100644 --- a/io-engine/src/grpc/v1/stats.rs +++ b/io-engine/src/grpc/v1/stats.rs @@ -8,17 +8,18 @@ use crate::{ RWLock, Serializer, }, - lvs::Lvs, }; use futures::{future::join_all, FutureExt}; use io_engine_api::v1::stats::*; -use std::{convert::TryFrom, fmt::Debug, panic::AssertUnwindSafe}; +use std::{fmt::Debug, panic::AssertUnwindSafe}; use tonic::{Request, Response, Status}; use crate::{ - bdev::{nexus, Nexus}, - core::{BlockDeviceIoStats, CoreError, LogicalVolume, UntypedBdev}, - lvs::{Lvol, LvsLvol, PropName, PropValue}, + bdev::nexus, + core::{BdevStater, BdevStats, CoreError, UntypedBdev}, + grpc::v1::{pool::GrpcPoolFactory, replica::GrpcReplicaFactory}, + pool_backend::ListPoolArgs, + replica_backend::{ListReplicaArgs, ReplicaBdevStats}, }; use ::function_name::named; @@ -131,36 +132,25 @@ impl StatsRpc for StatsService { ) -> GrpcResult { self.shared(self.pool_svc.rw_lock().await, async move { let args = request.into_inner(); - let rx = rpc_submit::<_, _, CoreError>(async move { - let pool_stats_future: Vec<_> = if let Some(name) = args.name { - if let Some(l) = Lvs::lookup(&name) { - vec![get_stats(name, l.uuid(), l.base_bdev())] - } else { - vec![] - } - } else { - Lvs::iter() - .map(|lvs| { - get_stats( - lvs.name().to_string(), - lvs.uuid(), - lvs.base_bdev(), - ) - }) - .collect() - }; + crate::spdk_submit!(async move { + let mut pools = vec![]; + let args = ListPoolArgs::new_named(args.name); + for factory in GrpcPoolFactory::factories() { + pools.extend( + factory.list_ops(&args).await.unwrap_or_default(), + ); + } + let pools_stats_future = pools.iter().map(|r| r.stats()); + let pools_stats = + join_all(pools_stats_future).await.into_iter(); + let stats = pools_stats + .map(|d| d.map(Into::into)) + .collect::, _>>()?; - let pool_stats: Result, _> = - join_all(pool_stats_future).await.into_iter().collect(); - let pool_stats = pool_stats?; Ok(PoolIoStatsResponse { - stats: pool_stats, + stats, }) - })?; - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) + }) }) .await } @@ -174,43 +164,23 @@ impl StatsRpc for StatsService { GrpcClientContext::new(&request, function_name!()), async move { let args = request.into_inner(); - let rx = rpc_submit::<_, _, CoreError>(async move { - let nexus_stats_future: Vec<_> = - if let Some(name) = args.name { - if let Some(nexus) = nexus::nexus_lookup(&name) { - vec![nexus_stats( - nexus.name.clone(), - nexus.uuid().to_string(), - nexus, - )] - } else { - vec![] - } - } else { - nexus::nexus_iter() - .map(|nexus| { - nexus_stats( - nexus.name.clone(), - nexus.uuid().to_string(), - nexus, - ) - }) - .collect() - }; - let nexus_stats: Result, _> = - join_all(nexus_stats_future) - .await - .into_iter() - .collect(); - let nexus_stats = nexus_stats?; + crate::spdk_submit!(async move { + let nexus_stats_future = if let Some(name) = args.name { + let nexus = nexus::nexus_lookup(&name) + .ok_or(Status::not_found("Nexus not found"))?; + vec![nexus.stats()] + } else { + nexus::nexus_iter().map(|nexus| nexus.stats()).collect() + }; + let nexus_stats = join_all(nexus_stats_future) + .await + .into_iter() + .map(|d| d.map(Into::into)); + let stats = nexus_stats.collect::, _>>()?; Ok(NexusIoStatsResponse { - stats: nexus_stats, + stats, }) - })?; - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) + }) }, ) .await @@ -221,40 +191,24 @@ impl StatsRpc for StatsService { ) -> GrpcResult { self.shared(self.replica_svc.rw_lock().await, async move { let args = request.into_inner(); - let rx = rpc_submit::<_, _, CoreError>(async move { - let replica_stats_future: Vec<_> = if let Some(name) = args.name - { - UntypedBdev::bdev_first() - .and_then(|bdev| { - bdev.into_iter().find(|b| { - b.driver() == "lvol" && b.name() == name - }) - }) - .and_then(|b| Lvol::try_from(b).ok()) - .map(|lvol| vec![replica_stats(lvol)]) - .unwrap_or_default() - } else { - let mut lvols = Vec::new(); - if let Some(bdev) = UntypedBdev::bdev_first() { - lvols = bdev - .into_iter() - .filter(|b| b.driver() == "lvol") - .map(|b| Lvol::try_from(b).unwrap()) - .collect(); - } - lvols.into_iter().map(replica_stats).collect() - }; - let replica_stats: Result, _> = - join_all(replica_stats_future).await.into_iter().collect(); - let replica_stats = replica_stats?; + crate::spdk_submit!(async move { + let mut replicas = vec![]; + let args = ListReplicaArgs::new_named(args.name); + for factory in GrpcReplicaFactory::factories() { + replicas.extend( + factory.list_ops(&args).await.unwrap_or_default(), + ); + } + let replica_stats_future = replicas.iter().map(|r| r.stats()); + let replica_stats = + join_all(replica_stats_future).await.into_iter(); + let stats = replica_stats + .map(|d| d.map(Into::into)) + .collect::, _>>()?; Ok(ReplicaIoStatsResponse { - stats: replica_stats, + stats, }) - })?; - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) - .map(Response::new) + }) }) .await } @@ -282,15 +236,13 @@ impl StatsRpc for StatsService { } } -struct ExternalType(T); - /// Conversion fn to get gRPC type IOStat from BlockDeviceIoStats. -impl From> for IoStats { - fn from(value: ExternalType<(String, String, BlockDeviceIoStats)>) -> Self { - let stats = value.0 .2; +impl From for IoStats { + fn from(value: BdevStats) -> Self { + let stats = value.stats; Self { - name: value.0 .0, - uuid: value.0 .1, + name: value.name, + uuid: value.uuid, num_read_ops: stats.num_read_ops, bytes_read: stats.bytes_read, num_write_ops: stats.num_write_ops, @@ -310,40 +262,12 @@ impl From> for IoStats { } } } - -/// Returns IoStats for a given BlockDevice. -async fn get_stats( - name: String, - uuid: String, - bdev: UntypedBdev, -) -> Result { - let stats = bdev.stats_async().await?; - Ok(IoStats::from(ExternalType((name, uuid, stats)))) -} - -/// Returns IoStats for a given Lvol(Replica). -async fn replica_stats(lvol: Lvol) -> Result { - let stats = lvol.as_bdev().stats_async().await?; - let io_stat = - IoStats::from(ExternalType((lvol.name(), lvol.uuid(), stats))); - let replica_stat = ReplicaIoStats { - stats: Some(io_stat), - entity_id: lvol.get(PropName::EntityId).await.ok().and_then(|id| { - match id { - PropValue::EntityId(id) => Some(id), - _ => None, - } - }), - }; - Ok(replica_stat) -} - -/// Returns IoStats for a given Nexus. -async fn nexus_stats( - name: String, - uuid: String, - bdev: &Nexus<'_>, -) -> Result { - let stats = bdev.bdev_stats().await?; - Ok(IoStats::from(ExternalType((name, uuid, stats)))) +/// Conversion fn to get gRPC type IOStat from BlockDeviceIoStats. +impl From for ReplicaIoStats { + fn from(value: ReplicaBdevStats) -> Self { + Self { + entity_id: value.entity_id, + stats: Some(value.stats.into()), + } + } } diff --git a/io-engine/src/grpc/v1/test.rs b/io-engine/src/grpc/v1/test.rs index 9e8b22334..690a83e55 100644 --- a/io-engine/src/grpc/v1/test.rs +++ b/io-engine/src/grpc/v1/test.rs @@ -1,12 +1,15 @@ use crate::{ - bdev_api::BdevError, core::{ wiper::{Error as WipeError, StreamedWiper, WipeStats, Wiper}, - Bdev, VerboseError, }, - grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWSerializer}, - lvs::{BsError, Lvol, Lvs, LvsError, LvsLvol}, + grpc::{ + v1::replica::{GrpcReplicaFactory, ReplicaGrpc}, + GrpcClientContext, + GrpcResult, + RWSerializer, + }, + replica_backend::FindReplicaArgs, }; use ::function_name::named; use io_engine_api::{ @@ -25,13 +28,17 @@ use std::convert::{TryFrom, TryInto}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; +use crate::grpc::v1::pool::PoolGrpc; #[cfg(feature = "fault-injection")] -use crate::core::fault_injection::{ - add_fault_injection, - list_fault_injections, - remove_fault_injection, - FaultInjectionError, - Injection, +use crate::{ + core::fault_injection::{ + add_fault_injection, + list_fault_injections, + remove_fault_injection, + FaultInjectionError, + Injection, + }, + grpc::rpc_submit, }; #[derive(Debug, Clone)] @@ -96,18 +103,21 @@ impl TestRpc for TestService { async move { let args = request.into_inner(); info!("{:?}", args); - let rx = rpc_submit(async move { - let lvol = Bdev::lookup_by_uuid_str(&args.uuid) - .ok_or(LvsError::InvalidBdev { - source: BdevError::BdevNotFound { - name: args.uuid.clone(), - }, - name: args.uuid, - }) - .and_then(Lvol::try_from)?; - validate_pool(&lvol, args.pool)?; - - let wiper = lvol.wiper(options.wipe_method)?; + crate::spdk_submit!(async move { + let pool = match args.pool { + Some(pool) => Some( + GrpcReplicaFactory::pool_finder(pool) + .await?, + ), + None => None, + }; + let repl = GrpcReplicaFactory::finder( + &FindReplicaArgs::new(&args.uuid), + ) + .await?; + validate_pool(&repl, pool).await?; + + let wiper = repl.wiper(options.wipe_method)?; let proto_stream = WiperStream(tx_cln); let wiper = StreamedWiper::new( @@ -118,11 +128,8 @@ impl TestRpc for TestService { )?; let final_stats = wiper.wipe().await?; final_stats.log(); - Result::<(), LvsError>::Ok(()) - })?; - rx.await - .map_err(|_| Status::cancelled("cancelled"))? - .map_err(Status::from) + Ok(()) + }) }, ) .await; @@ -331,43 +338,27 @@ impl From for tonic::Status { } } -/// Validate that the specified pool contains the specified lvol. -fn validate_pool( - lvol: &Lvol, - pool: Option, -) -> Result<(), LvsError> { +impl From for crate::pool_backend::FindPoolArgs { + fn from(value: wipe_replica_request::Pool) -> Self { + match value { + wipe_replica_request::Pool::PoolName(name) => { + Self::name_uuid(name, &None) + } + wipe_replica_request::Pool::PoolUuid(uuid) => Self::uuid(uuid), + } + } +} + +/// Validate that the replica belongs to the specified pool. +async fn validate_pool( + repl: &ReplicaGrpc, + pool: Option, +) -> Result<(), Status> { let Some(pool) = pool else { return Ok(()); }; - let lvs = lookup_pool(pool)?; - if lvol.lvs().uuid() == lvs.uuid() && lvol.lvs().name() == lvs.name() { - return Ok(()); - } - - let msg = format!("Specified {lvs:?} does match the target {lvol:?}!"); - tracing::error!("{msg}"); - Err(LvsError::Invalid { - source: BsError::InvalidArgument {}, - msg, - }) -} - -fn lookup_pool(pool: wipe_replica_request::Pool) -> Result { - match pool { - wipe_replica_request::Pool::PoolUuid(uuid) => { - Lvs::lookup_by_uuid(&uuid).ok_or(LvsError::PoolNotFound { - source: BsError::LvsNotFound {}, - msg: format!("Pool uuid={uuid} is not loaded"), - }) - } - wipe_replica_request::Pool::PoolName(name) => { - Lvs::lookup(&name).ok_or(LvsError::PoolNotFound { - source: BsError::LvsNotFound {}, - msg: format!("Pool name={name} is not loaded"), - }) - } - } + repl.verify_pool(&pool) } struct WiperStream( diff --git a/io-engine/src/lvm/lv_replica.rs b/io-engine/src/lvm/lv_replica.rs index f8ce17cbd..8f64f23a1 100644 --- a/io-engine/src/lvm/lv_replica.rs +++ b/io-engine/src/lvm/lv_replica.rs @@ -152,7 +152,7 @@ pub struct RunLogicalVolume { /// Runtime settings for the LogicalVolume. #[derive(Debug, Default, Clone)] -struct BdevOpts { +pub(crate) struct BdevOpts { /// The share URI of the SPDK Bdev which is created against the lv_path. share_uri: Option, /// The URI of the SPDK Bdev which is created against the lv_path. @@ -178,6 +178,10 @@ impl BdevOpts { self.allowed_hosts = to.allowed_hosts; self.share = to.share; } + /// Get a reference to the original bdev uri. + pub(crate) fn uri(&self) -> &str { + &self.open_uri + } } impl Deref for LogicalVolume { @@ -320,10 +324,11 @@ impl LogicalVolume { /// The bdev is unshared (if shared) and closed, allowing the logical volume /// to be closed and/or destroyed. pub(super) async fn export_bdev(&mut self) -> Result<(), Error> { - let Ok(uri) = self.bdev_uri() else { + let Ok(bdev) = self.bdev_opts() else { // Nothing to do if the bdev was not setup... return Ok(()); }; + let uri = bdev.open_uri.clone(); crate::spdk_run!(async move { if let Ok(mut bdev) = Self::bdev(&uri) { // todo: must we error if we can't unshare? @@ -545,8 +550,8 @@ impl LogicalVolume { let uri = bdev.open_uri.clone(); Ok((bdev, uri)) } - fn bdev_uri(&self) -> Result { - let Some(uri) = self.bdev.as_ref().map(|b| b.open_uri.clone()) else { + pub(crate) fn bdev_opts(&self) -> Result<&BdevOpts, Error> { + let Some(uri) = self.bdev.as_ref() else { // Nothing to do if the bdev was not setup... return Err(Error::BdevMissing {}); }; @@ -647,7 +652,7 @@ impl LogicalVolume { /// we could go the other way and use trampoline from spdk to tokio, but /// then also hit the problem? impl LogicalVolume { - fn bdev(uri: &str) -> Result { + pub(crate) fn bdev(uri: &str) -> Result { UntypedBdev::get_by_name(uri).map_err(|_| Error::BdevMissing {}) } diff --git a/io-engine/src/lvm/mod.rs b/io-engine/src/lvm/mod.rs index ef922dfde..646434832 100644 --- a/io-engine/src/lvm/mod.rs +++ b/io-engine/src/lvm/mod.rs @@ -49,31 +49,36 @@ use crate::{ bdev::PtplFileOps, core::{ snapshot::SnapshotDescriptor, + BdevStater, + BdevStats, CloneParams, + CoreError, NvmfShareProps, Protocol, PtplProps, SnapshotParams, + UntypedBdev, UpdateProps, }, lvm::property::Property, pool_backend::{ FindPoolArgs, + IPoolFactory, IPoolProps, ListPoolArgs, PoolArgs, PoolBackend, - PoolFactory, PoolOps, ReplicaArgs, }, replica_backend::{ FindReplicaArgs, FindSnapshotArgs, + IReplicaFactory, ListCloneArgs, ListReplicaArgs, ListSnapshotArgs, - ReplicaFactory, + ReplicaBdevStats, ReplicaOps, SnapshotOps, }, @@ -160,6 +165,23 @@ impl PoolOps for VolumeGroup { } } +#[async_trait::async_trait(?Send)] +impl BdevStater for VolumeGroup { + type Stats = BdevStats; + + async fn stats(&self) -> Result { + Err(CoreError::NotSupported { + source: nix::errno::Errno::ENOSYS, + }) + } + + async fn reset_stats(&self) -> Result<(), CoreError> { + Err(CoreError::NotSupported { + source: nix::errno::Errno::ENOSYS, + }) + } +} + #[async_trait::async_trait(?Send)] impl ReplicaOps for LogicalVolume { async fn share_nvmf( @@ -234,7 +256,30 @@ impl ReplicaOps for LogicalVolume { ) -> Result, crate::pool_backend::Error> { Err(Error::SnapshotNotSup {}.into()) } + + fn try_as_bdev(&self) -> Result { + let bdev = Self::bdev(self.bdev_opts()?.uri())?; + Ok(bdev) + } } + +#[async_trait::async_trait(?Send)] +impl BdevStater for LogicalVolume { + type Stats = ReplicaBdevStats; + + async fn stats(&self) -> Result { + Err(CoreError::NotSupported { + source: nix::errno::Errno::ENOSYS, + }) + } + + async fn reset_stats(&self) -> Result<(), CoreError> { + Err(CoreError::NotSupported { + source: nix::errno::Errno::ENOSYS, + }) + } +} + #[async_trait::async_trait(?Send)] impl SnapshotOps for LogicalVolume { async fn destroy_snapshot( @@ -304,7 +349,7 @@ impl IPoolProps for VolumeGroup { #[derive(Default)] pub struct PoolLvmFactory {} #[async_trait::async_trait(?Send)] -impl PoolFactory for PoolLvmFactory { +impl IPoolFactory for PoolLvmFactory { async fn create( &self, args: PoolArgs, @@ -369,6 +414,7 @@ impl PoolFactory for PoolLvmFactory { .map(|p| Box::new(p) as _) .collect::>()) } + fn backend(&self) -> PoolBackend { PoolBackend::Lvm } @@ -378,7 +424,7 @@ impl PoolFactory for PoolLvmFactory { #[derive(Default)] pub struct ReplLvmFactory {} #[async_trait::async_trait(?Send)] -impl ReplicaFactory for ReplLvmFactory { +impl IReplicaFactory for ReplLvmFactory { fn bdev_as_replica( &self, _bdev: crate::core::UntypedBdev, diff --git a/io-engine/src/lvs/lvol_iter.rs b/io-engine/src/lvs/lvol_iter.rs new file mode 100644 index 000000000..c6064777c --- /dev/null +++ b/io-engine/src/lvs/lvol_iter.rs @@ -0,0 +1,28 @@ +use super::Lvol; +use crate::core::BdevIter; + +/// Iterator over available Lvs Lvol's. +pub(crate) struct LvolIter(BdevIter<()>); + +impl LvolIter { + /// Returns a new Lvol iterator. + pub(crate) fn new() -> Self { + Self(BdevIter::new()) + } +} + +impl Iterator for LvolIter { + type Item = Lvol; + + fn next(&mut self) -> Option { + // notice we're hiding a potential inner loop here + // only way around this would be to have the iterator return an + // Option> which perhaps is a bit confusing + for bdev in self.0.by_ref() { + if let Some(lvol) = Lvol::ok_from(bdev) { + return Some(lvol); + } + } + None + } +} diff --git a/io-engine/src/lvs/lvs_lvol.rs b/io-engine/src/lvs/lvs_lvol.rs index 9d39fcc7b..5d4a4b634 100644 --- a/io-engine/src/lvs/lvs_lvol.rs +++ b/io-engine/src/lvs/lvs_lvol.rs @@ -40,7 +40,6 @@ use crate::{ bdev::PtplFileOps, core::{ logical_volume::{LogicalVolume, LvolSpaceUsage}, - wiper::{WipeMethod, Wiper}, Bdev, CloneXattrs, LvolSnapshotOps, @@ -49,7 +48,6 @@ use crate::{ PtplProps, Share, SnapshotXattrs, - ToErrno, UntypedBdev, UpdateProps, }, @@ -340,22 +338,6 @@ impl Lvol { Ok(()) } - /// Get a wiper for this replica. - pub(crate) fn wiper( - &self, - wipe_method: WipeMethod, - ) -> Result { - let hdl = Bdev::open(&self.as_bdev(), true) - .and_then(|desc| desc.into_handle()) - .map_err(|e| LvsError::Invalid { - msg: e.to_string(), - source: BsError::from_errno(e.to_errno()), - })?; - - let wiper = Wiper::new(hdl, wipe_method)?; - Ok(wiper) - } - /// generic callback for lvol operations pub(crate) extern "C" fn lvol_cb( sender_ptr: *mut c_void, @@ -371,11 +353,6 @@ impl Lvol { .send(errno_result_from_i32(lvol_ptr, errno)) .expect("Receiver is gone"); } - /// Format snapshot name - /// base_name is the nexus or replica UUID - pub fn format_snapshot_name(base_name: &str, snapshot_time: u64) -> String { - format!("{base_name}-snap-{snapshot_time}") - } /// Get a `PtplFileOps` from `&self`. pub(crate) fn ptpl(&self) -> impl PtplFileOps { LvolPtpl::from(self) @@ -659,7 +636,7 @@ pub trait LvsLvol: LogicalVolume + Share { async fn resize_replica(&mut self, resize_to: u64) -> Result<(), LvsError>; } -/// LogicalVolume implement Generic interface for Lvol. +/// LogicalVolume implement Generic interface for Lvol. impl LogicalVolume for Lvol { /// Returns the name of the Snapshot. fn name(&self) -> String { diff --git a/io-engine/src/lvs/mod.rs b/io-engine/src/lvs/mod.rs index 6b129bf5f..480480ca5 100644 --- a/io-engine/src/lvs/mod.rs +++ b/io-engine/src/lvs/mod.rs @@ -13,20 +13,20 @@ use crate::{ pool_backend::{ Error, FindPoolArgs, + IPoolFactory, IPoolProps, ListPoolArgs, PoolArgs, PoolBackend, - PoolFactory, PoolOps, ReplicaArgs, }, replica_backend::{ FindReplicaArgs, + IReplicaFactory, ListCloneArgs, ListReplicaArgs, ListSnapshotArgs, - ReplicaFactory, ReplicaOps, SnapshotOps, }, @@ -39,6 +39,7 @@ pub use lvs_lvol::{Lvol, LvsLvol, PropName, PropValue}; pub use lvs_store::Lvs; use std::{convert::TryFrom, pin::Pin}; +mod lvol_iter; mod lvol_snapshot; mod lvs_bdev; mod lvs_error; @@ -46,7 +47,10 @@ mod lvs_iter; pub mod lvs_lvol; mod lvs_store; -use crate::replica_backend::FindSnapshotArgs; +use crate::{ + core::{BdevStater, BdevStats, CoreError, UntypedBdev}, + replica_backend::{FindSnapshotArgs, ReplicaBdevStats}, +}; pub use lvol_snapshot::{LvolResult, LvolSnapshotDescriptor, LvolSnapshotOps}; #[async_trait::async_trait(?Send)] @@ -117,6 +121,24 @@ impl ReplicaOps for Lvol { let snapshot = LvolSnapshotOps::create_snapshot(self, params).await?; Ok(Box::new(snapshot)) } + + fn try_as_bdev(&self) -> Result { + Ok(self.as_bdev()) + } +} + +#[async_trait::async_trait(?Send)] +impl BdevStater for Lvol { + type Stats = ReplicaBdevStats; + + async fn stats(&self) -> Result { + let stats = self.as_bdev().stats().await?; + Ok(ReplicaBdevStats::new(stats, self.entity_id())) + } + + async fn reset_stats(&self) -> Result<(), CoreError> { + self.as_bdev().reset_stats().await + } } #[async_trait::async_trait(?Send)] @@ -172,6 +194,20 @@ impl PoolOps for Lvs { } } +#[async_trait::async_trait(?Send)] +impl BdevStater for Lvs { + type Stats = BdevStats; + + async fn stats(&self) -> Result { + let stats = self.base_bdev().stats_async().await?; + Ok(BdevStats::new(self.name().to_string(), self.uuid(), stats)) + } + + async fn reset_stats(&self) -> Result<(), CoreError> { + self.base_bdev().reset_bdev_io_stats().await + } +} + impl IPoolProps for Lvs { fn name(&self) -> &str { self.name() @@ -210,7 +246,7 @@ impl IPoolProps for Lvs { #[derive(Default)] pub struct PoolLvsFactory {} #[async_trait::async_trait(?Send)] -impl PoolFactory for PoolLvsFactory { +impl IPoolFactory for PoolLvsFactory { async fn create( &self, args: PoolArgs, @@ -273,6 +309,7 @@ impl PoolFactory for PoolLvsFactory { .map(|p| Box::new(p) as _) .collect::>()) } + fn backend(&self) -> PoolBackend { PoolBackend::Lvs } @@ -282,7 +319,7 @@ impl PoolFactory for PoolLvsFactory { #[derive(Default)] pub struct ReplLvsFactory {} #[async_trait::async_trait(?Send)] -impl ReplicaFactory for ReplLvsFactory { +impl IReplicaFactory for ReplLvsFactory { fn bdev_as_replica( &self, bdev: crate::core::UntypedBdev, @@ -323,15 +360,11 @@ impl ReplicaFactory for ReplLvsFactory { &self, args: &ListReplicaArgs, ) -> Result>, Error> { - let Some(bdev) = crate::core::UntypedBdev::bdev_first() else { - return Ok(vec![]); - }; let retain = |arg: Option<&String>, val: &String| -> bool { arg.is_none() || arg == Some(val) }; - let lvols = bdev.into_iter().filter_map(Lvol::ok_from); - let lvols = lvols.filter(|lvol| { + let lvols = lvol_iter::LvolIter::new().filter(|lvol| { retain(args.pool_name.as_ref(), &lvol.pool_name()) && retain(args.pool_uuid.as_ref(), &lvol.pool_uuid()) && retain(args.name.as_ref(), &lvol.name()) @@ -340,7 +373,6 @@ impl ReplicaFactory for ReplLvsFactory { Ok(lvols.map(|lvol| Box::new(lvol) as _).collect::>()) } - async fn list_snaps( &self, args: &ListSnapshotArgs, diff --git a/io-engine/src/pool_backend.rs b/io-engine/src/pool_backend.rs index 974014221..738b247da 100644 --- a/io-engine/src/pool_backend.rs +++ b/io-engine/src/pool_backend.rs @@ -1,9 +1,13 @@ -use crate::{core::ToErrno, replica_backend::ReplicaOps}; +use crate::{ + core::{BdevStater, BdevStats, ToErrno}, + replica_backend::ReplicaOps, +}; use nix::errno::Errno; +use std::ops::Deref; /// PoolArgs is used to translate the input for the grpc /// Create/Import requests which contains name, uuid & disks. -/// This help us avoid importing grpc structs in the actual lvs mod +/// This helps us avoid importing grpc structs in the actual lvs mod #[derive(Clone, Debug, Default)] pub struct PoolArgs { pub name: String, @@ -30,6 +34,34 @@ pub struct ReplicaArgs { pub(crate) entity_id: Option, } +/// Generic Errors shared by all backends. +/// todo: most common errors should be moved here. +#[derive(Debug, snafu::Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum GenericError { + #[snafu(display("{message}"))] + NotFound { message: String }, +} +impl From for tonic::Status { + fn from(e: GenericError) -> Self { + match e { + GenericError::NotFound { + message, + } => tonic::Status::not_found(message), + } + } +} +impl ToErrno for GenericError { + fn to_errno(self) -> Errno { + match self { + GenericError::NotFound { + .. + } => Errno::ENODEV, + } + } +} + +/// Aggregated errors for all backends. #[derive(Debug, snafu::Snafu)] #[snafu(visibility(pub(crate)))] pub enum Error { @@ -37,6 +69,8 @@ pub enum Error { Lvs { source: crate::lvs::LvsError }, #[snafu(display("{source}"))] Lvm { source: crate::lvm::Error }, + #[snafu(display("{source}"))] + Gen { source: GenericError }, } impl From for Error { fn from(source: crate::lvs::LvsError) -> Self { @@ -52,6 +86,13 @@ impl From for Error { } } } +impl From for Error { + fn from(source: GenericError) -> Self { + Self::Gen { + source, + } + } +} impl From for tonic::Status { fn from(e: Error) -> Self { match e { @@ -61,6 +102,9 @@ impl From for tonic::Status { Error::Lvm { source, } => source.into(), + Error::Gen { + source, + } => source.into(), } } } @@ -73,6 +117,9 @@ impl ToErrno for Error { Error::Lvm { source, } => source.to_errno(), + Error::Gen { + source, + } => source.to_errno(), } } } @@ -82,7 +129,9 @@ impl ToErrno for Error { /// much as possible, though we can allow for extra pool specific options /// to be passed as parameters. #[async_trait::async_trait(?Send)] -pub trait PoolOps: IPoolProps + std::fmt::Debug { +pub trait PoolOps: + IPoolProps + BdevStater + std::fmt::Debug +{ /// Create a replica on this pool with the given arguments. async fn create_repl( &self, @@ -98,7 +147,7 @@ pub trait PoolOps: IPoolProps + std::fmt::Debug { /// Interface for a pool factory which can be used for various /// pool creation and listings, for a specific backend type. #[async_trait::async_trait(?Send)] -pub trait PoolFactory { +pub trait IPoolFactory { /// Create a pool using the provided arguments. async fn create(&self, args: PoolArgs) -> Result, Error>; /// Import a pool (do not create it!) using the provided arguments. @@ -119,7 +168,7 @@ pub trait PoolFactory { } /// List pools using filters. -#[derive(Debug)] +#[derive(Default, Debug)] pub struct ListPoolArgs { /// Filter using the pool name. pub name: Option, @@ -128,7 +177,16 @@ pub struct ListPoolArgs { /// Filter using the pool uuid. pub uuid: Option, } -/// Probe for pools using this criteria. +impl ListPoolArgs { + /// A new `Self` with only the name specified. + pub fn new_named(name: Option) -> Self { + Self { + name, + ..Default::default() + } + } +} +/// Probe for pools using these criteria. #[derive(Debug, Clone)] pub enum FindPoolArgs { Uuid(String), @@ -145,15 +203,15 @@ impl From<&PoolArgs> for FindPoolArgs { } impl FindPoolArgs { /// Find pools by name and optional uuid. - pub fn name_uuid(name: &str, uuid: &Option) -> Self { + pub fn name_uuid(name: String, uuid: &Option) -> Self { Self::NameUuid { - name: name.to_owned(), + name, uuid: uuid.to_owned(), } } /// Find pools by uuid. - pub fn uuid(uuid: &String) -> Self { - Self::Uuid(uuid.to_string()) + pub fn uuid(uuid: String) -> Self { + Self::Uuid(uuid) } /// Back compat which finds pools by uuid and fallback to name. pub fn uuid_or_name(id: &String) -> Self { @@ -172,3 +230,60 @@ pub trait IPoolProps { fn pool_type(&self) -> PoolBackend; fn cluster_size(&self) -> u32; } + +/// A pool factory helper. +pub struct PoolFactory(Box); +impl PoolFactory { + /// Get all available backends. + pub fn all_backends() -> Vec { + vec![PoolBackend::Lvm, PoolBackend::Lvs] + } + /// Get all **enabled** backends. + pub fn backends() -> Vec { + let backends = Self::all_backends().into_iter(); + backends.filter(|b| b.enabled().is_ok()).collect() + } + /// Get factories for all **enabled** backends. + pub fn factories() -> Vec { + Self::backends().into_iter().map(Self::new).collect() + } + /// Returns the factory for the given backend kind. + pub fn new(backend: PoolBackend) -> Self { + Self(match backend { + PoolBackend::Lvs => { + Box::::default() as _ + } + PoolBackend::Lvm => { + Box::::default() as _ + } + }) + } + /// Probe backends for the given name and/or uuid and return the right one. + pub async fn find>( + args: I, + ) -> Result, Error> { + let args = args.into(); + let mut error = None; + + for factory in Self::factories() { + match factory.0.find(&args).await { + Ok(Some(pool)) => { + return Ok(pool); + } + Ok(None) => {} + Err(err) => { + error = Some(err); + } + } + } + Err(error.unwrap_or_else(|| Error::Gen { + source: GenericError::NotFound { + message: format!("Pool {args:?} not found"), + }, + })) + } + /// Get the inner factory interface. + pub fn as_factory(&self) -> &dyn IPoolFactory { + self.0.deref() + } +} diff --git a/io-engine/src/replica_backend.rs b/io-engine/src/replica_backend.rs index 334368fd7..f0fb3a243 100644 --- a/io-engine/src/replica_backend.rs +++ b/io-engine/src/replica_backend.rs @@ -1,14 +1,17 @@ -use super::pool_backend::PoolBackend; +use super::pool_backend::{Error, GenericError, PoolBackend}; use crate::core::{ snapshot::SnapshotDescriptor, + BdevStater, + BdevStats, CloneParams, LogicalVolume, Protocol, PtplProps, SnapshotParams, + UntypedBdev, UpdateProps, }; -use std::fmt::Debug; +use std::{fmt::Debug, ops::Deref}; /// This interface defines the high level operations which can be done on a /// `Pool` replica. Replica-Specific details should be hidden away in the @@ -16,7 +19,9 @@ use std::fmt::Debug; /// specific options to be passed as parameters. /// A `Replica` is also a `LogicalVolume` and also has `Share` traits. #[async_trait::async_trait(?Send)] -pub trait ReplicaOps: LogicalVolume { +pub trait ReplicaOps: + LogicalVolume + BdevStater +{ fn shared(&self) -> Option; fn create_ptpl( &self, @@ -78,6 +83,9 @@ pub trait ReplicaOps: LogicalVolume { &mut self, params: SnapshotParams, ) -> Result, crate::pool_backend::Error>; + + /// Returns the underlying bdev of the Logical Volume, if open. + fn try_as_bdev(&self) -> Result; } /// Snapshot Operations for snapshots created by `ReplicaOps`. @@ -130,6 +138,15 @@ pub struct ListReplicaArgs { /// Match the given pool uuid. pub pool_uuid: Option, } +impl ListReplicaArgs { + /// A new `Self` with only the name specified. + pub fn new_named(name: Option) -> Self { + Self { + name, + ..Default::default() + } + } +} /// Find replica with filters. #[derive(Debug, Clone)] @@ -149,7 +166,7 @@ impl FindReplicaArgs { /// Interface for a replica factory which can be used for various /// listing operations, for a specific backend type. #[async_trait::async_trait(?Send)] -pub trait ReplicaFactory { +pub trait IReplicaFactory { /// If the bdev is a `ReplicaOps`, move and retrieve it as a `ReplicaOps`. fn bdev_as_replica( &self, @@ -167,7 +184,8 @@ pub trait ReplicaFactory { &self, args: &FindSnapshotArgs, ) -> Result>, crate::pool_backend::Error>; - /// Lists all replicas specified by the arguments. + /// Lists all replicas specified by the arguments, except the replica kinds. + /// It lists all types of replicas. async fn list( &self, args: &ListReplicaArgs, @@ -186,6 +204,21 @@ pub trait ReplicaFactory { fn backend(&self) -> PoolBackend; } +/// Replica IO stats along with its name and uuid. +pub struct ReplicaBdevStats { + pub stats: BdevStats, + pub entity_id: Option, +} +impl ReplicaBdevStats { + /// Create a new `Self` from the given parts. + pub fn new(stats: BdevStats, entity_id: Option) -> Self { + Self { + stats, + entity_id, + } + } +} + /// Find snapshots with filters. #[derive(Debug, Default)] pub struct ListSnapshotArgs { @@ -217,35 +250,64 @@ pub struct ListCloneArgs { pub snapshot_uuid: Option, } -/// Get the `ReplicaFactory` for the given backend type. -pub(crate) fn factory_enabled( - backend: PoolBackend, -) -> Option> { - backend.enabled().ok()?; - Some(factory_unsafe(backend)) -} -/// Get the `ReplicaFactory` for the given backend type. -pub(crate) fn factory_unsafe(backend: PoolBackend) -> Box { - match backend { - PoolBackend::Lvs => Box::new(crate::lvs::ReplLvsFactory {}) as _, - PoolBackend::Lvm => Box::new(crate::lvm::ReplLvmFactory {}) as _, +/// A replica factory helper. +pub struct ReplicaFactory(Box); +impl ReplicaFactory { + /// Get factories for all **enabled** backends. + pub fn factories() -> Vec { + let backends = crate::pool_backend::PoolFactory::backends(); + backends.into_iter().map(Self::new).collect() } -} -/// Get all the enabled `ReplicaFactory`. -pub(crate) fn factories() -> Vec> { - vec![PoolBackend::Lvm, PoolBackend::Lvs] - .into_iter() - .filter_map(factory_enabled) - .collect() -} -/// Get the given bdev as a `ReplicaOps`. -pub(crate) fn bdev_as_replica( - bdev: crate::core::UntypedBdev, -) -> Option> { - for factory in factories() { - if let Some(replica) = factory.bdev_as_replica(bdev) { - return Some(replica); + /// Returns the factory for the given backend kind. + pub fn new(backend: PoolBackend) -> Self { + Self(match backend { + PoolBackend::Lvs => { + Box::::default() as _ + } + PoolBackend::Lvm => { + Box::::default() as _ + } + }) + } + /// Get the given bdev as a `ReplicaOps`. + pub(crate) fn bdev_as_replica( + bdev: crate::core::UntypedBdev, + ) -> Option> { + for factory in Self::factories() { + if let Some(replica) = factory.as_factory().bdev_as_replica(bdev) { + return Some(replica); + } } + None + } + /// Probe backends for the given name and/or uuid and return the right one. + pub async fn find( + args: &FindReplicaArgs, + ) -> Result, Error> { + let mut error = None; + + for factory in Self::factories() { + match factory.0.find(args).await { + Ok(Some(replica)) => { + // should this be an error? + if !replica.is_snapshot() { + return Ok(replica); + } + } + Ok(None) => {} + Err(err) => { + error = Some(err); + } + } + } + Err(error.unwrap_or_else(|| Error::Gen { + source: GenericError::NotFound { + message: format!("Replica {args:?} not found"), + }, + })) + } + /// Get the inner factory interface. + pub fn as_factory(&self) -> &dyn IReplicaFactory { + self.0.deref() } - None } diff --git a/io-engine/src/subsys/mod.rs b/io-engine/src/subsys/mod.rs index ac6b09060..381f1230f 100644 --- a/io-engine/src/subsys/mod.rs +++ b/io-engine/src/subsys/mod.rs @@ -8,7 +8,6 @@ pub use config::{ ConfigSubsystem, }; pub use nvmf::{ - create_snapshot, set_snapshot_time, Error as NvmfError, NvmeCpl, diff --git a/io-engine/src/subsys/nvmf/admin_cmd.rs b/io-engine/src/subsys/nvmf/admin_cmd.rs index 6c2eb9ece..dcda1af51 100644 --- a/io-engine/src/subsys/nvmf/admin_cmd.rs +++ b/io-engine/src/subsys/nvmf/admin_cmd.rs @@ -8,31 +8,20 @@ use std::{ use crate::{ bdev::{nexus, nvmx::NvmeSnapshotMessage}, - core::{ - logical_volume::LogicalVolume, - snapshot::LvolSnapshotOps, - Bdev, - Reactors, - SnapshotParams, - }, - lvs::Lvol, + core::{Bdev, Reactors, SnapshotParams}, }; use crate::{ core::{ToErrno, UntypedBdev}, - replica_backend::bdev_as_replica, + replica_backend::ReplicaFactory, }; -use chrono::Utc; use spdk_rs::{ libspdk::{ nvme_cmd_cdw10_get, - nvme_cmd_cdw10_get_val, nvme_cmd_cdw11_get, - nvme_cmd_cdw11_get_val, nvme_status_get, spdk_bdev, spdk_bdev_desc, - spdk_bdev_io, spdk_io_channel, spdk_nvme_cmd, spdk_nvme_cpl, @@ -49,7 +38,6 @@ use spdk_rs::{ spdk_nvmf_subsystem_get_max_nsid, }, nvme_admin_opc, - Uuid, }; #[warn(unused_variables)] @@ -232,7 +220,7 @@ async fn create_remote_snapshot( params: SnapshotParams, nvmf_req: NvmfReq, ) { - let Some(mut replica_ops) = bdev_as_replica(bdev) else { + let Some(mut replica_ops) = ReplicaFactory::bdev_as_replica(bdev) else { debug!("unsupported bdev driver"); nvmf_req.complete_error(nix::errno::Errno::ENOTSUP as i32); return; @@ -257,37 +245,6 @@ async fn create_remote_snapshot( } } } -pub fn create_snapshot( - lvol: Lvol, - cmd: &spdk_nvme_cmd, - _io: *mut spdk_bdev_io, -) { - let snapshot_time = unsafe { - nvme_cmd_cdw10_get_val(cmd) as u64 - | (nvme_cmd_cdw11_get_val(cmd) as u64) << 32 - }; - let snapshot_name = Lvol::format_snapshot_name(&lvol.name(), snapshot_time); - let snap_param = SnapshotParams::new( - Some(lvol.name()), - Some(lvol.name()), - Some(Uuid::generate().to_string()), - Some(snapshot_name), - Some(Uuid::generate().to_string()), - Some(Utc::now().to_string()), - false, - ); - // Blobfs operations must be on md_thread - Reactors::master().send_future(async move { - match lvol.create_snapshot(snap_param).await { - Ok(lvol) => { - info!("Create Snapshot {:?} Success!", lvol); - } - Err(e) => { - error!("Create Snapshot Failed with Error: {:?}", e); - } - } - }); -} /// Register custom NVMe admin command handler pub fn setup_create_snapshot_hdlr() { diff --git a/io-engine/src/subsys/nvmf/mod.rs b/io-engine/src/subsys/nvmf/mod.rs index 3bdb0884d..290c1f1df 100644 --- a/io-engine/src/subsys/nvmf/mod.rs +++ b/io-engine/src/subsys/nvmf/mod.rs @@ -13,7 +13,7 @@ use std::{cell::RefCell, mem::zeroed}; use nix::errno::Errno; use snafu::Snafu; -pub use admin_cmd::{create_snapshot, set_snapshot_time, NvmeCpl, NvmfReq}; +pub use admin_cmd::{set_snapshot_time, NvmeCpl, NvmfReq}; use poll_groups::PollGroup; use spdk_rs::libspdk::{ spdk_subsystem, diff --git a/io-engine/tests/replica_snapshot.rs b/io-engine/tests/replica_snapshot.rs index b5a1f351e..e73556524 100644 --- a/io-engine/tests/replica_snapshot.rs +++ b/io-engine/tests/replica_snapshot.rs @@ -2,7 +2,7 @@ use io_engine::{ bdev::nexus::nexus_create, constants::NVME_NQN_PREFIX, core::{CoreError, MayastorCliArgs, SnapshotParams, UntypedBdevHandle}, - lvs::{Lvol, Lvs}, + lvs::Lvs, pool_backend::{PoolArgs, PoolBackend}, }; use tracing::info; @@ -169,7 +169,7 @@ async fn create_nexus(t: u64, ip: &std::net::IpAddr) { if t > 0 { children .iter_mut() - .for_each(|c| *c = Lvol::format_snapshot_name(c, t)); + .for_each(|c| *c = format_snapshot_name(c, t)); nexus_name = NXNAME_SNAP; } @@ -177,6 +177,11 @@ async fn create_nexus(t: u64, ip: &std::net::IpAddr) { .await .unwrap(); } +/// Format snapshot name +/// base_name is the nexus or replica UUID +fn format_snapshot_name(base_name: &str, snapshot_time: u64) -> String { + format!("{base_name}-snap-{snapshot_time}") +} async fn create_snapshot() -> Result { // TODO: fill all the fields properly once nexus-level diff --git a/scripts/cargo-test.sh b/scripts/cargo-test.sh index cdfbd4b8c..9d5269cc8 100755 --- a/scripts/cargo-test.sh +++ b/scripts/cargo-test.sh @@ -5,6 +5,7 @@ SCRIPTDIR="$(realpath "$(dirname "$0")")" cleanup_handler() { ERROR=$? "$SCRIPTDIR"/clean-cargo-tests.sh || true + trap '' EXIT if [ $ERROR != 0 ]; then exit $ERROR; fi } @@ -15,8 +16,9 @@ rustc --version cleanup_handler trap cleanup_handler INT QUIT TERM HUP EXIT -set -euxo pipefail export PATH=$PATH:${HOME}/.cargo/bin +set -euxo pipefail + ( cd jsonrpc && cargo test ) # test dependencies cargo build --bins --features=io-engine-testing diff --git a/scripts/clean-cargo-tests.sh b/scripts/clean-cargo-tests.sh index c9575aae6..d259ace32 100755 --- a/scripts/clean-cargo-tests.sh +++ b/scripts/clean-cargo-tests.sh @@ -3,7 +3,7 @@ SCRIPT_DIR="$(dirname "$0")" ROOT_DIR=$(realpath "$SCRIPT_DIR/..") -sudo nvme disconnect-all +nix-sudo nvme disconnect-all # Detach any loop devices created for test purposes for back_file in "/tmp/io-engine-tests"/*; do @@ -14,12 +14,12 @@ for back_file in "/tmp/io-engine-tests"/*; do while IFS= read -r device; do if [ -n "$device" ]; then echo "Detaching loop device: $device" - losetup -d "$device" + sudo losetup -d "$device" fi done <<< "$devices" done # Delete the directory too -rmdir --ignore-fail-on-non-empty "/tmp/io-engine-tests" +nix-sudo rmdir --ignore-fail-on-non-empty "/tmp/io-engine-tests" 2>/dev/null for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}}') ; do