Skip to content

Commit

Permalink
Simple smoltcp is working with msg ids
Browse files Browse the repository at this point in the history
  • Loading branch information
hunhoffe committed Aug 11, 2023
1 parent f90038f commit 52f9c17
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 238 deletions.
6 changes: 2 additions & 4 deletions kernel/src/arch/x86_64/rackscale/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ pub(crate) fn run() {
// Start running the RPC server
log::info!("Starting RPC server for client {:?}!", mid);
loop {
let _handled = server
.try_handle()
.expect("Controller failed to handle RPC");
let _handled = server.handle().expect("Controller failed to handle RPC");
}
}

Expand Down Expand Up @@ -156,7 +154,7 @@ pub(crate) fn poll_interface() {
}
}

let _ = server.try_handle();
let _ = server.handle();
}
}

Expand Down
1 change: 1 addition & 0 deletions kernel/tests/s06_rackscale_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ fn rackscale_userspace_multicore_test(transport: RackscaleTransport) {
let mut test_run = RackscaleRun::new("userspace-smp".to_string(), built);
test_run.client_match_fn = client_match_fn;
test_run.transport = transport;
test_run.memory = 2048;
let machine = Machine::determine();
test_run.cores_per_client = core::cmp::min(4, (machine.max_cores() - 1) / 2);
test_run.wait_for_client = true;
Expand Down
13 changes: 5 additions & 8 deletions lib/rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::rpc::*;
use crate::transport::Transport;

pub struct Client {
transport: Arc<Mutex<Box<dyn Transport + Send + Sync>>>,
transport: Box<dyn Transport + Send + Sync>,
hdrs: ArrayVec<Arc<Mutex<RPCHeader>>, MAX_INFLIGHT_MSGS>,
msg_id: AtomicU8,
}
Expand All @@ -25,7 +25,7 @@ impl Client {
}
Client {
// Always lock transport first, then header
transport: Arc::new(Mutex::new(transport)),
transport,
hdrs,
msg_id: AtomicU8::new(0),
}
Expand All @@ -36,20 +36,18 @@ impl Client {
let data_in_len = data_in.iter().fold(0, |acc, x| acc + x.len());
debug_assert!(data_in_len < MsgLen::MAX as usize);

// Get client locks
let mut transport = self.transport.lock();
// Doesn't matter what header we use -> we are accessing the Client mutably.
let mut hdr = self.hdrs[0].lock();

// Connect
transport.client_connect()?;
self.transport.client_connect()?;

// Assemble header with connection data
hdr.msg_type = RPC_TYPE_CONNECT;
hdr.msg_len = data_in_len as MsgLen;

// Send and receive response
transport.send_and_recv(&mut hdr, data_in, &mut [])
self.transport.send_and_recv(&mut hdr, data_in, &mut [])
}

/// Calls a remote RPC function with ID
Expand All @@ -67,7 +65,6 @@ impl Client {
let msg_id = self.msg_id.fetch_add(1, Ordering::SeqCst);

// Get client locks
let transport = self.transport.lock();
let mut hdr = self.hdrs[msg_id as usize].lock();

// Assemble header
Expand All @@ -76,6 +73,6 @@ impl Client {
hdr.msg_len = data_in_len as MsgLen;

// Send and receive message
transport.send_and_recv(&mut hdr, data_in, data_out)
self.transport.send_and_recv(&mut hdr, data_in, data_out)
}
}
6 changes: 6 additions & 0 deletions lib/rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ pub struct RPCHeader {
pub const HDR_LEN: usize = core::mem::size_of::<RPCHeader>();

impl RPCHeader {
pub fn copy_from(&mut self, from: RPCHeader) {
self.msg_id = from.msg_id;
self.msg_type = from.msg_type;
self.msg_len = from.msg_len;
}

/// # Safety
/// - `self` must be valid RPCHeader
#[inline(always)]
Expand Down
33 changes: 1 addition & 32 deletions lib/rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,6 @@ impl<'t, 'a> Server<'a> {
Ok(())
}

/// Try to handle 1 RPC per client, if data is available (non-blocking if RPCs not available)
pub fn try_handle(&mut self) -> Result<bool, RPCError> {
match self.try_receive()? {
Some(rpc_id) => match self.handlers[rpc_id as usize] {
Some(func) => {
func(&mut self.hdr, &mut self.data)?;
self.reply()?;
Ok(true)
}
None => {
debug!("Invalid RPCType({}), ignoring", rpc_id);
Ok(false)
}
},
None => Ok(false),
}
}

/// Run the RPC server
pub fn run_server(&mut self) -> Result<(), RPCError> {
loop {
Expand All @@ -119,23 +101,10 @@ impl<'t, 'a> Server<'a> {
fn receive(&mut self) -> Result<RPCType, RPCError> {
// Receive request header
self.transport
.recv_msg(&mut self.hdr, &mut [&mut self.data])?;
.recv_msg(&mut self.hdr, None, &mut [&mut self.data])?;
Ok(self.hdr.msg_type)
}

/// receives next RPC call with RPC ID
fn try_receive(&mut self) -> Result<Option<RPCType>, RPCError> {
// Receive request header
if !self
.transport
.try_recv_msg(&mut self.hdr, &mut [&mut self.data])?
{
return Ok(None);
}

Ok(Some(self.hdr.msg_type))
}

/// Replies an RPC call with results
fn reply(&mut self) -> Result<(), RPCError> {
self.transport
Expand Down
15 changes: 5 additions & 10 deletions lib/rpc/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,29 @@ mod smoltcp;
pub use self::smoltcp::TCPTransport;
pub use shmem::transport::ShmemTransport;

use crate::rpc::{RPCError, RPCHeader};
use crate::rpc::{MsgId, RPCError, RPCHeader};

pub trait Transport {
fn max_send(&self) -> usize;

fn max_recv(&self) -> usize;

/// Receive an RPC message from a remote node, blocking
fn recv_msg(&self, hdr: &mut RPCHeader, payload: &mut [&mut [u8]]) -> Result<(), RPCError>;

/// Receive an RPC message from a remote node, non-blocking except to avoid partial receive
fn try_recv_msg(
fn recv_msg(
&self,
hdr: &mut RPCHeader,
recipient_id: Option<MsgId>,
payload: &mut [&mut [u8]],
) -> Result<bool, RPCError>;
) -> Result<(), RPCError>;

/// Send an RPC message to a remote node, blocking
fn send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result<(), RPCError>;

/// Send an RPC message to a remote node, non-blocking except to avoid partial send
fn try_send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result<bool, RPCError>;

/// Client-side method to setup connection to server
fn client_connect(&mut self) -> Result<(), RPCError>;

/// Server-side method to setup connection to client
fn server_accept(&self) -> Result<(), RPCError>;
fn server_accept(&mut self) -> Result<(), RPCError>;

/// Round-trip message passing.
fn send_and_recv(
Expand Down
49 changes: 8 additions & 41 deletions lib/rpc/src/transport/shmem/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,12 @@ impl<'a> Transport for ShmemTransport<'a> {
Ok(())
}

fn try_send_msg(&self, hdr: &RPCHeader, payload: &[&[u8]]) -> Result<bool, RPCError> {
let mut pointers: [&[u8]; 7] = [&[1]; 7];
pointers[0] = unsafe { &hdr.as_bytes()[..] };
let mut index = 1;
for d in payload {
pointers[index] = d;
index += 1;
}
Ok(self.tx.try_send(&pointers[..payload.len() + 1]))
}

fn recv_msg(&self, hdr: &mut RPCHeader, payload: &mut [&mut [u8]]) -> Result<(), RPCError> {
fn recv_msg(
&self,
hdr: &mut RPCHeader,
recipient_id: Option<MsgId>,
payload: &mut [&mut [u8]],
) -> Result<(), RPCError> {
if payload.is_empty() {
self.rx.recv(&mut [unsafe { &mut hdr.as_mut_bytes()[..] }]);
return Ok(());
Expand All @@ -106,38 +100,11 @@ impl<'a> Transport for ShmemTransport<'a> {
Ok(())
}

fn try_recv_msg(
&self,
hdr: &mut RPCHeader,
payload: &mut [&mut [u8]],
) -> Result<bool, RPCError> {
let mut pointers: [&mut [u8]; 7] = [
&mut [1],
&mut [1],
&mut [1],
&mut [1],
&mut [1],
&mut [1],
&mut [1],
];
pointers[0] = unsafe { &mut hdr.as_mut_bytes()[..] };
let mut index = 1;
let num_out = payload.len() + 1;
for p in payload {
pointers[index] = p;
index += 1;
}
match self.rx.try_recv(&mut pointers[..num_out]) {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
}

fn client_connect(&mut self) -> Result<(), RPCError> {
Ok(())
}

fn server_accept(&self) -> Result<(), RPCError> {
fn server_accept(&mut self) -> Result<(), RPCError> {
Ok(())
}

Expand All @@ -148,7 +115,7 @@ impl<'a> Transport for ShmemTransport<'a> {
recv_payload: &mut [&mut [u8]],
) -> Result<(), RPCError> {
self.send_msg(hdr, send_payload)?;
self.recv_msg(hdr, recv_payload)
self.recv_msg(hdr, Some(hdr.msg_id), recv_payload)
}
}

Expand Down
Loading

0 comments on commit 52f9c17

Please sign in to comment.