diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index a4cef78f..f7616771 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -46,11 +46,19 @@ impl ClientState { .map_or(false, |c| c.transport == Transport::Ethernet) { let num_cores: u64 = atopology::MACHINE_TOPOLOGY.num_threads() as u64; + let mid = *crate::environment::MACHINE_ID; + let port_base = CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT); + + log::debug!( + "Sending ethernet initialization for client with {:?} cores, mid {:?}, and port {:?}", + num_cores, + mid, + port_base, + ); crate::transport::ethernet::init_ethernet_rpc( smoltcp::wire::IpAddress::v4(172, 31, 0, 11), - CONTROLLER_PORT_BASE - + (*crate::environment::MACHINE_ID as u16 - 1) * MAX_CORES_PER_CLIENT, + port_base, num_cores, false, ) diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index 95ea43ac..e3201b1a 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -45,36 +45,45 @@ pub(crate) fn run() { .get() .map_or(false, |c| c.transport == Transport::Ethernet) { + let port_base = CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT; + + log::debug!( + "Initializing transport with mid {:?} on port {:?}", + mid, + port_base + ); let transport = Box::new( - TCPTransport::new( - None, - CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT, - Arc::clone(ÐERNET_IFACE), - ) - .expect("Failed to create TCP transport"), + TCPTransport::new(None, port_base, Arc::clone(ÐERNET_IFACE)) + .expect("Failed to create TCP transport"), ); let mut server = Server::new(transport); register_rpcs(&mut server); servers.push(server); + ClientReadyCount.fetch_add(1, Ordering::SeqCst); - // while !DCMServerReady.load(Ordering::SeqCst) {} + while !DCMServerReady.load(Ordering::SeqCst) {} + servers[0] .add_client(&CLIENT_REGISTRAR) .expect("Failed to accept client"); + log::debug!("Initial RPC server initialized. Learning client topology..."); + // wait until controller learns about client topology - while (*rpc_servers_to_register.lock() == 0) { - let start = rawtime::Instant::now(); - while start.elapsed() < Duration::from_secs(1) { - spin_loop(); - } - } + while (*rpc_servers_to_register.lock() == 0) {} + + log::debug!( + "Received client topology, registering subsequent {:?} cores", + *rpc_servers_to_register.lock() + ); - for i in 0..*rpc_servers_to_register.lock() { + // register n-1 servers as we already handled the initial request + // will do nothing if no more servers to register + for i in 0..*rpc_servers_to_register.lock() - 1 { let transport = Box::new( TCPTransport::new( None, - CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT) + i as u16 + 1, + port_base + (i as u16 + 1), Arc::clone(ÐERNET_IFACE), ) .expect("Failed to create TCP transport"), @@ -82,20 +91,20 @@ pub(crate) fn run() { let mut server = Server::new(transport); register_rpcs(&mut server); servers.push(server); - *rpc_servers_to_register.lock() -= 1; } + *rpc_servers_to_register.lock() = 0; - ClientReadyCount.fetch_add(1, Ordering::SeqCst); - - // Wait for all clients to connect before fulfilling any RPCs. - while !DCMServerReady.load(Ordering::SeqCst) {} + log::debug!("Transports added. Adding clients..."); // already registered the first server + // will do nothing if no more servers to register for s_index in 1..servers.len() { servers[s_index] .add_client(&CLIENT_REGISTRAR) .expect("Failed to accept client"); } + + log::debug!("Finished registering RPC servers for client"); } else if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Shmem) diff --git a/kernel/src/arch/x86_64/rackscale/registration.rs b/kernel/src/arch/x86_64/rackscale/registration.rs index 26bdb195..53629cd3 100644 --- a/kernel/src/arch/x86_64/rackscale/registration.rs +++ b/kernel/src/arch/x86_64/rackscale/registration.rs @@ -119,7 +119,7 @@ pub(crate) fn register_client(hdr: &mut RPCHeader, payload: &mut [u8]) -> Result return Ok(()); } - *rpc_servers_to_register.lock() = req.num_cores - 1; + *rpc_servers_to_register.lock() = req.num_cores; // Parse out hw_threads let hw_threads = match unsafe { decode::>(hwthreads_data) } { diff --git a/kernel/src/transport/ethernet.rs b/kernel/src/transport/ethernet.rs index 3b9b3cd0..d6ef51f9 100644 --- a/kernel/src/transport/ethernet.rs +++ b/kernel/src/transport/ethernet.rs @@ -108,9 +108,8 @@ pub(crate) fn init_ethernet_rpc( let mut clients: Vec = Vec::new(); - for i in 0..num_cores { - let offset = i; - let server_port = server_port_base + offset as u16; + for core in 0..num_cores { + let server_port = server_port_base + core as u16; let rpc_transport = Box::new( TCPTransport::new(Some(server_ip), server_port, Arc::clone(ÐERNET_IFACE)) @@ -119,7 +118,7 @@ pub(crate) fn init_ethernet_rpc( let mut client = Client::new(rpc_transport); // only send client data on first client registration request - let send_client_data = if i == 0 { true } else { false }; + let send_client_data = if core == 0 { true } else { false }; client = initialize_client(client, send_client_data, is_dcm) .expect("Failed to initialize client");