Skip to content

Commit

Permalink
Simplify RPC code and clean up controller state
Browse files Browse the repository at this point in the history
  • Loading branch information
hunhoffe committed Jul 21, 2023
1 parent 1988504 commit fce937c
Show file tree
Hide file tree
Showing 35 changed files with 335 additions and 440 deletions.
21 changes: 13 additions & 8 deletions kernel/src/arch/x86_64/rackscale/client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ use crate::process::MAX_PROCESSES;
/// This is the state the client records about itself
pub(crate) struct ClientState {
/// The RPC client used to communicate with the controller
pub(crate) rpc_client: Arc<Mutex<Box<Client>>>,
pub(crate) rpc_client: Arc<Mutex<Client>>,

/// Used to store shmem affinity base pages
pub(crate) affinity_base_pages: Arc<Mutex<ArrayVec<Box<dyn MemManager + Send>, MAX_MACHINES>>>,
/// TODO(rackscale): is the box necessary?
pub(crate) affinity_base_pages: ArrayVec<Arc<Mutex<Box<dyn MemManager + Send>>>, MAX_MACHINES>,

/// Used to store base pages allocated to a process
pub(crate) per_process_base_pages: Arc<Mutex<ArrayVec<FrameCacheBase, MAX_PROCESSES>>>,
pub(crate) per_process_base_pages: ArrayVec<Arc<Mutex<FrameCacheBase>>, MAX_PROCESSES>,
}

impl ClientState {
Expand Down Expand Up @@ -64,20 +65,24 @@ impl ClientState {
let mut per_process_base_pages = ArrayVec::new();
for _i in 0..MAX_PROCESSES {
// TODO(rackscale): this is a bogus affinity because it should really be "ANY_SHMEM"
per_process_base_pages.push(FrameCacheBase::new(local_shmem_affinity()));
per_process_base_pages.push(Arc::new(Mutex::new(FrameCacheBase::new(
local_shmem_affinity(),
))));
}

let mut affinity_base_pages = ArrayVec::new();
for i in 0..MAX_MACHINES {
affinity_base_pages.push(Box::new(FrameCacheBase::new(mid_to_shmem_affinity(i)))
as Box<dyn MemManager + Send>);
affinity_base_pages.push(Arc::new(Mutex::new(Box::new(FrameCacheBase::new(
mid_to_shmem_affinity(i),
))
as Box<dyn MemManager + Send>)));
}

log::debug!("Finished initializing client state");
ClientState {
rpc_client,
affinity_base_pages: Arc::new(Mutex::new(affinity_base_pages)),
per_process_base_pages: Arc::new(Mutex::new(per_process_base_pages)),
affinity_base_pages,
per_process_base_pages,
}
}
}
Expand Down
57 changes: 15 additions & 42 deletions kernel/src/arch/x86_64/rackscale/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@

use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec::Vec;
use arrayvec::ArrayVec;
use core::cell::Cell;
use fallible_collections::FallibleVecGlobal;
use smoltcp::time::Instant;

use rpc::api::RPCServer;
use rpc::rpc::RPCType;
use rpc::server::Server;
use rpc::transport::TCPTransport;

use crate::arch::MAX_MACHINES;
use crate::cmdline::Transport;
use crate::transport::ethernet::ETHERNET_IFACE;
use crate::transport::shmem::create_shmem_transport;
Expand All @@ -23,46 +23,24 @@ pub(crate) const CONTROLLER_PORT_BASE: u16 = 6970;

/// Controller main method
pub(crate) fn run() {
// Create network interface and clock
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) struct Clock(Cell<Instant>);

impl Clock {
fn new() -> Clock {
let rt = rawtime::Instant::now().as_nanos();
let rt_millis = (rt / 1_000_000) as i64;
Clock(Cell::new(Instant::from_millis(rt_millis)))
}

fn elapsed(&self) -> Instant {
self.0.get()
}
}
let clock = Clock::new();

