Skip to content

Commit

Permalink
scx_rustland_core: restart scheduler on hotplug events
Browse files Browse the repository at this point in the history
User-space schedulers may still hit some stalls during CPU hotplugging
events.

There is no reason to overcomplicate the code and trying to handle
hotplug events within the scx_rustland_core framework and we can simply
handle a scheduler restart performed by the scx core.

This makes CPU hotplugging more reliable with scx_rustland_core-based
schedulers.

Signed-off-by: Andrea Righi <[email protected]>
  • Loading branch information
arighi authored and minosfuture committed Oct 19, 2024
1 parent 8e6c3d0 commit c9425e9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 139 deletions.
37 changes: 14 additions & 23 deletions rust/scx_rustland_core/assets/bpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Once;

use anyhow::Context;
use anyhow::Result;
Expand Down Expand Up @@ -161,7 +162,6 @@ pub struct BpfScheduler<'cb> {
shutdown: Arc<AtomicBool>, // Determine scheduler shutdown
queued: libbpf_rs::RingBuffer<'cb>, // Ring buffer of queued tasks
dispatched: libbpf_rs::UserRingBuffer, // User Ring buffer of dispatched tasks
cpu_hotplug_cnt: u64, // CPU hotplug generation counter
struct_ops: Option<libbpf_rs::Link>, // Low-level BPF methods
}

Expand Down Expand Up @@ -190,6 +190,18 @@ fn is_smt_active() -> std::io::Result<bool> {
Ok(smt_active == 1)
}

static SET_HANDLER: Once = Once::new();

fn set_ctrlc_handler(shutdown: Arc<AtomicBool>) -> Result<(), anyhow::Error> {
SET_HANDLER.call_once(|| {
let shutdown_clone = shutdown.clone();
ctrlc::set_handler(move || {
shutdown_clone.store(true, Ordering::Relaxed);
}).expect("Error setting Ctrl-C handler");
});
Ok(())
}

