Skip to content

Commit

Permalink
Replace pthreads by C++ thread/mutex/condition (#363)
Browse files Browse the repository at this point in the history
Also Closes #226
  • Loading branch information
dweindl committed Dec 11, 2023
1 parent c58f8fb commit d2a18ec
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 106 deletions.
12 changes: 2 additions & 10 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ include(BuildType) # Ensure CMAKE_BUILD_TYPE is always set
include(CTest)

set(CMAKE_DEBUG_POSTFIX "-dbg")
# -D_GNU_SOURCE for pthread recursive mutex
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} \
-std=c99 -Wall -Wno-unused-function -D_GNU_SOURCE")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} \
-Wall -Wno-unused-function -D_GNU_SOURCE")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99 -Wall -Wno-unused-function")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-unused-function")

if(BUILD_PYTHON_MODULE)
# Build PIC code to be used for swig/python module
Expand Down Expand Up @@ -86,11 +83,6 @@ if(${PARPE_ENABLE_DLIB})
CACHE PATH "DLIB base directory")
endif(${PARPE_ENABLE_DLIB})

# PThreads
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)


# HDF5
#set(HDF5_PREFER_PARALLEL TRUE)
find_package(HDF5 COMPONENTS CXX C HL REQUIRED)
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ For full functionality, parPE requires the following libraries:
* CMAKE (>=3.10)
* MPI ([OpenMPI](https://www.open-mpi.org/),
[MPICH](https://www.mpich.org/), ...)
* PTHREADS
* IPOPT (>= 1.2.7) (requires coinhsl)
* CERES (>=1.13)
([requires Eigen](http://ceres-solver.org/installation.html#dependencies))
Expand Down
1 change: 0 additions & 1 deletion examples/parpeamici/steadystate/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ target_link_libraries(${PROJECT_NAME}
Upstream::amici
parpeoptimization
parpeloadbalancer
${CMAKE_THREAD_LIBS_INIT}
)
# /example_steadystate executable

Expand Down
1 change: 0 additions & 1 deletion examples/parpeloadbalancer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ set(SRC_LIST_EXE
add_executable(${PROJECT_NAME} ${SRC_LIST_EXE})

target_link_libraries(${PROJECT_NAME}
${CMAKE_THREAD_LIBS_INIT}
parpeloadbalancer
)

Expand Down
15 changes: 7 additions & 8 deletions examples/parpeloadbalancer/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <cstdlib>
#include <cstdio>
#include <unistd.h>
#include <mutex>
#include <condition_variable>

#include <mpi.h>

Expand All @@ -29,8 +31,8 @@ int master() {
parpe::JobData jobdata[numJobs];

// mutex to wait for simulations to finish
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
std::condition_variable cond;
std::mutex mutex;

for (int i = 0; i < numJobs; ++i) {
parpe::JobData *job = &jobdata[i];
Expand All @@ -43,12 +45,9 @@ int master() {
}

// wait for simulations to finish
pthread_mutex_lock(&mutex);
while (numJobsFinished < numJobs)
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
std::unique_lock lock(mutex);
cond.wait(lock, [&numJobsFinished, &numJobs]{
return numJobsFinished == numJobs;});

// check results
int errors = 0;
Expand Down
6 changes: 4 additions & 2 deletions include/parpeamici/amiciSimulationRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>

#include <boost/serialization/array.hpp>
#include <boost/serialization/map.hpp>
Expand Down Expand Up @@ -122,8 +124,8 @@ class AmiciSimulationRunner
void queueSimulation(LoadBalancerMaster* loadBalancer,
JobData* d,
int* jobDone,
pthread_cond_t* jobDoneChangedCondition,
pthread_mutex_t* jobDoneChangedMutex,
std::condition_variable* jobDoneChangedCondition,
std::mutex* jobDoneChangedMutex,
int jobIdx,
const std::vector<double>& optimizationParameters,
amici::SensitivityOrder sensitivityOrder,
Expand Down
30 changes: 15 additions & 15 deletions include/parpeloadbalancer/loadBalancerMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

#include <parpecommon/parpeConfig.h>

#include <pthread.h>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <semaphore.h>
#include <functional>
Expand All @@ -19,8 +22,8 @@ struct JobData {
JobData() = default;

JobData(int *jobDone,
pthread_cond_t *jobDoneChangedCondition,
pthread_mutex_t *jobDoneChangedMutex)
std::condition_variable *jobDoneChangedCondition,
std::mutex *jobDoneChangedMutex)
: jobDone(jobDone),
jobDoneChangedCondition(jobDoneChangedCondition),
jobDoneChangedMutex(jobDoneChangedMutex) {
Expand All @@ -39,9 +42,9 @@ struct JobData {
int *jobDone = nullptr;

/** is signaled after jobDone has been incremented (if set) */
pthread_cond_t *jobDoneChangedCondition = nullptr;
std::condition_variable *jobDoneChangedCondition = nullptr;
/** is locked to signal jobDoneChangedCondition condition (if set) */
pthread_mutex_t *jobDoneChangedMutex = nullptr;
std::mutex *jobDoneChangedMutex = nullptr;

/** callback when job is finished (if set) */
std::function<void(JobData*)> callbackJobFinished = nullptr;
Expand Down Expand Up @@ -107,13 +110,6 @@ class LoadBalancerMaster {
int getNumQueuedJobs() const;

private:
/**
* @brief Thread entry point. This is run from run()
* @param `this`
* @return nullptr, always
*/
static void *threadEntryPoint(void *vpLoadBalancerMaster);

/**
* @brief Main function of the load balancer thread.
*
Expand Down Expand Up @@ -180,7 +176,7 @@ class LoadBalancerMaster {
MPI_Datatype mpiJobDataType = MPI_BYTE;

/** Indicates whether we are ready to handle jobs */
bool isRunning_ = false;
std::atomic_bool isRunning_ = false;

/** Number of workers we can send jobs to */
int numWorkers = 0;
Expand Down Expand Up @@ -208,7 +204,7 @@ class LoadBalancerMaster {
std::vector<JobData *> sentJobsData;

/** Mutex to protect access to `queue`. */
pthread_mutex_t mutexQueue = PTHREAD_MUTEX_INITIALIZER;
mutable std::mutex mutexQueue;

/** Semaphore to limit queue length and avoid potentially huge memory
* allocation for all send and receive buffers. Note that using this might
Expand All @@ -217,7 +213,11 @@ class LoadBalancerMaster {
sem_t semQueue = {};

/** Thread that runs the message dispatcher. */
pthread_t queueThread = 0;
std::thread queueThread;

/** Signals whether the queue thread should keep running */
std::atomic_bool queue_thread_continue_ = true;


/** Value to indicate that there is currently no known free worker. */
constexpr static int NO_FREE_WORKER = -1;
Expand Down
34 changes: 15 additions & 19 deletions src/parpeamici/amiciSimulationRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ AmiciSimulationRunner::runDistributedMemory(LoadBalancerMaster* loadBalancer,
#endif

// mutex and condition to wait for simulations to finish
pthread_cond_t simulationsCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t simulationsMutex = PTHREAD_MUTEX_INITIALIZER;
std::condition_variable simulationsCond;
std::mutex simulationsMutex;

// multiple simulations may be grouped into one work package
auto numJobsTotal = static_cast<int>(
Expand Down Expand Up @@ -75,14 +75,10 @@ AmiciSimulationRunner::runDistributedMemory(LoadBalancerMaster* loadBalancer,
}

// wait for simulations to finish
pthread_mutex_lock(&simulationsMutex);
while (numJobsFinished < numJobsTotal) // TODO don't wait for all to
// complete; stop early if errors
// occurred
pthread_cond_wait(&simulationsCond, &simulationsMutex);
pthread_mutex_unlock(&simulationsMutex);
pthread_mutex_destroy(&simulationsMutex);
pthread_cond_destroy(&simulationsCond);
// TODO don't wait for all to complete; stop early if errors occurred
std::unique_lock lock(simulationsMutex);
simulationsCond.wait(lock, [&numJobsFinished, &numJobsTotal]{
return numJobsFinished == numJobsTotal;});

// unpack
if (aggregate_)
Expand Down Expand Up @@ -138,15 +134,15 @@ AmiciSimulationRunner::runSharedMemory(const messageHandlerFunc& messageHandler,
#ifdef PARPE_ENABLE_MPI
void
AmiciSimulationRunner::queueSimulation(
LoadBalancerMaster* loadBalancer,
JobData* d,
int* jobDone,
pthread_cond_t* jobDoneChangedCondition,
pthread_mutex_t* jobDoneChangedMutex,
int jobIdx,
std::vector<double> const& optimizationParameters,
amici::SensitivityOrder sensitivityOrder,
std::vector<int> const& conditionIndices) const
LoadBalancerMaster* loadBalancer,
JobData* d,
int* jobDone,
std::condition_variable* jobDoneChangedCondition,
std::mutex* jobDoneChangedMutex,
int jobIdx,
std::vector<double> const& optimizationParameters,
amici::SensitivityOrder sensitivityOrder,
std::vector<int> const& conditionIndices) const
{
// TODO avoid copy optimizationParameters; reuse;; for const& in work
// package need to split into(de)serialize
Expand Down
1 change: 0 additions & 1 deletion src/parpeloadbalancer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

target_link_libraries(${PROJECT_NAME}
PUBLIC parpecommon
PUBLIC ${CMAKE_THREAD_LIBS_INIT}
)

if(${PARPE_ENABLE_MPI})
Expand Down
53 changes: 17 additions & 36 deletions src/parpeloadbalancer/loadBalancerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include <cassert>
#include <climits>
#include <sched.h>
#include <chrono>

#include <parpecommon/misc.h>
#include <parpecommon/parpeException.h>
Expand Down Expand Up @@ -43,20 +43,14 @@ void LoadBalancerMaster::run() {
#endif
sem_init(&semQueue, 0, queueMaxLength);

pthread_attr_t threadAttr;
pthread_attr_init(&threadAttr);
pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
pthread_create(&queueThread, &threadAttr, threadEntryPoint, this);
pthread_attr_destroy(&threadAttr);
queueThread = std::thread(&LoadBalancerMaster::loadBalancerThreadRun, this);

isRunning_ = true;
}

LoadBalancerMaster::~LoadBalancerMaster()
{
terminate();

pthread_mutex_destroy(&mutexQueue);
sem_destroy(&semQueue);
}

Expand All @@ -66,20 +60,15 @@ void LoadBalancerMaster::assertMpiActive() {
}
#endif

void *LoadBalancerMaster::threadEntryPoint(void *vpLoadBalancerMaster) {
auto master = static_cast<LoadBalancerMaster *>(vpLoadBalancerMaster);
master->loadBalancerThreadRun();
return nullptr;
}

void LoadBalancerMaster::loadBalancerThreadRun() {

// dispatch queued work packages
while (true) {
while (queue_thread_continue_) {
int freeWorkerIndex = NO_FREE_WORKER;

// empty send queue while there are free workers
while((freeWorkerIndex = getNextFreeWorkerIndex()) >= 0
while(queue_thread_continue_ && (freeWorkerIndex = getNextFreeWorkerIndex()) >= 0
&& sendQueuedJob(freeWorkerIndex)) {}

// check if any job finished
Expand Down Expand Up @@ -114,10 +103,6 @@ int LoadBalancerMaster::handleFinishedJobs() {

// handle all finished jobs, if any
while (true) {
// add cancellation point to avoid invalid reads in
// loadBalancer.recvRequests
pthread_testcancel();

// check for waiting incoming message
MPI_Status status;
int messageWaiting = 0;
Expand Down Expand Up @@ -150,16 +135,14 @@ int LoadBalancerMaster::getNextFreeWorkerIndex() {

JobData *LoadBalancerMaster::getNextJob() {

pthread_mutex_lock(&mutexQueue);
std::unique_lock lock(mutexQueue);

JobData *nextJob = nullptr;
if (!queue.empty()) {
nextJob = queue.front();
queue.pop();
}

pthread_mutex_unlock(&mutexQueue);

return nextJob;
}

Expand Down Expand Up @@ -188,7 +171,7 @@ void LoadBalancerMaster::queueJob(JobData *data) {

sem_wait(&semQueue);

pthread_mutex_lock(&mutexQueue);
std::unique_lock lock(mutexQueue);

if (lastJobId == INT_MAX) // Unlikely, but prevent overflow
lastJobId = 0;
Expand All @@ -202,22 +185,16 @@ void LoadBalancerMaster::queueJob(JobData *data) {
printf("\x1b[33mQueued job with size %dB. New queue length is %d.\x1b[0m\n", size, queue.size());
#endif

pthread_mutex_unlock(&mutexQueue);
}

void LoadBalancerMaster::terminate() {
// avoid double termination
pthread_mutex_lock(&mutexQueue);
if (!isRunning_) {
pthread_mutex_unlock(&mutexQueue);
// avoid double termination
return;
}
isRunning_ = false;
pthread_mutex_unlock(&mutexQueue);

pthread_cancel(queueThread);
// wait until canceled
pthread_join(queueThread, nullptr);
queue_thread_continue_ = false;
queueThread.join();
}

int LoadBalancerMaster::handleReply(MPI_Status *mpiStatus) {
Expand Down Expand Up @@ -254,12 +231,15 @@ int LoadBalancerMaster::handleReply(MPI_Status *mpiStatus) {


// signal job done
pthread_mutex_lock(data->jobDoneChangedMutex);
std::unique_lock<std::mutex> lock;
if(data->jobDoneChangedMutex) {
lock = std::unique_lock(*data->jobDoneChangedMutex);
}
if(data->jobDone)
++(*data->jobDone);
pthread_cond_signal(data->jobDoneChangedCondition);
pthread_mutex_unlock(data->jobDoneChangedMutex);

if (data->jobDoneChangedCondition) {
data->jobDoneChangedCondition->notify_all();
}
return workerIdx;
}

Expand Down Expand Up @@ -301,6 +281,7 @@ bool LoadBalancerMaster::isRunning() const

int LoadBalancerMaster::getNumQueuedJobs() const
{
std::unique_lock lock(mutexQueue);
return queue.size();
}

Expand Down
Loading

0 comments on commit d2a18ec

Please sign in to comment.