Skip to content

Commit

Permalink
Add ScopedJobThreadPtr class to be used with JobThread
Browse files Browse the repository at this point in the history
b/372515171
  • Loading branch information
alexanderbobrovnik committed Oct 23, 2024
1 parent a337e43 commit 5a37123
Show file tree
Hide file tree
Showing 20 changed files with 91 additions and 64 deletions.
2 changes: 1 addition & 1 deletion starboard/android/shared/audio_renderer_passthrough.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class AudioRendererPassthrough
// ensure the thread completes all tasks before |audio_track_bridge_| is
// invalidated.
std::unique_ptr<AudioTrackBridge> audio_track_bridge_;
std::unique_ptr<JobThread> audio_track_thread_;
starboard::shared::starboard::player::ScopedJobThreadPtr audio_track_thread_;
};

} // namespace shared
Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libaom/aom_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libaom/aom_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool error_occurred_;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
SbPlayerOutputMode output_mode_;
Expand Down
5 changes: 2 additions & 3 deletions starboard/shared/libdav1d/dav1d_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libdav1d/dav1d_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool stream_ended_ = false;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
const SbPlayerOutputMode output_mode_;
Expand Down
5 changes: 2 additions & 3 deletions starboard/shared/libde265/de265_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libde265/de265_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool error_occurred_ = false;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
SbPlayerOutputMode output_mode_;
Expand Down
5 changes: 2 additions & 3 deletions starboard/shared/libvpx/vpx_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libvpx/vpx_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool error_occurred_;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
SbPlayerOutputMode output_mode_;
Expand Down
4 changes: 2 additions & 2 deletions starboard/shared/openh264/openh264_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));
// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/openh264/openh264_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
Mutex decode_target_mutex_;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Openh264 decode handler.
ISVCDecoder* decoder_ = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StubAudioDecoder : public AudioDecoder, private JobQueue::JobOwner {
OutputCB output_cb_;
ErrorCB error_cb_;

std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;
Mutex decoded_audios_mutex_;
std::queue<scoped_refptr<DecodedAudio> > decoded_audios_;
scoped_refptr<InputBuffer> last_input_buffer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class StubVideoDecoder : public VideoDecoder, private JobQueue::JobOwner {
DecoderStatusCB decoder_status_cb_;
media::VideoStreamInfo video_stream_info_;

std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;
// std::set<> keeps frame timestamps sorted in ascending order.
std::set<int64_t> output_frame_timestamps_;
// Used to determine when to send kBufferFull in DecodeOneBuffer().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const int kJobThreadStackSize = 0;
std::unique_ptr<VideoDmpReader> s_video_dmp_reader;
std::unique_ptr<PlayerComponents> s_player_components;
int s_audio_sample_index;
std::unique_ptr<JobThread> s_job_thread;
starboard::shared::starboard::player::ScopedJobThreadPtr s_job_thread;
int64_t s_duration;

static void DeallocateSampleFunc(SbPlayer player,
Expand Down
34 changes: 32 additions & 2 deletions starboard/shared/starboard/player/job_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class JobThread {
explicit JobThread(const char* thread_name,
int64_t stack_size = 0,
SbThreadPriority priority = kSbThreadPriorityNormal);
~JobThread();

JobQueue* job_queue() { return job_queue_.get(); }
const JobQueue* job_queue() const { return job_queue_.get(); }
Expand Down Expand Up @@ -83,13 +82,44 @@ class JobThread {

return job_queue_->RemoveJobByToken(job_token);
}

private:
~JobThread();
static void* ThreadEntryPoint(void* context);
void RunLoop();

pthread_t thread_;
std::unique_ptr<JobQueue> job_queue_;

friend class ScopedJobThreadPtr;
};

// The ScopedJobThreadPtr class guarantees that the pointer to JobThread object
// is valid during JobThread destructor. This prevents issues of accessing nullified
// JobThread pointer, as per b/372515171
class ScopedJobThreadPtr {
public:
explicit ScopedJobThreadPtr(JobThread* p = nullptr): job_thread_(p) {
}

~ScopedJobThreadPtr() {
delete job_thread_;
}

void reset(JobThread* p = nullptr) {
delete job_thread_;
job_thread_ = p;
}

JobThread* operator->() const {
return job_thread_;
}

explicit operator bool() const {
return job_thread_ != nullptr;
}

private:
JobThread* job_thread_;
};

} // namespace player
Expand Down
64 changes: 32 additions & 32 deletions starboard/shared/starboard/player/job_thread_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,35 @@ using ::testing::ElementsAre;
// Require at least millisecond-level precision.
constexpr int64_t kPrecisionUsec = 1000;

void ExecutePendingJobs(JobThread* job_thread) {
void ExecutePendingJobs(ScopedJobThreadPtr& job_thread) {
job_thread->ScheduleAndWait([]() {});
}

TEST(JobThreadTest, ScheduledJobsAreExecutedInOrder) {
std::vector<int> values;
JobThread job_thread{"JobThreadTests"};
job_thread.Schedule([&]() { values.push_back(1); });
job_thread.Schedule([&]() { values.push_back(2); });
job_thread.Schedule([&]() { values.push_back(3); });
job_thread.Schedule([&]() { values.push_back(4); }, 1 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(5); }, 1 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(6); }, 1 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(7); }, 2 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(8); }, 3 * kPrecisionUsec);
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
job_thread->Schedule([&]() { values.push_back(1); });
job_thread->Schedule([&]() { values.push_back(2); });
job_thread->Schedule([&]() { values.push_back(3); });
job_thread->Schedule([&]() { values.push_back(4); }, 1 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(5); }, 1 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(6); }, 1 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(7); }, 2 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(8); }, 3 * kPrecisionUsec);

