Skip to content

Commit

Permalink
Make memcached naming convention and output csv files consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
hunhoffe committed Nov 20, 2023
1 parent c783254 commit 1ffc1ec
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 112 deletions.
158 changes: 51 additions & 107 deletions kernel/tests/s11_rackscale_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use testutils::rackscale_runner::{RackscaleBench, RackscaleRun};
use testutils::runner_args::RackscaleTransport;

use testutils::memcached::{
parse_memcached_output, rackscale_memcached_checkout, MemcachedShardedConfig,
MEMCACHED_MEM_SIZE_MB, MEMCACHED_NUM_QUERIES,
linux_spawn_memcached, parse_memcached_output, rackscale_memcached_checkout,
MemcachedShardedConfig, MEMCACHED_MEM_SIZE_MB, MEMCACHED_NUM_QUERIES,
RACKSCALE_MEMCACHED_CSV_COLUMNS,
};

#[test]
Expand Down Expand Up @@ -477,12 +478,12 @@ struct MemcachedInternalConfig {

#[test]
#[cfg(not(feature = "baremetal"))]
fn s11_rackscale_memcached_benchmark_internal() {
rackscale_memcached_benchmark(RackscaleTransport::Shmem);
fn s11_rackscale_shmem_memcached_internal_benchmark() {
rackscale_memcached_internal_benchmark(RackscaleTransport::Shmem);
}

#[cfg(not(feature = "baremetal"))]
fn rackscale_memcached_benchmark(transport: RackscaleTransport) {
fn rackscale_memcached_internal_benchmark(transport: RackscaleTransport) {
let is_smoke = cfg!(feature = "smoke");

let file_name = format!(
Expand All @@ -491,6 +492,11 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) {
);
let _ignore = std::fs::remove_file(file_name.clone());

let baseline_file_name = "rackscale_baseline_memcached_benchmark.csv";
if cfg!(feature = "baseline") {
let _ignore = std::fs::remove_file(baseline_file_name.clone());
}

let built = BuildArgs::default()
.module("rkapps")
.user_feature("rkapps:memcached-bench")
Expand Down Expand Up @@ -525,24 +531,32 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) {
.open(file_name)
.expect("Can't open file");
if write_headers {
let row = "git_rev,benchmark,nthreads,mem,queries,time,thpt,num_clients,num_replicas\n";
let r = csv_file.write(row.as_bytes());
let r = csv_file.write(RACKSCALE_MEMCACHED_CSV_COLUMNS.as_bytes());
assert!(r.is_ok());
}

let actual_num_clients = if is_baseline { 0 } else { num_clients };
let os_name = if is_baseline { "nros" } else { "dinos" };
let protocol = if is_baseline {
"internal"
} else if file_name.contains(&RackscaleTransport::Ethernet.to_string()) {
"tcp"
} else {
"shmem"
};

let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());

let out = format!(
"memcached_internal,{},{},{},{},{},{},{}",
"memcached_internal,{},{},{},{},{},{},{},{}",
os_name,
protocol,
num_clients,
ret.b_threads,
ret.b_mem,
ret.b_queries,
ret.b_time,
ret.b_thpt,
actual_num_clients,
num_clients
ret.b_thpt
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());
Expand Down Expand Up @@ -619,18 +633,20 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) {
}
}

let bench = RackscaleBench {
let mut bench = RackscaleBench {
test,
cmd_fn,
baseline_timeout_fn,
rackscale_timeout_fn,
mem_fn,
};

bench.run_bench(false, is_smoke);

if cfg!(feature = "baseline") {
bench.test.file_name = baseline_file_name.to_string();
bench.run_bench(true, is_smoke);
}
bench.run_bench(false, is_smoke);
}

#[ignore]
Expand Down Expand Up @@ -860,9 +876,7 @@ fn rackscale_memcached_dcm(transport: RackscaleTransport, dcm_config: Option<DCM

#[test]
#[cfg(not(feature = "baremetal"))]
fn s11_rackscale_memcached_benchmark_sharded_linux() {
use std::fs::remove_file;

fn s11_linux_memcached_sharded_benchmark() {
use rexpect::process::signal::Signal::SIGKILL;

let machine = Machine::determine();
Expand Down Expand Up @@ -910,86 +924,8 @@ fn s11_rackscale_memcached_benchmark_sharded_linux() {
spawn_command(command, Some(timeout_ms)).expect("failed to spawn memcached")
}

fn spawn_memcached(
id: usize,
config: &MemcachedShardedConfig,
timeout_ms: u64,
) -> Result<PtySession> {
let con_info = if config.protocol == "tcp" {
format!("tcp://localhost:{}", 11212 + id)
} else {
let pathname = config.path.join(format!("memcached{id}.sock"));
if pathname.is_file() {
remove_file(pathname.clone()).expect("Failed to remove path"); // make sure the socket file is removed
}
format!("unix://{}", pathname.display())
};

let mut command = Command::new("bash");

command.args(&[
"scripts/spawn-memcached-process.sh",
id.to_string().as_str(),
con_info.as_str(),
(2 * config.mem_size).to_string().as_str(),
config.num_threads.to_string().as_str(),
]);
command.current_dir(config.path.as_path());

println!("Spawning memcached:\n $ `{:?}`", command);

let mut res = spawn_command(command, Some(timeout_ms))?;
std::thread::sleep(Duration::from_secs(1));

match res.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#) {
Ok((_prev, _matched)) => {
println!(" $ OK.");
Ok(res)
}
Err(e) => {
println!(" $ FAILED. {}", e);
Err(e)
}
}
}
let file_name = "linux_memcached_sharded_benchmark.csv";

fn spawn_loadbalancer(config: &MemcachedShardedConfig, timeout_ms: u64) -> Result<PtySession> {
let mut command = Command::new("./loadbalancer/loadbalancer");
command.args(&["--binary"]);
command.arg(format!("--num-queries={}", config.num_queries).as_str());
command.arg(format!("--num-threads={}", config.num_threads).as_str());
command.arg(format!("--max-memory={}", config.mem_size / 8).as_str());
let mut servers = String::from("--servers=");
for i in 0..config.num_servers {
if i > 0 {
servers.push_str(",");
}
if config.protocol == "tcp" {
if config.is_local_host {
servers.push_str(format!("tcp://localhost:{}", 11212 + i).as_str());
} else {
// +1 because tap0 is reserved for the controller.
let ip = 10 + i + 1;
servers.push_str(format!("tcp://172.31.0.{}:{}", ip, 11211).as_str());
}
} else {
servers.push_str(
format!("unix://{}/memcached{}.sock", config.path.display(), i).as_str(),
);
}
}
command.arg(servers.as_str());
command.current_dir(config.path.as_path());

// give the servers some time to be spawned
std::thread::sleep(Duration::from_secs(5));

println!("Spawning Loadbalancer: \n $ `{:?}`", command);

spawn_command(command, Some(timeout_ms))
}

let file_name = "memcached_benchmark_sharded_linux.csv";
let _r = std::fs::remove_file(file_name);

let mut csv_file = OpenOptions::new()
Expand All @@ -998,8 +934,7 @@ fn s11_rackscale_memcached_benchmark_sharded_linux() {
.open(file_name)
.expect("Can't open file");

let row = "git_rev,benchmark,os,nthreads,protocol,mem,queries,time,thpt\n";
let r = csv_file.write(row.as_bytes());
let r = csv_file.write(RACKSCALE_MEMCACHED_CSV_COLUMNS.as_bytes());
assert!(r.is_ok());

let max_threads_per_node = if is_smoke {
Expand Down Expand Up @@ -1032,9 +967,11 @@ fn s11_rackscale_memcached_benchmark_sharded_linux() {
parse_memcached_output(&mut pty, &mut output).expect("could not parse output!");
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());

//git_rev,benchmark,os,protocol,npieces,nthreads,mem,queries,time,thpt
let out = format!(
"memcached_sharded,linux,{},{},{},{},{},{}\n",
res.b_threads, "internal", res.b_mem, res.b_queries, res.b_time, res.b_thpt,
"memcached_sharded,linux,{},{},{},{},{},{},{}\n",
"internal", 1, res.b_threads, res.b_mem, res.b_queries, res.b_time, res.b_thpt,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());
Expand All @@ -1060,22 +997,24 @@ fn s11_rackscale_memcached_benchmark_sharded_linux() {
let mut memcached_ctrls = Vec::new();
for i in 0..num_nodes {
memcached_ctrls.push(
spawn_memcached(i, &config, timeout_ms).expect("could not spawn memcached"),
linux_spawn_memcached(i, &config, timeout_ms)
.expect("could not spawn memcached"),
);
}

let mut pty =
spawn_loadbalancer(&config, timeout_ms).expect("failed to spawn load balancer");
let mut pty = testutils::memcached::spawn_loadbalancer(&config, timeout_ms)
.expect("failed to spawn load balancer");
let mut output = String::new();
use rexpect::errors::ErrorKind::Timeout;
match parse_memcached_output(&mut pty, &mut output) {
Ok(res) => {
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
let out = format!(
"memcached_sharded,linux,{},{},{},{},{},{}\n",
res.b_threads,
"memcached_sharded,linux,{},{},{},{},{},{},{}\n",
protocol,
config.num_servers,
res.b_threads,
res.b_mem,
res.b_queries,
res.b_time,
Expand All @@ -1102,8 +1041,12 @@ fn s11_rackscale_memcached_benchmark_sharded_linux() {
let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());
let out = format!(
"memcached_sharded,linux,{},{},failure,failure,failure,failure\n",
config.num_servers, protocol,
"memcached_sharded,linux,{},{},{},{},{},failure,failure\n",
protocol,
config.num_servers,
config.num_threads * config.num_servers,
config.mem_size,
config.num_queries,
);
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());
Expand Down Expand Up @@ -1166,6 +1109,7 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
}
};

// TODO: consolidate code with testutils::memcached::spawn_loadbalancer
fn spawn_loadbalancer(config: &MemcachedShardedConfig, timeout_ms: u64) -> Result<PtySession> {
let mut command = Command::new("./loadbalancer/loadbalancer");
command.args(&["--binary"]);
Expand Down
85 changes: 84 additions & 1 deletion kernel/testutils/src/memcached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
// SPDX-License-Identifier: Apache-2.0 OR MIT

use std::env;
use std::fs::remove_file;
use std::io::Write;
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;

use rexpect::errors::*;
use rexpect::session::PtySession;
use rexpect::session::{spawn_command, PtySession};

pub const MEMCACHED_MEM_SIZE_MB: usize = 4 * 1024;
pub const MEMCACHED_NUM_QUERIES: usize = 1_000_000;

pub const RACKSCALE_MEMCACHED_CSV_COLUMNS: &str =
"git_rev,benchmark,os,protocol,npieces,nthreads,mem,queries,time,thpt\n";

#[derive(Clone)]
pub struct MemcachedShardedConfig {
pub num_servers: usize,
Expand Down Expand Up @@ -173,3 +178,81 @@ pub fn rackscale_memcached_checkout(tmpdir: &str) {
panic!("BUILD FAILED");
}
}

pub fn linux_spawn_memcached(
id: usize,
config: &MemcachedShardedConfig,
timeout_ms: u64,
) -> Result<PtySession> {
let con_info = if config.protocol == "tcp" {
format!("tcp://localhost:{}", 11212 + id)
} else {
let pathname = config.path.join(format!("memcached{id}.sock"));
if pathname.is_file() {
remove_file(pathname.clone()).expect("Failed to remove path"); // make sure the socket file is removed
}
format!("unix://{}", pathname.display())
};

let mut command = Command::new("bash");

command.args(&[
"scripts/spawn-memcached-process.sh",
id.to_string().as_str(),
con_info.as_str(),
(2 * config.mem_size).to_string().as_str(),
config.num_threads.to_string().as_str(),
]);
command.current_dir(config.path.as_path());

println!("Spawning memcached:\n $ `{:?}`", command);

let mut res = spawn_command(command, Some(timeout_ms))?;
std::thread::sleep(Duration::from_secs(1));

match res.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#) {
Ok((_prev, _matched)) => {
println!(" $ OK.");
Ok(res)
}
Err(e) => {
println!(" $ FAILED. {}", e);
Err(e)
}
}
}

pub fn spawn_loadbalancer(config: &MemcachedShardedConfig, timeout_ms: u64) -> Result<PtySession> {
let mut command = Command::new("./loadbalancer/loadbalancer");
command.args(&["--binary"]);
command.arg(format!("--num-queries={}", config.num_queries).as_str());
command.arg(format!("--num-threads={}", config.num_threads).as_str());
command.arg(format!("--max-memory={}", config.mem_size).as_str());
let mut servers = String::from("--servers=");
for i in 0..config.num_servers {
if i > 0 {
servers.push_str(",");
}
if config.protocol == "tcp" {
if config.is_local_host {
servers.push_str(format!("tcp://localhost:{}", 11212 + i).as_str());
} else {
// +1 because tap0 is reserved for the controller.
let ip = 10 + i + 1;
servers.push_str(format!("tcp://172.31.0.{}:{}", ip, 11211).as_str());
}
} else {
servers
.push_str(format!("unix://{}/memcached{}.sock", config.path.display(), i).as_str());
}
}
command.arg(servers.as_str());
command.current_dir(config.path.as_path());

// give the servers some time to be spawned
std::thread::sleep(Duration::from_secs(5));

println!("Spawning Loadbalancer: \n $ `{:?}`", command);

spawn_command(command, Some(timeout_ms))
}
Loading

0 comments on commit 1ffc1ec

Please sign in to comment.