diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index 91e8b1a69..6d6e15726 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -114,9 +114,7 @@ pub(crate) fn run() { // Start running the RPC server log::info!("Starting RPC server for client {:?}!", mid); loop { - let _handled = server - .try_handle() - .expect("Controller failed to handle RPC"); + let _handled = server.handle().expect("Controller failed to handle RPC"); } } @@ -156,7 +154,7 @@ pub(crate) fn poll_interface() { } } - let _ = server.try_handle(); + let _ = server.handle(); } } diff --git a/kernel/tests/s06_rackscale_tests.rs b/kernel/tests/s06_rackscale_tests.rs index b0a4652ad..b044efdf2 100644 --- a/kernel/tests/s06_rackscale_tests.rs +++ b/kernel/tests/s06_rackscale_tests.rs @@ -241,6 +241,7 @@ fn rackscale_userspace_multicore_test(transport: RackscaleTransport) { let mut test_run = RackscaleRun::new("userspace-smp".to_string(), built); test_run.client_match_fn = client_match_fn; test_run.transport = transport; + test_run.memory = 2048; let machine = Machine::determine(); test_run.cores_per_client = core::cmp::min(4, (machine.max_cores() - 1) / 2); test_run.wait_for_client = true; diff --git a/lib/rpc/src/client.rs b/lib/rpc/src/client.rs index 679156150..469f6f8d6 100644 --- a/lib/rpc/src/client.rs +++ b/lib/rpc/src/client.rs @@ -12,7 +12,7 @@ use crate::rpc::*; use crate::transport::Transport; pub struct Client { - transport: Arc>>, + transport: Box, hdrs: ArrayVec>, MAX_INFLIGHT_MSGS>, msg_id: AtomicU8, } @@ -25,7 +25,7 @@ impl Client { } Client { // Always lock transport first, then header - transport: Arc::new(Mutex::new(transport)), + transport, hdrs, msg_id: AtomicU8::new(0), } @@ -36,20 +36,18 @@ impl Client { let data_in_len = data_in.iter().fold(0, |acc, x| acc + x.len()); debug_assert!(data_in_len < MsgLen::MAX as usize); - // Get client locks - let mut transport = self.transport.lock(); // Doesn't matter what header we use -> we are accessing the Client mutably. let mut hdr = self.hdrs[0].lock(); // Connect - transport.client_connect()?; + self.transport.client_connect()?; // Assemble header with connection data hdr.msg_type = RPC_TYPE_CONNECT; hdr.msg_len = data_in_len as MsgLen; // Send and receive response - transport.send_and_recv(&mut hdr, data_in, &mut []) + self.transport.send_and_recv(&mut hdr, data_in, &mut []) } /// Calls a remote RPC function with ID @@ -67,7 +65,6 @@ impl Client { let msg_id = self.msg_id.fetch_add(1, Ordering::SeqCst); // Get client locks - let transport = self.transport.lock(); let mut hdr = self.hdrs[msg_id as usize].lock(); // Assemble header @@ -76,6 +73,6 @@ impl Client { hdr.msg_len = data_in_len as MsgLen; // Send and receive message - transport.send_and_recv(&mut hdr, data_in, data_out) + self.transport.send_and_recv(&mut hdr, data_in, data_out) } } diff --git a/lib/rpc/src/rpc.rs b/lib/rpc/src/rpc.rs index 144a71d5d..6aa55be30 100644 --- a/lib/rpc/src/rpc.rs +++ b/lib/rpc/src/rpc.rs @@ -40,6 +40,12 @@ pub struct RPCHeader { pub const HDR_LEN: usize = core::mem::size_of::(); impl RPCHeader { + pub fn copy_from(&mut self, from: RPCHeader) { + self.msg_id = from.msg_id; + self.msg_type = from.msg_type; + self.msg_len = from.msg_len; + } + /// # Safety /// - `self` must be valid RPCHeader #[inline(always)] diff --git a/lib/rpc/src/server.rs b/lib/rpc/src/server.rs index 945841577..9682bf894 100644 --- a/lib/rpc/src/server.rs +++ b/lib/rpc/src/server.rs @@ -90,24 +90,6 @@ impl<'t, 'a> Server<'a> { Ok(()) } - /// Try to handle 1 RPC per client, if data is available (non-blocking if RPCs not available) - pub fn try_handle(&mut self) -> Result { - match self.try_receive()? { - Some(rpc_id) => match self.handlers[rpc_id as usize] { - Some(func) => { - func(&mut self.hdr, &mut self.data)?; - self.reply()?; - Ok(true) - } - None => { - debug!("Invalid RPCType({}), ignoring", rpc_id); - Ok(false) - } - }, - None => Ok(false), - } - } - /// Run the RPC server pub fn run_server(&mut self) -> Result<(), RPCError> { loop { @@ -119,23 +101,10 @@ impl<'t, 'a> Server<'a> { fn receive(&mut self) -> Result { // Receive request header self.transport - .recv_msg(&mut self.hdr, &mut [&mut self.data])?; + .recv_msg(&mut self.hdr, None, &mut [&mut self.data])?; Ok(self.hdr.msg_type) } - /// receives next RPC call with RPC ID - fn try_receive(&mut self) -> Result, RPCError> { - // Receive request header - if !self - .transport - .try_recv_msg(&mut self.hdr, &mut [&mut self.data])? - { - return Ok(None); - } - - Ok(Some(self.hdr.msg_type)) - } - /// Replies an RPC call with results fn reply(&mut self) -> Result<(), RPCError> { self.transport diff --git a/lib/rpc/src/transport/mod.rs b/lib/rpc/src/transport/mod.rs index 4bca400f6..bf08ff83e 100644 --- a/lib/rpc/src/transport/mod.rs +++ b/lib/rpc/src/transport/mod.rs @@ -4,7 +4,7 @@ mod smoltcp; pub use self::smoltcp::TCPTransport; pub use shmem::transport::ShmemTransport; -use crate::rpc::{RPCError, RPCHeader}; +use crate::rpc::{MsgId, RPCError, RPCHeader}; pub trait Transport { fn max_send(&self) -> usize; @@ -12,26 +12,21 @@ pub trait Transport { fn max_recv(&self) -> usize; /// Receive an RPC message from a remote node, blocking - fn recv_msg(&self, hdr: &mut RPCHeader, payload: &mut [&mut [u8]]) -> Result<(), RPCError>; - - /// Receive an RPC message from a remote node, non-blocking except to avoid partial receive - fn try_recv_msg( + fn recv_msg( &self, hdr: &mut RPCHeader, + recipient_id: Option, payload: &mut [&mut [u8]], - ) -> Result; + ) -> Result<(), RPCError>; /// Send an RPC message to a remote node, blocking fn send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result<(), RPCError>; - /// Send an RPC message to a remote node, non-blocking except to avoid partial send - fn try_send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result; - /// Client-side method to setup connection to server fn client_connect(&mut self) -> Result<(), RPCError>; /// Server-side method to setup connection to client - fn server_accept(&self) -> Result<(), RPCError>; + fn server_accept(&mut self) -> Result<(), RPCError>; /// Round-trip message passing. fn send_and_recv( diff --git a/lib/rpc/src/transport/shmem/transport.rs b/lib/rpc/src/transport/shmem/transport.rs index 277fdeb54..9d143c434 100644 --- a/lib/rpc/src/transport/shmem/transport.rs +++ b/lib/rpc/src/transport/shmem/transport.rs @@ -70,18 +70,12 @@ impl<'a> Transport for ShmemTransport<'a> { Ok(()) } - fn try_send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result { - let mut pointers: [&[u8]; 7] = [&[1]; 7]; - pointers[0] = unsafe { &hdr.as_bytes()[..] }; - let mut index = 1; - for d in payload { - pointers[index] = d; - index += 1; - } - Ok(self.tx.try_send(&pointers[..payload.len() + 1])) - } - - fn recv_msg(&self, hdr: &mut RPCHeader, payload: &mut [&mut [u8]]) -> Result<(), RPCError> { + fn recv_msg( + &self, + hdr: &mut RPCHeader, + recipient_id: Option, + payload: &mut [&mut [u8]], + ) -> Result<(), RPCError> { if payload.is_empty() { self.rx.recv(&mut [unsafe { &mut hdr.as_mut_bytes()[..] }]); return Ok(()); @@ -106,38 +100,11 @@ impl<'a> Transport for ShmemTransport<'a> { Ok(()) } - fn try_recv_msg( - &self, - hdr: &mut RPCHeader, - payload: &mut [&mut [u8]], - ) -> Result { - let mut pointers: [&mut [u8]; 7] = [ - &mut [1], - &mut [1], - &mut [1], - &mut [1], - &mut [1], - &mut [1], - &mut [1], - ]; - pointers[0] = unsafe { &mut hdr.as_mut_bytes()[..] }; - let mut index = 1; - let num_out = payload.len() + 1; - for p in payload { - pointers[index] = p; - index += 1; - } - match self.rx.try_recv(&mut pointers[..num_out]) { - Ok(_) => Ok(true), - Err(_) => Ok(false), - } - } - fn client_connect(&mut self) -> Result<(), RPCError> { Ok(()) } - fn server_accept(&self) -> Result<(), RPCError> { + fn server_accept(&mut self) -> Result<(), RPCError> { Ok(()) } @@ -148,7 +115,7 @@ impl<'a> Transport for ShmemTransport<'a> { recv_payload: &mut [&mut [u8]], ) -> Result<(), RPCError> { self.send_msg(hdr, send_payload)?; - self.recv_msg(hdr, recv_payload) + self.recv_msg(hdr, Some(hdr.msg_id), recv_payload) } } diff --git a/lib/rpc/src/transport/smoltcp.rs b/lib/rpc/src/transport/smoltcp.rs index 34647a728..10c2537f2 100644 --- a/lib/rpc/src/transport/smoltcp.rs +++ b/lib/rpc/src/transport/smoltcp.rs @@ -3,6 +3,7 @@ use alloc::sync::Arc; use alloc::vec::Vec; +use core::sync::atomic::{AtomicBool, Ordering}; use log::{debug, trace, warn}; use spin::Mutex; @@ -25,6 +26,8 @@ pub struct TCPTransport<'a> { server_ip: Option, server_port: u16, client_port: u16, + recv_hdr: Arc>>, + send_lock: AtomicBool, } impl TCPTransport<'_> { @@ -52,7 +55,7 @@ impl TCPTransport<'_> { // Create the TCP socket let socket_tx_buffer = TcpSocketBuffer::new(sock_vec); let mut tcp_socket = TcpSocket::new(socket_rx_buffer, socket_tx_buffer); - tcp_socket.set_ack_delay(None); + //tcp_socket.set_ack_delay(None); // Add socket to interface and record socket handle let server_handle = iface.lock().add_socket(tcp_socket); @@ -63,65 +66,47 @@ impl TCPTransport<'_> { server_ip, server_port, client_port: 10110, + recv_hdr: Arc::new(Mutex::new(None)), + send_lock: AtomicBool::new(false), }) } - fn send(&self, send_buf: &[u8], is_try: bool) -> Result { - trace!("send {:?} bytes, try={:?}", send_buf.len(), is_try); + fn send(&self, send_buf: &[u8]) -> Result<(), RPCError> { + trace!("send {:?} bytes", send_buf.len()); let mut offset = 0; if send_buf.is_empty() { - return Ok(true); - } - - { - let mut iface = self.iface.lock(); - let socket = iface.get_socket::(self.server_handle); - - // Attempt to write from first buffer into the socket send buffer - if socket.can_send() { - if let Ok(bytes_sent) = socket.send_slice(send_buf) { - trace!("send [{:?}-{:?}]", 0, bytes_sent); - offset = bytes_sent; - } - } - } - - // Can't send now - if is_try && offset == 0 { - return Ok(false); - - // All sent - } else if offset == send_buf.len() { - return Ok(true); + return Ok(()); } - // Send rest of the data + // Send the data loop { - let mut iface = self.iface.lock(); - let socket = iface.get_socket::(self.server_handle); - // Send until socket state is bad (shouldn't happen), send buffer is full, all data is sent, - // or no progress is being made (e.g., send_slice starts returning 0) - let bytes_sent = 1; - while socket.can_send() && bytes_sent != 0 { - // Attempt to send until end of data array - if let Ok(bytes_sent) = socket.send_slice(&send_buf[offset..]) { - // Try to send remaining in current send_buf - trace!("sent [{:?}-{:?}]", offset, offset + bytes_sent); - - // Update index if reached end of send_buf - offset += bytes_sent; - if offset == send_buf.len() { - return Ok(true); + { + let mut iface = self.iface.lock(); + let socket = iface.get_socket::(self.server_handle); + // Send until socket state is bad (shouldn't happen), send buffer is full, all data is sent, + // or no progress is being made (e.g., send_slice starts returning 0) + let bytes_sent = 1; + while socket.can_send() && bytes_sent != 0 { + // Attempt to send until end of data array + if let Ok(bytes_sent) = socket.send_slice(&send_buf[offset..]) { + // Try to send remaining in current send_buf + trace!("sent [{:?}-{:?}]", offset, offset + bytes_sent); + + // Update index if reached end of send_buf + offset += bytes_sent; + if offset == send_buf.len() { + return Ok(()); + } + } else { + trace!("send_slice failed... trying again?"); } - } else { - trace!("send_slice failed... trying again?"); } } // Poll the interface only if we must in order to have space in the send buffer { - match iface.poll(Instant::from_millis( + match self.iface.lock().poll(Instant::from_millis( rawtime::duration_since_boot().as_millis() as i64, )) { Ok(_) => {} @@ -130,6 +115,9 @@ impl TCPTransport<'_> { } } } + for _ in 0..5 { + core::hint::spin_loop(); + } } } @@ -145,29 +133,31 @@ impl TCPTransport<'_> { loop { // Recv until socket state is bad (shouldn't happen), all data is received, // or no progress is being made (e.g., recv_slice starts returning 0) - let mut iface = self.iface.lock(); - let socket = iface.get_socket::(self.server_handle); + { + let mut iface = self.iface.lock(); + let socket = iface.get_socket::(self.server_handle); - let bytes_recv = 1; - while socket.can_recv() && bytes_recv != 0 { - // Attempt to recv until end of data array - if let Ok(bytes_recv) = socket.recv_slice(&mut recv_buf[offset..]) { - // Try to recv remaining in current recv_buf - trace!("recv [{:?}-{:?}]", offset, offset + bytes_recv); - - // Update index if reached end of recv_buf - offset += bytes_recv; - if offset == recv_buf.len() { - return Ok(()); + let bytes_recv = 1; + while socket.can_recv() && bytes_recv != 0 { + // Attempt to recv until end of data array + if let Ok(bytes_recv) = socket.recv_slice(&mut recv_buf[offset..]) { + // Try to recv remaining in current recv_buf + trace!("recv [{:?}-{:?}]", offset, offset + bytes_recv); + + // Update index if reached end of recv_buf + offset += bytes_recv; + if offset == recv_buf.len() { + return Ok(()); + } + } else { + debug!("recv_slice failed... trying again?"); } - } else { - debug!("recv_slice failed... trying again?"); } } - // Poll the interface only if we must in order to have space in the send buffer + // Poll the interface only if we must in order to have space in the recv buffer { - match iface.poll(Instant::from_millis( + match self.iface.lock().poll(Instant::from_millis( rawtime::duration_since_boot().as_millis() as i64, )) { Ok(_) => {} @@ -176,73 +166,110 @@ impl TCPTransport<'_> { } } } + for _ in 0..5 { + core::hint::spin_loop(); + } } } fn internal_recv_msg( &self, hdr: &mut RPCHeader, + recipient_id: Option, payload: &mut [&mut [u8]], - is_try: bool, - ) -> Result { - // Try to receive the header, bail if try and data was not received here. Otherwise, proceed - // to finish receiving - trace!("recv_msg, try = {:?}", is_try); - let mut offset = 0; - { - let mut iface = self.iface.lock(); - let socket = iface.get_socket::(self.server_handle); - if socket.can_recv() { - let hdr_slice = unsafe { hdr.as_mut_bytes() }; - if let Ok(bytes_recv) = socket.recv_slice(hdr_slice) { - trace!("recv_msg [{:?}-{:?}]", 0, bytes_recv); - if is_try && bytes_recv == 0 { - return Ok(false); - } - offset = bytes_recv; - } - } else if is_try { - return Ok(false); - } - } + ) -> Result<(), RPCError> { + debug!("internal_recv_msg, recipient_id = {:?}", recipient_id,); - // Finish receiving header if necessary - let hdr_slice = unsafe { hdr.as_mut_bytes() }; - self.recv(&mut hdr_slice[offset..])?; - - // At this point, if try failed, we've already bailed. We've also received all of the header - // So we are ready to read in all payload data. First, do a bit of validation before entering loop - let expected_data = hdr.msg_len as usize; - let max_recv_data = payload.iter().fold(0, |acc, x| acc + x.len()); - if expected_data > max_recv_data { - // Not enough space to store all message data - log::error!( - "Found {:?} payload data, but only have room for {:?}", - expected_data, - max_recv_data + loop { + // Try to grab lock + let mut recv_hdr = self.recv_hdr.lock(); + trace!("internal_recv_msg acquired receive lock"); + + let received_hdr = if let Some(r_hdr) = *recv_hdr { + // already been received + trace!( + "internal_recv_msg acquired receive lock already has header: {:?}", + r_hdr + ); + r_hdr + } else { + // We have not already received a header. + trace!("internal_recv_msg acquired receive lock has NO header"); + let mut new_hdr = RPCHeader::default(); + let hdr_slice = unsafe { new_hdr.as_mut_bytes() }; + self.recv(&mut hdr_slice[..])?; + trace!("internal_recv_msg received new header: {:?}", new_hdr); + *recv_hdr = Some(new_hdr); + new_hdr + }; + + let is_match = match recipient_id { + None => true, + Some(id) => id == received_hdr.msg_id, + }; + debug!( + "internal_recv_msg is_match={:?}, recv_header={:?}, recipient_id={:?}", + is_match, recv_hdr, recipient_id ); - Err(RPCError::InternalError) - } else if expected_data == 0 { - Ok(true) - } else { - // Receive until expected data is fully received - let mut recv_count = 0; - for p in payload.iter_mut() { - if recv_count + p.len() > expected_data { - trace!( - "recv_msg recv payload buf[{:?}-{:?}]", - 0, - expected_data - recv_count + + if is_match { + // Copy received header into our given header + hdr.copy_from(received_hdr); + + // Remove the received header from partial-received status + *recv_hdr = None; + + // We've also received all of the header so we are ready to read in all payload data. + // First, do a bit of validation before entering loop + let expected_data = hdr.msg_len as usize; + let max_recv_data = payload.iter().fold(0, |acc, x| acc + x.len()); + return if expected_data > max_recv_data { + // Not enough space to store all message data + log::error!( + "Found {:?} payload data, but only have room for {:?}", + expected_data, + max_recv_data ); - self.recv(&mut p[..(expected_data - recv_count)])?; - return Ok(true); + Err(RPCError::InternalError) + } else if expected_data == 0 { + trace!("internal_recv_msg done - nothing else to receive"); + Ok(()) } else { - trace!("recv_msg recv payload buf[{:?}-{:?}]", 0, p.len()); - recv_count += p.len(); - self.recv(p)?; + // Receive until expected data is fully received + let mut recv_count = 0; + trace!( + "internal_recv_msg - about to receive {:?} into buffers", + expected_data + ); + for p in payload.iter_mut() { + if recv_count + p.len() > expected_data { + trace!( + "recv_msg recv payload buf[{:?}-{:?}]", + 0, + expected_data - recv_count + ); + self.recv(&mut p[..(expected_data - recv_count)])?; + return Ok(()); + } else { + trace!("recv_msg recv payload buf[{:?}-{:?}]", 0, p.len()); + recv_count += p.len(); + self.recv(p)?; + } + } + trace!( + "internal_recv_msg - finished receiving {:?} into buffers", + expected_data + ); + Ok(()) + }; + } else { + drop(recv_hdr); + trace!("internal_recv_msg no match -> dropped receive header lock"); + // TODO: this isn't a great solution + for _ in 0..5 { + core::hint::spin_loop(); } } - Ok(true) } } } @@ -257,42 +284,52 @@ impl Transport for TCPTransport<'_> { } fn send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result<(), RPCError> { - trace!("send_msg - sending header"); - self.send(&unsafe { hdr.as_bytes() }[..], false)?; - for p in payload { - trace!("send_msg - sending payload"); - self.send(p, false)?; - } - Ok(()) - } + debug!("send_msg"); - fn try_send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result { - trace!("try_send_msg - sending header"); - match self.send(&unsafe { hdr.as_bytes() }[..], true)? { - true => { - for p in payload { - trace!("send_msg - sending payload"); - self.send(p, false)?; + // Set the send lock + loop { + if self + .send_lock + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + break; + } else { + // TODO: this isn't a great solution + for _ in 0..5 { + core::hint::spin_loop(); } - Ok(true) } - false => Ok(false), } - } + trace!("send_msg - acquired lock"); - fn recv_msg(&self, hdr: &mut RPCHeader, payload: &mut [&mut [u8]]) -> Result<(), RPCError> { - trace!("recv_msg"); - self.internal_recv_msg(hdr, payload, false)?; + if let Err(e) = self.send(&unsafe { hdr.as_bytes() }[..]) { + self.send_lock.store(false, Ordering::SeqCst); + trace!("send_msg - released lock on failure"); + return Err(e); + } + for p in payload { + trace!("send_msg - sending payload"); + if let Err(e) = self.send(p) { + self.send_lock.store(false, Ordering::SeqCst); + trace!("send_msg - released lock on failure"); + return Err(e); + } + } + self.send_lock.store(false, Ordering::SeqCst); + debug!("send_msg - done"); Ok(()) } - fn try_recv_msg( + fn recv_msg( &self, hdr: &mut RPCHeader, + recipient_id: Option, payload: &mut [&mut [u8]], - ) -> Result { - trace!("try_recv_msg"); - self.internal_recv_msg(hdr, payload, true) + ) -> Result<(), RPCError> { + trace!("recv_msg"); + self.internal_recv_msg(hdr, recipient_id, payload)?; + Ok(()) } fn send_and_recv( @@ -302,7 +339,7 @@ impl Transport for TCPTransport<'_> { recv_payload: &mut [&mut [u8]], ) -> Result<(), RPCError> { self.send_msg(hdr, send_payload)?; - self.recv_msg(hdr, recv_payload) + self.recv_msg(hdr, Some(hdr.msg_id), recv_payload) } fn client_connect(&mut self) -> Result<(), RPCError> { @@ -346,7 +383,7 @@ impl Transport for TCPTransport<'_> { Ok(()) } - fn server_accept(&self) -> Result<(), RPCError> { + fn server_accept(&mut self) -> Result<(), RPCError> { // Listen { let mut iface = (*self.iface).lock();