diff --git a/kernel/tests/s11_rackscale_benchmarks.rs b/kernel/tests/s11_rackscale_benchmarks.rs index 3517cd0bf..69c5c9820 100644 --- a/kernel/tests/s11_rackscale_benchmarks.rs +++ b/kernel/tests/s11_rackscale_benchmarks.rs @@ -326,9 +326,6 @@ struct LevelDBConfig { val_size: i32, } -// Ignoring this test for now due to synchronization bugs. Seen bugs include -// mutex locking against itself, _lwp_exit returning after a thread has blocked. -/* #[test] #[cfg(not(feature = "baremetal"))] fn s11_rackscale_shmem_leveldb_benchmark() { @@ -454,7 +451,12 @@ fn s11_rackscale_shmem_leveldb_benchmark() { } bench.run_bench(false, is_smoke); } -*/ + +#[derive(Clone)] +struct MemcachedInternalConfig { + pub num_queries: usize, + pub mem_size: usize, +} #[test] #[cfg(not(feature = "baremetal"))] @@ -468,6 +470,10 @@ fn rackscale_memcached_benchmark(is_shmem: bool) { use std::thread::sleep; use std::time::Duration; + // TODO(rackscale): because this test is flaky, always just run smoke test. + // Seen bugs include mutex locking against itself, _lwp_exit returning after a thread has blocked. + let is_smoke = true; // cfg!(feature = "smoke") + let transport_str = if is_shmem { "shmem" } else { "ethernet" }; let file_name = Arc::new(format!( "rackscale_{}_memcached_benchmark.csv", @@ -475,513 +481,170 @@ fn rackscale_memcached_benchmark(is_shmem: bool) { )); let _ignore = std::fs::remove_file(file_name.as_ref()); - let build = Arc::new({ - let mut build = BuildArgs::default().module("init"); - - build = build.user_feature("rkapps:memcached-bench"); - - build = build - .kernel_feature("shmem") - .kernel_feature("ethernet") - .kernel_feature("rackscale") - .release(); - if cfg!(feature = "smoke") { - build = build.user_feature("smoke"); - } - build.build() - }); + let built = BuildArgs::default() + .module("rkapps") + .user_feature("rkapps:memcached-bench") + .release() + .build(); - let build_baseline = Arc::new({ - let mut build = BuildArgs::default().module("init"); + fn controller_match_fn( + proc: &mut PtySession, + output: &mut String, + cores_per_client: usize, + num_clients: usize, + file_name: &str, + is_baseline: bool, + arg: Option, + ) -> Result<()> { + let config = arg.expect("match function expects a memcached config"); - build = build.user_feature("rkapps:memcached-bench"); + // match the title + let (prev, matched) = proc.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#)?; - build = build - .kernel_feature("shmem") - .kernel_feature("ethernet") - .release(); - if cfg!(feature = "smoke") { - build = build.user_feature("smoke"); - } - build.build() - }); + *output += prev.as_str(); + *output += matched.as_str(); - let machine = Machine::determine(); - let max_cores = if cfg!(feature = "smoke") { - 1 - } else { - machine.max_cores() - }; - let baseline_shmem_size = if cfg!(feature = "smoke") || machine.max_cores() <= 32 { - SHMEM_SIZE * 2 - } else { - SHMEM_SIZE * 4 - }; - let shmem_size = SHMEM_SIZE; + // x_benchmark_mem = 10 MB + let (prev, matched) = proc.exp_regex(r#"x_benchmark_mem = (\d+) MB"#)?; + println!("> {}", matched); + let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", ""); - let cores_per_node = machine.max_cores() / machine.max_numa_nodes(); + *output += prev.as_str(); + *output += matched.as_str(); - if cfg!(feature = "baseline") { - // Run the baseline test - setup_network(1); - let mut num_nodes = 1; - for cores in (0..max_cores).step_by(4) { - let mut cores = if cores == 0 { 1 } else { cores }; + // number of threads: 3 + let (prev, matched) = proc.exp_regex(r#"number of threads: (\d+)"#)?; + println!("> {}", matched); + let b_threads = matched.replace("number of threads: ", ""); - // Round up to get the number of clients - let new_num_nodes = (cores + (cores_per_node - 1)) / cores_per_node; + *output += prev.as_str(); + *output += matched.as_str(); - // Make sure cores are divisible by num replicas (nodes) if num replicas changes. - if num_nodes != new_num_nodes { - num_nodes = new_num_nodes; + // number of keys: 131072 + let (prev, matched) = proc.exp_regex(r#"number of keys: (\d+)"#)?; + println!("> {}", matched); - // ensure total cores is divisible by num nodes - cores = cores - (cores % num_nodes); - } + *output += prev.as_str(); + *output += matched.as_str(); - let timeout = 20_000 * (cores) as u64; - eprintln!( - "\tRunning NrOS Memcached baseline with {} core(s) and {} node(s)", - cores, num_nodes - ); + let (prev, matched) = proc.exp_regex(r#"Executing (\d+) queries with (\d+) threads"#)?; + println!("> {}", matched); - let (shmem_socket, shmem_file) = - get_shmem_names(None, cfg!(feature = "affinity-shmem")); - let shmem_affinity = if cfg!(feature = "affinity-shmem") { - Some(0) - } else { - None - }; - let mut shmem_server = spawn_shmem_server( - &shmem_socket, - &shmem_file, - baseline_shmem_size, - shmem_affinity, - ) - .expect("Failed to start shmem server"); - - let baseline_cmdline = format!("initargs={}", cores); - let baseline_file_name = file_name.clone(); - - let vm_cores = vec![cores / num_nodes; num_nodes]; // client vms - let placement_cores = machine.rackscale_core_affinity(vm_cores); - let mut all_placement_cores = Vec::new(); - let placement_offset = placement_cores[0].0; - for placement in placement_cores { - all_placement_cores.extend(placement.1); - } + *output += prev.as_str(); + *output += matched.as_str(); - let mut cmdline_baseline = RunnerArgs::new_with_build("userspace-smp", &build_baseline) - .timeout(timeout) - .shmem_size(vec![baseline_shmem_size as usize]) - .shmem_path(vec![shmem_socket]) - .tap("tap0") - .workers(1) - .cores(cores) - .nodes(num_nodes) - .node_offset(placement_offset) - .setaffinity(all_placement_cores) - .use_vmxnet3() - .cmd(baseline_cmdline.as_str()); - - if cfg!(feature = "smoke") { - cmdline_baseline = cmdline_baseline.memory(10 * 1024); - } else { - cmdline_baseline = cmdline_baseline.memory(48 * 1024); - } + // benchmark took 129 seconds + let (prev, matched) = proc.exp_regex(r#"benchmark took (\d+) ms"#)?; + println!("> {}", matched); + let b_time = matched.replace("benchmark took ", "").replace(" ms", ""); - let mut output = String::new(); - let mut qemu_run = |_baseline_cores| -> Result { - let mut p = spawn_nrk(&cmdline_baseline)?; - - // match the title - let (prev, matched) = p.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#)?; - - output += prev.as_str(); - output += matched.as_str(); - - // x_benchmark_mem = 10 MB - let (prev, matched) = p.exp_regex(r#"x_benchmark_mem = (\d+) MB"#)?; - println!("> {}", matched); - let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // number of threads: 3 - let (prev, matched) = p.exp_regex(r#"number of threads: (\d+)"#)?; - println!("> {}", matched); - let b_threads = matched.replace("number of threads: ", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // number of keys: 131072 - let (prev, matched) = p.exp_regex(r#"number of keys: (\d+)"#)?; - println!("> {}", matched); - - output += prev.as_str(); - output += matched.as_str(); - - let (prev, matched) = - p.exp_regex(r#"Executing (\d+) queries with (\d+) threads"#)?; - println!("> {}", matched); - - output += prev.as_str(); - output += matched.as_str(); - - // benchmark took 129 seconds - let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) ms"#)?; - println!("> {}", matched); - let b_time = matched.replace("benchmark took ", "").replace(" ms", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // benchmark took 7937984 queries / second - let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) queries / second"#)?; - println!("> {}", matched); - let b_thpt = matched - .replace("benchmark took ", "") - .replace(" queries / second", ""); - - output += prev.as_str(); - output += matched.as_str(); - - let (prev, matched) = p.exp_regex(r#"benchmark executed (\d+)"#)?; - println!("> {}", matched); - let b_queries = matched - .replace("benchmark executed ", "") - .split(" ") - .next() - .unwrap() - .to_string(); - - output += prev.as_str(); - output += matched.as_str(); - - // Append parsed results to a CSV file - let write_headers = !Path::new(baseline_file_name.as_str()).exists(); - let mut csv_file = OpenOptions::new() - .append(true) - .create(true) - .open(baseline_file_name.as_str()) - .expect("Can't open file"); - if write_headers { - let row = "git_rev,benchmark,nthreads,mem,queries,time,thpt\n"; - let r = csv_file.write(row.as_bytes()); - assert!(r.is_ok()); - } - - let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); - assert!(r.is_ok()); - let out = format!( - "memcached,{},{},{},{},{}", - b_threads, b_mem, b_queries, b_time, b_thpt, - ); - let r = csv_file.write(out.as_bytes()); - assert!(r.is_ok()); - let r = csv_file.write("\n".as_bytes()); - assert!(r.is_ok()); + *output += prev.as_str(); + *output += matched.as_str(); - output += p.exp_eof()?.as_str(); - p.process.exit() - }; - check_for_successful_exit(&cmdline_baseline, qemu_run(cores), output); - let _ignore = shmem_server.send_control('c'); - } - } + // benchmark took 7937984 queries / second + let (prev, matched) = proc.exp_regex(r#"benchmark took (\d+) queries / second"#)?; + println!("> {}", matched); + let b_thpt = matched + .replace("benchmark took ", "") + .replace(" queries / second", ""); - // Run the rackscale test - let mut num_clients = 1; - setup_network(num_clients + 1); - for mut total_cores in (0..max_cores).step_by(4) { - if total_cores == 0 { - total_cores = 1; - } + *output += prev.as_str(); + *output += matched.as_str(); - // Round up to get the number of clients - let new_num_clients = (total_cores + (cores_per_node - 1)) / cores_per_node; + let (prev, matched) = proc.exp_regex(r#"benchmark executed (\d+)"#)?; + println!("> {}", matched); + let b_queries = matched + .replace("benchmark executed ", "") + .split(" ") + .next() + .unwrap() + .to_string(); - // Do network setup if number of clients has changed. - if num_clients != new_num_clients { - num_clients = new_num_clients; - setup_network(num_clients + 1); + *output += prev.as_str(); + *output += matched.as_str(); - // ensure total cores is divisible by num clients - total_cores = total_cores - (total_cores % num_clients); + // Append parsed results to a CSV file + let write_headers = !Path::new(file_name).exists(); + let mut csv_file = OpenOptions::new() + .append(true) + .create(true) + .open(file_name) + .expect("Can't open file"); + if write_headers { + let row = "git_rev,benchmark,nthreads,mem,queries,time,thpt,num_clients\n"; + let r = csv_file.write(row.as_bytes()); + assert!(r.is_ok()); } - let cores = total_cores / num_clients; - eprintln!( - "\tRunning Memcached test with {:?} total core(s), {:?} client(s) (cores_per_client={:?})", - total_cores, num_clients, cores - ); - let timeout = 120_000 + 800000 * total_cores as u64; - let all_outputs = Arc::new(Mutex::new(Vec::new())); + let actual_num_clients = if is_baseline { 0 } else { num_clients }; - let mut vm_cores = vec![cores; num_clients + 1]; - vm_cores[0] = 1; // controller vm only has 1 core - let placement_cores = machine.rackscale_core_affinity(vm_cores); + let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); + assert!(r.is_ok()); + let out = format!( + "memcached,{},{},{},{},{},{}", + b_threads, b_mem, b_queries, b_time, b_thpt, actual_num_clients + ); + let r = csv_file.write(out.as_bytes()); + assert!(r.is_ok()); + let r = csv_file.write("\n".as_bytes()); + assert!(r.is_ok()); - let (tx, rx) = channel(); - let rx_mut = Arc::new(Mutex::new(rx)); + Ok(()) + } - let mut shmem_sockets = Vec::new(); - let mut shmem_servers = Vec::new(); - for i in 0..(num_clients + 1) { - let shmem_affinity = if cfg!(feature = "affinity-shmem") { - Some(placement_cores[i].0) - } else { - None - }; - let (shmem_socket, shmem_file) = - get_shmem_names(Some(i), cfg!(feature = "affinity-shmem")); - let shmem_server = - spawn_shmem_server(&shmem_socket, &shmem_file, shmem_size, shmem_affinity) - .expect("Failed to start shmem server"); - shmem_sockets.push(shmem_socket); - shmem_servers.push(shmem_server); + let config = if is_smoke { + MemcachedInternalConfig { + num_queries: 100_000, + mem_size: 16, } + } else { + MemcachedInternalConfig { + num_queries: 100_000_000, + mem_size: 32_000, + } + }; - let mut dcm = spawn_dcm(1).expect("Failed to start DCM"); - - let controller_cmdline = format!( - "mode=controller transport={}", - if is_shmem { "shmem" } else { "ethernet" } - ); + let mut test = RackscaleRun::new("userspace-smp".to_string(), built); + test.controller_match_fn = controller_match_fn; + test.transport = RackscaleTransport::Shmem; + test.use_affinity_shmem = cfg!(feature = "affinity-shmem"); + test.file_name = file_name.to_string(); + test.arg = Some(config); + test.client_build_delay *= 2; + test.run_dhcpd_for_baseline = true; - // Create controller - let build1 = build.clone(); - let controller_output_array = all_outputs.clone(); - let controller_file_name = file_name.clone(); - let controller_placement_cores = placement_cores.clone(); - let my_shmem_sockets = shmem_sockets.clone(); - let controller = std::thread::spawn(move || { - let mut cmdline_controller = RunnerArgs::new_with_build("userspace-smp", &build1) - .timeout(timeout) - .cmd(&controller_cmdline) - .shmem_size(vec![shmem_size as usize; num_clients + 1]) - .shmem_path(my_shmem_sockets) - .tap("tap0") - .nodes(1) - .node_offset(controller_placement_cores[0].0) - .no_network_setup() - .workers(num_clients + 1) - .setaffinity(controller_placement_cores[0].1.clone()) - .use_vmxnet3(); - - if cfg!(feature = "smoke") { - cmdline_controller = cmdline_controller.memory(10 * 1024); - } else { - cmdline_controller = cmdline_controller.memory(48 * 1024); - } + fn cmd_fn(num_cores: usize, arg: Option) -> String { + let config = arg.expect("missing leveldb config"); + format!( + r#"init=memcachedbench.bin initargs={} appcmd='--x-benchmark-mem={} --x-benchmark-queries={}'"#, + num_cores, config.mem_size, config.num_queries + ) + } - let mut output = String::new(); - let mut qemu_run = |_controller_clients, _application_cores| -> Result { - let mut p = spawn_nrk(&cmdline_controller)?; - - // match the title - let (prev, matched) = p.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#)?; - - output += prev.as_str(); - output += matched.as_str(); - - // x_benchmark_mem = 10 MB - let (prev, matched) = p.exp_regex(r#"x_benchmark_mem = (\d+) MB"#)?; - println!("> {}", matched); - let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // number of threads: 3 - let (prev, matched) = p.exp_regex(r#"number of threads: (\d+)"#)?; - println!("> {}", matched); - let b_threads = matched.replace("number of threads: ", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // number of keys: 131072 - let (prev, matched) = p.exp_regex(r#"number of keys: (\d+)"#)?; - println!("> {}", matched); - - output += prev.as_str(); - output += matched.as_str(); - - let (prev, matched) = - p.exp_regex(r#"Executing (\d+) queries with (\d+) threads"#)?; - println!("> {}", matched); - - output += prev.as_str(); - output += matched.as_str(); - - // benchmark took 129 seconds - let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) ms"#)?; - println!("> {}", matched); - let b_time = matched.replace("benchmark took ", "").replace(" ms", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // benchmark took 7937984 queries / second - let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) queries / second"#)?; - println!("> {}", matched); - let b_thpt = matched - .replace("benchmark took ", "") - .replace(" queries / second", ""); - - output += prev.as_str(); - output += matched.as_str(); - - let (prev, matched) = p.exp_regex(r#"benchmark executed (\d+)"#)?; - println!("> {}", matched); - let b_queries = matched - .replace("benchmark executed ", "") - .split(" ") - .next() - .unwrap() - .to_string(); - - output += prev.as_str(); - output += matched.as_str(); - - // Append parsed results to a CSV file - let write_headers = !Path::new(controller_file_name.as_str()).exists(); - let mut csv_file = OpenOptions::new() - .append(true) - .create(true) - .open(controller_file_name.as_str()) - .expect("Can't open file"); - if write_headers { - let row = "git_rev,benchmark,nthreads,mem,queries,time,thpt\n"; - let r = csv_file.write(row.as_bytes()); - assert!(r.is_ok()); - } - - let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); - assert!(r.is_ok()); - let out = format!( - "memcached,{},{},{},{},{}", - b_threads, b_mem, b_queries, b_time, b_thpt, - ); - let r = csv_file.write(out.as_bytes()); - assert!(r.is_ok()); - let r = csv_file.write("\n".as_bytes()); - assert!(r.is_ok()); + fn baseline_timeout_fn(num_cores: usize) -> u64 { + 20_000 * num_cores as u64 + } - for _i in 0..num_clients { - notify_controller_of_termination(&tx); - } - p.process.kill(SIGTERM) - }; - let ret = qemu_run(num_clients, total_cores); - controller_output_array - .lock() - .expect("Failed to get output lock") - .push((String::from("Controller"), output)); - - // This will only find sigterm, that's okay - wait_for_sigterm_or_successful_exit_no_log( - &cmdline_controller, - ret, - String::from("Controller"), - ); - }); - - let mut clients = Vec::new(); - for nclient in 1..(num_clients + 1) { - let kernel_cmdline = format!( - "mode=client transport={} initargs={}", - if is_shmem { "shmem" } else { "ethernet" }, - total_cores, - ); - - let tap = format!("tap{}", 2 * nclient); - let my_rx_mut = rx_mut.clone(); - let my_output_array = all_outputs.clone(); - let my_placement_cores = placement_cores.clone(); - let build2 = build.clone(); - let my_shmem_sockets = shmem_sockets.clone(); - let client = std::thread::spawn(move || { - sleep(Duration::from_millis( - CLIENT_BUILD_DELAY * (nclient as u64 + 1), - )); - let mut cmdline_client = RunnerArgs::new_with_build("userspace-smp", &build2) - .timeout(timeout) - .shmem_size(vec![shmem_size as usize; num_clients + 1]) - .shmem_path(my_shmem_sockets) - .tap(&tap) - .no_network_setup() - .workers(num_clients + 1) - .cores(cores) - .nodes(1) - .node_offset(my_placement_cores[nclient].0) - .setaffinity(my_placement_cores[nclient].1.clone()) - .use_vmxnet3() - .nobuild() - .cmd(kernel_cmdline.as_str()); - - if cfg!(feature = "smoke") { - cmdline_client = cmdline_client.memory(10 * 1024); - } else { - cmdline_client = cmdline_client.memory(48 * 1024); - } - - let mut output = String::new(); - let mut qemu_run = || -> Result { - let mut p = spawn_nrk(&cmdline_client)?; - let rx = my_rx_mut.lock().expect("Failed to get rx lock"); - let _ = wait_for_client_termination::<()>(&rx); - let ret = p.process.kill(SIGTERM); - output += p.exp_eof()?.as_str(); - ret - }; - // Could exit with 'success' or from sigterm, depending on number of clients. - let ret = qemu_run(); - my_output_array - .lock() - .expect("Failed to get output lock") - .push((format!("Client{}", nclient), output)); - wait_for_sigterm_or_successful_exit_no_log( - &cmdline_client, - ret, - format!("Client{}", nclient), - ); - }); - clients.push(client) - } + fn rackscale_timeout_fn(num_cores: usize) -> u64 { + 180_000 + 60_000 * num_cores as u64 + } - let controller_ret = controller.join(); - let mut client_rets = Vec::new(); - for client in clients { - client_rets.push(client.join()); - } - for shmem_server in shmem_servers.iter_mut() { - let _ignore = shmem_server.send_control('c'); - } - let _ignore = dcm.process.kill(SIGKILL); + fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { + 512 * num_cores + if is_smoke { 8192 } else { 40_000 } + } - // If there's been an error, print everything - if controller_ret.is_err() || (&client_rets).into_iter().any(|ret| ret.is_err()) { - let outputs = all_outputs.lock().expect("Failed to get output lock"); - for (name, output) in outputs.iter() { - log_qemu_out_with_name(None, name.to_string(), output.to_string()); - } - if controller_ret.is_err() { - let dcm_log = dcm.exp_eof(); - if dcm_log.is_ok() { - log_qemu_out_with_name(None, "DCM".to_string(), dcm_log.unwrap()); - } else { - eprintln!("Failed to print DCM log."); - } - } - } + let bench = RackscaleBench { + test, + cmd_fn, + baseline_timeout_fn, + rackscale_timeout_fn, + controller_mem_fn: mem_fn, + client_mem_fn: mem_fn, + baseline_mem_fn: mem_fn, + }; - for client_ret in client_rets { - client_ret.unwrap(); - } - controller_ret.unwrap(); + if cfg!(feature = "baseline") { + bench.run_bench(true, is_smoke); } + bench.run_bench(false, is_smoke); }