From 7e27a13c05d02fdb5d48ce08961139a3c5a4685f Mon Sep 17 00:00:00 2001 From: Diwakar Sharma Date: Mon, 1 Jul 2024 12:48:23 +0000 Subject: [PATCH] feat: add RDMA listener to Mayastor Nvmf target This adds the capability to listen for rdma connections to the Mayastor Nvmf target if the rdma feature is enabled during installation. Any Nvmf subsystem facing the host i.e. the nexus nvmf subsystem will now be able to support tcp and rdma both. Signed-off-by: Diwakar Sharma --- io-engine/src/bin/io-engine.rs | 2 +- io-engine/src/core/bdev.rs | 3 +- io-engine/src/core/env.rs | 5 + io-engine/src/subsys/config/opts.rs | 106 +++++++++++++++++++- io-engine/src/subsys/nvmf/subsystem.rs | 32 ++++-- io-engine/src/subsys/nvmf/target.rs | 131 ++++++++++++++++++++----- io-engine/src/subsys/nvmf/transport.rs | 52 +++++++++- io-engine/src/target/nvmf.rs | 2 +- io-engine/tests/nvmf.rs | 2 +- 9 files changed, 299 insertions(+), 36 deletions(-) diff --git a/io-engine/src/bin/io-engine.rs b/io-engine/src/bin/io-engine.rs index a62335108..9aaf54133 100644 --- a/io-engine/src/bin/io-engine.rs +++ b/io-engine/src/bin/io-engine.rs @@ -108,7 +108,7 @@ fn start_tokio_runtime(args: &MayastorCliArgs) { if args.rdma { env::set_var("ENABLE_RDMA", "true"); - warn!("RDMA is enabled for Mayastor NVMEoF target"); + warn!("RDMA is requested to be enabled for Mayastor NVMEoF target"); } unsafe { diff --git a/io-engine/src/core/bdev.rs b/io-engine/src/core/bdev.rs index b3cb81f05..3b8586563 100644 --- a/io-engine/src/core/bdev.rs +++ b/io-engine/src/core/bdev.rs @@ -210,6 +210,7 @@ where ) -> Result { let me = unsafe { self.get_unchecked_mut() }; let props = NvmfShareProps::from(props); + let is_lvol = me.driver() == "lvol"; let ptpl = props.ptpl().as_ref().map(|ptpl| ptpl.path()); @@ -232,7 +233,7 @@ where .await .context(ShareNvmf {})?; - subsystem.start().await.context(ShareNvmf {}) + subsystem.start(!is_lvol).await.context(ShareNvmf {}) } fn create_ptpl(&self) -> Result, Self::Error> { diff --git a/io-engine/src/core/env.rs b/io-engine/src/core/env.rs index 3554a07d5..7661a138d 100644 --- a/io-engine/src/core/env.rs +++ b/io-engine/src/core/env.rs @@ -834,6 +834,11 @@ impl MayastorEnvironment { .map(|s| s.clone()) } + /// Check if RDMA needs to be enabled for Mayastor target. + pub fn rdma(&self) -> bool { + self.rdma + } + /// Detects IP address for NVMF target by the interface specified in CLI /// arguments. fn detect_nvmf_tgt_iface_ip(iface: &str) -> Result { diff --git a/io-engine/src/subsys/config/opts.rs b/io-engine/src/subsys/config/opts.rs index aee50d53a..05b89c6d1 100644 --- a/io-engine/src/subsys/config/opts.rs +++ b/io-engine/src/subsys/config/opts.rs @@ -28,7 +28,7 @@ use spdk_rs::{ use std::{ convert::TryFrom, - fmt::{Debug, Display}, + fmt::{Debug, Display, Formatter}, mem::zeroed, ptr::null_mut, str::FromStr, @@ -84,6 +84,26 @@ impl GetOpts for NexusOpts { /// Must be equal to the size of `spdk_nvmf_target_opts.crdt`. pub const TARGET_CRDT_LEN: usize = 3; +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub enum NvmfTgtTransport { + Rdma, + #[default] + Tcp, +} + +impl Display for NvmfTgtTransport { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + NvmfTgtTransport::Rdma => "rdma", + NvmfTgtTransport::Tcp => "tcp", + } + ) + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct NvmfTgtConfig { @@ -94,11 +114,13 @@ pub struct NvmfTgtConfig { /// NVMF target Command Retry Delay in x100 ms. pub crdt: [u16; TARGET_CRDT_LEN], /// TCP transport options - pub opts: NvmfTcpTransportOpts, + pub opts_tcp: NvmfTcpTransportOpts, /// NVMF target interface (ip, mac, name or subnet). pub interface: Option, /// Enable RDMA for NVMF target or not pub rdma: Option, + /// RDMA transport options + pub opts_rdma: NvmfRdmaTransportOpts, } impl From for Box { @@ -126,9 +148,10 @@ impl Default for NvmfTgtConfig { name: "mayastor_target".to_string(), max_namespaces: 2048, crdt: args.nvmf_tgt_crdt, - opts: NvmfTcpTransportOpts::default(), + opts_tcp: NvmfTcpTransportOpts::default(), interface: None, rdma: None, + opts_rdma: NvmfRdmaTransportOpts::default(), } } } @@ -173,6 +196,36 @@ pub struct NvmfTcpTransportOpts { data_wr_pool_size: u32, } +/// Settings for the RDMA transport +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct NvmfRdmaTransportOpts { + /// max queue depth + max_queue_depth: u16, + /// max qpairs per controller + max_qpairs_per_ctrl: u16, + /// encapsulated data size + in_capsule_data_size: u32, + /// max IO size + max_io_size: u32, + /// IO unit size + io_unit_size: u32, + /// max admin queue depth per admin queue + max_aq_depth: u32, + /// num of shared buffers + num_shared_buf: u32, + /// cache size + buf_cache_size: u32, + /// dif + dif_insert_or_strip: bool, + /// abort execution timeout + abort_timeout_sec: u32, + /// acceptor poll rate, microseconds + acceptor_poll_rate: u32, + /// Use zero-copy operations if the underlying bdev supports them + zcopy: bool, +} + /// try to read an env variable or returns the default when not found pub(crate) fn try_from_env(name: &str, default: T) -> T where @@ -288,6 +341,29 @@ impl Default for NvmfTcpTransportOpts { } } +// todo: Tune the defaults by experiments or recommendations, if required. +impl Default for NvmfRdmaTransportOpts { + fn default() -> Self { + Self { + max_queue_depth: try_from_env("NVMF_RDMA_MAX_QUEUE_DEPTH", 128), + in_capsule_data_size: 8192, + max_io_size: 131_072, + io_unit_size: 8192, + max_qpairs_per_ctrl: try_from_env( + "NVMF_RDMA_MAX_QPAIRS_PER_CTRL", + 32, + ), + num_shared_buf: try_from_env("NVMF_RDMA_NUM_SHARED_BUF", 2047), + buf_cache_size: try_from_env("NVMF_RDMA_BUF_CACHE_SIZE", 64), + dif_insert_or_strip: false, + max_aq_depth: 32, + abort_timeout_sec: 1, + acceptor_poll_rate: try_from_env("NVMF_ACCEPTOR_POLL_RATE", 10_000), + zcopy: try_from_env("NVMF_ZCOPY", 1) == 1, + } + } +} + /// we cannot add derives for YAML to these structs directly, so we need to /// copy them. The upside though, is that if the FFI structures change, we will /// know about it during compile time. @@ -319,6 +395,30 @@ impl From for spdk_nvmf_transport_opts { } } +impl From for spdk_nvmf_transport_opts { + fn from(o: NvmfRdmaTransportOpts) -> Self { + Self { + max_queue_depth: o.max_queue_depth, + max_qpairs_per_ctrlr: o.max_qpairs_per_ctrl, + in_capsule_data_size: o.in_capsule_data_size, + max_io_size: o.max_io_size, + io_unit_size: o.io_unit_size, + max_aq_depth: o.max_aq_depth, + num_shared_buffers: o.num_shared_buf, + buf_cache_size: o.buf_cache_size, + dif_insert_or_strip: o.dif_insert_or_strip, + reserved29: Default::default(), + abort_timeout_sec: o.abort_timeout_sec, + association_timeout: 120000, + transport_specific: std::ptr::null(), + opts_size: std::mem::size_of::() as u64, + acceptor_poll_rate: o.acceptor_poll_rate, + zcopy: o.zcopy, + reserved61: Default::default(), + } + } +} + /// generic settings for the NVMe bdev (all our replicas) #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(default, deny_unknown_fields)] diff --git a/io-engine/src/subsys/nvmf/subsystem.rs b/io-engine/src/subsys/nvmf/subsystem.rs index a80314559..382c054f1 100644 --- a/io-engine/src/subsys/nvmf/subsystem.rs +++ b/io-engine/src/subsys/nvmf/subsystem.rs @@ -68,6 +68,7 @@ use crate::{ ffihelper::{cb_arg, done_cb, AsStr, FfiResult, IntoCString}, lvs::Lvol, subsys::{ + config::opts::NvmfTgtTransport, make_subsystem_serial, nvmf::{transport::TransportId, Error, NVMF_TGT}, Config, @@ -798,7 +799,7 @@ impl NvmfSubsystem { } // we currently allow all listeners to the subsystem - async fn add_listener(&self) -> Result<(), Error> { + async fn add_listener(&self, xprt: NvmfTgtTransport) -> Result<(), Error> { extern "C" fn listen_cb(arg: *mut c_void, status: i32) { let s = unsafe { Box::from_raw(arg as *mut oneshot::Sender) }; s.send(status).unwrap(); @@ -807,8 +808,11 @@ impl NvmfSubsystem { let cfg = Config::get(); // dont yet enable both ports, IOW just add one transportID now - - let trid_replica = TransportId::new(cfg.nexus_opts.nvmf_replica_port); + let trid_replica = if xprt == NvmfTgtTransport::Tcp { + TransportId::new(cfg.nexus_opts.nvmf_replica_port) + } else { + TransportId::new_with_rdma(cfg.nexus_opts.nvmf_replica_port) + }; let (s, r) = oneshot::channel::(); unsafe { @@ -907,8 +911,21 @@ impl NvmfSubsystem { /// start the subsystem previously created -- note that we destroy it on /// failure to ensure the state is not in limbo and to avoid leaking /// resources - pub async fn start(self) -> Result { - self.add_listener().await?; + pub async fn start(self, need_rdma: bool) -> Result { + self.add_listener(NvmfTgtTransport::Tcp).await?; + if need_rdma { + let _ = + self.add_listener(NvmfTgtTransport::Rdma) + .await + .map_err(|e| { + warn!( + "NvmfSubsystem RDMA listener add failed {}. \ + Subsystem will be accessible over TCP only.\ + {:?}", + e, self + ); + }); + } if let Err(e) = self .change_state("start", |ss, cb, arg| unsafe { @@ -1118,7 +1135,10 @@ impl NvmfSubsystem { pub fn uri_endpoints(&self) -> Option> { if let Some(v) = self.listeners_to_vec() { let nqn = self.get_nqn(); - Some(v.iter().map(|t| format!("{t}/{nqn}")).collect::>()) + let retvec = Some( + v.iter().map(|t| format!("{t}/{nqn}")).collect::>(), + ); + retvec } else { None } diff --git a/io-engine/src/subsys/nvmf/target.rs b/io-engine/src/subsys/nvmf/target.rs index b93d1d47c..32aae5ae1 100644 --- a/io-engine/src/subsys/nvmf/target.rs +++ b/io-engine/src/subsys/nvmf/target.rs @@ -28,14 +28,13 @@ use spdk_rs::libspdk::{ use crate::{ constants::NVME_CONTROLLER_MODEL_ID, - core::{Cores, Mthread, Reactors}, + core::{Cores, MayastorEnvironment, Mthread, Reactor, Reactors}, ffihelper::{AsStr, FfiResult}, subsys::{ nvmf::{ poll_groups::PollGroup, subsystem::NvmfSubsystem, - transport, - transport::{get_ipv4_address, TransportId}, + transport::{self, get_ipv4_address, TransportId}, Error, NVMF_PGS, }, @@ -57,6 +56,8 @@ pub struct Target { poll_group_count: u16, /// The current state of the target next_state: TargetState, + /// Whether the target supports RDMA transport + rdma: bool, } impl Default for Target { @@ -102,6 +103,7 @@ impl Target { tgt: NonNull::dangling(), poll_group_count: 0, next_state: TargetState::Init, + rdma: MayastorEnvironment::global_or_default().rdma(), } } @@ -174,10 +176,23 @@ impl Target { /// add the transport to the target fn add_transport(&self) { - Reactors::master().send_future(async { - let result = transport::add_tcp_transport().await; + Reactors::master().send_future(async move { + let ret_tcp = transport::add_tcp_transport().await; + let rdma = MayastorEnvironment::global_or_default().rdma(); + if rdma { + info!("Adding RDMA transport for Mayastor Nvmf target"); + if transport::add_rdma_transport().await.is_err() { + // XXX: How to reset self.rdma = false + // todo: add event mechanism for Target and Nvmfsubsystem + warn!( + "RDMA enablement failed due to rdma transport add failure.\ + The target will however keep running with only tcp(if \ + that succeeds), with performance expectations of tcp" + ); + } + } NVMF_TGT.with(|t| { - if result.is_err() { + if ret_tcp.is_err() { t.borrow_mut().next_state = TargetState::Invalid; } t.borrow_mut().next_state(); @@ -268,19 +283,88 @@ impl Target { }); } info!( - "nvmf target listening on {}:({},{})", + "nvmf target listening(tcp) on {}:({},{})", get_ipv4_address().unwrap(), trid_nexus.trsvcid.as_str(), trid_replica.trsvcid.as_str(), ); + + if self.rdma { + // listen RDMA also. + let _ = self.listen_rdma() + .map_err(|e| { + warn!( + "failed to listen rdma on address. err: {}:\ + The target will however keep running with \ + only tcp listener, with performance expectations of tcp", e + ); + }); + } self.next_state(); Ok(()) } + /// Listen for incoming connections by default we only listen on the replica + /// port + fn listen_rdma(&mut self) -> Result<()> { + let cfg = Config::get(); + let trid_nexus = + TransportId::new_with_rdma(cfg.nexus_opts.nvmf_nexus_port); + let mut opts = spdk_nvmf_listen_opts { + opts_size: 0, + transport_specific: null(), + secure_channel: false, + reserved1: unsafe { zeroed() }, + ana_state: 0, + }; + unsafe { + spdk_nvmf_listen_opts_init( + &mut opts, + std::mem::size_of::() as u64, + ); + } + let rc = unsafe { + spdk_nvmf_tgt_listen_ext( + self.tgt.as_ptr(), + trid_nexus.as_ptr(), + &mut opts, + ) + }; + + if rc != 0 { + return Err(Error::CreateTarget { + msg: "failed to back target".into(), + }); + } + + let trid_replica = + TransportId::new_with_rdma(cfg.nexus_opts.nvmf_replica_port); + let rc = unsafe { + spdk_nvmf_tgt_listen_ext( + self.tgt.as_ptr(), + trid_replica.as_ptr(), + &mut opts, + ) + }; + + if rc != 0 { + return Err(Error::CreateTarget { + msg: "failed to front target".into(), + }); + } + info!( + "nvmf target listening(rdma) on {}:({},{})", + get_ipv4_address().unwrap(), + trid_nexus.trsvcid.as_str(), + trid_replica.trsvcid.as_str(), + ); + Ok(()) + } + /// Create the discovery for the target -- note that the discovery system is /// not started. fn create_discovery_subsystem(&self) -> NvmfSubsystem { - debug!("enabling discovery for target"); + debug!("creating discovery subsystem for target"); let discovery = unsafe { NvmfSubsystem::from(spdk_nvmf_subsystem_create( self.tgt.as_ptr(), @@ -302,7 +386,6 @@ impl Target { .unwrap(); discovery.allow_any(true); - discovery } @@ -394,20 +477,24 @@ impl Target { let trid_nexus = TransportId::new(cfg.nexus_opts.nvmf_nexus_port); let trid_replica = TransportId::new(cfg.nexus_opts.nvmf_replica_port); + let mut trid_vec = vec![trid_nexus, trid_replica]; + // todo: handle by fetching current listeners dynamically here. + // Since this is shutdown path we're good this way for + // now. + if self.rdma { + trid_vec.push(TransportId::new_with_rdma( + cfg.nexus_opts.nvmf_nexus_port, + )); + trid_vec.push(TransportId::new_with_rdma( + cfg.nexus_opts.nvmf_replica_port, + )); + } - unsafe { - spdk_nvmf_tgt_stop_listen( - self.tgt.as_ptr(), - trid_replica.as_ptr(), - ) - }; - - unsafe { - spdk_nvmf_tgt_stop_listen( - self.tgt.as_ptr(), - trid_nexus.as_ptr(), - ) - }; + for trid in trid_vec { + unsafe { + spdk_nvmf_tgt_stop_listen(self.tgt.as_ptr(), trid.as_ptr()) + }; + } } unsafe { diff --git a/io-engine/src/subsys/nvmf/transport.rs b/io-engine/src/subsys/nvmf/transport.rs index a47a29832..b91eff23d 100644 --- a/io-engine/src/subsys/nvmf/transport.rs +++ b/io-engine/src/subsys/nvmf/transport.rs @@ -14,6 +14,7 @@ use spdk_rs::{ spdk_nvme_transport_id, spdk_nvmf_tgt_add_transport, spdk_nvmf_transport_create, + SPDK_NVME_TRANSPORT_RDMA, SPDK_NVME_TRANSPORT_TCP, SPDK_NVMF_ADRFAM_IPV4, SPDK_NVMF_TRSVCID_MAX_LEN, @@ -32,9 +33,12 @@ use crate::{ static TCP_TRANSPORT: Lazy = Lazy::new(|| CString::new("TCP").unwrap()); +static RDMA_TRANSPORT: Lazy = + Lazy::new(|| CString::new("RDMA").unwrap()); + pub async fn add_tcp_transport() -> Result<(), Error> { let cfg = Config::get(); - let mut opts = cfg.nvmf_tgt_conf.opts.into(); + let mut opts = cfg.nvmf_tgt_conf.opts_tcp.into(); let transport = unsafe { spdk_nvmf_transport_create(TCP_TRANSPORT.as_ptr(), &mut opts) }; @@ -62,6 +66,36 @@ pub async fn add_tcp_transport() -> Result<(), Error> { Ok(()) } +pub async fn add_rdma_transport() -> Result<(), Error> { + let cfg = Config::get(); + let mut opts = cfg.nvmf_tgt_conf.opts_rdma.into(); + let transport = unsafe { + spdk_nvmf_transport_create(RDMA_TRANSPORT.as_ptr(), &mut opts) + }; + + transport.to_result(|_| Error::Transport { + source: Errno::UnknownErrno, + msg: "failed to create transport".into(), + })?; + + let (s, r) = oneshot::channel::>(); + unsafe { + NVMF_TGT.with(|t| { + spdk_nvmf_tgt_add_transport( + t.borrow().tgt.as_ptr(), + transport, + Some(done_errno_cb), + cb_arg(s), + ); + }) + }; + + let _result = r.await.unwrap(); + + debug!("Added RDMA nvmf transport"); + Ok(()) +} + pub struct TransportId(pub(crate) spdk_nvme_transport_id); impl Deref for TransportId { type Target = spdk_nvme_transport_id; @@ -97,6 +131,22 @@ impl TransportId { Self(trid) } + pub fn new_with_rdma(port: u16) -> Self { + let address = get_ipv4_address().unwrap(); + + let mut trid = spdk_nvme_transport_id { + trtype: SPDK_NVME_TRANSPORT_RDMA, + adrfam: SPDK_NVMF_ADRFAM_IPV4, + ..Default::default() + }; + let port = format!("{port}"); + assert!(port.len() < SPDK_NVMF_TRSVCID_MAX_LEN as usize); + copy_cstr_with_null(&RDMA_TRANSPORT, &mut trid.trstring); + copy_str_with_null(&address, &mut trid.traddr); + copy_str_with_null(&port, &mut trid.trsvcid); + Self(trid) + } + pub fn as_ptr(&self) -> *mut spdk_nvme_transport_id { &self.0 as *const _ as *mut spdk_nvme_transport_id } diff --git a/io-engine/src/target/nvmf.rs b/io-engine/src/target/nvmf.rs index 63be1da07..c92c0331c 100644 --- a/io-engine/src/target/nvmf.rs +++ b/io-engine/src/target/nvmf.rs @@ -16,7 +16,7 @@ where }; let ss = NvmfSubsystem::try_from(bdev)?; - ss.start().await?; + ss.start(false).await?; Ok(()) } diff --git a/io-engine/tests/nvmf.rs b/io-engine/tests/nvmf.rs index f3c774c87..b2f31e657 100644 --- a/io-engine/tests/nvmf.rs +++ b/io-engine/tests/nvmf.rs @@ -41,7 +41,7 @@ fn nvmf_target() { let bdev = UntypedBdev::lookup_by_name(&b).unwrap(); let ss = NvmfSubsystem::try_from(&bdev).unwrap(); - ss.start().await.unwrap(); + ss.start(false).await.unwrap(); }); // test we can not create the same one again