impl<'cb> BpfScheduler<'cb> {
pub fn init(
open_object: &'cb mut MaybeUninit<OpenObject>,
Expand All @@ -198,11 +210,7 @@ impl<'cb> BpfScheduler<'cb> {
debug: bool,
) -> Result<Self> {
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
ctrlc::set_handler(move || {
shutdown_clone.store(true, Ordering::Relaxed);
})
.context("Error setting Ctrl-C handler")?;
set_ctrlc_handler(shutdown.clone()).context("Error setting Ctrl-C handler")?;

// Open the BPF prog first for verification.
let mut skel_builder = BpfSkelBuilder::default();
Expand Down Expand Up @@ -289,7 +297,6 @@ impl<'cb> BpfScheduler<'cb> {
shutdown,
queued,
dispatched,
cpu_hotplug_cnt: 0,
struct_ops,
}),
err => Err(anyhow::Error::msg(format!(
Expand Down Expand Up @@ -383,29 +390,13 @@ impl<'cb> BpfScheduler<'cb> {
})
}

fn refresh_cache_domains(&mut self) {
// Check if we need to refresh the CPU cache information.
if self.cpu_hotplug_cnt == self.skel.maps.bss_data.cpu_hotplug_cnt {
return;
}

// Re-initialize cache domains.
let topo = Topology::new().unwrap();
Self::init_l2_cache_domains(&mut self.skel, &topo).unwrap();
Self::init_l3_cache_domains(&mut self.skel, &topo).unwrap();

// Update CPU hotplug generation counter.
self.cpu_hotplug_cnt = self.skel.maps.bss_data.cpu_hotplug_cnt;
}

// Notify the BPF component that the user-space scheduler has completed its scheduling cycle,
// updating the amount tasks that are still peding.
//
// NOTE: do not set allow(dead_code) for this method, any scheduler must use this method at
// some point, otherwise the BPF component will keep waking-up the user-space scheduler in a
// busy loop, causing unnecessary high CPU consumption.
pub fn notify_complete(&mut self, nr_pending: u64) {
self.refresh_cache_domains();
self.skel.maps.bss_data.nr_scheduled = nr_pending;
std::thread::yield_now();
}
Expand Down
116 changes: 0 additions & 116 deletions rust/scx_rustland_core/assets/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,6 @@ const volatile bool debug;
*/
const volatile bool smt_enabled = true;

/*
* Mask of offline CPUs, used to properly support CPU hotplugging.
*/
private(BPFLAND) struct bpf_cpumask __kptr *offline_cpumask;

/*
* CPU hotplugging generation counter (used to notify the user-space
* counterpart when a CPU hotplug event happened, allowing it to refresh the
* topology information).
*/
volatile u64 cpu_hotplug_cnt;

/*
* Set the state of a CPU in a cpumask.
*/
Expand Down Expand Up @@ -141,28 +129,6 @@ static int calloc_cpumask(struct bpf_cpumask **p_cpumask)
return 0;
}

/*
* Determine when we need to drain tasks dispatched to CPUs that went offline.
*/
static int offline_needed;

/*
* Notify the scheduler that we need to drain and re-enqueue the tasks
* dispatched to the offline CPU DSQs.
*/
static void set_offline_needed(void)
{
__sync_fetch_and_or(&offline_needed, 1);
}

/*
* Check and clear the state of the offline CPUs re-enqueuing.
*/
static bool test_and_clear_offline_needed(void)
{
return __sync_fetch_and_and(&offline_needed, 0) == 1;
}

/*
* Maximum amount of tasks queued between kernel and user-space at a certain
* time.
Expand Down Expand Up @@ -948,50 +914,6 @@ static long handle_dispatched_task(struct bpf_dynptr *dynptr, void *context)
return !!scx_bpf_dispatch_nr_slots();
}

/*
* Consume tasks dispatched to CPUs that have gone offline.
*
* These tasks will be consumed on other active CPUs to prevent indefinite
* stalling.
*
* Return true if one task is consumed, false otherwise.
*/
static bool consume_offline_cpus(s32 cpu)
{
u64 nr_cpu_ids = scx_bpf_nr_cpu_ids();
struct bpf_cpumask *offline;
bool ret = false;

if (!test_and_clear_offline_needed())
return false;

offline = offline_cpumask;
if (!offline)
return false;

/*
* Cycle through all the CPUs and evenly consume tasks from the DSQs of
* those that are offline.
*/
bpf_repeat(nr_cpu_ids - 1) {
cpu = (cpu + 1) % nr_cpu_ids;

if (!bpf_cpumask_test_cpu(cpu, cast_mask(offline)))
continue;
/*
* This CPU is offline, if a task has been dispatched there
* consume it immediately on the current CPU.
*/
if (scx_bpf_consume(cpu_to_dsq(cpu))) {
set_offline_needed();
ret = true;
break;
}
}

return ret;
}

/*
* Dispatch tasks that are ready to run.
*
Expand All @@ -1015,13 +937,6 @@ void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
*/
dispatch_user_scheduler();

/*
* Try to steal a task dispatched to CPUs that may have gone offline
* (this allows to prevent indefinite task stalls).
*/
if (consume_offline_cpus(cpu))
return;

/*
* Consume a task from the per-CPU DSQ.
*/
Expand Down Expand Up @@ -1136,26 +1051,6 @@ void BPF_STRUCT_OPS(rustland_cpu_release, s32 cpu,
set_usersched_needed();
}

void BPF_STRUCT_OPS(rustland_cpu_online, s32 cpu)
{
/* Set the CPU state to online */
set_cpu_state(offline_cpumask, cpu, false);

__sync_fetch_and_add(&nr_online_cpus, 1);
__sync_fetch_and_add(&cpu_hotplug_cnt, 1);
}

void BPF_STRUCT_OPS(rustland_cpu_offline, s32 cpu)
{
/* Set the CPU state to offline */
set_cpu_state(offline_cpumask, cpu, true);

__sync_fetch_and_sub(&nr_online_cpus, 1);
__sync_fetch_and_add(&cpu_hotplug_cnt, 1);

set_offline_needed();
}

/*
* A new task @p is being created.
*
Expand Down Expand Up @@ -1367,20 +1262,11 @@ int enable_sibling_cpu(struct domain_arg *input)
*/
s32 BPF_STRUCT_OPS_SLEEPABLE(rustland_init)
{
struct bpf_cpumask *mask;
int err;

/* Compile-time checks */
BUILD_BUG_ON((MAX_CPUS % 2));

/* Initialize the offline CPU mask */
err = calloc_cpumask(&offline_cpumask);
mask = offline_cpumask;
if (!mask)
err = -ENOMEM;
if (err)
return err;

/* Initialize rustland core */
err = dsq_init();
if (err)
Expand Down Expand Up @@ -1412,8 +1298,6 @@ SCX_OPS_DEFINE(rustland,
.update_idle = (void *)rustland_update_idle,
.set_cpumask = (void *)rustland_set_cpumask,
.cpu_release = (void *)rustland_cpu_release,
.cpu_online = (void *)rustland_cpu_online,
.cpu_offline = (void *)rustland_cpu_offline,
.init_task = (void *)rustland_init_task,
.init = (void *)rustland_init,
.exit = (void *)rustland_exit,
Expand Down

0 comments on commit c9425e9

Please sign in to comment.