diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index daf91843a..466fc2ca8 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -25,11 +25,12 @@ use crate::error::{KError, KResult}; use crate::memory::backends::MemManager; use crate::memory::shmem_affinity::{local_shmem_affinity, mid_to_shmem_affinity}; use crate::process::MAX_PROCESSES; +use crate::transport::shmem::NUM_SHMEM_TRANSPORTS; /// This is the state the client records about itself pub(crate) struct ClientState { /// The RPC client used to communicate with the controller - pub(crate) rpc_client: Arc>, + pub(crate) rpc_clients: Arc, { NUM_SHMEM_TRANSPORTS as usize }>>, /// Used to store shmem affinity base pages pub(crate) affinity_base_pages: Arc>, MAX_MACHINES>>, @@ -41,7 +42,9 @@ pub(crate) struct ClientState { impl ClientState { pub(crate) fn new() -> ClientState { // Create network stack and instantiate RPC Client - let rpc_client = if crate::CMDLINE + // TODO(rackscale, hack): only allow shmem for now + /* + let rpc_clients = if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Ethernet) { @@ -60,6 +63,13 @@ impl ClientState { .expect("Failed to initialize shmem RPC"), )) }; + */ + let clients = + crate::transport::shmem::init_shmem_rpc(true).expect("Failed to initialize shmem RPC"); + let mut rpc_clients = ArrayVec::new(); + for client in clients.into_iter() { + rpc_clients.push(Mutex::new(client)); + } let mut per_process_base_pages = ArrayVec::new(); for _i in 0..MAX_PROCESSES { @@ -76,7 +86,7 @@ impl ClientState { log::debug!("Finished initializing client state"); ClientState { - rpc_client, + rpc_clients: Arc::new(rpc_clients), affinity_base_pages: Arc::new(affinity_base_pages), per_process_base_pages: Arc::new(per_process_base_pages), } diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index 23339f0ef..d14c9d837 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -18,7 +18,7 @@ use crate::arch::rackscale::dcm::{ use crate::arch::MAX_MACHINES; use crate::cmdline::Transport; use crate::transport::ethernet::ETHERNET_IFACE; -use crate::transport::shmem::create_shmem_transport; +use crate::transport::shmem::{create_shmem_transport, NUM_SHMEM_TRANSPORTS}; use super::*; @@ -32,6 +32,8 @@ pub(crate) fn run() { let mid = *crate::environment::CORE_ID; // Initialize one server per controller thread + // TODO(rackscale, hack): only support shmem for now + /* let mut server = if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Ethernet) @@ -51,26 +53,34 @@ pub(crate) fn run() { .get() .map_or(false, |c| c.transport == Transport::Shmem) { - let transport = Box::new( - create_shmem_transport(mid.try_into().unwrap()) - .expect("Failed to create shmem transport"), - ); + */ + let transports = + create_shmem_transport(mid.try_into().unwrap()).expect("Failed to create shmem transport"); - let mut server = Server::new(transport); + let mut servers: ArrayVec, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new(); + for transport in transports.into_iter() { + let mut server = Server::new(Box::new(transport)); register_rpcs(&mut server); - server + servers.push(server); + } + + /* } else { unreachable!("No supported transport layer specified in kernel argument"); }; + */ ClientReadyCount.fetch_add(1, Ordering::SeqCst); // Wait for all clients to connect before fulfilling any RPCs. while !DCMServerReady.load(Ordering::SeqCst) {} - server + // TODO(rackscale, hack): only register core 0 + //for s_index in 0..servers.len() { + servers[0] .add_client(&CLIENT_REGISTRAR) .expect("Failed to accept client"); + //} ClientReadyCount.fetch_add(1, Ordering::SeqCst); @@ -114,9 +124,11 @@ pub(crate) fn run() { // Start running the RPC server log::info!("Starting RPC server for client {:?}!", mid); loop { - let _handled = server - .try_handle() - .expect("Controller failed to handle RPC"); + for s_index in 0..servers.len() { + let _handled = servers[s_index] + .try_handle() + .expect("Controller failed to handle RPC"); + } } } diff --git a/kernel/src/arch/x86_64/rackscale/fileops/close.rs b/kernel/src/arch/x86_64/rackscale/fileops/close.rs index 76a8424e4..75d9cfcfa 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/close.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/close.rs @@ -34,11 +34,13 @@ pub(crate) fn rpc_close(pid: usize, fd: FileDescriptor) -> KResult<(u64, u64)> { let mut res_data = [0u8; core::mem::size_of::>()]; // Call Close() RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Close as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Close as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/delete.rs b/kernel/src/arch/x86_64/rackscale/fileops/delete.rs index 4b4157f65..cf5fa52f5 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/delete.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/delete.rs @@ -36,11 +36,13 @@ pub(crate) fn rpc_delete(pid: usize, pathname: String) -> KResult<(u64, u64)> { let mut res_data = [0u8; core::mem::size_of::>()]; // Call RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Delete as RPCType, - &[&req_data, &pathname.as_bytes()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Delete as RPCType, + &[&req_data, &pathname.as_bytes()], + &mut [&mut res_data], + )?; // Decode result - return result if decoding successful if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs b/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs index 89ff5451c..33a83864d 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs @@ -34,11 +34,13 @@ pub(crate) fn rpc_getinfo + Debug>(pid: usize, name: P) -> KResul // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::GetInfo as RPCType, - &[&req_data, name.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::GetInfo as RPCType, + &[&req_data, name.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs b/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs index a64802c18..c8879a3c2 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs @@ -42,11 +42,13 @@ pub(crate) fn rpc_mkdir + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::MkDir as RPCType, - &[&req_data, pathname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::MkDir as RPCType, + &[&req_data, pathname.as_ref()], + &mut [&mut res_data], + )?; // Parse and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/open.rs b/kernel/src/arch/x86_64/rackscale/fileops/open.rs index f263acf22..cb21c281c 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/open.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/open.rs @@ -54,11 +54,13 @@ fn rpc_open_create + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call the RPC - CLIENT_STATE.rpc_client.lock().call( - rpc_type, - &[&req_data, pathname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + rpc_type, + &[&req_data, pathname.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/rename.rs b/kernel/src/arch/x86_64/rackscale/fileops/rename.rs index 55cf81faf..ccfbc81f9 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/rename.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/rename.rs @@ -44,11 +44,13 @@ pub(crate) fn rpc_rename + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call the RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::FileRename as RPCType, - &[&req_data, oldname.as_ref(), newname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::FileRename as RPCType, + &[&req_data, oldname.as_ref(), newname.as_ref()], + &mut [&mut res_data], + )?; // Parse and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/rw.rs b/kernel/src/arch/x86_64/rackscale/fileops/rw.rs index a45ac4a97..81eb3a62b 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/rw.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/rw.rs @@ -69,8 +69,7 @@ pub(crate) fn rpc_writeat( } else { KernelRpc::WriteAt as RPCType }; - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call(rpc_type, &[&req_data, &data], &mut [&mut res_data])?; @@ -129,11 +128,13 @@ pub(crate) fn rpc_readat( KernelRpc::ReadAt as RPCType }; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReadAt as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReadAt as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, if successful, return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs b/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs index 205d9d803..9dae7c098 100644 --- a/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs +++ b/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs @@ -40,7 +40,11 @@ unsafe_abomonate!(ShmemRegion: base, affinity); // This isn't truly a syscall pub(crate) fn rpc_get_shmem_frames(pid: Option, num_frames: usize) -> KResult> { assert!(num_frames > 0); - log::debug!("GetShmemFrames({:?})", num_frames); + log::debug!( + "GetShmemFrames({:?}) core={:?}", + num_frames, + kpi::system::mtid_from_gtid(*crate::environment::CORE_ID) + ); let mid = if pid.is_none() { Some(*crate::environment::MACHINE_ID) @@ -66,8 +70,7 @@ pub(crate) fn rpc_get_shmem_frames(pid: Option, num_frames: usize) -> KResu for i in 0..max_res_size { res_data.push(0u8); } - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call( KernelRpc::GetShmemFrames as RPCType, diff --git a/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs b/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs index 3c2a5f997..3a4cccf63 100644 --- a/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs +++ b/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs @@ -63,8 +63,7 @@ pub(crate) fn rpc_get_shmem_structure( // Make buffer max size of MAX_PROCESS (for NrProcLogs), 1 (for NrLog) let mut res_data = [0u8; core::mem::size_of::<[u64; MAX_PROCESSES]>()]; - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call( KernelRpc::GetShmemStructure as RPCType, diff --git a/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs b/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs index 6c2dce75e..eb4b5f2cb 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs @@ -45,11 +45,13 @@ pub(crate) fn rpc_allocate_physical(pid: Pid, size: u64, affinity: u64) -> KResu // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::AllocatePhysical as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::AllocatePhysical as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/print.rs b/kernel/src/arch/x86_64/rackscale/processops/print.rs index 283aacf9b..3ceafab85 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/print.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/print.rs @@ -35,11 +35,13 @@ pub(crate) fn rpc_log(msg: String) -> KResult<(u64, u64)> { // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Log as RPCType, - &[&req_data, print_str.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Log as RPCType, + &[&req_data, print_str.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/release_core.rs b/kernel/src/arch/x86_64/rackscale/processops/release_core.rs index e912d0540..586f7b52a 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/release_core.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/release_core.rs @@ -38,11 +38,13 @@ pub(crate) fn rpc_release_core(pid: Pid, gtid: ThreadId) -> KResult<(u64, u64)> // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReleaseCore as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReleaseCore as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs b/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs index 1bc76e7d4..b067125d9 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs @@ -54,11 +54,13 @@ pub(crate) fn rpc_release_physical(pid: Pid, frame_id: u64) -> KResult<(u64, u64 // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReleasePhysical as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReleasePhysical as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/request_core.rs b/kernel/src/arch/x86_64/rackscale/processops/request_core.rs index bda7182ef..8c83465b1 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/request_core.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/request_core.rs @@ -38,11 +38,13 @@ pub(crate) fn rpc_request_core(pid: Pid, new_pid: bool, entry_point: u64) -> KRe // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::RequestCore as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::RequestCore as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs b/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs index 8cbcb3aec..6bc5577b2 100644 --- a/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs +++ b/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs @@ -31,11 +31,13 @@ pub(crate) fn rpc_get_hardware_threads( let mut res_data = [0u8; core::mem::size_of::>() + 5 * 4096]; // Call GetHardwareThreads() RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::GetHardwareThreads as RPCType, - &[&[]], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::GetHardwareThreads as RPCType, + &[&[]], + &mut [&mut res_data], + )?; // Decode and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/transport/shmem.rs b/kernel/src/transport/shmem.rs index b518a4551..f2b5c6126 100644 --- a/kernel/src/transport/shmem.rs +++ b/kernel/src/transport/shmem.rs @@ -307,13 +307,20 @@ const SHMEM_QUEUE_SIZE: usize = 32; // The total size of two queues(sender and reciever) should be less than the transport size. #[cfg(feature = "rpc")] -const_assert!(2 * SHMEM_QUEUE_SIZE * QUEUE_ENTRY_SIZE <= SHMEM_TRANSPORT_SIZE as usize); +const_assert!(2 * SHMEM_QUEUE_SIZE * QUEUE_ENTRY_SIZE <= SINGLE_SHMEM_TRANSPORT_SIZE as usize); #[cfg(feature = "rpc")] -pub(crate) const SHMEM_TRANSPORT_SIZE: u64 = 2 * 1024 * 1024; +pub(crate) const SINGLE_SHMEM_TRANSPORT_SIZE: u64 = 2 * 1024 * 1024; +// TODO(rackscale, hack): max cores at 24 for now #[cfg(feature = "rpc")] -pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult> { +pub(crate) const NUM_SHMEM_TRANSPORTS: u64 = 24; + +#[cfg(feature = "rpc")] +pub(crate) const SHMEM_TRANSPORT_SIZE: u64 = SINGLE_SHMEM_TRANSPORT_SIZE * NUM_SHMEM_TRANSPORTS; + +#[cfg(feature = "rpc")] +pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult>> { use rpc::transport::shmem::allocator::ShmemAllocator; use rpc::transport::shmem::Queue; use rpc::transport::shmem::{Receiver, Sender}; @@ -326,57 +333,77 @@ pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult= SHMEM_TRANSPORT_SIZE); - let allocator = ShmemAllocator::new(base_addr.as_u64(), SHMEM_TRANSPORT_SIZE); - match crate::CMDLINE.get().map_or(Mode::Native, |c| c.mode) { - Mode::Controller => { - let server_to_client_queue = - Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_to_server_queue = - Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let server_sender = Sender::with_shared_queue(server_to_client_queue.clone()); - let server_receiver = Receiver::with_shared_queue(client_to_server_queue.clone()); - log::info!( - "Controller: Created shared-memory transport for machine {}! size={:?}, base={:?}", + let mut transports = Vec::try_with_capacity(NUM_SHMEM_TRANSPORTS as usize)?; + + for transport_offset in 0..NUM_SHMEM_TRANSPORTS { + let allocator = ShmemAllocator::new( + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE, + SINGLE_SHMEM_TRANSPORT_SIZE, + ); + match crate::CMDLINE.get().map_or(Mode::Native, |c| c.mode) { + Mode::Controller => { + let server_to_client_queue = + Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_to_server_queue = + Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let server_sender = Sender::with_shared_queue(server_to_client_queue.clone()); + let server_receiver = Receiver::with_shared_queue(client_to_server_queue.clone()); + log::info!( + "Controller: Created shared-memory transport for machine {}! size={:?}, base={:x}", mid, - SHMEM_TRANSPORT_SIZE, - base_addr - ); - Ok(ShmemTransport::new(server_receiver, server_sender)) - } - Mode::Client => { - let server_to_client_queue = - Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_to_server_queue = - Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_receiver = Receiver::with_shared_queue(server_to_client_queue.clone()); - let client_sender = Sender::with_shared_queue(client_to_server_queue.clone()); - log::info!( - "Client: Created shared-memory transport! size={:?}, base={:?}", - SHMEM_TRANSPORT_SIZE, - base_addr + SINGLE_SHMEM_TRANSPORT_SIZE, + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE ); - Ok(ShmemTransport::new(client_receiver, client_sender)) - } - Mode::Native => { - log::error!("Native mode not supported for shmem"); - Err(KError::InvalidNativeMode) + transports.push(ShmemTransport::new(server_receiver, server_sender)); + } + Mode::Client => { + let server_to_client_queue = + Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_to_server_queue = + Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_receiver = Receiver::with_shared_queue(server_to_client_queue.clone()); + let client_sender = Sender::with_shared_queue(client_to_server_queue.clone()); + log::info!( + "Client: Created shared-memory transport! size={:?}, base={:x}", + SINGLE_SHMEM_TRANSPORT_SIZE, + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE + ); + transports.push(ShmemTransport::new(client_receiver, client_sender)); + } + Mode::Native => { + log::error!("Native mode not supported for shmem"); + return Err(KError::InvalidNativeMode); + } } } + Ok(transports) } #[cfg(feature = "rpc")] pub(crate) fn init_shmem_rpc( send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest -) -> KResult { +) -> KResult> { use crate::arch::rackscale::registration::initialize_client; use rpc::client::Client; // Set up the transport - let transport = Box::try_new(create_shmem_transport(*crate::environment::MACHINE_ID)?)?; - + let transports = create_shmem_transport(*crate::environment::MACHINE_ID)?; + let mut clients = Vec::try_with_capacity(transports.len())?; + let mut first = true; + for transport in transports.into_iter() { + let client = Client::new(Box::new(transport)); + + let client = if first { + first = false; + initialize_client(client, send_client_data).expect("Failed to initialize client") + } else { + //initialize_client(client, false).expect("Failed to initialize client") + client + }; + clients.push(client); + } // Create the client - let client = Client::new(transport); - initialize_client(client, send_client_data) + Ok(clients) } #[cfg(feature = "rackscale")] diff --git a/kernel/tests/s06_rackscale_tests.rs b/kernel/tests/s06_rackscale_tests.rs index 0954febdb..8274a6914 100644 --- a/kernel/tests/s06_rackscale_tests.rs +++ b/kernel/tests/s06_rackscale_tests.rs @@ -22,6 +22,7 @@ fn s06_rackscale_shmem_userspace_smoke_test() { rackscale_userspace_smoke_test(RackscaleTransport::Shmem); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_smoke_test() { @@ -132,6 +133,7 @@ fn s06_rackscale_shmem_fs_test() { rackscale_fs_test(RackscaleTransport::Shmem); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_fs_test() { @@ -237,6 +239,7 @@ fn s06_rackscale_shmem_userspace_multicore_test() { rackscale_userspace_multicore_test(RackscaleTransport::Shmem); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_multicore_test() { @@ -278,6 +281,7 @@ fn rackscale_userspace_multicore_test(transport: RackscaleTransport) { test_run.run_rackscale(); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_multicore_multiclient() { diff --git a/kernel/tests/s11_rackscale_benchmarks.rs b/kernel/tests/s11_rackscale_benchmarks.rs index 525822e9f..975dfb440 100644 --- a/kernel/tests/s11_rackscale_benchmarks.rs +++ b/kernel/tests/s11_rackscale_benchmarks.rs @@ -26,6 +26,7 @@ fn s11_rackscale_shmem_fxmark_benchmark() { rackscale_fxmark_benchmark(RackscaleTransport::Shmem); } +#[ignore] #[test] #[cfg(not(feature = "baremetal"))] fn s11_rackscale_ethernet_fxmark_benchmark() {