Skip to content

Commit

Permalink
Add rackscalerunstate constructor, port another integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
hunhoffe committed Jul 17, 2023
1 parent 7760fbd commit 622f083
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 151 deletions.
169 changes: 22 additions & 147 deletions kernel/tests/s06_rackscale_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn rackscale_userspace_smoke_test(transport: RackscaleTransport) {
.release()
.build();

fn controller_match_function(
fn client_match_function(
proc: &mut PtySession,
output: &mut String,
_cores_per_client: usize,
Expand All @@ -73,162 +73,37 @@ fn rackscale_userspace_smoke_test(transport: RackscaleTransport) {
Ok(())
}

fn client_match_function(
_proc: &mut PtySession,
_output: &mut String,
_cores_per_client: usize,
_num_clients: usize,
) -> Result<()> {
// Do nothing
Ok(())
}

let test_run = RackscaleRunState {
controller_timeout: 60_000,
controller_memory: 1024,
controller_match_function,
client_timeout: 60_000,
client_memory: 1024,
client_match_function,
kernel_test: "userspace-smp".to_string(),
built,
num_clients: 1,
cores_per_client: 1,
shmem_size: SHMEM_SIZE,
use_affinity: false,
transport,
};
let mut test_run = RackscaleRunState::new("userspace-smp".to_string(), built);
test_run.client_match_function = client_match_function;
test_run.transport = transport;
test_run.wait_for_client = true;

rackscale_runner(test_run);
}

#[cfg(not(feature = "baremetal"))]
#[test]
fn s06_rackscale_phys_alloc_test() {
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

let timeout = 120_000;

setup_network(2);

let (shmem_socket0, shmem_file0) = get_shmem_names(Some(0), false);
let (shmem_socket1, shmem_file1) = get_shmem_names(Some(1), false);
let mut shmem_server0 = spawn_shmem_server(&shmem_socket0, &shmem_file0, SHMEM_SIZE, None)
.expect("Failed to start shmem server 0");
let mut shmem_server1 = spawn_shmem_server(&shmem_socket1, &shmem_file1, SHMEM_SIZE, None)
.expect("Failed to start shmem server 1");

let mut dcm = spawn_dcm(1).expect("Failed to start DCM");

let (tx, rx) = channel();
let all_outputs = Arc::new(Mutex::new(Vec::new()));

let build = Arc::new(
BuildArgs::default()
.module("init")
.user_feature("test-phys-alloc")
.kernel_feature("rackscale")
.release()
.build(),
);

let controller_output_array = all_outputs.clone();
let shmem_sockets = vec![shmem_socket0.clone(), shmem_socket1.clone()];
let build1 = build.clone();
let controller = std::thread::Builder::new()
.name("Controller".to_string())
.spawn(move || {
let cmdline_controller = RunnerArgs::new_with_build("userspace-smp", &build1)
.timeout(timeout)
.mode(RackscaleMode::Controller)
.shmem_size(vec![SHMEM_SIZE as usize; 2])
.shmem_path(shmem_sockets)
.workers(2)
.tap("tap0")
.no_network_setup()
.use_vmxnet3();

let mut output = String::new();
let mut qemu_run = || -> Result<WaitStatus> {
let mut p = spawn_nrk(&cmdline_controller)?;

let _ = wait_for_termination::<()>(&rx);
let ret = p.process.kill(SIGTERM);
output += p.exp_eof()?.as_str();
ret
};
let ret = qemu_run();
controller_output_array
.lock()
.expect("Failed to get mutex to output array")
.push((String::from("Controller"), output));

// This will only find sigterm, that's okay
wait_for_sigterm_or_successful_exit_no_log(
&cmdline_controller,
ret,
String::from("Controller"),
);
})
.expect("Controller thread failed to spawn");

let client_output_array = all_outputs.clone();
let shmem_sockets = vec![shmem_socket0.clone(), shmem_socket1.clone()];
let build2 = build.clone();
let client = std::thread::Builder::new()
.name("Client".to_string())
.spawn(move || {
sleep(Duration::from_millis(CLIENT_BUILD_DELAY));
let cmdline_client = RunnerArgs::new_with_build("userspace-smp", &build2)
.timeout(180_000)
.mode(RackscaleMode::Client)
.shmem_size(vec![SHMEM_SIZE as usize; 2])
.shmem_path(shmem_sockets)
.tap("tap2")
.no_network_setup()
.workers(2)
.nobuild()
.use_vmxnet3();

let mut output = String::new();
let mut qemu_run = || -> Result<WaitStatus> {
let mut p = spawn_nrk(&cmdline_client)?;
output += p.exp_string("phys_alloc_test OK")?.as_str();
output += p.exp_eof()?.as_str();
notify_of_termination(&tx);
p.process.exit()
};
let ret = qemu_run();
client_output_array
.lock()
.expect("Failed to get mutex to output array")
.push((String::from("Client"), output.clone()));
check_for_successful_exit_no_log(&cmdline_client, ret, String::from("Client"));
})
.expect("Client thread failed to spawn");

let client_ret = client.join();
let controller_ret = controller.join();

let _ignore = dcm.process.kill(SIGKILL);
let _ignore = shmem_server0.send_control('c');
let _ignore = shmem_server1.send_control('c');
let built = BuildArgs::default()
.module("init")
.user_feature("test-phys-alloc")
.kernel_feature("rackscale")
.release()
.build();

// If there's been an error, print everything
let outputs = all_outputs
.lock()
.expect("Failed to get mutex to output array");
assert!(outputs.len() == 2);
if controller_ret.is_err() || client_ret.is_err() {
for (name, output) in outputs.iter() {
log_qemu_out_with_name(None, name.to_string(), output.to_string());
}
fn client_match_function(
proc: &mut PtySession,
output: &mut String,
_cores_per_client: usize,
_num_clients: usize,
) -> Result<()> {
*output += proc.exp_string("phys_alloc_test OK")?.as_str();
Ok(())
}

client_ret.unwrap();
controller_ret.unwrap();
let mut test_run = RackscaleRunState::new("userspace-smp".to_string(), built);
test_run.client_match_function = client_match_function;
test_run.wait_for_client = true;
}

#[cfg(not(feature = "baremetal"))]
Expand Down
57 changes: 53 additions & 4 deletions kernel/testutils/src/rackscale_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rexpect::session::PtySession;
use crate::builder::{Built, Machine};
use crate::helpers::{
get_shmem_names, setup_network, spawn_dcm, spawn_nrk, spawn_shmem_server, CLIENT_BUILD_DELAY,
SHMEM_SIZE,
};
use crate::runner_args::{
log_qemu_out_with_name, wait_for_sigterm_or_successful_exit_no_log, RackscaleMode,
Expand Down Expand Up @@ -58,9 +59,41 @@ pub struct RackscaleRunState {
pub cores_per_client: usize,
pub shmem_size: usize,
pub use_affinity: bool,
pub wait_for_client: bool,
pub transport: RackscaleTransport,
}

impl RackscaleRunState {
pub fn new(kernel_test: String, built: Built<'static>) -> RackscaleRunState {
fn blank_match_function(
_proc: &mut PtySession,
_output: &mut String,
_cores_per_client: usize,
_num_clients: usize,
) -> Result<()> {
// Do nothing
Ok(())
}

RackscaleRunState {
controller_timeout: 60_000,
controller_memory: 1024,
controller_match_function: blank_match_function,
client_timeout: 60_000,
client_memory: 1024,
client_match_function: blank_match_function,
kernel_test,
built,
num_clients: 1,
cores_per_client: 1,
shmem_size: SHMEM_SIZE,
use_affinity: false,
wait_for_client: false,
transport: RackscaleTransport::Shmem,
}
}
}

pub fn rackscale_runner<'a>(run: RackscaleRunState) {
// Do not allow over provisioning
let machine = Machine::determine();
Expand Down Expand Up @@ -91,13 +124,16 @@ pub fn rackscale_runner<'a>(run: RackscaleRunState) {

let (tx, rx) = channel();
let rx_mut = Arc::new(Mutex::new(rx));
let tx_mut = Arc::new(Mutex::new(tx));
let built = Arc::new(run.built);

// Run controller in separate thread
let controller_output_array = all_outputs.clone();
let controller_build = built.clone();
let controller_shmem_sockets = shmem_sockets.clone();
let controller_kernel_test = run.kernel_test.clone();
let controller_rx = rx_mut.clone();
let controller_tx = tx_mut.clone();
let controller = std::thread::Builder::new()
.name("Controller".to_string())
.spawn(move || {
Expand Down Expand Up @@ -126,9 +162,16 @@ pub fn rackscale_runner<'a>(run: RackscaleRunState) {
run.num_clients,
)?;

// Notify each client it's okay to shutdown
for _ in 0..run.num_clients {
notify_of_termination(&tx);
if run.wait_for_client {
// Wait for signal from each client that it is done
let rx = controller_rx.lock().expect("Failed to get rx lock");
let _ = wait_for_termination::<()>(&rx);
} else {
// Notify each client it's okay to shutdown
let tx = controller_tx.lock().expect("Failed to get tx lock");
notify_of_termination(&tx);
}
}

let ret = p.process.kill(SIGTERM)?;
Expand Down Expand Up @@ -157,6 +200,7 @@ pub fn rackscale_runner<'a>(run: RackscaleRunState) {
let client_build = built.clone();
let client_shmem_sockets = shmem_sockets.clone();
let client_rx = rx_mut.clone();
let client_tx = tx_mut.clone();
let client_kernel_test = run.kernel_test.clone();
let client = std::thread::Builder::new()
.name(format!("Client{}", i + 1))
Expand Down Expand Up @@ -189,8 +233,13 @@ pub fn rackscale_runner<'a>(run: RackscaleRunState) {
)?;

// Wait for controller to terminate
let rx = client_rx.lock().expect("Failed to get rx lock");
let _ = wait_for_termination::<()>(&rx);
if run.wait_for_client {
let tx = client_tx.lock().expect("Failed to get rx lock");
notify_of_termination(&tx);
} else {
let rx = client_rx.lock().expect("Failed to get rx lock");
let _ = wait_for_termination::<()>(&rx);
}

let ret = p.process.kill(SIGTERM);
output += p.exp_eof()?.as_str();
Expand Down

0 comments on commit 622f083

Please sign in to comment.