Skip to content

Commit

Permalink
Merge #1700
Browse files Browse the repository at this point in the history
1700: Refactor more explicit uses of Lvol with the backend interface r=tiagolobocastro a=tiagolobocastro

    ci: use sudo to remove test files
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    refactor(wipe): use new backend traits
    
    Use traits rather than explicitly use Lvol.
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    refactor(backends): reuse common factory code
    
    Move common code to the factories allowing us to reuse code between
    core components and grpc service.
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    refactor(snapshot): remove unused create-snapshot
    
    Removes unused function still making use of Lvol.
    Probably not previously detected because it was being exposed as pub.
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    refactor(stats): use new ops interface
    
    Use new ops interfaces rather than hardcoded lvs/lvol.
    This will help support stats for other pool backends.
    
    Signed-off-by: Tiago Castro <[email protected]>

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Aug 9, 2024
2 parents ce40263 + 42682ad commit e7383a4
Show file tree
Hide file tree
Showing 22 changed files with 635 additions and 471 deletions.
6 changes: 3 additions & 3 deletions io-engine/src/bdev/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::core::fault_injection::{
FaultDomain,
InjectIoCtx,
};
use crate::replica_backend::bdev_as_replica;
use crate::replica_backend::ReplicaFactory;

/// TODO
type EventDispatcherMap = HashMap<String, DeviceEventDispatcher>;
Expand Down Expand Up @@ -584,14 +584,14 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle {
})
}

