Skip to content

Commit

Permalink
Spawn Rust threads instead of pthreads (#723)
Browse files Browse the repository at this point in the history
This change spawns threads as Rust threads rather than C pthread
threads. Each thread owns its own `Rav1dTaskContext` and communicates
with the main thread via a shared, reference-counted
`Rav1dTaskContext_task_thread` structure.

Closes #696.
  • Loading branch information
rinon authored Feb 8, 2024
2 parents 338f783 + c15557b commit 3028044
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 112 deletions.
1 change: 1 addition & 0 deletions src/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ macro_rules! impl_ArrayDefault {
impl_ArrayDefault!(u8);
impl_ArrayDefault!(i8);
impl_ArrayDefault!(i16);
impl_ArrayDefault!(i32);

macro_rules! def_align {
($align:literal, $name:ident) => {
Expand Down
12 changes: 7 additions & 5 deletions src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ use crate::src::error::Rav1dResult;
use crate::src::filmgrain::Rav1dFilmGrainDSPContext;
use crate::src::internal::CodedBlockInfo;
use crate::src::internal::Rav1dContext;
use crate::src::internal::Rav1dContextTaskType;
use crate::src::internal::Rav1dFrameContext;
use crate::src::internal::Rav1dFrameContext_bd_fn;
use crate::src::internal::Rav1dTaskContext;
Expand Down Expand Up @@ -4780,9 +4781,10 @@ pub(crate) unsafe fn rav1d_decode_frame_init_cdf(
unsafe fn rav1d_decode_frame_main(c: &Rav1dContext, f: &mut Rav1dFrameContext) -> Rav1dResult {
assert!(c.n_tc == 1);

let t = &mut *c.tc.offset((f as *mut Rav1dFrameContext).offset_from(c.fc));
t.f = f;
t.frame_thread.pass = 0;
let Rav1dContextTaskType::Single(t) = &c.tc[0].task else {
panic!("Expected a single-threaded context");
};
let mut t = t.lock().unwrap();

let frame_hdr = &***f.frame_hdr.as_ref().unwrap();

Expand Down Expand Up @@ -4826,14 +4828,14 @@ unsafe fn rav1d_decode_frame_main(c: &Rav1dContext, f: &mut Rav1dFrameContext) -
}
for tile in &mut ts[..] {
t.ts = tile;
rav1d_decode_tile_sbrow(c, t).map_err(|()| EINVAL)?;
rav1d_decode_tile_sbrow(c, &mut t).map_err(|()| EINVAL)?;
}
if frame_hdr.frame_type.is_inter_or_switch() {
rav1d_refmvs_save_tmvs(&c.refmvs_dsp, &mut t.rt, 0, f.bw >> 1, t.by >> 1, by_end);
}

// loopfilter + cdef + restoration
(f.bd_fn.filter_sbrow)(c, f, t, sby);
(f.bd_fn.filter_sbrow)(c, f, &mut t, sby);
}
}

Expand Down
73 changes: 59 additions & 14 deletions src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ use crate::src::refmvs::refmvs_temporal_block;
use crate::src::refmvs::refmvs_tile;
use crate::src::refmvs::Rav1dRefmvsDSPContext;
use atomig::Atomic;
use libc::pthread_t;
use libc::ptrdiff_t;
use std::ffi::c_int;
use std::ffi::c_uint;
use std::mem;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI32;
Expand All @@ -84,6 +84,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::thread::JoinHandle;

#[repr(C)]
pub(crate) struct Rav1dDSPContext {
Expand Down Expand Up @@ -165,6 +166,9 @@ pub(crate) struct TaskThreadData_delayed_fg {
pub grain: BitDepthUnion<Grain>,
}

// TODO(SJC): Remove when TaskThreadData_delayed_fg is thread-safe
unsafe impl Send for TaskThreadData_delayed_fg {}

#[repr(C)]
pub(crate) struct TaskThreadData {
pub cond: Condvar,
Expand Down Expand Up @@ -204,11 +208,24 @@ pub struct Rav1dContext_intra_edge {
pub tip_sb64: [EdgeTip; 64],
}

pub(crate) enum Rav1dContextTaskType {
/// Worker thread in a multi-threaded context.
Worker(JoinHandle<()>),
/// Main thread in a single-threaded context. There are no worker threads so
/// we need to store a Rav1dTaskContext for work that requires it.
// This Rav1dTaskContext is heap-allocated because we don't want to bloat
// the size of Rav1dContext, especially when it isn't used when we have
// worker threads. This is wrapped in a mutex so we can have inner
// mutability in rav1d_decode_frame_main where we need a mutable reference
// to this task context along with an immutable reference to Rav1dContext.
Single(Mutex<Box<Rav1dTaskContext>>),
}

pub(crate) struct Rav1dContextTaskThread {
/// Thread join handle, if this task is on a worker thread. The main thread
/// task does not contain a handle.
pub handle: Option<pthread_t>,
/// Data shared between the main thread and a worker thread.
/// Type of the task thread, along with either the thread join handle for
/// worker threads or the single-threaded task context.
pub task: Rav1dContextTaskType,
/// Thread specific data shared between the main thread and a worker thread.
pub thread_data: Arc<Rav1dTaskContext_task_thread>,
}

Expand All @@ -223,9 +240,12 @@ pub struct Rav1dContext {
pub(crate) fc: *mut Rav1dFrameContext,
pub(crate) n_fc: c_uint,

pub(crate) tc: *mut Rav1dTaskContext,
pub(crate) tc_shared: Vec<Rav1dContextTaskThread>,
/// Worker thread join handles and communication, or single thread task
/// context if n_tc == 1
pub(crate) tc: Box<[Rav1dContextTaskThread]>,
/// Number of worker threads
pub(crate) n_tc: c_uint,

/// Cache of OBUs that make up a single frame before we submit them
/// to a frame worker to be decoded.
pub(crate) tiles: Vec<Rav1dTileGroup>,
Expand Down Expand Up @@ -281,6 +301,11 @@ pub struct Rav1dContext {
pub(crate) picture_pool: *mut Rav1dMemPool,
}

// TODO(SJC): Remove when Rav1dContext is thread-safe
unsafe impl Send for Rav1dContext {}
// TODO(SJC): Remove when Rav1dContext is thread-safe
unsafe impl Sync for Rav1dContext {}

#[derive(Clone)]
#[repr(C)]
pub struct Rav1dTask {
Expand Down Expand Up @@ -718,11 +743,31 @@ pub(crate) struct Rav1dTaskContext {
pub task_thread: Arc<Rav1dTaskContext_task_thread>,
}

// TODO(SJC): This is a temporary struct to pass a single pointer that holds
// both a Rav1dContext and Rav1dTaskContext to the start routine in
// pthread_create. We need to pass the Rav1dTaskContext into the thread by value
// and remove it from the context structure.
pub(crate) struct Rav1dTaskContext_borrow<'c> {
pub c: &'c Rav1dContext,
pub tc: &'c mut Rav1dTaskContext,
impl Rav1dTaskContext {
pub(crate) unsafe fn new(
f: *mut Rav1dFrameContext,
task_thread: Arc<Rav1dTaskContext_task_thread>,
) -> Self {
Self {
f,
ts: ptr::null_mut(),
bx: 0,
by: 0,
l: mem::zeroed(),
a: ptr::null_mut(),
rt: mem::zeroed(),
cf: Default::default(),
al_pal: Default::default(),
pal_sz_uv: Default::default(),
txtp_map: [0u8; 1024],
scratch: mem::zeroed(),
warpmv: mem::zeroed(),
lf_mask: ptr::null_mut(),
top_pre_cdef_toggle: 0,
cur_sb_cdef_idx_ptr: ptr::null_mut(),
tl_4x4_filter: mem::zeroed(),
frame_thread: Rav1dTaskContext_frame_thread { pass: 0 },
task_thread,
}
}
}
119 changes: 39 additions & 80 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::include::common::bitdepth::BitDepth;
use crate::include::common::bitdepth::BitDepth16;
use crate::include::common::bitdepth::BitDepth8;
use crate::include::common::bitdepth::DynCoef;
Expand All @@ -20,7 +19,6 @@ use crate::include::dav1d::headers::Rav1dFilmGrainData;
use crate::include::dav1d::headers::Rav1dSequenceHeader;
use crate::include::dav1d::picture::Dav1dPicture;
use crate::include::dav1d::picture::Rav1dPicture;
use crate::src::align::Align64;
use crate::src::cdf::rav1d_cdf_thread_unref;
use crate::src::cpu::rav1d_init_cpu;
use crate::src::cpu::rav1d_num_logical_processors;
Expand All @@ -36,10 +34,10 @@ use crate::src::fg_apply;
use crate::src::internal::CodedBlockInfo;
use crate::src::internal::Rav1dContext;
use crate::src::internal::Rav1dContextTaskThread;
use crate::src::internal::Rav1dContextTaskType;
use crate::src::internal::Rav1dFrameContext;
use crate::src::internal::Rav1dTask;
use crate::src::internal::Rav1dTaskContext;
use crate::src::internal::Rav1dTaskContext_borrow;
use crate::src::internal::Rav1dTaskContext_task_thread;
use crate::src::internal::TaskThreadData;
use crate::src::intra_edge::rav1d_init_mode_tree;
Expand Down Expand Up @@ -79,8 +77,6 @@ use libc::pthread_attr_destroy;
use libc::pthread_attr_init;
use libc::pthread_attr_setstacksize;
use libc::pthread_attr_t;
use libc::pthread_join;
use libc::pthread_t;
use std::cmp;
use std::ffi::c_char;
use std::ffi::c_int;
Expand All @@ -100,6 +96,7 @@ use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::Once;
use std::thread;
use to_method::To as _;

#[cfg(target_os = "linux")]
Expand All @@ -108,15 +105,6 @@ use libc::dlsym;
#[cfg(target_os = "linux")]
use libc::sysconf;

extern "C" {
fn pthread_create(
__newthread: *mut pthread_t,
__attr: *const pthread_attr_t,
__start_routine: Option<unsafe extern "C" fn(*mut c_void) -> *mut c_void>,
__arg: *mut c_void,
) -> c_int;
}

#[cold]
fn init_internal() {
rav1d_init_cpu();
Expand Down Expand Up @@ -323,18 +311,6 @@ pub(crate) unsafe fn rav1d_open(c_out: &mut *mut Rav1dContext, s: &Rav1dSettings
0 as c_int,
::core::mem::size_of::<Rav1dFrameContext>().wrapping_mul((*c).n_fc as usize),
);
(*c).tc = rav1d_alloc_aligned(
::core::mem::size_of::<Rav1dTaskContext>().wrapping_mul((*c).n_tc as usize),
64 as c_int as usize,
) as *mut Rav1dTaskContext;
if ((*c).tc).is_null() {
return error(c, c_out, &mut thread_attr);
}
memset(
(*c).tc as *mut c_void,
0 as c_int,
::core::mem::size_of::<Rav1dTaskContext>().wrapping_mul((*c).n_tc as usize),
);
let ttd = TaskThreadData {
cond: Condvar::new(),
first: AtomicU32::new(0),
Expand Down Expand Up @@ -366,41 +342,30 @@ pub(crate) unsafe fn rav1d_open(c_out: &mut *mut Rav1dContext, s: &Rav1dSettings
rav1d_refmvs_init(&mut (*f).rf);
n = n.wrapping_add(1);
}
let mut m: c_uint = 0 as c_int as c_uint;
while m < (*c).n_tc {
let t: *mut Rav1dTaskContext = &mut *((*c).tc).offset(m as isize) as *mut Rav1dTaskContext;
(*t).f = &mut *((*c).fc).offset(0) as *mut Rav1dFrameContext;
let thread_data = Arc::new(Rav1dTaskContext_task_thread::new(Arc::clone(
&(*c).task_thread,
)));
(&mut (*t).task_thread as *mut Arc<Rav1dTaskContext_task_thread>)
.write(Arc::clone(&thread_data));
*BitDepth16::select_mut(&mut (*t).cf) = Align64([0; 32 * 32]);
let handle = if (*c).n_tc > 1 as c_uint {
let thread_args = Box::new(Rav1dTaskContext_borrow {
c: &*c,
tc: &mut *t,
});
let mut thread = 0;
if pthread_create(
&mut thread,
&mut thread_attr,
Some(rav1d_worker_task),
Box::into_raw(thread_args).cast(),
) != 0
{
return error(c, c_out, &mut thread_attr);
(*c).tc = (0..(*c).n_tc)
.map(|_| {
let thread_data = Arc::new(Rav1dTaskContext_task_thread::new(Arc::clone(
&(*c).task_thread,
)));
if (*c).n_tc > 1 {
// TODO(SJC): can be removed when c is not a raw pointer
let context_borrow = &*c;
let thread_data_copy = Arc::clone(&thread_data);
let handle = thread::spawn(|| rav1d_worker_task(context_borrow, thread_data_copy));
Rav1dContextTaskThread {
task: Rav1dContextTaskType::Worker(handle),
thread_data,
}
} else {
Rav1dContextTaskThread {
task: Rav1dContextTaskType::Single(Mutex::new(Box::new(
Rav1dTaskContext::new(&mut *((*c).fc).offset(0), Arc::clone(&thread_data)),
))),
thread_data,
}
}
Some(thread)
} else {
None
};
(*c).tc_shared.push(Rav1dContextTaskThread {
handle,
thread_data,
});
m = m.wrapping_add(1);
}
})
.collect();
rav1d_refmvs_dsp_init(&mut (*c).refmvs_dsp);
(*c).intra_edge.root[BL_128X128 as c_int as usize] =
&mut (*((*c).intra_edge.branch_sb128).as_mut_ptr().offset(0)).node;
Expand Down Expand Up @@ -832,14 +797,10 @@ pub(crate) unsafe fn rav1d_flush(c: *mut Rav1dContext) {
(*c).flush.store(1, Ordering::SeqCst);
if (*c).n_tc > 1 as c_uint {
let mut task_thread_lock = (*c).task_thread.delayed_fg.lock().unwrap();
let mut i_0: c_uint = 0 as c_int as c_uint;
while i_0 < (*c).n_tc {
let tc: *mut Rav1dTaskContext =
&mut *((*c).tc).offset(i_0 as isize) as *mut Rav1dTaskContext;
while !(*tc).task_thread.flushed.load(Ordering::Relaxed) {
task_thread_lock = (*tc).task_thread.cond.wait(task_thread_lock).unwrap();
for tc in (*c).tc.iter() {
while !tc.flushed() {
task_thread_lock = tc.thread_data.cond.wait(task_thread_lock).unwrap();
}
i_0 = i_0.wrapping_add(1);
}
let mut i_1: c_uint = 0 as c_int as c_uint;
while i_1 < (*c).n_fc {
Expand Down Expand Up @@ -927,22 +888,20 @@ impl Drop for Rav1dContext {
// remove all pointers from the structure. We can't make the drop
// function unsafe because the Drop trait requires a safe function.
unsafe {
if !(self.tc).is_null() {
if self.n_tc > 1 {
let ttd: &TaskThreadData = &*self.task_thread;
if self.n_tc > 1 {
let task_thread_lock = ttd.delayed_fg.lock().unwrap();
for tc in &self.tc_shared {
tc.thread_data.die.store(true, Ordering::Relaxed);
}
ttd.cond.notify_all();
drop(task_thread_lock);
for tc in &self.tc_shared {
if let Some(handle) = tc.handle {
pthread_join(handle, 0 as *mut *mut c_void);
}
let task_thread_lock = ttd.delayed_fg.lock().unwrap();
for tc in self.tc.iter() {
tc.thread_data.die.store(true, Ordering::Relaxed);
}
ttd.cond.notify_all();
drop(task_thread_lock);
let tc = mem::take(&mut self.tc);
for task_thread in tc.into_vec() {
if let Rav1dContextTaskType::Worker(handle) = task_thread.task {
handle.join().expect("Could not join task thread");
}
}
rav1d_free_aligned(self.tc as *mut c_void);
}
let mut n_1: c_uint = 0 as c_int as c_uint;
while !(self.fc).is_null() && n_1 < self.n_fc {
Expand Down
Loading

0 comments on commit 3028044

Please sign in to comment.