Skip to content

Commit

Permalink
scx_rlfifo: better documentation and code readability
Browse files Browse the repository at this point in the history
Simplify scx_rlfifo code, add detailed documentation of the
scx_rustland_core API and get rid of the additional task queue, since it
just makes the code bigger, slower and it doesn't really provide any
benefit (considering that we are dispatching the tasks in FIFO order
anyway).

Signed-off-by: Andrea Righi <[email protected]>
  • Loading branch information
arighi committed Sep 6, 2024
1 parent 168fb78 commit 8231f85
Showing 1 changed file with 87 additions and 69 deletions.
156 changes: 87 additions & 69 deletions scheds/rust/scx_rlfifo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,66 @@

// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.

//! # FIFO Linux kernel scheduler that runs in user-space
//!
//! ## Overview
//!
//! This is a fully functional FIFO scheduler for the Linux kernel that operates in user-space and
//! it is 100% implemented in Rust.
//!
//! The scheduler is designed to serve as a simple template for developers looking to implement
//! more advanced scheduling policies.
//!
//! It is based on `scx_rustland_core`, a framework that is specifically designed to simplify the
//! creation of user-space schedulers, leveraging the Linux kernel's `sched_ext` feature (a
//! technology that allows to implement schedulers in BPF).
//!
//! The `scx_rustland_core` crate offers an abstraction layer over `sched_ext`, enabling developers
//! to write schedulers in Rust without needing to interact directly with low-level kernel or BPF
//! internal details.
//!
//! ## scx_rustland_core API
//!
//! ### struct `BpfScheduler`
//!
//! The `BpfScheduler` struct is the core interface for interacting with `sched_ext` via BPF.
//!
//! - **Initialization**:
//! - `BpfScheduler::init()` registers the scheduler and initializes the BPF component.
//!
//! - **Task Management**:
//! - `dequeue_task()`: Consume a task that wants to run, returns a QueuedTask object
//! - `select_cpu(pid: i32, prev_cpu: i32, flags: u64)`: Select an idle CPU for a task
//! - `dispatch_task(task: &DispatchedTask)`: Dispatch a task
//!
//! - **Completion Notification**:
//! - `notify_complete(nr_pending: u64)` Give control to the BPF component and report the number
//! of tasks that are still pending (this function can sleep)
//!
//! Each task received from dequeue_task() contains the following:
//!
//! struct QueuedTask {
//! pub pid: i32, // pid that uniquely identifies a task
//! pub cpu: i32, // CPU previously used by the task
//! pub flags: u64, // task's enqueue flags
//! pub sum_exec_runtime: u64, // Total cpu time in nanoseconds
//! pub weight: u64, // Task priority in the range [1..10000] (default is 100)
//! }
//!
//! Each task dispatched using dispatch_task() contains the following:
//!
//! struct DispatchedTask {
//! pub pid: i32, // pid that uniquely identifies a task
//! pub cpu: i32, // target CPU selected by the scheduler
//! pub flags: u64, // special dispatch flags:
//! // - RL_CPU_ANY = dispatch on the first CPU available
//! pub slice_ns: u64, // time slice in nanoseconds assigned to the task
//! // (0 = use default)
//! pub vtime: u64, // this value can be used to send the task's vruntime or deadline
//! // directly to the underlying BPF dispatcher
//! }

mod bpf_skel;
pub use bpf_skel::*;
pub mod bpf_intf;
Expand All @@ -13,17 +73,16 @@ use scx_utils::UserExitInfo;

use libbpf_rs::OpenObject;

use std::collections::VecDeque;
use std::mem::MaybeUninit;
use std::time::SystemTime;

use anyhow::Result;

const SLICE_US: u64 = 5000;
// Maximum time slice (in nanoseconds) that a task can use before it is re-enqueued.
const SLICE_NS: u64 = 5_000_000;

struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
task_queue: VecDeque<QueuedTask>,
bpf: BpfScheduler<'a>, // Connector to the sched_ext BPF backend
}

