Skip to content

Commit

Permalink
struct TaskThreadData: Replace mutex
Browse files Browse the repository at this point in the history
Replaces the locking in `TaskThreadData` with Rust equivalents. `TaskThreadData` can now be accessed as read-only with interior mutability as needed.
  • Loading branch information
rinon committed Feb 1, 2024
1 parent 5cd45cc commit 2a45df6
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 233 deletions.
48 changes: 21 additions & 27 deletions src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,6 @@ use crate::src::warpmv::rav1d_get_shear_params;
use crate::src::warpmv::rav1d_set_affine_mv2d;
use libc::free;
use libc::malloc;
use libc::pthread_cond_signal;
use libc::pthread_cond_wait;
use libc::pthread_mutex_lock;
use libc::pthread_mutex_unlock;
use libc::ptrdiff_t;
use libc::uintptr_t;
use std::array;
Expand All @@ -237,6 +233,7 @@ use std::ptr;
use std::ptr::addr_of_mut;
use std::slice;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;

#[cfg(feature = "bitdepth_8")]
Expand Down Expand Up @@ -4902,19 +4899,19 @@ pub(crate) unsafe fn rav1d_decode_frame(f: &mut Rav1dFrameContext) -> Rav1dResul
if res.is_ok() {
if (*f.c).n_tc > 1 {
res = rav1d_task_create_tile_sbrow(f, 0, 1);
pthread_mutex_lock(&mut (*f.task_thread.ttd).lock);
pthread_cond_signal(&mut (*f.task_thread.ttd).cond);
let mut task_thread_lock = (*f.task_thread.ttd).delayed_fg.lock().unwrap();
(*f.task_thread.ttd).cond.notify_one();
if res.is_ok() {
while f.task_thread.done[0].load(Ordering::Relaxed) == 0
// TODO(kkysen) Make `.task_counter` an `AtomicI32`, but that requires recursively removing `impl Copy`s.
|| (*(addr_of_mut!(f.task_thread.task_counter) as *mut AtomicI32))
.load(Ordering::SeqCst)
> 0
{
pthread_cond_wait(&mut f.task_thread.cond, &mut (*f.task_thread.ttd).lock);
task_thread_lock = f.task_thread.cond.wait(task_thread_lock).unwrap();
}
}
pthread_mutex_unlock(&mut (*f.task_thread.ttd).lock);
drop(task_thread_lock);
res = f.task_thread.retval;
} else {
res = rav1d_decode_frame_main(f);
Expand All @@ -4940,8 +4937,8 @@ fn get_upscale_x0(in_w: c_int, out_w: c_int, step: c_int) -> c_int {

pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
// wait for c->out_delayed[next] and move into c->out if visible
let (f, out) = if c.n_fc > 1 {
pthread_mutex_lock(&mut c.task_thread.lock);
let (f, out, _task_thread_lock) = if c.n_fc > 1 {
let mut task_thread_lock = c.task_thread.delayed_fg.lock().unwrap();
let next = c.frame_thread.next;
c.frame_thread.next += 1;
if c.frame_thread.next == c.n_fc {
Expand All @@ -4950,7 +4947,7 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {

let f = &mut *c.fc.offset(next as isize);
while !f.tiles.is_empty() {
pthread_cond_wait(&mut f.task_thread.cond, &mut c.task_thread.lock);
task_thread_lock = f.task_thread.cond.wait(task_thread_lock).unwrap();
}
let out_delayed = &mut *c.frame_thread.out_delayed.offset(next as isize);
if !out_delayed.p.data.data[0].is_null() || f.task_thread.error.load(Ordering::SeqCst) != 0
Expand All @@ -4967,8 +4964,10 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
Ordering::SeqCst,
Ordering::SeqCst,
);
if c.task_thread.cur != 0 && c.task_thread.cur < c.n_fc {
c.task_thread.cur -= 1;
// `cur` is not actually mutated from multiple threads concurrently
let cur = c.task_thread.cur.load(Ordering::Relaxed);
if cur != 0 && cur < c.n_fc {
c.task_thread.cur = AtomicU32::new(cur - 1);
}
}
let error = f.task_thread.retval;
Expand All @@ -4985,9 +4984,9 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
}
rav1d_thread_picture_unref(out_delayed);
}
(f, out_delayed as *mut _)
(f, out_delayed as *mut _, Some(task_thread_lock))
} else {
(&mut *c.fc, &mut c.out as *mut _)
(&mut *c.fc, &mut c.out as *mut _, None)
};

f.seq_hdr = c.seq_hdr.clone();
Expand All @@ -4997,11 +4996,7 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {

let bpc = 8 + 2 * seq_hdr.hbd;

unsafe fn on_error(
f: &mut Rav1dFrameContext,
c: &mut Rav1dContext,
out: *mut Rav1dThreadPicture,
) {
unsafe fn on_error(f: &mut Rav1dFrameContext, c: &Rav1dContext, out: *mut Rav1dThreadPicture) {
f.task_thread.error = AtomicI32::new(1);
rav1d_cdf_thread_unref(&mut f.in_cdf);
if f.frame_hdr.as_ref().unwrap().refresh_context != 0 {
Expand All @@ -5019,13 +5014,9 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
rav1d_ref_dec(&mut f.mvs_ref);
let _ = mem::take(&mut f.seq_hdr);
let _ = mem::take(&mut f.frame_hdr);
*c.cached_error_props.get_mut().unwrap() = c.in_0.m.clone();
*c.cached_error_props.lock().unwrap() = c.in_0.m.clone();

f.tiles.clear();

if c.n_fc > 1 {
pthread_mutex_unlock(&mut c.task_thread.lock);
}
}

// TODO(kkysen) Rather than lazy initializing this,
Expand Down Expand Up @@ -5149,7 +5140,11 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
mem::swap(&mut f.tiles, &mut c.tiles);

// allocate frame
let res = rav1d_thread_picture_alloc(c, f, bpc);

// We must take itut_t35 out of the context before the call so borrowck can
// see we mutably borrow `c.itut_t35` disjointly from the task thread lock.
let itut_t35 = c.itut_t35.take();
let res = rav1d_thread_picture_alloc(c, f, bpc, itut_t35);
if res.is_err() {
on_error(f, c, out);
return res;
Expand Down Expand Up @@ -5363,7 +5358,6 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
}
} else {
rav1d_task_frame_init(f);
pthread_mutex_unlock(&mut c.task_thread.lock);
}

Ok(())
Expand Down
21 changes: 13 additions & 8 deletions src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use crate::src::refmvs::refmvs_tile;
use crate::src::refmvs::Rav1dRefmvsDSPContext;
use crate::src::thread_data::thread_data;
use atomig::Atomic;
use libc::pthread_cond_t;
use libc::pthread_mutex_t;
use libc::ptrdiff_t;
use std::ffi::c_int;
Expand All @@ -82,6 +81,7 @@ use std::ptr;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;

#[repr(C)]
Expand Down Expand Up @@ -158,27 +158,32 @@ impl BitDepthDependentType for Grain {
#[repr(C)]
pub(crate) struct TaskThreadData_delayed_fg {
pub exec: c_int,
pub cond: pthread_cond_t,
pub in_0: *const Rav1dPicture,
pub out: *mut Rav1dPicture,
pub type_0: TaskType,
pub progress: [AtomicI32; 2], /* [0]=started, [1]=completed */
pub grain: BitDepthUnion<Grain>,
}

#[repr(C)]
pub(crate) struct TaskThreadData {
pub lock: pthread_mutex_t,
pub cond: pthread_cond_t,
pub cond: Condvar,
pub first: AtomicU32,
pub cur: c_uint,
pub cur: AtomicU32,
/// This is used for delayed reset of the task cur pointer when
/// such operation is needed but the thread doesn't enter a critical
/// section (typically when executing the next sbrow task locklessly).
/// See [`crate::src::thread_task::reset_task_cur`].
pub reset_task_cur: AtomicU32,
pub cond_signaled: AtomicI32,
pub delayed_fg: TaskThreadData_delayed_fg,
pub delayed_fg_progress: [AtomicI32; 2], /* [0]=started, [1]=completed */
pub delayed_fg_cond: Condvar,
/// This lock has a dual purpose - protecting the delayed_fg structure, as
/// well as synchronizing tasks across threads. Many cases do not use the
/// inner data when holding the lock but instead use it to sequence
/// operations. Rather than disentagle these related uses in the original C
/// code, we have kept a single mutex and put the delayed_fg structure into
/// it.
pub delayed_fg: Mutex<TaskThreadData_delayed_fg>,
pub inited: c_int,
}

Expand Down Expand Up @@ -426,7 +431,7 @@ impl Default for Rav1dFrameContext_task_thread_pending_tasks {
#[repr(C)]
pub(crate) struct Rav1dFrameContext_task_thread {
pub lock: pthread_mutex_t,
pub cond: pthread_cond_t,
pub cond: Condvar,
pub ttd: *mut TaskThreadData,
pub tasks: *mut Rav1dTask,
pub tile_tasks: [*mut Rav1dTask; 2],
Expand Down
Loading

0 comments on commit 2a45df6

Please sign in to comment.