diff --git a/rust/scx_rustland_core/Cargo.toml b/rust/scx_rustland_core/Cargo.toml index ca3e103c6..3acefc22e 100644 --- a/rust/scx_rustland_core/Cargo.toml +++ b/rust/scx_rustland_core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scx_rustland_core" -version = "2.0.1" +version = "2.1.1" edition = "2021" authors = ["Andrea Righi "] license = "GPL-2.0-only" diff --git a/rust/scx_rustland_core/assets/bpf.rs b/rust/scx_rustland_core/assets/bpf.rs index 528e37e1a..b24c72b8d 100644 --- a/rust/scx_rustland_core/assets/bpf.rs +++ b/rust/scx_rustland_core/assets/bpf.rs @@ -14,6 +14,7 @@ use std::ffi::c_ulong; use std::fs::File; use std::io::Read; +use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -40,6 +41,7 @@ use scx_utils::scx_ops_load; use scx_utils::scx_ops_open; use scx_utils::uei_exited; use scx_utils::uei_report; +use scx_utils::Topology; use scx_utils::UserExitInfo; use scx_rustland_core::ALLOCATOR; @@ -78,6 +80,7 @@ pub const RL_CPU_ANY: u64 = bpf_intf::RL_CPU_ANY as u64; pub struct QueuedTask { pub pid: i32, // pid that uniquely identifies a task pub cpu: i32, // CPU where the task is running + pub flags: u64, // task enqueue flags pub sum_exec_runtime: u64, // Total cpu time pub weight: u64, // Task static priority cpumask_cnt: u64, // cpumask generation counter (private) @@ -139,9 +142,10 @@ impl EnqueuedMessage { QueuedTask { pid: self.inner.pid, cpu: self.inner.cpu, - cpumask_cnt: self.inner.cpumask_cnt, + flags: self.inner.flags, sum_exec_runtime: self.inner.sum_exec_runtime, weight: self.inner.weight, + cpumask_cnt: self.inner.cpumask_cnt, } } } @@ -151,6 +155,7 @@ pub struct BpfScheduler<'cb> { shutdown: Arc, // Determine scheduler shutdown queued: libbpf_rs::RingBuffer<'cb>, // Ring buffer of queued tasks dispatched: libbpf_rs::UserRingBuffer, // User Ring buffer of dispatched tasks + cpu_hotplug_cnt: u64, // CPU hotplug generation counter struct_ops: Option, // Low-level BPF methods } @@ -251,6 +256,12 @@ impl<'cb> BpfScheduler<'cb> { // Attach BPF scheduler. let mut skel = scx_ops_load!(skel, rustland, uei)?; + + // Initialize cache domains. + let topo = Topology::new().unwrap(); + Self::init_l2_cache_domains(&mut skel, &topo)?; + Self::init_l3_cache_domains(&mut skel, &topo)?; + let struct_ops = Some(scx_ops_attach!(skel, rustland)?); // Build the ring buffer of queued tasks. @@ -272,6 +283,7 @@ impl<'cb> BpfScheduler<'cb> { shutdown, queued, dispatched, + cpu_hotplug_cnt: 0, struct_ops, }), err => Err(anyhow::Error::msg(format!( @@ -281,6 +293,105 @@ impl<'cb> BpfScheduler<'cb> { } } + fn enable_sibling_cpu( + skel: &mut BpfSkel<'_>, + lvl: usize, + cpu: usize, + sibling_cpu: usize, + ) -> Result<(), u32> { + let prog = &mut skel.progs.enable_sibling_cpu; + let mut args = domain_arg { + lvl_id: lvl as c_int, + cpu_id: cpu as c_int, + sibling_cpu_id: sibling_cpu as c_int, + }; + let input = ProgramInput { + context_in: Some(unsafe { + std::slice::from_raw_parts_mut( + &mut args as *mut _ as *mut u8, + std::mem::size_of_val(&args), + ) + }), + ..Default::default() + }; + let out = prog.test_run(input).unwrap(); + if out.return_value != 0 { + return Err(out.return_value); + } + + Ok(()) + } + + fn init_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + cache_lvl: usize, + enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>, + ) -> Result<(), std::io::Error> { + // Determine the list of CPU IDs associated to each cache node. + let mut cache_id_map: HashMap> = HashMap::new(); + for core in topo.cores().into_iter() { + for (cpu_id, cpu) in core.cpus() { + let cache_id = match cache_lvl { + 2 => cpu.l2_id(), + 3 => cpu.l3_id(), + _ => panic!("invalid cache level {}", cache_lvl), + }; + cache_id_map + .entry(cache_id) + .or_insert_with(Vec::new) + .push(*cpu_id); + } + } + + // Update the BPF cpumasks for the cache domains. + for (_cache_id, cpus) in cache_id_map { + for cpu in &cpus { + for sibling_cpu in &cpus { + match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) { + Ok(()) => {} + Err(_) => {} + } + } + } + } + + Ok(()) + } + + fn init_l2_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + ) -> Result<(), std::io::Error> { + Self::init_cache_domains(skel, topo, 2, &|skel, lvl, cpu, sibling_cpu| { + Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu) + }) + } + + fn init_l3_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + ) -> Result<(), std::io::Error> { + Self::init_cache_domains(skel, topo, 3, &|skel, lvl, cpu, sibling_cpu| { + Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu) + }) + } + + fn refresh_cache_domains(&mut self) { + // Check if we need to refresh the CPU cache information. + if self.cpu_hotplug_cnt == self.skel.maps.bss_data.cpu_hotplug_cnt { + return; + } + + // Re-initialize cache domains. + let topo = Topology::new().unwrap(); + Self::init_l2_cache_domains(&mut self.skel, &topo).unwrap(); + Self::init_l3_cache_domains(&mut self.skel, &topo).unwrap(); + + // Update CPU hotplug generation counter. + self.cpu_hotplug_cnt = self.skel.maps.bss_data.cpu_hotplug_cnt; + } + // Notify the BPF component that the user-space scheduler has completed its scheduling cycle, // updating the amount tasks that are still peding. // @@ -288,6 +399,7 @@ impl<'cb> BpfScheduler<'cb> { // some point, otherwise the BPF component will keep waking-up the user-space scheduler in a // busy loop, causing unnecessary high CPU consumption. pub fn notify_complete(&mut self, nr_pending: u64) { + self.refresh_cache_domains(); self.skel.maps.bss_data.nr_scheduled = nr_pending; std::thread::yield_now(); } diff --git a/rust/scx_rustland_core/assets/bpf/intf.h b/rust/scx_rustland_core/assets/bpf/intf.h index db409a625..207eaaafc 100644 --- a/rust/scx_rustland_core/assets/bpf/intf.h +++ b/rust/scx_rustland_core/assets/bpf/intf.h @@ -65,6 +65,15 @@ struct task_cpu_arg { u64 flags; }; +/* + * Specify a sibling CPU relationship for a specific scheduling domain. + */ +struct domain_arg { + s32 lvl_id; + s32 cpu_id; + s32 sibling_cpu_id; +}; + /* * Task sent to the user-space scheduler by the BPF dispatcher. * @@ -73,6 +82,7 @@ struct task_cpu_arg { struct queued_task_ctx { s32 pid; s32 cpu; /* CPU where the task is running */ + u64 flags; /* task enqueue flags */ u64 cpumask_cnt; /* cpumask generation counter */ u64 sum_exec_runtime; /* Total cpu time */ u64 weight; /* Task static priority */ diff --git a/rust/scx_rustland_core/assets/bpf/main.bpf.c b/rust/scx_rustland_core/assets/bpf/main.bpf.c index acdb65f60..0727ce42e 100644 --- a/rust/scx_rustland_core/assets/bpf/main.bpf.c +++ b/rust/scx_rustland_core/assets/bpf/main.bpf.c @@ -97,6 +97,13 @@ const volatile bool smt_enabled = true; */ private(BPFLAND) struct bpf_cpumask __kptr *offline_cpumask; +/* + * CPU hotplugging generation counter (used to notify the user-space + * counterpart when a CPU hotplug event happened, allowing it to refresh the + * topology information). + */ +volatile u64 cpu_hotplug_cnt; + /* * Set the state of a CPU in a cpumask. */ @@ -195,12 +202,42 @@ struct { sizeof(struct dispatched_task_ctx)); } dispatched SEC(".maps"); +/* + * Per-CPU context. + */ +struct cpu_ctx { + struct bpf_cpumask __kptr *l2_cpumask; + struct bpf_cpumask __kptr *l3_cpumask; +}; + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __type(key, u32); + __type(value, struct cpu_ctx); + __uint(max_entries, 1); +} cpu_ctx_stor SEC(".maps"); + +/* + * Return a CPU context. + */ +struct cpu_ctx *try_lookup_cpu_ctx(s32 cpu) +{ + const u32 idx = 0; + return bpf_map_lookup_percpu_elem(&cpu_ctx_stor, &idx, cpu); +} + /* * Per-task local storage. * * This contain all the per-task information used internally by the BPF code. */ struct task_ctx { + /* + * Temporary cpumask for calculating scheduling domains. + */ + struct bpf_cpumask __kptr *l2_cpumask; + struct bpf_cpumask __kptr *l3_cpumask; + /* * Time slice assigned to the task. */ @@ -385,7 +422,7 @@ static void dispatch_task(struct task_struct *p, u64 dsq_id, * Bounce to the shared DSQ if we can't find a valid task * context. */ - scx_bpf_dispatch_vtime(p, dsq_id, SCX_SLICE_DFL, vtime, 0); + scx_bpf_dispatch_vtime(p, SHARED_DSQ, SCX_SLICE_DFL, vtime, 0); return; } tctx->slice_ns = slice; @@ -499,11 +536,31 @@ static bool dispatch_user_scheduler(void) * to handle these mistakes in favor of a more efficient response and a reduced * scheduling overhead. */ -static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) +static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 enq_flags) { const struct cpumask *online_cpumask, *idle_smtmask, *idle_cpumask; + struct bpf_cpumask *l2_domain, *l3_domain; + struct bpf_cpumask *l2_mask, *l3_mask; + struct task_ctx *tctx; + struct cpu_ctx *cctx; s32 cpu; + tctx = try_lookup_task_ctx(p); + if (!tctx) + return -ENOENT; + cctx = try_lookup_cpu_ctx(prev_cpu); + if (!cctx) + return -ENOENT; + + /* + * If the task isn't allowed to use its previously used CPU it means + * that it's rapidly changing affinity. In this case it's pointless to + * find an optimal idle CPU, just return and let the task being + * dispatched to a global DSQ. + */ + if (!bpf_cpumask_test_cpu(prev_cpu, p->cpus_ptr)) + return -ENOENT; + /* * For tasks that can run only on a single CPU, we can simply verify if * their only allowed CPU is idle. @@ -523,6 +580,83 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) idle_smtmask = scx_bpf_get_idle_smtmask(); idle_cpumask = scx_bpf_get_idle_cpumask(); + /* + * Scheduling domains of the previously used CPU. + */ + l2_domain = cctx->l2_cpumask; + if (!l2_domain) + l2_domain = p->cpus_ptr; + l3_domain = cctx->l3_cpumask; + if (!l3_domain) + l3_domain = p->cpus_ptr; + + /* + * Task's scheduling domains. + */ + l2_mask = tctx->l2_cpumask; + if (!l2_mask) { + scx_bpf_error("l2 cpumask not initialized"); + cpu = prev_cpu; + goto out_put_cpumask; + } + l3_mask = tctx->l3_cpumask; + if (!l3_mask) { + scx_bpf_error("l3 cpumask not initialized"); + cpu = prev_cpu; + goto out_put_cpumask; + } + + /* + * Determine the L2 cache domain as the intersection of the task's + * primary cpumask and the L2 cache domain mask of the previously used + * CPU (ignore if this cpumask completely overlaps with the task's + * cpumask). + */ + bpf_cpumask_and(l2_mask, p->cpus_ptr, cast_mask(l2_domain)); + + /* + * Determine the L3 cache domain as the intersection of the task's + * primary cpumask and the L3 cache domain mask of the previously used + * CPU (ignore if this cpumask completely overlaps with the task's + * cpumask). + */ + bpf_cpumask_and(l3_mask, p->cpus_ptr, cast_mask(l3_domain)); + + /* + * Try to prioritize newly awakened tasks by immediately promoting them + * as interactive. + */ + if (enq_flags & SCX_ENQ_WAKEUP) { + struct task_struct *current = (void *)bpf_get_current_task_btf(); + + /* + * If the CPUs of the waker and the wakee share the same L3 + * cache, we can try to re-use the same CPU, if it's a fully + * idle core. + */ + cpu = bpf_get_smp_processor_id(); + if (bpf_cpumask_test_cpu(cpu, cast_mask(l3_mask)) && + bpf_cpumask_test_cpu(prev_cpu, idle_smtmask) && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + goto out_put_cpumask; + } + + /* + * Otherwise try to run the task on the same CPU as the waker, + * provided it's is still alive and the task's cpumask allows + * it. + * + * This allows to improve producer->consumer pipelines. + */ + if (!(current->flags & PF_EXITING)) { + cpu = bpf_get_smp_processor_id(); + if (bpf_cpumask_test_cpu(cpu, p->cpus_ptr) && + scx_bpf_dsq_nr_queued(cpu_to_dsq(cpu)) == 0) + goto out_put_cpumask; + } + } + /* * Find the best idle CPU, prioritizing full idle cores in SMT systems. */ @@ -538,6 +672,24 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) goto out_put_cpumask; } + /* + * Search for any full-idle CPU in the task domain that shares + * the same L2 cache. + */ + cpu = bpf_cpumask_any_and_distribute(cast_mask(l2_mask), idle_smtmask); + if (bpf_cpumask_test_cpu(cpu, online_cpumask) && + scx_bpf_test_and_clear_cpu_idle(cpu)) + goto out_put_cpumask; + + /* + * Search for any full-idle CPU in the task domain that shares + * the same L3 cache. + */ + cpu = bpf_cpumask_any_and_distribute(cast_mask(l3_mask), idle_smtmask); + if (bpf_cpumask_test_cpu(cpu, online_cpumask) && + scx_bpf_test_and_clear_cpu_idle(cpu)) + goto out_put_cpumask; + /* * Otherwise, search for another usable full-idle core. */ @@ -557,6 +709,24 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) goto out_put_cpumask; } + /* + * Search for any idle CPU in the primary domain that shares the same + * L2 cache. + */ + cpu = bpf_cpumask_any_and_distribute(cast_mask(l2_mask), idle_cpumask); + if (bpf_cpumask_test_cpu(cpu, online_cpumask) && + scx_bpf_test_and_clear_cpu_idle(cpu)) + goto out_put_cpumask; + + /* + * Search for any idle CPU in the primary domain that shares the same + * L3 cache. + */ + cpu = bpf_cpumask_any_and_distribute(cast_mask(l3_mask), idle_cpumask); + if (bpf_cpumask_test_cpu(cpu, online_cpumask) && + scx_bpf_test_and_clear_cpu_idle(cpu)) + goto out_put_cpumask; + /* * If all the previous attempts have failed, try to use any idle CPU in * the system. @@ -615,13 +785,14 @@ int rs_select_cpu(struct task_cpu_arg *input) * Fill @task with all the information that need to be sent to the user-space * scheduler. */ -static void -get_task_info(struct queued_task_ctx *task, const struct task_struct *p) +static void get_task_info(struct queued_task_ctx *task, + const struct task_struct *p, u64 enq_flags) { struct task_ctx *tctx = try_lookup_task_ctx(p); task->pid = p->pid; task->sum_exec_runtime = p->se.sum_exec_runtime; + task->flags = enq_flags; task->weight = p->scx.weight; task->cpu = scx_bpf_task_cpu(p); task->cpumask_cnt = tctx ? tctx->cpumask_cnt : 0; @@ -683,7 +854,7 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags) __sync_fetch_and_add(&nr_kernel_dispatches, 1); return; } - get_task_info(task, p); + get_task_info(task, p, enq_flags); dbg_msg("enqueue: pid=%d (%s)", p->pid, p->comm); bpf_ringbuf_submit(task, 0); @@ -698,7 +869,7 @@ static long handle_dispatched_task(struct bpf_dynptr *dynptr, void *context) { const struct dispatched_task_ctx *task; struct task_struct *p; - u64 enq_flags = 0, dsq_id; + u64 dsq_id; /* Get a pointer to the dispatched task */ task = bpf_dynptr_data(dynptr, 0, sizeof(*task)); @@ -932,6 +1103,7 @@ void BPF_STRUCT_OPS(rustland_cpu_online, s32 cpu) set_cpu_state(offline_cpumask, cpu, false); __sync_fetch_and_add(&nr_online_cpus, 1); + __sync_fetch_and_add(&cpu_hotplug_cnt, 1); } void BPF_STRUCT_OPS(rustland_cpu_offline, s32 cpu) @@ -940,6 +1112,8 @@ void BPF_STRUCT_OPS(rustland_cpu_offline, s32 cpu) set_cpu_state(offline_cpumask, cpu, true); __sync_fetch_and_sub(&nr_online_cpus, 1); + __sync_fetch_and_add(&cpu_hotplug_cnt, 1); + set_offline_needed(); } @@ -952,15 +1126,35 @@ void BPF_STRUCT_OPS(rustland_cpu_offline, s32 cpu) s32 BPF_STRUCT_OPS(rustland_init_task, struct task_struct *p, struct scx_init_task_args *args) { - struct task_ctx *tctx;; + struct task_ctx *tctx; + struct bpf_cpumask *cpumask; - /* Allocate task's local storage */ tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); if (!tctx) return -ENOMEM; tctx->slice_ns = SCX_SLICE_DFL; + /* + * Create task's L2 cache cpumask. + */ + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + cpumask = bpf_kptr_xchg(&tctx->l2_cpumask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + /* + * Create task's L3 cache cpumask. + */ + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + cpumask = bpf_kptr_xchg(&tctx->l3_cpumask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + return 0; } @@ -1071,6 +1265,64 @@ static int dsq_init(void) return 0; } +static int init_cpumask(struct bpf_cpumask **cpumask) +{ + struct bpf_cpumask *mask; + int err = 0; + + /* + * Do nothing if the mask is already initialized. + */ + mask = *cpumask; + if (mask) + return 0; + /* + * Create the CPU mask. + */ + err = calloc_cpumask(cpumask); + if (!err) + mask = *cpumask; + if (!mask) + err = -ENOMEM; + + return err; +} + +SEC("syscall") +int enable_sibling_cpu(struct domain_arg *input) +{ + struct cpu_ctx *cctx; + struct bpf_cpumask *mask, **pmask; + int err = 0; + + cctx = try_lookup_cpu_ctx(input->cpu_id); + if (!cctx) + return -ENOENT; + + /* Make sure the target CPU mask is initialized */ + switch (input->lvl_id) { + case 2: + pmask = &cctx->l2_cpumask; + break; + case 3: + pmask = &cctx->l3_cpumask; + break; + default: + return -EINVAL; + } + err = init_cpumask(pmask); + if (err) + return err; + + bpf_rcu_read_lock(); + mask = *pmask; + if (mask) + bpf_cpumask_set_cpu(input->sibling_cpu_id, mask); + bpf_rcu_read_unlock(); + + return err; +} + /* * Initialize the scheduling class. */ diff --git a/scheds/rust/Cargo.lock b/scheds/rust/Cargo.lock index 754eb677d..4891a8d90 100644 --- a/scheds/rust/Cargo.lock +++ b/scheds/rust/Cargo.lock @@ -1237,7 +1237,7 @@ dependencies = [ [[package]] name = "scx_rustland_core" -version = "2.0.1" +version = "2.1.1" dependencies = [ "anyhow", "libbpf-rs", diff --git a/scheds/rust/scx_rlfifo/Cargo.toml b/scheds/rust/scx_rlfifo/Cargo.toml index 892b50a87..3d9b4020a 100644 --- a/scheds/rust/scx_rlfifo/Cargo.toml +++ b/scheds/rust/scx_rlfifo/Cargo.toml @@ -13,11 +13,11 @@ ctrlc = { version = "3.1", features = ["termination"] } libbpf-rs = "0.24.1" libc = "0.2.137" scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } -scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.0.1" } +scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.1.1" } [build-dependencies] scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } -scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.0.1" } +scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.1.1" } [features] enable_backtrace = [] diff --git a/scheds/rust/scx_rlfifo/src/main.rs b/scheds/rust/scx_rlfifo/src/main.rs index 91485c06d..a6c9407a2 100644 --- a/scheds/rust/scx_rlfifo/src/main.rs +++ b/scheds/rust/scx_rlfifo/src/main.rs @@ -2,6 +2,66 @@ // This software may be used and distributed according to the terms of the // GNU General Public License version 2. + +//! # FIFO Linux kernel scheduler that runs in user-space +//! +//! ## Overview +//! +//! This is a fully functional FIFO scheduler for the Linux kernel that operates in user-space and +//! it is 100% implemented in Rust. +//! +//! The scheduler is designed to serve as a simple template for developers looking to implement +//! more advanced scheduling policies. +//! +//! It is based on `scx_rustland_core`, a framework that is specifically designed to simplify the +//! creation of user-space schedulers, leveraging the Linux kernel's `sched_ext` feature (a +//! technology that allows to implement schedulers in BPF). +//! +//! The `scx_rustland_core` crate offers an abstraction layer over `sched_ext`, enabling developers +//! to write schedulers in Rust without needing to interact directly with low-level kernel or BPF +//! internal details. +//! +//! ## scx_rustland_core API +//! +//! ### struct `BpfScheduler` +//! +//! The `BpfScheduler` struct is the core interface for interacting with `sched_ext` via BPF. +//! +//! - **Initialization**: +//! - `BpfScheduler::init()` registers the scheduler and initializes the BPF component. +//! +//! - **Task Management**: +//! - `dequeue_task()`: Consume a task that wants to run, returns a QueuedTask object +//! - `select_cpu(pid: i32, prev_cpu: i32, flags: u64)`: Select an idle CPU for a task +//! - `dispatch_task(task: &DispatchedTask)`: Dispatch a task +//! +//! - **Completion Notification**: +//! - `notify_complete(nr_pending: u64)` Give control to the BPF component and report the number +//! of tasks that are still pending (this function can sleep) +//! +//! Each task received from dequeue_task() contains the following: +//! +//! struct QueuedTask { +//! pub pid: i32, // pid that uniquely identifies a task +//! pub cpu: i32, // CPU previously used by the task +//! pub flags: u64, // task's enqueue flags +//! pub sum_exec_runtime: u64, // Total cpu time in nanoseconds +//! pub weight: u64, // Task priority in the range [1..10000] (default is 100) +//! } +//! +//! Each task dispatched using dispatch_task() contains the following: +//! +//! struct DispatchedTask { +//! pub pid: i32, // pid that uniquely identifies a task +//! pub cpu: i32, // target CPU selected by the scheduler +//! pub flags: u64, // special dispatch flags: +//! // - RL_CPU_ANY = dispatch on the first CPU available +//! pub slice_ns: u64, // time slice in nanoseconds assigned to the task +//! // (0 = use default) +//! pub vtime: u64, // this value can be used to send the task's vruntime or deadline +//! // directly to the underlying BPF dispatcher +//! } + mod bpf_skel; pub use bpf_skel::*; pub mod bpf_intf; @@ -13,17 +73,16 @@ use scx_utils::UserExitInfo; use libbpf_rs::OpenObject; -use std::collections::VecDeque; use std::mem::MaybeUninit; use std::time::SystemTime; use anyhow::Result; -const SLICE_US: u64 = 5000; +// Maximum time slice (in nanoseconds) that a task can use before it is re-enqueued. +const SLICE_NS: u64 = 5_000_000; struct Scheduler<'a> { - bpf: BpfScheduler<'a>, - task_queue: VecDeque, + bpf: BpfScheduler<'a>, // Connector to the sched_ext BPF backend } impl<'a> Scheduler<'a> { @@ -34,86 +93,45 @@ impl<'a> Scheduler<'a> { false, // partial (false = include all tasks) false, // debug (false = debug mode off) )?; - Ok(Self { - bpf, - task_queue: VecDeque::new(), - }) + Ok(Self { bpf }) } - fn consume_all_tasks(&mut self) { - // Consume all tasks that are ready to run. - // - // Each task contains the following details: - // - // pub struct QueuedTask { - // pub pid: i32, // pid that uniquely identifies a task - // pub cpu: i32, // CPU where the task is running - // pub sum_exec_runtime: u64, // Total cpu time - // pub weight: u64, // Task static priority - // } - // - // Although the FIFO scheduler doesn't use these fields, they can provide valuable data for - // implementing more sophisticated scheduling policies. - while let Ok(Some(task)) = self.bpf.dequeue_task() { - self.task_queue.push_back(task); - } - } + fn dispatch_tasks(&mut self) { + // Get the amount of tasks that are waiting to be scheduled. + let nr_waiting = *self.bpf.nr_queued_mut(); - fn dispatch_next_task(&mut self) { - if let Some(task) = self.task_queue.pop_front() { - // Create a new task to be dispatched, derived from the received enqueued task. - // - // pub struct DispatchedTask { - // pub pid: i32, // pid that uniquely identifies a task - // pub cpu: i32, // target CPU selected by the scheduler - // pub flags: u64, // special dispatch flags - // pub slice_ns: u64, // time slice assigned to the task (0 = default) - // } - // - // The dispatched task's information are pre-populated from the QueuedTask and they can - // be modified before dispatching it via self.bpf.dispatch_task(). + // Start consuming and dispatching tasks, until all the CPUs are busy or there are no more + // tasks to be dispatched. + while let Ok(Some(task)) = self.bpf.dequeue_task() { + // Create a new task to be dispatched from the received enqueued task. let mut dispatched_task = DispatchedTask::new(&task); - // Decide where the task needs to run (target CPU). + // Decide where the task needs to run (pick a target CPU). // // A call to select_cpu() will return the most suitable idle CPU for the task, - // considering its previously used CPU. - let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); - if cpu >= 0 { - dispatched_task.cpu = cpu; - } else { - dispatched_task.flags |= RL_CPU_ANY; - } + // prioritizing its previously used CPU (task.cpu). + // + // If we can't find any idle CPU, keep the task running on the same CPU. + let cpu = self.bpf.select_cpu(task.pid, task.cpu, task.flags); + dispatched_task.cpu = if cpu < 0 { task.cpu } else { cpu }; - // Decide for how long the task needs to run (time slice); if not specified - // SCX_SLICE_DFL will be used by default. - dispatched_task.slice_ns = SLICE_US; + // Determine the task's time slice: assign value inversely proportional to the number + // of tasks waiting to be scheduled. + dispatched_task.slice_ns = SLICE_NS / (nr_waiting + 1); - // Dispatch the task on the target CPU. + // Dispatch the task. self.bpf.dispatch_task(&dispatched_task).unwrap(); - // Notify the BPF component of the number of pending tasks and immediately give a - // chance to run to the dispatched task. - self.bpf.notify_complete(self.task_queue.len() as u64); - } - } - - fn dispatch_tasks(&mut self) { - loop { - // Consume all tasks before dispatching any. - self.consume_all_tasks(); - - // Dispatch one task from the queue. - self.dispatch_next_task(); - - // If no task is ready to run (or in case of error), stop dispatching tasks and notify - // the BPF component that all tasks have been scheduled / dispatched, with no remaining - // pending tasks. - if self.task_queue.is_empty() { - self.bpf.notify_complete(0); + // Stop dispatching if all the CPUs are busy (select_cpu() couldn't find an idle CPU). + if cpu < 0 { break; } } + + // Notify the BPF component that tasks have been dispatched. + // + // This function will put the scheduler to sleep, until another task needs to run. + self.bpf.notify_complete(0); } fn print_stats(&mut self) { diff --git a/scheds/rust/scx_rustland/Cargo.toml b/scheds/rust/scx_rustland/Cargo.toml index c4c2ab13d..e4caa6b03 100644 --- a/scheds/rust/scx_rustland/Cargo.toml +++ b/scheds/rust/scx_rustland/Cargo.toml @@ -20,12 +20,12 @@ serde = { version = "1.0", features = ["derive"] } scx_stats = { path = "../../../rust/scx_stats", version = "1.0.4" } scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "1.0.4" } scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } -scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.0.1" } +scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.1.1" } simplelog = "0.12" [build-dependencies] scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } -scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.0.1" } +scx_rustland_core = { path = "../../../rust/scx_rustland_core", version = "2.1.1" } [features] enable_backtrace = [] diff --git a/scheds/rust/scx_rustland/src/main.rs b/scheds/rust/scx_rustland/src/main.rs index 2c0bf7099..86421caac 100644 --- a/scheds/rust/scx_rustland/src/main.rs +++ b/scheds/rust/scx_rustland/src/main.rs @@ -202,9 +202,9 @@ impl TaskInfoMap { #[derive(Debug, PartialEq, Eq, PartialOrd, Clone)] struct Task { - qtask: QueuedTask, // queued task - vruntime: u64, // total vruntime (that determines the order how tasks are dispatched) - timestamp: u64, // task enqueue timestamp + qtask: QueuedTask, // queued task + vruntime: u64, // total vruntime (that determines the order how tasks are dispatched) + timestamp: u64, // task enqueue timestamp } // Sort tasks by their interactive status first (interactive tasks are always scheduled before @@ -448,9 +448,9 @@ impl<'a> Scheduler<'a> { dispatched_task.slice_ns = slice_ns; // Try to pick an idle CPU for the task. - let cpu = self - .bpf - .select_cpu(dispatched_task.pid, dispatched_task.cpu, 0); + let cpu = + self.bpf + .select_cpu(dispatched_task.pid, dispatched_task.cpu, task.qtask.flags); if cpu >= 0 { dispatched_task.cpu = cpu; } else {