Skip to content

Commit

Permalink
Merge pull request #622 from sched-ext/rustland-core-topology-awareness
Browse files Browse the repository at this point in the history
scx_rustland_core: introduce topology awareness
  • Loading branch information
arighi authored Sep 6, 2024
2 parents 0a8dc01 + 8231f85 commit 9301d7d
Show file tree
Hide file tree
Showing 9 changed files with 483 additions and 91 deletions.
2 changes: 1 addition & 1 deletion rust/scx_rustland_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "scx_rustland_core"
version = "2.0.1"
version = "2.1.1"
edition = "2021"
authors = ["Andrea Righi <[email protected]>"]
license = "GPL-2.0-only"
Expand Down
114 changes: 113 additions & 1 deletion rust/scx_rustland_core/assets/bpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::ffi::c_ulong;
use std::fs::File;
use std::io::Read;

use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -40,6 +41,7 @@ use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::Topology;
use scx_utils::UserExitInfo;

use scx_rustland_core::ALLOCATOR;
Expand Down Expand Up @@ -78,6 +80,7 @@ pub const RL_CPU_ANY: u64 = bpf_intf::RL_CPU_ANY as u64;
pub struct QueuedTask {
pub pid: i32, // pid that uniquely identifies a task
pub cpu: i32, // CPU where the task is running
pub flags: u64, // task enqueue flags
pub sum_exec_runtime: u64, // Total cpu time
pub weight: u64, // Task static priority
cpumask_cnt: u64, // cpumask generation counter (private)
Expand Down Expand Up @@ -139,9 +142,10 @@ impl EnqueuedMessage {
QueuedTask {
pid: self.inner.pid,
cpu: self.inner.cpu,
cpumask_cnt: self.inner.cpumask_cnt,
flags: self.inner.flags,
sum_exec_runtime: self.inner.sum_exec_runtime,
weight: self.inner.weight,
cpumask_cnt: self.inner.cpumask_cnt,
}
}
}
Expand All @@ -151,6 +155,7 @@ pub struct BpfScheduler<'cb> {
shutdown: Arc<AtomicBool>, // Determine scheduler shutdown
queued: libbpf_rs::RingBuffer<'cb>, // Ring buffer of queued tasks
dispatched: libbpf_rs::UserRingBuffer, // User Ring buffer of dispatched tasks
cpu_hotplug_cnt: u64, // CPU hotplug generation counter
struct_ops: Option<libbpf_rs::Link>, // Low-level BPF methods
}

Expand Down Expand Up @@ -251,6 +256,12 @@ impl<'cb> BpfScheduler<'cb> {

// Attach BPF scheduler.
let mut skel = scx_ops_load!(skel, rustland, uei)?;

// Initialize cache domains.
let topo = Topology::new().unwrap();
Self::init_l2_cache_domains(&mut skel, &topo)?;
Self::init_l3_cache_domains(&mut skel, &topo)?;

let struct_ops = Some(scx_ops_attach!(skel, rustland)?);

// Build the ring buffer of queued tasks.
Expand All @@ -272,6 +283,7 @@ impl<'cb> BpfScheduler<'cb> {
shutdown,
queued,
dispatched,
cpu_hotplug_cnt: 0,
struct_ops,
}),
err => Err(anyhow::Error::msg(format!(
Expand All @@ -281,13 +293,113 @@ impl<'cb> BpfScheduler<'cb> {
}
}

fn enable_sibling_cpu(
skel: &mut BpfSkel<'_>,
lvl: usize,
cpu: usize,
sibling_cpu: usize,
) -> Result<(), u32> {
let prog = &mut skel.progs.enable_sibling_cpu;
let mut args = domain_arg {
lvl_id: lvl as c_int,
cpu_id: cpu as c_int,
sibling_cpu_id: sibling_cpu as c_int,
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
let out = prog.test_run(input).unwrap();
if out.return_value != 0 {
return Err(out.return_value);
}

Ok(())
}

fn init_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
cache_lvl: usize,
enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>,
) -> Result<(), std::io::Error> {
// Determine the list of CPU IDs associated to each cache node.
let mut cache_id_map: HashMap<usize, Vec<usize>> = HashMap::new();
for core in topo.cores().into_iter() {
for (cpu_id, cpu) in core.cpus() {
let cache_id = match cache_lvl {
2 => cpu.l2_id(),
3 => cpu.l3_id(),
_ => panic!("invalid cache level {}", cache_lvl),
};
cache_id_map
.entry(cache_id)
.or_insert_with(Vec::new)
.push(*cpu_id);
}
}

// Update the BPF cpumasks for the cache domains.
for (_cache_id, cpus) in cache_id_map {
for cpu in &cpus {
for sibling_cpu in &cpus {
match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) {
Ok(()) => {}
Err(_) => {}
}
}
}
}

Ok(())
}

fn init_l2_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, topo, 2, &|skel, lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
})
}

fn init_l3_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, topo, 3, &|skel, lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
})
}

fn refresh_cache_domains(&mut self) {
// Check if we need to refresh the CPU cache information.
if self.cpu_hotplug_cnt == self.skel.maps.bss_data.cpu_hotplug_cnt {
return;
}

// Re-initialize cache domains.
let topo = Topology::new().unwrap();
Self::init_l2_cache_domains(&mut self.skel, &topo).unwrap();
Self::init_l3_cache_domains(&mut self.skel, &topo).unwrap();

// Update CPU hotplug generation counter.
self.cpu_hotplug_cnt = self.skel.maps.bss_data.cpu_hotplug_cnt;
}

// Notify the BPF component that the user-space scheduler has completed its scheduling cycle,
// updating the amount tasks that are still peding.
//
// NOTE: do not set allow(dead_code) for this method, any scheduler must use this method at
// some point, otherwise the BPF component will keep waking-up the user-space scheduler in a
// busy loop, causing unnecessary high CPU consumption.
pub fn notify_complete(&mut self, nr_pending: u64) {
self.refresh_cache_domains();
self.skel.maps.bss_data.nr_scheduled = nr_pending;
std::thread::yield_now();
}
Expand Down
10 changes: 10 additions & 0 deletions rust/scx_rustland_core/assets/bpf/intf.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ struct task_cpu_arg {
u64 flags;
};

/*
* Specify a sibling CPU relationship for a specific scheduling domain.
*/
struct domain_arg {
s32 lvl_id;
s32 cpu_id;
s32 sibling_cpu_id;
};

/*
* Task sent to the user-space scheduler by the BPF dispatcher.
*
Expand All @@ -73,6 +82,7 @@ struct task_cpu_arg {
struct queued_task_ctx {
s32 pid;
s32 cpu; /* CPU where the task is running */
u64 flags; /* task enqueue flags */
u64 cpumask_cnt; /* cpumask generation counter */
u64 sum_exec_runtime; /* Total cpu time */
u64 weight; /* Task static priority */
Expand Down
Loading

0 comments on commit 9301d7d

Please sign in to comment.