impl<'a> Scheduler<'a> {
Expand All @@ -34,86 +93,45 @@ impl<'a> Scheduler<'a> {
false, // partial (false = include all tasks)
false, // debug (false = debug mode off)
)?;
Ok(Self {
bpf,
task_queue: VecDeque::new(),
})
Ok(Self { bpf })
}

fn consume_all_tasks(&mut self) {
// Consume all tasks that are ready to run.
//
// Each task contains the following details:
//
// pub struct QueuedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // CPU where the task is running
// pub sum_exec_runtime: u64, // Total cpu time
// pub weight: u64, // Task static priority
// }
//
// Although the FIFO scheduler doesn't use these fields, they can provide valuable data for
// implementing more sophisticated scheduling policies.
while let Ok(Some(task)) = self.bpf.dequeue_task() {
self.task_queue.push_back(task);
}
}
fn dispatch_tasks(&mut self) {
// Get the amount of tasks that are waiting to be scheduled.
let nr_waiting = *self.bpf.nr_queued_mut();

fn dispatch_next_task(&mut self) {
if let Some(task) = self.task_queue.pop_front() {
// Create a new task to be dispatched, derived from the received enqueued task.
//
// pub struct DispatchedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // target CPU selected by the scheduler
// pub flags: u64, // special dispatch flags
// pub slice_ns: u64, // time slice assigned to the task (0 = default)
// }
//
// The dispatched task's information are pre-populated from the QueuedTask and they can
// be modified before dispatching it via self.bpf.dispatch_task().
// Start consuming and dispatching tasks, until all the CPUs are busy or there are no more
// tasks to be dispatched.
while let Ok(Some(task)) = self.bpf.dequeue_task() {
// Create a new task to be dispatched from the received enqueued task.
let mut dispatched_task = DispatchedTask::new(&task);

// Decide where the task needs to run (target CPU).
// Decide where the task needs to run (pick a target CPU).
//
// A call to select_cpu() will return the most suitable idle CPU for the task,
// considering its previously used CPU.
// prioritizing its previously used CPU (task.cpu).
//
// If we can't find any idle CPU, keep the task running on the same CPU.
let cpu = self.bpf.select_cpu(task.pid, task.cpu, task.flags);
if cpu >= 0 {
dispatched_task.cpu = cpu;
} else {
dispatched_task.flags |= RL_CPU_ANY;
}
dispatched_task.cpu = if cpu < 0 { task.cpu } else { cpu };

// Decide for how long the task needs to run (time slice); if not specified
// SCX_SLICE_DFL will be used by default.
dispatched_task.slice_ns = SLICE_US;
// Determine the task's time slice: assign value inversely proportional to the number
// of tasks waiting to be scheduled.
dispatched_task.slice_ns = SLICE_NS / (nr_waiting + 1);

// Dispatch the task on the target CPU.
// Dispatch the task.
self.bpf.dispatch_task(&dispatched_task).unwrap();

// Notify the BPF component of the number of pending tasks and immediately give a
// chance to run to the dispatched task.
self.bpf.notify_complete(self.task_queue.len() as u64);
}
}

fn dispatch_tasks(&mut self) {
loop {
// Consume all tasks before dispatching any.
self.consume_all_tasks();

// Dispatch one task from the queue.
self.dispatch_next_task();

// If no task is ready to run (or in case of error), stop dispatching tasks and notify
// the BPF component that all tasks have been scheduled / dispatched, with no remaining
// pending tasks.
if self.task_queue.is_empty() {
self.bpf.notify_complete(0);
// Stop dispatching if all the CPUs are busy (select_cpu() couldn't find an idle CPU).
if cpu < 0 {
break;
}
}

// Notify the BPF component that tasks have been dispatched.
//
// This function will put the scheduler to sleep, until another task needs to run.
self.bpf.notify_complete(0);
}

fn print_stats(&mut self) {
Expand Down

0 comments on commit 8231f85

Please sign in to comment.