// Sleep past the last scheduled job.
usleep(4 * kPrecisionUsec);

ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);

EXPECT_THAT(values, ElementsAre(1, 2, 3, 4, 5, 6, 7, 8));
}

TEST(JobThreadTest, ScheduleAndWaitWaits) {
int64_t start = CurrentMonotonicTime();
std::atomic_bool job_1 = {false};
JobThread job_thread{"JobThreadTests"};
job_thread.ScheduleAndWait([&]() {
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
job_thread->ScheduleAndWait([&]() {
usleep(1 * kPrecisionUsec);
job_1 = true;
});
Expand All @@ -77,16 +77,16 @@ TEST(JobThreadTest, ScheduleAndWaitWaits) {
TEST(JobThreadTest, ScheduledJobsShouldNotExecuteAfterGoingOutOfScope) {
std::atomic_int counter = {0};
{
JobThread job_thread{"JobThreadTests"};
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
std::function<void()> job = [&]() {
counter++;
job_thread.Schedule(job, 2 * kPrecisionUsec);
job_thread->Schedule(job, 2 * kPrecisionUsec);
};
job_thread.Schedule(job);
job_thread->Schedule(job);

// Wait for the job to run at least once and reschedule itself.
usleep(1 * kPrecisionUsec);
ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);
}
int end_value = counter;
EXPECT_GE(counter, 1);
Expand All @@ -100,31 +100,31 @@ TEST(JobThreadTest, CanceledJobsAreCanceled) {
std::atomic_int counter_1 = {0}, counter_2 = {0};
JobQueue::JobToken job_token_1, job_token_2;

JobThread job_thread{"JobThreadTests"};
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
std::function<void()> job_1 = [&]() {
counter_1++;
job_token_1 = job_thread.Schedule(job_1);
job_token_1 = job_thread->Schedule(job_1);
};
std::function<void()> job_2 = [&]() {
counter_2++;
job_token_2 = job_thread.Schedule(job_2);
job_token_2 = job_thread->Schedule(job_2);
};

job_token_1 = job_thread.Schedule(job_1);
job_token_2 = job_thread.Schedule(job_2);
job_token_1 = job_thread->Schedule(job_1);
job_token_2 = job_thread->Schedule(job_2);

// Wait for the scheduled jobs to at least run once.
ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);

// Cancel job 1 and grab the current counter values.
job_thread.ScheduleAndWait(
[&]() { job_thread.RemoveJobByToken(job_token_1); });
job_thread->ScheduleAndWait(
[&]() { job_thread->RemoveJobByToken(job_token_1); });
int checkpoint_1 = counter_1;
int checkpoint_2 = counter_2;

// Sleep and wait for pending jobs to run.
usleep(1 * kPrecisionUsec);
ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);

// Job 1 should not have run again.
EXPECT_EQ(counter_1, checkpoint_1);
Expand All @@ -133,20 +133,20 @@ TEST(JobThreadTest, CanceledJobsAreCanceled) {
EXPECT_GT(counter_2, checkpoint_2);

// Cancel job 2 to avoid it scheduling itself during destruction.
job_thread.ScheduleAndWait(
[&]() { job_thread.RemoveJobByToken(job_token_2); });
job_thread->ScheduleAndWait(
[&]() { job_thread->RemoveJobByToken(job_token_2); });
}

TEST(JobThreadTest, QueueBelongsToCorrectThread) {
JobThread job_thread{"JobThreadTests"};
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
JobQueue job_queue;

bool belongs_to_job_thread = false;
bool belongs_to_main_thread = false;

// Schedule in JobQueue owned by job thread.
job_thread.ScheduleAndWait([&]() {
belongs_to_job_thread = job_thread.BelongsToCurrentThread();
job_thread->ScheduleAndWait([&]() {
belongs_to_job_thread = job_thread->BelongsToCurrentThread();
belongs_to_main_thread = job_queue.BelongsToCurrentThread();
});

Expand All @@ -157,7 +157,7 @@ TEST(JobThreadTest, QueueBelongsToCorrectThread) {

// Schedule in JobQueue owned by main thread.
job_queue.Schedule([&]() {
belongs_to_job_thread = job_thread.BelongsToCurrentThread();
belongs_to_job_thread = job_thread->BelongsToCurrentThread();
belongs_to_main_thread = job_queue.BelongsToCurrentThread();
});

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/uwp/wasapi_audio_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class WASAPIAudioSink {
Mutex output_frames_mutex_;
std::queue<scoped_refptr<DecodedAudio>> pending_decoded_audios_;

std::unique_ptr<JobThread> job_thread_;
starboard::player::ScopedJobThreadPtr job_thread_;

starboard::ThreadChecker thread_checker_;
};
Expand Down
Loading

0 comments on commit 5a37123

Please sign in to comment.