diff --git a/Cargo.lock b/Cargo.lock index 84f6fc762..11b6b8369 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1700,6 +1700,23 @@ dependencies = [ "simplelog", ] +[[package]] +name = "scx_fair" +version = "1.0.4" +dependencies = [ + "anyhow", + "clap", + "crossbeam", + "ctrlc", + "libbpf-rs", + "log", + "scx_stats", + "scx_stats_derive", + "scx_utils", + "serde", + "simplelog", +] + [[package]] name = "scx_lavd" version = "1.0.5" diff --git a/Cargo.toml b/Cargo.toml index 577a32646..11560284e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = ["rust/scx_stats", "rust/scx_loader", "scheds/rust/scx_lavd", "scheds/rust/scx_bpfland", + "scheds/rust/scx_fair", "scheds/rust/scx_rustland", "scheds/rust/scx_rlfifo", "scheds/rust/scx_rusty", diff --git a/meson-scripts/stress_tests.ini b/meson-scripts/stress_tests.ini index e15717c1e..2f809fcfc 100644 --- a/meson-scripts/stress_tests.ini +++ b/meson-scripts/stress_tests.ini @@ -22,6 +22,12 @@ sched_args: -v stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` timeout_sec: 15 +[scx_fair] +sched: scx_fair +sched_args: +stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` +timeout_sec: 15 + [scx_layered] sched: scx_layered sched_args: --run-example -v --stats 1 diff --git a/meson.build b/meson.build index dd29e5206..39b4c425e 100644 --- a/meson.build +++ b/meson.build @@ -327,7 +327,7 @@ if enable_rust run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env) rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo', - 'scx_rusty', + 'scx_fair', 'scx_rusty', 'scx_layered', 'scx_mitosis'] rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils', 'scx_rustland_core', diff --git a/scheds/rust/README.md b/scheds/rust/README.md index d6319a5a6..548e8bf82 100644 --- a/scheds/rust/README.md +++ b/scheds/rust/README.md @@ -18,3 +18,4 @@ main.rs or \*.bpf.c files. - [scx_rlfifo](scx_rlfifo/README.md) - [scx_lavd](scx_lavd/README.md) - [scx_bpfland](scx_bpfland/README.md) +- [scx_fair](scx_fair/README.md) diff --git a/scheds/rust/scx_fair/Cargo.toml b/scheds/rust/scx_fair/Cargo.toml new file mode 100644 index 000000000..2d5b0bfdc --- /dev/null +++ b/scheds/rust/scx_fair/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "scx_fair" +version = "1.0.4" +authors = ["Andrea Righi "] +edition = "2021" +description = "A scheduler designed for multimedia and real-time audio processing workloads. https://github.com/sched-ext/scx/tree/main" +license = "GPL-2.0-only" + +[dependencies] +anyhow = "1.0.65" +ctrlc = { version = "3.1", features = ["termination"] } +clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] } +crossbeam = "0.8.4" +libbpf-rs = "0.24.1" +log = "0.4.17" +scx_stats = { path = "../../../rust/scx_stats", version = "1.0.4" } +scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "1.0.4" } +scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } +serde = { version = "1.0", features = ["derive"] } +simplelog = "0.12" + +[build-dependencies] +scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } + +[features] +enable_backtrace = [] diff --git a/scheds/rust/scx_fair/LICENSE b/scheds/rust/scx_fair/LICENSE new file mode 120000 index 000000000..5853aaea5 --- /dev/null +++ b/scheds/rust/scx_fair/LICENSE @@ -0,0 +1 @@ +../../../LICENSE \ No newline at end of file diff --git a/scheds/rust/scx_fair/README.md b/scheds/rust/scx_fair/README.md new file mode 100644 index 000000000..7e1b6e764 --- /dev/null +++ b/scheds/rust/scx_fair/README.md @@ -0,0 +1,26 @@ +# scx_fair + +This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main). + +## Overview + +A scheduler that focuses on ensuring fairness among tasks and performance +predictability. + +It operates using a vruntime-based policy, where each task is assigned a +"latency" weight. This weight is dynamically adjusted based on how often a task +release the CPU before its full time slice is used. Tasks that release the CPU +early are given a higher latency weight, prioritizing them over tasks that +fully consume their time slice. + +## Typical Use Case + +The combination of dynamic latency weights and vruntime-based scheduling +ensures responsive and consistent performance, even in overcommitted systems. + +This makes the scheduler particularly well-suited for workloads that require +multimedia or real-time audio processing. + +## Production Ready? + +Yes. diff --git a/scheds/rust/scx_fair/build.rs b/scheds/rust/scx_fair/build.rs new file mode 100644 index 000000000..c15be3ff5 --- /dev/null +++ b/scheds/rust/scx_fair/build.rs @@ -0,0 +1,13 @@ +// Copyright (c) Andrea Righi +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +fn main() { + scx_utils::BpfBuilder::new() + .unwrap() + .enable_intf("src/bpf/intf.h", "bpf_intf.rs") + .enable_skel("src/bpf/main.bpf.c", "bpf") + .build() + .unwrap(); +} diff --git a/scheds/rust/scx_fair/rustfmt.toml b/scheds/rust/scx_fair/rustfmt.toml new file mode 100644 index 000000000..b7258ed0a --- /dev/null +++ b/scheds/rust/scx_fair/rustfmt.toml @@ -0,0 +1,8 @@ +# Get help on options with `rustfmt --help=config` +# Please keep these in alphabetical order. +edition = "2021" +group_imports = "StdExternalCrate" +imports_granularity = "Item" +merge_derives = false +use_field_init_shorthand = true +version = "Two" diff --git a/scheds/rust/scx_fair/src/bpf/intf.h b/scheds/rust/scx_fair/src/bpf/intf.h new file mode 100644 index 000000000..f05a2d678 --- /dev/null +++ b/scheds/rust/scx_fair/src/bpf/intf.h @@ -0,0 +1,42 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (c) 2024 Andrea Righi + * + * This software may be used and distributed according to the terms of the GNU + * General Public License version 2. + */ +#ifndef __INTF_H +#define __INTF_H + +#include + +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#define CLAMP(val, lo, hi) MIN(MAX(val, lo), hi) + +enum consts { + NSEC_PER_USEC = 1000ULL, + NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC), + NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC), +}; + +#ifndef __VMLINUX_H__ +typedef unsigned char u8; +typedef unsigned short u16; +typedef unsigned int u32; +typedef unsigned long u64; + +typedef signed char s8; +typedef signed short s16; +typedef signed int s32; +typedef signed long s64; + +typedef int pid_t; +#endif /* __VMLINUX_H__ */ + +struct domain_arg { + s32 cpu_id; + s32 sibling_cpu_id; +}; + +#endif /* __INTF_H */ diff --git a/scheds/rust/scx_fair/src/bpf/main.bpf.c b/scheds/rust/scx_fair/src/bpf/main.bpf.c new file mode 100644 index 000000000..0ef86cf53 --- /dev/null +++ b/scheds/rust/scx_fair/src/bpf/main.bpf.c @@ -0,0 +1,807 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (c) 2024 Andrea Righi + */ +#include +#include "intf.h" + +char _license[] SEC("license") = "GPL"; + +/* + * Global DSQ used to dispatch tasks. + */ +#define SHARED_DSQ 0 + +/* + * Maximum multiplier for the dynamic task priority. + */ +#define MAX_LATENCY_WEIGHT 1000 + +/* + * Task time slice range. + */ +const volatile u64 slice_max = 20ULL * NSEC_PER_MSEC; +const volatile u64 slice_min = 1ULL * NSEC_PER_MSEC; +const volatile u64 slice_lag = 20ULL * NSEC_PER_MSEC; + +/* + * When enabled always dispatch all kthreads directly. + * + * This allows to prioritize critical kernel threads that may potentially slow + * down the entire system if they are blocked for too long, but it may also + * introduce interactivity issues or unfairness in scenarios with high kthread + * activity, such as heavy I/O or network traffic. + */ +const volatile bool local_kthreads; + +/* + * Scheduling statistics. + */ +volatile u64 nr_kthread_dispatches, nr_direct_dispatches, + nr_shared_dispatches, nr_migrate_dispatches; + +/* + * Exit information. + */ +UEI_DEFINE(uei); + +/* + * Amount of possible CPUs in the system. + */ +static u64 nr_possible_cpus; + +/* + * CPUs in the system have SMT is enabled. + */ +const volatile bool smt_enabled = true; + +/* + * Current global vruntime. + */ +static u64 vtime_now; + +/* + * Per-CPU context. + */ +struct cpu_ctx { + u64 tot_runtime; + u64 prev_runtime; + u64 last_running; + struct bpf_cpumask __kptr *llc_mask; +}; + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __type(key, u32); + __type(value, struct cpu_ctx); + __uint(max_entries, 1); +} cpu_ctx_stor SEC(".maps"); + +/* + * Return a CPU context. + */ +struct cpu_ctx *try_lookup_cpu_ctx(s32 cpu) +{ + const u32 idx = 0; + return bpf_map_lookup_percpu_elem(&cpu_ctx_stor, &idx, cpu); +} + +/* + * Per-task local storage. + * + * This contain all the per-task information used internally by the BPF code. + */ +struct task_ctx { + /* + * Temporary cpumask for calculating scheduling domains. + */ + struct bpf_cpumask __kptr *llc_mask; + + /* + * Total execution time of the task. + */ + u64 sum_exec_runtime; + + /* + * Voluntary context switches metrics. + */ + u64 nvcsw; + u64 nvcsw_ts; + + /* + * Task's dynamic priority multiplier. + */ + u64 lat_weight; +}; + +/* Map that contains task-local storage. */ +struct { + __uint(type, BPF_MAP_TYPE_TASK_STORAGE); + __uint(map_flags, BPF_F_NO_PREALLOC); + __type(key, int); + __type(value, struct task_ctx); +} task_ctx_stor SEC(".maps"); + +/* + * Return a local task context from a generic task. + */ +struct task_ctx *try_lookup_task_ctx(const struct task_struct *p) +{ + return bpf_task_storage_get(&task_ctx_stor, + (struct task_struct *)p, 0, 0); +} + +/* + * Allocate/re-allocate a new cpumask. + */ +static int calloc_cpumask(struct bpf_cpumask **p_cpumask) +{ + struct bpf_cpumask *cpumask; + + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + + cpumask = bpf_kptr_xchg(p_cpumask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + return 0; +} + +/* + * Exponential weighted moving average (EWMA). + * + * Copied from scx_lavd. Returns the new average as: + * + * new_avg := (old_avg * .75) + (new_val * .25); + */ +static u64 calc_avg(u64 old_val, u64 new_val) +{ + return (old_val - (old_val >> 2)) + (new_val >> 2); +} + +/* + * Compare two vruntime values, returns true if the first value is less than + * the second one. + * + * Copied from scx_simple. + */ +static inline bool vtime_before(u64 a, u64 b) +{ + return (s64)(a - b) < 0; +} + +/* + * Return true if the target task @p is a kernel thread, false instead. + */ +static inline bool is_kthread(const struct task_struct *p) +{ + return p->flags & PF_KTHREAD; +} + +/* + * Return the dynamic priority multiplier. + * + * The multiplier is evaluated in function of the task's average rate of + * voluntary context switches per second. + */ +static u64 task_dyn_prio(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return 1; + return MAX(tctx->lat_weight, 1); +} + +/* + * Return task's dynamic priority. + */ +static u64 task_prio(struct task_struct *p) +{ + return p->scx.weight * task_dyn_prio(p); +} + +/* + * Return the task's allowed lag: used to determine how early its deadline it + * can be. + */ +static u64 task_lag(struct task_struct *p) +{ + return MIN(slice_lag * task_prio(p) / 100, NSEC_PER_SEC); +} + +/* + * Return a value inversely proportional to the task's weight. + */ +static inline u64 scale_inverse_fair(struct task_struct *p, u64 value) +{ + return value * 100 / task_prio(p, tctx); +} + +/* + * Return task's evaluated deadline. + */ +static inline u64 task_vtime(struct task_struct *p) +{ + u64 min_vruntime = vtime_now - task_lag(p); + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return min_vruntime; + + /* + * Limit the vruntime to vtime_now minus the maximum task's lag to + * avoid excessively penalizing tasks. + */ + if (vtime_before(p->scx.dsq_vtime, min_vruntime)) + p->scx.dsq_vtime = min_vruntime; + + return p->scx.dsq_vtime; +} + +static inline u64 nr_waiting_tasks(void) +{ + return scx_bpf_dsq_nr_queued(SHARED_DSQ); +} + +/* + * Evaluate task's time slice in function of the total amount of tasks that are + * waiting to be dispatched and the task's weight. + */ +static inline void task_refill_slice(struct task_struct *p) +{ + u64 slice, nr_waiting = nr_waiting_tasks(); + + slice = slice_max / (nr_waiting + 1); + p->scx.slice = CLAMP(slice, slice_min, slice_max); +} + +/* + * Scheduling domain types used for idle CPU selection. + */ +static const struct cpumask *sched_domain(const struct task_struct *p, s32 cpu) +{ + struct bpf_cpumask *llc_dom, *task_dom; + struct task_ctx *tctx; + struct cpu_ctx *cctx; + + /* + * Determine the scheduling domain only if the task is allowed to run + * on all CPUs. + * + * This is done primarily for efficiency, as it avoids the overhead of + * updating a cpumask every time we need to select an idle CPU (which + * can be costly in large SMP systems), but it also aligns logically: + * if a task's scheduling domain is restricted by user-space (through + * CPU affinity), the task will simply use the flat scheduling domain + * defined by user-space. + */ + if (p->nr_cpus_allowed < nr_possible_cpus) + return NULL; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return NULL; + task_dom = tctx->llc_mask; + if (!task_dom) { + scx_bpf_error("task cpumask not initialized"); + return NULL; + } + + cctx = try_lookup_cpu_ctx(cpu); + if (!cctx) + return NULL; + llc_dom = tctx->llc_mask; + if (!llc_dom) { + scx_bpf_error("LLC cpumask not initialized"); + return NULL; + } + + /* + * Determine the task's LLC domain as the intersection of the task's + * domain and the LLC domain of the previously used CPU. + */ + bpf_cpumask_and(task_dom, p->cpus_ptr, cast_mask(llc_dom)); + + return cast_mask(task_dom); +} + +/* + * Find an idle CPU in the system. + * + * NOTE: the idle CPU selection doesn't need to be formally perfect, it is + * totally fine to accept racy conditions and potentially make mistakes, by + * picking CPUs that are not idle or even offline, the logic has been designed + * to handle these mistakes in favor of a more efficient response and a reduced + * scheduling overhead. + */ +static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) +{ + const struct cpumask *idle_smtmask, *idle_cpumask; + const struct cpumask *llc_cpus; + s32 cpu; + + /* + * Acquire the CPU masks to determine the online and idle CPUs in the + * system. + */ + idle_smtmask = scx_bpf_get_idle_smtmask(); + idle_cpumask = scx_bpf_get_idle_cpumask(); + + /* + * Determine the scheduling domain of the task. + */ + llc_cpus = sched_domain(p, prev_cpu); + if (!llc_cpus) + llc_cpus = p->cpus_ptr; + + /* + * If the current task is waking up another task and releasing the CPU + * (WAKE_SYNC), attempt to migrate the wakee on the same CPU as the + * waker. + */ + if (wake_flags & SCX_WAKE_SYNC) { + struct task_struct *current = (void *)bpf_get_current_task_btf(); + const struct cpumask *curr_llc_domain; + struct cpu_ctx *cctx; + bool share_llc, has_idle; + + /* + * Determine waker CPU scheduling domain. + */ + cpu = bpf_get_smp_processor_id(); + cctx = try_lookup_cpu_ctx(cpu); + if (!cctx) { + cpu = -EINVAL; + goto out_put_cpumask; + } + + curr_llc_domain = cast_mask(cctx->llc_mask); + if (!curr_llc_domain) + curr_llc_domain = p->cpus_ptr; + + /* + * If both the waker and wakee share the same L3 cache keep + * using the same CPU if possible. + */ + share_llc = bpf_cpumask_test_cpu(prev_cpu, curr_llc_domain); + if (share_llc && scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + goto out_put_cpumask; + } + + /* + * If the waker's L3 domain is not saturated attempt to migrate + * the wakee on the same CPU as the waker (since it's going to + * block and release the current CPU). + */ + has_idle = bpf_cpumask_intersects(curr_llc_domain, idle_cpumask); + if (has_idle && + bpf_cpumask_test_cpu(cpu, p->cpus_ptr) && + !(current->flags & PF_EXITING) && + scx_bpf_dsq_nr_queued(SCX_DSQ_LOCAL_ON | cpu) == 0) + goto out_put_cpumask; + } + + /* + * Find the best idle CPU, prioritizing full idle cores in SMT systems. + */ + if (smt_enabled) { + /* + * If the task can still run on the previously used CPU and + * it's a full-idle core, keep using it. + */ + if (bpf_cpumask_test_cpu(prev_cpu, idle_smtmask) && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + goto out_put_cpumask; + } + + /* + * Search for any full-idle CPU in the same LLC domain. + */ + cpu = scx_bpf_pick_idle_cpu(llc_cpus, SCX_PICK_IDLE_CORE); + if (cpu >= 0) + goto out_put_cpumask; + + /* + * Search for any other full-idle core in the primary domain. + */ + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, SCX_PICK_IDLE_CORE); + if (cpu >= 0) + goto out_put_cpumask; + } + + /* + * If a full-idle core can't be found (or if this is not an SMT system) + * try to re-use the same CPU, even if it's not in a full-idle core. + */ + if (scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + goto out_put_cpumask; + } + + /* + * Search for any idle CPU in the same LLC domain. + */ + cpu = scx_bpf_pick_idle_cpu(llc_cpus, 0); + if (cpu >= 0) + goto out_put_cpumask; + + /* + * Search for any idle CPU in the scheduling domain. + */ + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0); + if (cpu >= 0) + goto out_put_cpumask; + + /* + * We couldn't find any idle CPU, so simply dispatch the task to the + * first CPU that will become available. + */ + cpu = -ENOENT; + +out_put_cpumask: + scx_bpf_put_cpumask(idle_cpumask); + scx_bpf_put_cpumask(idle_smtmask); + + return cpu; +} + +/* + * Pick a target CPU for a task which is being woken up. + * + * If a task is dispatched here, ops.enqueue() will be skipped: task will be + * dispatched directly to the CPU returned by this callback. + */ +s32 BPF_STRUCT_OPS(fair_select_cpu, struct task_struct *p, + s32 prev_cpu, u64 wake_flags) +{ + s32 cpu; + + cpu = pick_idle_cpu(p, prev_cpu, wake_flags); + if (cpu >= 0) { + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, SCX_SLICE_DFL, 0); + __sync_fetch_and_add(&nr_direct_dispatches, 1); + return cpu; + } + + /* + * If we couldn't find any idle CPU, return any CPU in the task's + * domain. + * + * There is not guarantee that the task will be dispatched on this CPU, + * but it can help to wake-up in advance a CPU suitable for the task. + */ + cpu = scx_bpf_pick_any_cpu(p->cpus_ptr, 0); + + return cpu >= 0 ? cpu : prev_cpu; +} + +/* + * Wake up an idle CPU for task @p. + */ +static void kick_task_cpu(struct task_struct *p) +{ + s32 cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0); + + if (cpu >= 0) + scx_bpf_kick_cpu(cpu, 0); +} + +/* + * Dispatch all the other tasks that were not dispatched directly in + * select_cpu(). + */ +void BPF_STRUCT_OPS(fair_enqueue, struct task_struct *p, u64 enq_flags) +{ + s32 cpu; + + /* + * Per-CPU kthreads can be critical for system responsiveness, when + * local_kthreads is specified they are always dispatched directly + * before any other task. + */ + if (local_kthreads && is_kthread(p) && p->nr_cpus_allowed == 1) { + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, SCX_SLICE_DFL, + enq_flags); + __sync_fetch_and_add(&nr_kthread_dispatches, 1); + return; + } + + /* + * Enqueue the task to the global DSQ. The task will be dispatched on + * the first CPU that becomes available. + */ + scx_bpf_dispatch_vtime(p, SHARED_DSQ, SCX_SLICE_DFL, + task_vtime(p), enq_flags); + __sync_fetch_and_add(&nr_shared_dispatches, 1); + + /* + * If there is an idle CPU available for the task, wake it up so it can + * consume the task immediately. + */ + kick_task_cpu(p); +} + +void BPF_STRUCT_OPS(fair_dispatch, s32 cpu, struct task_struct *prev) +{ + if (scx_bpf_consume(SHARED_DSQ)) + return; + + /* + * If the current task expired its time slice and no other task wants + * to run, simply replenish its time slice and let it run for another + * round on the same CPU. + */ + if (prev && (prev->scx.flags & SCX_TASK_QUEUED)) + task_refill_slice(prev); +} + +/* + * Scale target CPU frequency based on the performance level selected + * from user-space and the CPU utilization. + */ +static void update_cpuperf_target(struct task_struct *p) +{ + u64 now = bpf_ktime_get_ns(); + s32 cpu = scx_bpf_task_cpu(p); + u64 perf_lvl, delta_runtime, delta_t; + struct cpu_ctx *cctx; + + /* + * For non-interactive tasks determine their cpufreq scaling factor as + * a function of their CPU utilization. + */ + cctx = try_lookup_cpu_ctx(cpu); + if (!cctx) + return; + + /* + * Evaluate dynamic cpuperf scaling factor using the average CPU + * utilization, normalized in the range [0 .. SCX_CPUPERF_ONE]. + */ + delta_t = now - cctx->last_running; + delta_runtime = cctx->tot_runtime - cctx->prev_runtime; + perf_lvl = MIN(delta_runtime * SCX_CPUPERF_ONE / delta_t, SCX_CPUPERF_ONE); + + /* + * Apply the dynamic cpuperf scaling factor. + */ + scx_bpf_cpuperf_set(cpu, perf_lvl); + + cctx->last_running = bpf_ktime_get_ns(); + cctx->prev_runtime = cctx->tot_runtime; +} + +void BPF_STRUCT_OPS(fair_running, struct task_struct *p) +{ + /* + * Refresh task's time slice immediately before it starts to run on its + * assigned CPU. + */ + task_refill_slice(p); + + /* + * Adjust target CPU frequency before the task starts to run. + */ + update_cpuperf_target(p); +} + +/* + * Update task statistics when the task is releasing the CPU (either + * voluntarily or because it expires its assigned time slice). + */ +void BPF_STRUCT_OPS(fair_stopping, struct task_struct *p, bool runnable) +{ + u64 now = bpf_ktime_get_ns(), slice, delta_t; + s32 cpu = scx_bpf_task_cpu(p); + struct cpu_ctx *cctx; + struct task_ctx *tctx; + + cctx = try_lookup_cpu_ctx(cpu); + if (cctx) + cctx->tot_runtime += now - cctx->last_running; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + /* + * If the time slice is not fully depleted, it means that the task + * voluntarily relased the CPU, therefore update the voluntary context + * switch counter. + * + * NOTE: the sched_ext core implements sched_yield() by setting the + * time slice to 0, so we won't boost the priority of tasks that are + * explicitly calling sched_yield(). + * + * This is actually a good thing, because we want to prioritize tasks + * that are releasing the CPU, because they're doing I/O, waiting for + * input or sending output to other tasks. + * + * Tasks that are using sched_yield() don't really need the priority + * boost and when they get the chance to run again they will be + * naturally prioritized by the vruntime-based scheduling policy. + */ + if (p->scx.slice > 0) + tctx->nvcsw++; + + /* + * Evaluate task's used time slice. + */ + slice = CLAMP(p->se.sum_exec_runtime - tctx->sum_exec_runtime, + slice_min, slice_max); + tctx->sum_exec_runtime = p->se.sum_exec_runtime; + slice = scale_inverse_fair(p, slice); + + /* + * Update task's vruntime by adding the used time slice, scaled by its + * priority. + */ + p->scx.dsq_vtime += slice; + + /* + * Update global system vruntime. + */ + vtime_now += slice; + + /* + * Update task's average rate of voluntary context switches per second. + */ + delta_t = (s64)(now - tctx->nvcsw_ts); + if (delta_t > NSEC_PER_SEC) { + /* + * Evaluate the task's latency weight as the task's average + * rate of voluntary context switches per second. + */ + u64 avg_nvcsw = tctx->nvcsw * NSEC_PER_SEC / delta_t; + u64 lat_weight = MIN(avg_nvcsw, MAX_LATENCY_WEIGHT); + + tctx->nvcsw = 0; + tctx->nvcsw_ts = now; + tctx->lat_weight = calc_avg(tctx->lat_weight, lat_weight); + } +} + +void BPF_STRUCT_OPS(fair_enable, struct task_struct *p) +{ + struct task_ctx *tctx; + + p->scx.dsq_vtime = vtime_now; + + tctx = try_lookup_task_ctx(p); + if (!tctx) { + scx_bpf_error("incorrectly initialized task: %d (%s)", + p->pid, p->comm); + return; + } + tctx->sum_exec_runtime = p->se.sum_exec_runtime; + tctx->nvcsw_ts = bpf_ktime_get_ns(); +} + +s32 BPF_STRUCT_OPS(fair_init_task, struct task_struct *p, + struct scx_init_task_args *args) +{ + struct task_ctx *tctx; + struct bpf_cpumask *cpumask; + + tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, + BPF_LOCAL_STORAGE_GET_F_CREATE); + if (!tctx) + return -ENOMEM; + /* + * Create task's LLC cpumask. + */ + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + cpumask = bpf_kptr_xchg(&tctx->llc_mask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + return 0; +} + +static int init_cpumask(struct bpf_cpumask **cpumask) +{ + struct bpf_cpumask *mask; + int err = 0; + + /* + * Do nothing if the mask is already initialized. + */ + mask = *cpumask; + if (mask) + return 0; + /* + * Create the CPU mask. + */ + err = calloc_cpumask(cpumask); + if (!err) + mask = *cpumask; + if (!mask) + err = -ENOMEM; + + return err; +} + +SEC("syscall") +int enable_sibling_cpu(struct domain_arg *input) +{ + struct cpu_ctx *cctx; + struct bpf_cpumask *mask, **pmask; + int err = 0; + + cctx = try_lookup_cpu_ctx(input->cpu_id); + if (!cctx) + return -ENOENT; + + /* Make sure the target CPU mask is initialized */ + pmask = &cctx->llc_mask; + err = init_cpumask(pmask); + if (err) + return err; + + bpf_rcu_read_lock(); + mask = *pmask; + if (mask) + bpf_cpumask_set_cpu(input->sibling_cpu_id, mask); + bpf_rcu_read_unlock(); + + return err; +} + +static s32 get_nr_possible_cpus(void) +{ + const struct cpumask *possible_cpumask; + s32 cnt; + + possible_cpumask = scx_bpf_get_possible_cpumask(); + cnt = bpf_cpumask_weight(possible_cpumask); + scx_bpf_put_cpumask(possible_cpumask); + + return cnt; +} + +s32 BPF_STRUCT_OPS_SLEEPABLE(fair_init) +{ + int err; + + nr_possible_cpus = get_nr_possible_cpus(); + + /* + * Create the shared DSQ. + * + * Allocate the new DSQ id to not clash with any valid CPU id. + */ + err = scx_bpf_create_dsq(SHARED_DSQ, -1); + if (err) { + scx_bpf_error("failed to create shared DSQ: %d", err); + return err; + } + + return 0; +} + +void BPF_STRUCT_OPS(fair_exit, struct scx_exit_info *ei) +{ + UEI_RECORD(uei, ei); +} + +SCX_OPS_DEFINE(fair_ops, + .select_cpu = (void *)fair_select_cpu, + .enqueue = (void *)fair_enqueue, + .dispatch = (void *)fair_dispatch, + .running = (void *)fair_running, + .stopping = (void *)fair_stopping, + .enable = (void *)fair_enable, + .init_task = (void *)fair_init_task, + .init = (void *)fair_init, + .exit = (void *)fair_exit, + .timeout_ms = 5000, + .name = "fair"); diff --git a/scheds/rust/scx_fair/src/bpf_intf.rs b/scheds/rust/scx_fair/src/bpf_intf.rs new file mode 100644 index 000000000..30808ac75 --- /dev/null +++ b/scheds/rust/scx_fair/src/bpf_intf.rs @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(dead_code)] + +include!(concat!(env!("OUT_DIR"), "/bpf_intf.rs")); diff --git a/scheds/rust/scx_fair/src/bpf_skel.rs b/scheds/rust/scx_fair/src/bpf_skel.rs new file mode 100644 index 000000000..9491741eb --- /dev/null +++ b/scheds/rust/scx_fair/src/bpf_skel.rs @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +include!(concat!(env!("OUT_DIR"), "/bpf_skel.rs")); diff --git a/scheds/rust/scx_fair/src/main.rs b/scheds/rust/scx_fair/src/main.rs new file mode 100644 index 000000000..cc94d801e --- /dev/null +++ b/scheds/rust/scx_fair/src/main.rs @@ -0,0 +1,338 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +mod bpf_skel; +pub use bpf_skel::*; +pub mod bpf_intf; +pub use bpf_intf::*; + +mod stats; +use std::collections::HashMap; +use std::ffi::c_int; +use std::fs::File; +use std::io::Read; +use std::mem::MaybeUninit; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use clap::Parser; +use crossbeam::channel::RecvTimeoutError; +use libbpf_rs::skel::OpenSkel; +use libbpf_rs::skel::Skel; +use libbpf_rs::skel::SkelBuilder; +use libbpf_rs::OpenObject; +use libbpf_rs::ProgramInput; +use log::info; +use log::warn; +use scx_stats::prelude::*; +use scx_utils::build_id; +use scx_utils::scx_ops_attach; +use scx_utils::scx_ops_load; +use scx_utils::scx_ops_open; +use scx_utils::set_rlimit_infinity; +use scx_utils::uei_exited; +use scx_utils::uei_report; +use scx_utils::Topology; +use scx_utils::UserExitInfo; +use stats::Metrics; + +const SCHEDULER_NAME: &'static str = "scx_fair"; + +#[derive(Debug, Parser)] +struct Opts { + /// Exit debug dump buffer length. 0 indicates default. + #[clap(long, default_value = "0")] + exit_dump_len: u32, + + /// Maximum scheduling slice duration in microseconds. + #[clap(short = 's', long, default_value = "20000")] + slice_us_max: u64, + + /// Mimimum scheduling slice duration in microseconds. + #[clap(short = 'S', long, default_value = "1000")] + slice_us_min: u64, + + /// Maximum time slice lag in microseconds. + /// + /// Increasing this value can help to enhance the responsiveness of interactive tasks, but it + /// can also make performance more "spikey". + #[clap(short = 'l', long, default_value = "20000")] + slice_us_lag: u64, + + /// Enable kthreads prioritization. + /// + /// Enabling this can improve system performance, but it may also introduce interactivity + /// issues or unfairness in scenarios with high kthread activity, such as heavy I/O or network + /// traffic. + #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)] + local_kthreads: bool, + + /// Enable stats monitoring with the specified interval. + #[clap(long)] + stats: Option, + + /// Run in stats monitoring mode with the specified interval. Scheduler + /// is not launched. + #[clap(long)] + monitor: Option, + + /// Enable verbose output, including libbpf details. + #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)] + verbose: bool, + + /// Print scheduler version and exit. + #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)] + version: bool, + + /// Show descriptions for statistics. + #[clap(long)] + help_stats: bool, +} + +fn is_smt_active() -> std::io::Result { + let mut file = File::open("/sys/devices/system/cpu/smt/active")?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let smt_active: i32 = contents.trim().parse().unwrap_or(0); + + Ok(smt_active) +} + +struct Scheduler<'a> { + skel: BpfSkel<'a>, + struct_ops: Option, + stats_server: StatsServer<(), Metrics>, +} + +impl<'a> Scheduler<'a> { + fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit) -> Result { + set_rlimit_infinity(); + + // Check host topology to determine if we need to enable SMT capabilities. + let smt_enabled = match is_smt_active() { + Ok(value) => value == 1, + Err(e) => bail!("Failed to read SMT status: {}", e), + }; + info!( + "{} {} {}", + SCHEDULER_NAME, + *build_id::SCX_FULL_VERSION, + if smt_enabled { "SMT on" } else { "SMT off" } + ); + + // Initialize BPF connector. + let mut skel_builder = BpfSkelBuilder::default(); + skel_builder.obj_builder.debug(opts.verbose); + let mut skel = scx_ops_open!(skel_builder, open_object, fair_ops)?; + + skel.struct_ops.fair_ops_mut().exit_dump_len = opts.exit_dump_len; + + // Override default BPF scheduling parameters. + skel.maps.rodata_data.slice_max = opts.slice_us_max * 1000; + skel.maps.rodata_data.slice_min = opts.slice_us_min * 1000; + skel.maps.rodata_data.slice_lag = opts.slice_us_lag * 1000; + skel.maps.rodata_data.local_kthreads = opts.local_kthreads; + + skel.maps.rodata_data.smt_enabled = smt_enabled; + + // Load the BPF program for validation. + let mut skel = scx_ops_load!(skel, fair_ops, uei)?; + + // Initialize CPU topology. + let topo = Topology::new().unwrap(); + + // Initialize LLC domain. + Self::init_l3_cache_domains(&mut skel, &topo)?; + + // Attach the scheduler. + let struct_ops = Some(scx_ops_attach!(skel, fair_ops)?); + let stats_server = StatsServer::new(stats::server_data()).launch()?; + + Ok(Self { + skel, + struct_ops, + stats_server, + }) + } + + fn enable_sibling_cpu( + skel: &mut BpfSkel<'_>, + cpu: usize, + sibling_cpu: usize, + ) -> Result<(), u32> { + let prog = &mut skel.progs.enable_sibling_cpu; + let mut args = domain_arg { + cpu_id: cpu as c_int, + sibling_cpu_id: sibling_cpu as c_int, + }; + let input = ProgramInput { + context_in: Some(unsafe { + std::slice::from_raw_parts_mut( + &mut args as *mut _ as *mut u8, + std::mem::size_of_val(&args), + ) + }), + ..Default::default() + }; + let out = prog.test_run(input).unwrap(); + if out.return_value != 0 { + return Err(out.return_value); + } + + Ok(()) + } + + fn init_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + cache_lvl: usize, + enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>, + ) -> Result<(), std::io::Error> { + // Determine the list of CPU IDs associated to each cache node. + let mut cache_id_map: HashMap> = HashMap::new(); + for core in topo.cores().into_iter() { + for (cpu_id, cpu) in core.cpus() { + let cache_id = match cache_lvl { + 2 => cpu.l2_id(), + 3 => cpu.l3_id(), + _ => panic!("invalid cache level {}", cache_lvl), + }; + cache_id_map + .entry(cache_id) + .or_insert_with(Vec::new) + .push(*cpu_id); + } + } + + // Update the BPF cpumasks for the cache domains. + for (cache_id, cpus) in cache_id_map { + info!( + "L{} cache ID {}: sibling CPUs: {:?}", + cache_lvl, cache_id, cpus + ); + for cpu in &cpus { + for sibling_cpu in &cpus { + match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) { + Ok(()) => {} + Err(_) => { + warn!( + "L{} cache ID {}: failed to set CPU {} sibling {}", + cache_lvl, cache_id, *cpu, *sibling_cpu + ); + } + } + } + } + } + + Ok(()) + } + + fn init_l3_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + ) -> Result<(), std::io::Error> { + Self::init_cache_domains(skel, topo, 3, &|skel, _lvl, cpu, sibling_cpu| { + Self::enable_sibling_cpu(skel, cpu, sibling_cpu) + }) + } + + fn get_metrics(&self) -> Metrics { + Metrics { + nr_kthread_dispatches: self.skel.maps.bss_data.nr_kthread_dispatches, + nr_direct_dispatches: self.skel.maps.bss_data.nr_direct_dispatches, + nr_shared_dispatches: self.skel.maps.bss_data.nr_shared_dispatches, + nr_migrate_dispatches: self.skel.maps.bss_data.nr_migrate_dispatches, + } + } + + pub fn exited(&mut self) -> bool { + uei_exited!(&self.skel, uei) + } + + fn run(&mut self, shutdown: Arc) -> Result { + let (res_ch, req_ch) = self.stats_server.channels(); + while !shutdown.load(Ordering::Relaxed) && !self.exited() { + match req_ch.recv_timeout(Duration::from_secs(1)) { + Ok(()) => res_ch.send(self.get_metrics())?, + Err(RecvTimeoutError::Timeout) => {} + Err(e) => Err(e)?, + } + } + + self.struct_ops.take(); + uei_report!(&self.skel, uei) + } +} + +impl<'a> Drop for Scheduler<'a> { + fn drop(&mut self) { + info!("Unregister {} scheduler", SCHEDULER_NAME); + } +} + +fn main() -> Result<()> { + let opts = Opts::parse(); + + if opts.version { + println!("{} {}", SCHEDULER_NAME, *build_id::SCX_FULL_VERSION); + return Ok(()); + } + + if opts.help_stats { + stats::server_data().describe_meta(&mut std::io::stdout(), None)?; + return Ok(()); + } + + let loglevel = simplelog::LevelFilter::Info; + + let mut lcfg = simplelog::ConfigBuilder::new(); + lcfg.set_time_level(simplelog::LevelFilter::Error) + .set_location_level(simplelog::LevelFilter::Off) + .set_target_level(simplelog::LevelFilter::Off) + .set_thread_level(simplelog::LevelFilter::Off); + simplelog::TermLogger::init( + loglevel, + lcfg.build(), + simplelog::TerminalMode::Stderr, + simplelog::ColorChoice::Auto, + )?; + + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = shutdown.clone(); + ctrlc::set_handler(move || { + shutdown_clone.store(true, Ordering::Relaxed); + }) + .context("Error setting Ctrl-C handler")?; + + if let Some(intv) = opts.monitor.or(opts.stats) { + let shutdown_copy = shutdown.clone(); + let jh = std::thread::spawn(move || { + stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap() + }); + if opts.monitor.is_some() { + let _ = jh.join(); + return Ok(()); + } + } + + let mut open_object = MaybeUninit::uninit(); + loop { + let mut sched = Scheduler::init(&opts, &mut open_object)?; + if !sched.run(shutdown.clone())?.should_restart() { + break; + } + } + + Ok(()) +} diff --git a/scheds/rust/scx_fair/src/stats.rs b/scheds/rust/scx_fair/src/stats.rs new file mode 100644 index 000000000..543b14c4b --- /dev/null +++ b/scheds/rust/scx_fair/src/stats.rs @@ -0,0 +1,79 @@ +use std::io::Write; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use scx_stats::prelude::*; +use scx_stats_derive::Stats; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)] +#[stat(top)] +pub struct Metrics { + #[stat(desc = "Number of kthread direct dispatches")] + pub nr_kthread_dispatches: u64, + #[stat(desc = "Number of task direct dispatches")] + pub nr_direct_dispatches: u64, + #[stat(desc = "Number of task global dispatches")] + pub nr_shared_dispatches: u64, + #[stat(desc = "Number of migrated task dispatches")] + pub nr_migrate_dispatches: u64, +} + +impl Metrics { + fn format(&self, w: &mut W) -> Result<()> { + writeln!( + w, + "[{}] dispatch -> kthread: {:<5} direct: {:<5} shared: {:<5} migrated: {:<5}", + crate::SCHEDULER_NAME, + self.nr_kthread_dispatches, + self.nr_direct_dispatches, + self.nr_shared_dispatches, + self.nr_migrate_dispatches + )?; + Ok(()) + } + + fn delta(&self, rhs: &Self) -> Self { + Self { + nr_kthread_dispatches: self.nr_kthread_dispatches - rhs.nr_kthread_dispatches, + nr_direct_dispatches: self.nr_direct_dispatches - rhs.nr_direct_dispatches, + nr_shared_dispatches: self.nr_shared_dispatches - rhs.nr_shared_dispatches, + nr_migrate_dispatches: self.nr_migrate_dispatches - rhs.nr_migrate_dispatches, + ..self.clone() + } + } +} + +pub fn server_data() -> StatsServerData<(), Metrics> { + let open: Box> = Box::new(move |(req_ch, res_ch)| { + req_ch.send(())?; + let mut prev = res_ch.recv()?; + + let read: Box> = Box::new(move |_args, (req_ch, res_ch)| { + req_ch.send(())?; + let cur = res_ch.recv()?; + let delta = cur.delta(&prev); + prev = cur; + delta.to_json() + }); + + Ok(read) + }); + + StatsServerData::new() + .add_meta(Metrics::meta()) + .add_ops("top", StatsOps { open, close: None }) +} + +pub fn monitor(intv: Duration, shutdown: Arc) -> Result<()> { + scx_utils::monitor_stats::( + &vec![], + intv, + || shutdown.load(Ordering::Relaxed), + |metrics| metrics.format(&mut std::io::stdout()), + ) +} diff --git a/services/scx b/services/scx index c8b1b76b8..599d9da62 100644 --- a/services/scx +++ b/services/scx @@ -1,4 +1,4 @@ -# List of scx_schedulers: scx_bpfland scx_central scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland +# List of scx_schedulers: scx_bpfland scx_central scx_fair scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland SCX_SCHEDULER=scx_bpfland # Set custom flags for each scheduler, below is an example of how to use