// NVMe commands are not applicable for non-NVMe devices.
/// NVMe commands are not applicable for non-NVMe devices.
async fn create_snapshot(
&self,
snapshot: SnapshotParams,
) -> Result<u64, CoreError> {
let bdev = self.handle.get_bdev();

let Some(mut replica) = bdev_as_replica(bdev) else {
let Some(mut replica) = ReplicaFactory::bdev_as_replica(bdev) else {
return Err(CoreError::NotSupported {
source: Errno::ENXIO,
});
Expand Down
25 changes: 16 additions & 9 deletions io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::{
subsys::NvmfSubsystem,
};

use crate::core::{BlockDeviceIoStats, CoreError, IoCompletionStatus};
use crate::core::{BdevStater, BdevStats, CoreError, IoCompletionStatus};
use events_api::event::EventAction;
use spdk_rs::{
libspdk::spdk_bdev_notify_blockcnt_change,
Expand Down Expand Up @@ -360,6 +360,21 @@ impl Display for NexusState {
}
}

#[async_trait::async_trait(?Send)]
impl BdevStater for Nexus<'_> {
type Stats = BdevStats;

async fn stats(&self) -> Result<BdevStats, CoreError> {
let bdev = unsafe { self.bdev() };
bdev.stats().await
}

async fn reset_stats(&self) -> Result<(), CoreError> {
let bdev = unsafe { self.bdev() };
bdev.reset_bdev_io_stats().await
}
}

impl<'n> Nexus<'n> {
/// create a new nexus instance with optionally directly attaching
/// children to it.
Expand Down Expand Up @@ -535,14 +550,6 @@ impl<'n> Nexus<'n> {
unsafe { self.bdev().name().to_string() }
}

/// Returns io stats for underlying Bdev.
pub(crate) async fn bdev_stats(
&self,
) -> Result<BlockDeviceIoStats, CoreError> {
let bdev = unsafe { self.bdev() };
bdev.stats_async().await
}

/// TODO
pub fn req_size(&self) -> u64 {
self.req_size
Expand Down
60 changes: 58 additions & 2 deletions io-engine/src/core/bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ where
}

/// Gets tick rate of the current io engine instance.
/// NOTE: tick_rate returned in SPDK struct is not accurate. Hence we get it
/// via this method.
/// NOTE: tick_rate returned in SPDK struct is not accurate. Hence, we get
/// it via this method.
pub fn get_tick_rate(&self) -> u64 {
unsafe { spdk_get_ticks_hz() }
}
Expand Down Expand Up @@ -404,3 +404,59 @@ where
Default::default()
}
}

/// A Bdev should expose information and IO stats as well as having the ability
/// to reset the cumulative stats.
#[async_trait::async_trait(?Send)]
pub trait BdevStater {
type Stats;

/// Gets tick rate of the bdev stats.
fn tick_rate(&self) -> u64 {
unsafe { spdk_get_ticks_hz() }
}

/// Returns IoStats for a particular bdev.
async fn stats(&self) -> Result<Self::Stats, CoreError>;

/// Resets io stats for a given Bdev.
async fn reset_stats(&self) -> Result<(), CoreError>;
}

/// Bdev IO stats along with its name and uuid.
pub struct BdevStats {
/// Name of the Bdev.
pub name: String,
/// Uuid of the Bdev.
pub uuid: String,
/// Stats of the Bdev.
pub stats: BlockDeviceIoStats,
}
impl BdevStats {
/// Create a new `Self` from the given parts.
pub fn new(name: String, uuid: String, stats: BlockDeviceIoStats) -> Self {
Self {
name,
uuid,
stats,
}
}
}

#[async_trait::async_trait(?Send)]
impl<T: spdk_rs::BdevOps> BdevStater for Bdev<T> {
type Stats = BdevStats;

async fn stats(&self) -> Result<BdevStats, CoreError> {
let stats = self.stats_async().await?;
Ok(BdevStats::new(
self.name().to_string(),
self.uuid_as_string(),
stats,
))
}

async fn reset_stats(&self) -> Result<(), CoreError> {
self.reset_bdev_io_stats().await
}
}
2 changes: 1 addition & 1 deletion io-engine/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use nix::errno::Errno;
use snafu::Snafu;

pub use bdev::{Bdev, BdevIter, UntypedBdev};
pub use bdev::{Bdev, BdevIter, BdevStater, BdevStats, UntypedBdev};
pub use block_device::{
BlockDevice,
BlockDeviceDescriptor,
Expand Down
98 changes: 33 additions & 65 deletions io-engine/src/grpc/v1/pool.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub use crate::pool_backend::FindPoolArgs as PoolIdProbe;
use crate::{
core::{
NvmfShareProps,
Expand All @@ -16,6 +17,7 @@ use crate::{
lvs::{BsError, LvsError},
pool_backend::{
FindPoolArgs,
IPoolFactory,
ListPoolArgs,
PoolArgs,
PoolBackend,
Expand All @@ -34,14 +36,12 @@ use io_engine_api::v1::{
use std::{convert::TryFrom, fmt::Debug, ops::Deref, panic::AssertUnwindSafe};
use tonic::{Request, Status};

pub use crate::pool_backend::FindPoolArgs as PoolIdProbe;

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);

impl From<DestroyPoolRequest> for FindPoolArgs {
fn from(value: DestroyPoolRequest) -> Self {
Self::name_uuid(&value.name, &value.uuid)
Self::name_uuid(value.name, &value.uuid)
}
}
impl From<&destroy_replica_request::Pool> for FindPoolArgs {
Expand All @@ -68,7 +68,7 @@ impl From<&destroy_snapshot_request::Pool> for FindPoolArgs {
}
impl From<ExportPoolRequest> for FindPoolArgs {
fn from(value: ExportPoolRequest) -> Self {
Self::name_uuid(&value.name, &value.uuid)
Self::name_uuid(value.name, &value.uuid)
}
}

Expand Down Expand Up @@ -385,83 +385,51 @@ impl PoolBackend {
}

/// A pool factory with the various types of specific impls.
pub(crate) struct GrpcPoolFactory {
pool_factory: Box<dyn PoolFactory>,
}
pub(crate) struct GrpcPoolFactory(PoolFactory);
impl GrpcPoolFactory {
fn factories() -> Vec<Self> {
vec![PoolBackend::Lvm, PoolBackend::Lvs]
pub(crate) fn factories() -> Vec<Self> {
PoolFactory::factories()
.into_iter()
.filter_map(|b| Self::new(b).ok())
.collect()
.map(Self)
.collect::<Vec<_>>()
}
fn new(backend: PoolBackend) -> Result<Self, Status> {
backend.enabled()?;
let pool_factory = match backend {
PoolBackend::Lvs => {
Box::<crate::lvs::PoolLvsFactory>::default() as _
}
PoolBackend::Lvm => {
Box::<crate::lvm::PoolLvmFactory>::default() as _
}
};
Ok(Self {
pool_factory,
})
Ok(Self(PoolFactory::new(backend)))
}

/// Probe backends for the given name and/or uuid and return the right one.
pub(crate) async fn finder<I: Into<FindPoolArgs>>(
args: I,
) -> Result<PoolGrpc, tonic::Status> {
let args = args.into();
let mut error = None;

for factory in Self::factories() {
match factory.find_pool(&args).await {
Ok(Some(pool)) => {
return Ok(pool);
}
Ok(None) => {}
Err(err) => {
error = Some(err);
}
}
}
Err(error.unwrap_or_else(|| {
Status::not_found(format!("Pool {args:?} not found"))
}))
}
async fn find_pool(
&self,
args: &FindPoolArgs,
) -> Result<Option<PoolGrpc>, tonic::Status> {
let pool = self.as_pool_factory().find(args).await?;
match pool {
Some(pool) => {
let pool_subsystem = ResourceLockManager::get_instance()
.get_subsystem(ProtectedSubsystems::POOL);
let lock_guard =
acquire_subsystem_lock(pool_subsystem, Some(pool.name()))
.await?;
Ok(Some(PoolGrpc::new(pool, lock_guard)))
}
None => Ok(None),
}
) -> Result<PoolGrpc, Status> {
let pool = PoolFactory::find(args).await?;
let pool_subsystem = ResourceLockManager::get_instance()
.get_subsystem(ProtectedSubsystems::POOL);
let lock_guard =
acquire_subsystem_lock(pool_subsystem, Some(pool.name())).await?;
Ok(PoolGrpc::new(pool, lock_guard))
}
async fn list(&self, args: &ListPoolArgs) -> Result<Vec<Pool>, Status> {
let pools = self.as_pool_factory().list(args).await?;
let pools = self.as_factory().list(args).await?;
Ok(pools.into_iter().map(Into::into).collect::<Vec<_>>())
}
/// Lists all `PoolOps` matching the given arguments.
pub(crate) async fn list_ops(
&self,
args: &ListPoolArgs,
) -> Result<Vec<Box<dyn PoolOps>>, Status> {
let pools = self.as_factory().list(args).await?;
Ok(pools)
}
fn backend(&self) -> PoolBackend {
self.as_pool_factory().backend()
self.as_factory().backend()
}
async fn ensure_not_found(
&self,
args: &FindPoolArgs,
backend: PoolBackend,
) -> Result<(), Status> {
if self.as_pool_factory().find(args).await?.is_some() {
if self.as_factory().find(args).await?.is_some() {
if self.backend() != backend {
return Err(Status::invalid_argument(
"Pool Already exists on another backend type",
Expand All @@ -488,7 +456,7 @@ impl GrpcPoolFactory {
// todo: inspect disk contents as well!
factory.ensure_not_found(&finder, args.backend).await?;
}
let pool = self.as_pool_factory().create(args).await?;
let pool = self.as_factory().create(args).await?;
Ok(pool.into())
}
async fn import(&self, args: PoolArgs) -> Result<Pool, Status> {
Expand All @@ -501,11 +469,11 @@ impl GrpcPoolFactory {
for factory in Self::factories() {
factory.ensure_not_found(&finder, args.backend).await?;
}
let pool = self.as_pool_factory().import(args).await?;
let pool = self.as_factory().import(args).await?;
Ok(pool.into())
}
fn as_pool_factory(&self) -> &dyn PoolFactory {
self.pool_factory.deref()
fn as_factory(&self) -> &dyn IPoolFactory {
self.0.as_factory()
}
}

Expand Down Expand Up @@ -641,7 +609,7 @@ impl PoolRpc for PoolService {
pools.extend(fpools);
}
Err(error) => {
let backend = factory.pool_factory.backend();
let backend = factory.0.as_factory().backend();
tracing::error!("Failed to list pools of type {backend:?}, error: {error}");
}
}
Expand Down
Loading

0 comments on commit e7383a4

Please sign in to comment.