// Initialize one server per client
let num_clients = *crate::environment::NUM_MACHINES - 1;
let mut servers: Vec<Box<dyn RPCServer<ControllerState>>> =
Vec::try_with_capacity(num_clients as usize)
.expect("Failed to allocate vector for RPC server");
let mut servers: ArrayVec<Server, MAX_MACHINES> = ArrayVec::new();

if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
for mid in 0..num_clients {
let transport = Box::try_new(
let transport = Box::new(
TCPTransport::new(
None,
CONTROLLER_PORT_BASE + mid as u16,
Arc::clone(&ETHERNET_IFACE),
)
.expect("Failed to create TCP transport"),
)
.expect("Out of memory during init");
let mut server: Box<dyn RPCServer<ControllerState>> =
Box::try_new(Server::new(transport)).expect("Out of memory during init");
);
let mut server = Server::new(transport);
register_rpcs(&mut server);
servers.push(server);
}
Expand All @@ -71,26 +49,22 @@ pub(crate) fn run() {
.map_or(false, |c| c.transport == Transport::Shmem)
{
for mid in 1..=num_clients {
let transport = Box::try_new(
let transport = Box::new(
create_shmem_transport(mid.try_into().unwrap())
.expect("Failed to create shmem transport"),
)
.expect("Out of memory during init");
);

let mut server: Box<dyn RPCServer<ControllerState>> =
Box::try_new(Server::new(transport)).expect("Out of memory during init");
let mut server = Server::new(transport);
register_rpcs(&mut server);
servers.push(server);
}
} else {
unreachable!("No supported transport layer specified in kernel argument");
}

let mut controller_state = ControllerState::new(num_clients as usize);

for server in servers.iter_mut() {
controller_state = server
.add_client(&CLIENT_REGISTRAR, controller_state)
server
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to connect to remote server");
}

Expand Down Expand Up @@ -136,16 +110,15 @@ pub(crate) fn run() {
}

// Try to handle an RPC request
for server in servers.iter() {
let (mut new_state, _handled) = server
.try_handle(controller_state)
for server in servers.iter_mut() {
let _handled = server
.try_handle()
.expect("Controller failed to handle RPC");
controller_state = new_state;
}
}
}

