Skip to content

Commit

Permalink
Fixed ethernet rpc connection issues on single core client
Browse files Browse the repository at this point in the history
  • Loading branch information
zmckevitt committed Nov 28, 2023
1 parent 13012e7 commit 4acf557
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 27 deletions.
12 changes: 10 additions & 2 deletions kernel/src/arch/x86_64/rackscale/client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,19 @@ impl ClientState {
.map_or(false, |c| c.transport == Transport::Ethernet)
{
let num_cores: u64 = atopology::MACHINE_TOPOLOGY.num_threads() as u64;
let mid = *crate::environment::MACHINE_ID;
let port_base = CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT);

log::debug!(
"Sending ethernet initialization for client with {:?} cores, mid {:?}, and port {:?}",
num_cores,
mid,
port_base,
);

crate::transport::ethernet::init_ethernet_rpc(
smoltcp::wire::IpAddress::v4(172, 31, 0, 11),
CONTROLLER_PORT_BASE
+ (*crate::environment::MACHINE_ID as u16 - 1) * MAX_CORES_PER_CLIENT,
port_base,
num_cores,
false,
)
Expand Down
49 changes: 29 additions & 20 deletions kernel/src/arch/x86_64/rackscale/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,57 +45,66 @@ pub(crate) fn run() {
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
let port_base = CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT;

log::debug!(
"Initializing transport with mid {:?} on port {:?}",
mid,
port_base
);
let transport = Box::new(
TCPTransport::new(
None,
CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT,
Arc::clone(&ETHERNET_IFACE),
)
.expect("Failed to create TCP transport"),
TCPTransport::new(None, port_base, Arc::clone(&ETHERNET_IFACE))
.expect("Failed to create TCP transport"),
);
let mut server = Server::new(transport);
register_rpcs(&mut server);
servers.push(server);

ClientReadyCount.fetch_add(1, Ordering::SeqCst);
// while !DCMServerReady.load(Ordering::SeqCst) {}
while !DCMServerReady.load(Ordering::SeqCst) {}

servers[0]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");

log::debug!("Initial RPC server initialized. Learning client topology...");

// wait until controller learns about client topology
while (*rpc_servers_to_register.lock() == 0) {
let start = rawtime::Instant::now();
while start.elapsed() < Duration::from_secs(1) {
spin_loop();
}
}
while (*rpc_servers_to_register.lock() == 0) {}

log::debug!(
"Received client topology, registering subsequent {:?} cores",
*rpc_servers_to_register.lock()
);

for i in 0..*rpc_servers_to_register.lock() {
// register n-1 servers as we already handled the initial request
// will do nothing if no more servers to register
for i in 0..*rpc_servers_to_register.lock() - 1 {
let transport = Box::new(
TCPTransport::new(
None,
CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT) + i as u16 + 1,
port_base + (i as u16 + 1),
Arc::clone(&ETHERNET_IFACE),
)
.expect("Failed to create TCP transport"),
);
let mut server = Server::new(transport);
register_rpcs(&mut server);
servers.push(server);
*rpc_servers_to_register.lock() -= 1;
}
*rpc_servers_to_register.lock() = 0;

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

// Wait for all clients to connect before fulfilling any RPCs.
while !DCMServerReady.load(Ordering::SeqCst) {}
log::debug!("Transports added. Adding clients...");

// already registered the first server
// will do nothing if no more servers to register
for s_index in 1..servers.len() {
servers[s_index]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
}

log::debug!("Finished registering RPC servers for client");
} else if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Shmem)
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/arch/x86_64/rackscale/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub(crate) fn register_client(hdr: &mut RPCHeader, payload: &mut [u8]) -> Result
return Ok(());
}

*rpc_servers_to_register.lock() = req.num_cores - 1;
*rpc_servers_to_register.lock() = req.num_cores;

// Parse out hw_threads
let hw_threads = match unsafe { decode::<Vec<CpuThread>>(hwthreads_data) } {
Expand Down
7 changes: 3 additions & 4 deletions kernel/src/transport/ethernet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ pub(crate) fn init_ethernet_rpc(

let mut clients: Vec<rpc::client::Client> = Vec::new();

for i in 0..num_cores {
let offset = i;
let server_port = server_port_base + offset as u16;
for core in 0..num_cores {
let server_port = server_port_base + core as u16;

let rpc_transport = Box::new(
TCPTransport::new(Some(server_ip), server_port, Arc::clone(&ETHERNET_IFACE))
Expand All @@ -119,7 +118,7 @@ pub(crate) fn init_ethernet_rpc(
let mut client = Client::new(rpc_transport);

// only send client data on first client registration request
let send_client_data = if i == 0 { true } else { false };
let send_client_data = if core == 0 { true } else { false };

client = initialize_client(client, send_client_data, is_dcm)
.expect("Failed to initialize client");
Expand Down

0 comments on commit 4acf557

Please sign in to comment.