Skip to content

Commit

Permalink
Merge pull request #339 from berteauxjb/fix/workerpool
Browse files Browse the repository at this point in the history
Fix spurious wake-ups and worker threads initialization in workerpool
  • Loading branch information
christian-rauch committed Jun 26, 2024
2 parents 64654c0 + c16bba6 commit 188c0e0
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions common/workerpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ either expressed or implied, of the Regents of The University of Michigan.
#define __USE_GNU
#include "common/pthreads_cross.h"
#include <assert.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef _WIN32
Expand All @@ -51,6 +52,7 @@ struct workerpool {

pthread_mutex_t mutex;
pthread_cond_t startcond; // used to signal the availability of work
bool start_predicate; // predicate that prevents spurious wakeups on startcond
pthread_cond_t endcond; // used to signal completion of all work

int end_count; // how many threads are done?
Expand All @@ -70,7 +72,7 @@ void *worker_thread(void *p)
struct task *task;

pthread_mutex_lock(&wp->mutex);
while (wp->taskspos == zarray_size(wp->tasks)) {
while (wp->taskspos == zarray_size(wp->tasks) || !wp->start_predicate) {
wp->end_count++;
pthread_cond_broadcast(&wp->endcond);
pthread_cond_wait(&wp->startcond, &wp->mutex);
Expand Down Expand Up @@ -98,6 +100,7 @@ workerpool_t *workerpool_create(int nthreads)
workerpool_t *wp = calloc(1, sizeof(workerpool_t));
wp->nthreads = nthreads;
wp->tasks = zarray_create(sizeof(struct task));
wp->start_predicate = false;

if (nthreads > 1) {
wp->threads = calloc(wp->nthreads, sizeof(pthread_t));
Expand All @@ -114,6 +117,13 @@ workerpool_t *workerpool_create(int nthreads)
return NULL;
}
}

// Wait for the worker threads to be ready
pthread_mutex_lock(&wp->mutex);
while (wp->end_count < wp->nthreads) {
pthread_cond_wait(&wp->endcond, &wp->mutex);
}
pthread_mutex_unlock(&wp->mutex);
}

return wp;
Expand All @@ -130,6 +140,7 @@ void workerpool_destroy(workerpool_t *wp)
workerpool_add_task(wp, NULL, NULL);

pthread_mutex_lock(&wp->mutex);
wp->start_predicate = true;
pthread_cond_broadcast(&wp->startcond);
pthread_mutex_unlock(&wp->mutex);

Expand Down Expand Up @@ -157,7 +168,13 @@ void workerpool_add_task(workerpool_t *wp, void (*f)(void *p), void *p)
t.f = f;
t.p = p;

zarray_add(wp->tasks, &t);
if (wp->nthreads > 1) {
pthread_mutex_lock(&wp->mutex);
zarray_add(wp->tasks, &t);
pthread_mutex_unlock(&wp->mutex);
} else {
zarray_add(wp->tasks, &t);
}
}

void workerpool_run_single(workerpool_t *wp)
Expand All @@ -175,19 +192,19 @@ void workerpool_run_single(workerpool_t *wp)
void workerpool_run(workerpool_t *wp)
{
if (wp->nthreads > 1) {
wp->end_count = 0;

pthread_mutex_lock(&wp->mutex);
wp->end_count = 0;
wp->start_predicate = true;
pthread_cond_broadcast(&wp->startcond);

while (wp->end_count < wp->nthreads) {
// printf("caught %d\n", wp->end_count);
pthread_cond_wait(&wp->endcond, &wp->mutex);
}

pthread_mutex_unlock(&wp->mutex);

wp->taskspos = 0;
wp->start_predicate = false;
pthread_mutex_unlock(&wp->mutex);

zarray_clear(wp->tasks);

Expand Down

0 comments on commit 188c0e0

Please sign in to comment.