fn register_rpcs(server: &mut Box<dyn RPCServer<ControllerState>>) {
fn register_rpcs(server: &mut Server) {
// Register all of the RPC functions supported
server
.register(KernelRpc::Close as RPCType, &CLOSE_HANDLER)
Expand Down
173 changes: 107 additions & 66 deletions kernel/src/arch/x86_64/rackscale/controller_state.rs
Original file line number Diff line number Diff line change
@@ -1,116 +1,157 @@
// Copyright © 2022 University of Colorado. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0 OR MIT

use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec::Vec;

use arrayvec::ArrayVec;
use atopology::NodeId;
use lazy_static::lazy_static;
use spin::Mutex;

use kpi::system::{CpuThread, MachineId};
use kpi::system::{new_gtid, CpuThread, GlobalThreadId, MachineId, MachineThreadId};

use crate::arch::rackscale::FrameCacheBase;
use crate::arch::MAX_MACHINES;
use crate::arch::{MAX_CORES, MAX_MACHINES};
use crate::memory::backends::MemManager;
use crate::memory::mcache::MCache;
use crate::memory::shmem_affinity::{local_shmem_affinity, mid_to_shmem_affinity};
use crate::memory::{mcache::MCache, LARGE_PAGE_SIZE};
use crate::memory::vspace::{CoreBitMap, CoreBitMapIter};
use crate::transport::shmem::get_affinity_shmem;

/// TODO(rackscale): think about how we should constrain this?
/// Global state about the local rackscale client
/// Caches of memory for use by the controller. The controller cache includes all shmem belonging to the controller,
/// because DCM does not allocate controller shmem.
lazy_static! {
pub(crate) static ref CONTROLLER_SHMEM_CACHES: Arc<Mutex<ArrayVec<Box<dyn MemManager + Send>, MAX_MACHINES>>> = {
pub(crate) static ref CONTROLLER_SHMEM_CACHES: ArrayVec<Arc<Mutex<Box<dyn MemManager + Send>>>, MAX_MACHINES> = {
let mut shmem_caches = ArrayVec::new();
shmem_caches.push(Box::new(MCache::<2048, 2048>::new_with_frame::<2048, 2048>(
// TODO(rackscale): think about how we should constrain the mcache?
shmem_caches.push(Arc::new(Mutex::new(Box::new(MCache::<2048, 2048>::new_with_frame::<2048, 2048>(
local_shmem_affinity(),
get_affinity_shmem(),
)) as Box<dyn MemManager + Send>);
)) as Box<dyn MemManager + Send>)));
for i in 1..MAX_MACHINES {
shmem_caches.push(Box::new(FrameCacheBase::new(mid_to_shmem_affinity(i)))
as Box<dyn MemManager + Send>);
shmem_caches.push(Arc::new(Mutex::new(Box::new(FrameCacheBase::new(mid_to_shmem_affinity(i)))
as Box<dyn MemManager + Send>)));
}

Arc::new(Mutex::new(shmem_caches))
shmem_caches
};
}

/// TODO(rackscale): think about how we should constrain this?
/// TODO(rackscale): want to lock around individual allocators?
/// Global state about the local rackscale client
/// Caches of memslices allocated by the DCM scheduler
lazy_static! {
pub(crate) static ref SHMEM_MEMSLICE_ALLOCATORS: Arc<Mutex<ArrayVec<MCache<0, 2048>, MAX_MACHINES>>> = {
pub(crate) static ref SHMEM_MEMSLICE_ALLOCATORS: ArrayVec<Arc<Mutex<MCache<0, 2048>>>, MAX_MACHINES> = {
// TODO(rackscale): think about how we should constrain the mcache?
let mut shmem_allocators = ArrayVec::new();
for i in 0..MAX_MACHINES {
shmem_allocators.push(MCache::<0, 2048>::new(mid_to_shmem_affinity(i + 1)));
for i in 1..(MAX_MACHINES + 1) {
shmem_allocators.push(Arc::new(Mutex::new(MCache::<0, 2048>::new(mid_to_shmem_affinity(i)))));
}
Arc::new(Mutex::new(shmem_allocators))
shmem_allocators
};
}

/// This is the state the controller records about each client
pub(crate) struct PerClientState {
/// The client believes it has this ID
pub(crate) mid: MachineId,

/// A list of the hardware threads belonging to this client and whether the thread is scheduler or not
/// TODO(rackscale, performance): make this a core map??
pub(crate) hw_threads: Vec<(CpuThread, bool)>,
struct ThreadMap {
pub num_threads: usize,
pub map: CoreBitMap,
}

impl PerClientState {
pub(crate) fn new(mid: MachineId, hw_threads: Vec<(CpuThread, bool)>) -> PerClientState {
PerClientState { mid, hw_threads }
impl ThreadMap {
fn new() -> ThreadMap {
let map = CoreBitMap { low: 0, high: 0 };
ThreadMap {
num_threads: 0,
map,
}
}
}

/// This is the state of the controller, including all per-client state
pub(crate) struct ControllerState {
/// State related to each client.
per_client_state: ArrayVec<Arc<Mutex<PerClientState>>, MAX_MACHINES>,
}
fn init(&mut self, num_threads: usize) {
// make sure smaller than max size of CoreBitMap
debug_assert!(num_threads <= (u128::BITS as usize) * 2);

impl ControllerState {
pub(crate) fn new(max_clients: usize) -> ControllerState {
let mut per_client_state = ArrayVec::new();
for i in 0..max_clients {
per_client_state.push(Arc::new(Mutex::new(PerClientState::new(i, Vec::new()))));
self.num_threads = num_threads;
for i in 0..num_threads {
self.mark_thread_free(i);
}
ControllerState { per_client_state }
}

pub(crate) fn add_client(&mut self, mid: MachineId, threads: &Vec<CpuThread>) {
let mut client_state = self.per_client_state[mid - 1].lock();
assert!(client_state.hw_threads.len() == 0);
fn mark_thread_free(&mut self, mtid: MachineThreadId) {
debug_assert!(mtid < self.num_threads);
self.map.set_bit(mtid, true);
}

client_state
.hw_threads
.try_reserve_exact(threads.len())
.expect("Failed to reserve room in hw threads vector");
for thread in threads {
client_state.hw_threads.push((*thread, false));
fn claim_first_free_thread(&mut self) -> Option<MachineThreadId> {
let mut iter = CoreBitMapIter(self.map);
if let Some(mtid) = iter.next() {
if mtid < self.num_threads {
self.map.set_bit(mtid, false);
return Some(mtid);
}
}
None
}
}

pub(crate) fn get_client_state(&self, mid: MachineId) -> &Arc<Mutex<PerClientState>> {
&self.per_client_state[mid - 1]
/// This is the state the controller records about each client
pub(crate) struct ControllerState {
/// A composite list of all hardware threads
hw_threads_all: Arc<Mutex<ArrayVec<CpuThread, { MAX_MACHINES * MAX_CORES }>>>,
/// Bit maps to keep track of free/busy hw threads. Index is machine_id - 1
thread_maps: ArrayVec<Arc<Mutex<ThreadMap>>, MAX_MACHINES>,
/// The NodeId of each thread, organized by client. Index is machine_id - 1
affinities_per_client: ArrayVec<Arc<Mutex<ArrayVec<NodeId, MAX_CORES>>>, MAX_MACHINES>,
}

impl ControllerState {
pub(crate) fn init_client_state(&self, mid: MachineId, threads: &Vec<CpuThread>) {
{
// We assume that threads are ordered by gtid within the threads list.
let mut hw_threads = self.hw_threads_all.lock();
let mut affinities = self.affinities_per_client[mid - 1].lock();
for thread in threads {
affinities.push(thread.node_id);
hw_threads.push(*thread);
}
}
let mut thread_map = self.thread_maps[mid - 1].lock();
thread_map.init(threads.len());
}

// TODO(rackscale, efficiency): allocates memory on the fly & also has nested loop
// should be called sparingly or rewritten
pub(crate) fn get_hardware_threads(&self) -> Vec<CpuThread> {
let mut hw_threads = Vec::new();
for client_state in &self.per_client_state {
let state = client_state.lock();
hw_threads
.try_reserve_exact(state.hw_threads.len())
.expect("Failed to reserve room in hw threads vector");
for j in 0..state.hw_threads.len() {
// ignore the thread state and just save the information
hw_threads.push(state.hw_threads[j].0);
}
let hw_threads = self.hw_threads_all.lock();
// TODO(rackscale, performance): copy is relatiely expensive here
hw_threads.to_vec()
}

// Chooses sequentially for cores on the machine.
// TODO(rackscale, performance): it should choose in a NUMA-aware fashion for the remote node.
pub(crate) fn claim_hardware_thread(&self, mid: MachineId) -> Option<(GlobalThreadId, NodeId)> {
let mut thread_map = self.thread_maps[mid - 1].lock();
if let Some(mtid) = thread_map.claim_first_free_thread() {
let affinity = {
let thread_affinities = self.affinities_per_client[mid - 1].lock();
thread_affinities[mtid]
};
Some((kpi::system::new_gtid(mtid, mid), affinity))
} else {
// No threads are free
None
}
hw_threads
}
}

/// State the controller maintains about each client.
lazy_static! {
pub(crate) static ref CONTROLLER_STATE: ControllerState = {
let mut affinities_per_client = ArrayVec::new();
let mut thread_maps = ArrayVec::new();
for i in 0..MAX_MACHINES {
affinities_per_client.push(Arc::new(Mutex::new(ArrayVec::new())));
thread_maps.push(Arc::new(Mutex::new(ThreadMap::new())));
}
ControllerState {
hw_threads_all: Arc::new(Mutex::new(ArrayVec::new())),
thread_maps,
affinities_per_client,
}
};
}
Loading

0 comments on commit fce937c

Please sign in to comment.