From d2a18ec0e47d31c05933ef438eca39b9b5261662 Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Mon, 11 Dec 2023 13:46:10 +0100 Subject: [PATCH] Replace pthreads by C++ thread/mutex/condition (#363) Also Closes #226 --- CMakeLists.txt | 12 +---- README.md | 1 - .../parpeamici/steadystate/CMakeLists.txt | 1 - examples/parpeloadbalancer/CMakeLists.txt | 1 - examples/parpeloadbalancer/main.cpp | 15 +++--- include/parpeamici/amiciSimulationRunner.h | 6 ++- .../parpeloadbalancer/loadBalancerMaster.h | 30 +++++------ src/parpeamici/amiciSimulationRunner.cpp | 34 ++++++------ src/parpeloadbalancer/CMakeLists.txt | 1 - src/parpeloadbalancer/loadBalancerMaster.cpp | 53 ++++++------------- src/parpeoptimization/CMakeLists.txt | 1 - templates/CMakeLists.template.txt | 7 ++- tests/parpecommon/CMakeLists.txt | 1 - tests/parpeloadbalancer/CMakeLists.txt | 1 - .../loadBalancerMasterTest.cpp | 23 +++++--- 15 files changed, 81 insertions(+), 106 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a4d89cc5a..1bc8190b3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,11 +17,8 @@ include(BuildType) # Ensure CMAKE_BUILD_TYPE is always set include(CTest) set(CMAKE_DEBUG_POSTFIX "-dbg") -# -D_GNU_SOURCE for pthread recursive mutex -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} \ - -std=c99 -Wall -Wno-unused-function -D_GNU_SOURCE") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} \ - -Wall -Wno-unused-function -D_GNU_SOURCE") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99 -Wall -Wno-unused-function") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-unused-function") if(BUILD_PYTHON_MODULE) # Build PIC code to be used for swig/python module @@ -86,11 +83,6 @@ if(${PARPE_ENABLE_DLIB}) CACHE PATH "DLIB base directory") endif(${PARPE_ENABLE_DLIB}) -# PThreads -set(THREADS_PREFER_PTHREAD_FLAG ON) -find_package(Threads REQUIRED) - - # HDF5 #set(HDF5_PREFER_PARALLEL TRUE) find_package(HDF5 COMPONENTS CXX C HL REQUIRED) diff --git a/README.md b/README.md index df616a4a9..c38f71969 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,6 @@ For full functionality, parPE requires the following libraries: * CMAKE (>=3.10) * MPI ([OpenMPI](https://www.open-mpi.org/), [MPICH](https://www.mpich.org/), ...) -* PTHREADS * IPOPT (>= 1.2.7) (requires coinhsl) * CERES (>=1.13) ([requires Eigen](http://ceres-solver.org/installation.html#dependencies)) diff --git a/examples/parpeamici/steadystate/CMakeLists.txt b/examples/parpeamici/steadystate/CMakeLists.txt index 5cf507a3c..03d2f4f39 100644 --- a/examples/parpeamici/steadystate/CMakeLists.txt +++ b/examples/parpeamici/steadystate/CMakeLists.txt @@ -65,7 +65,6 @@ target_link_libraries(${PROJECT_NAME} Upstream::amici parpeoptimization parpeloadbalancer - ${CMAKE_THREAD_LIBS_INIT} ) # /example_steadystate executable diff --git a/examples/parpeloadbalancer/CMakeLists.txt b/examples/parpeloadbalancer/CMakeLists.txt index a75d3ca9c..d236c15e7 100644 --- a/examples/parpeloadbalancer/CMakeLists.txt +++ b/examples/parpeloadbalancer/CMakeLists.txt @@ -8,7 +8,6 @@ set(SRC_LIST_EXE add_executable(${PROJECT_NAME} ${SRC_LIST_EXE}) target_link_libraries(${PROJECT_NAME} - ${CMAKE_THREAD_LIBS_INIT} parpeloadbalancer ) diff --git a/examples/parpeloadbalancer/main.cpp b/examples/parpeloadbalancer/main.cpp index b3d220d8a..430f54540 100644 --- a/examples/parpeloadbalancer/main.cpp +++ b/examples/parpeloadbalancer/main.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include @@ -29,8 +31,8 @@ int master() { parpe::JobData jobdata[numJobs]; // mutex to wait for simulations to finish - pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + std::condition_variable cond; + std::mutex mutex; for (int i = 0; i < numJobs; ++i) { parpe::JobData *job = &jobdata[i]; @@ -43,12 +45,9 @@ int master() { } // wait for simulations to finish - pthread_mutex_lock(&mutex); - while (numJobsFinished < numJobs) - pthread_cond_wait(&cond, &mutex); - pthread_mutex_unlock(&mutex); - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + std::unique_lock lock(mutex); + cond.wait(lock, [&numJobsFinished, &numJobs]{ + return numJobsFinished == numJobs;}); // check results int errors = 0; diff --git a/include/parpeamici/amiciSimulationRunner.h b/include/parpeamici/amiciSimulationRunner.h index 771dab670..1193a5916 100644 --- a/include/parpeamici/amiciSimulationRunner.h +++ b/include/parpeamici/amiciSimulationRunner.h @@ -15,6 +15,8 @@ #include #include +#include +#include #include #include @@ -122,8 +124,8 @@ class AmiciSimulationRunner void queueSimulation(LoadBalancerMaster* loadBalancer, JobData* d, int* jobDone, - pthread_cond_t* jobDoneChangedCondition, - pthread_mutex_t* jobDoneChangedMutex, + std::condition_variable* jobDoneChangedCondition, + std::mutex* jobDoneChangedMutex, int jobIdx, const std::vector& optimizationParameters, amici::SensitivityOrder sensitivityOrder, diff --git a/include/parpeloadbalancer/loadBalancerMaster.h b/include/parpeloadbalancer/loadBalancerMaster.h index 3e5ce9f36..7f40c2486 100644 --- a/include/parpeloadbalancer/loadBalancerMaster.h +++ b/include/parpeloadbalancer/loadBalancerMaster.h @@ -3,7 +3,10 @@ #include -#include +#include +#include +#include +#include #include #include #include @@ -19,8 +22,8 @@ struct JobData { JobData() = default; JobData(int *jobDone, - pthread_cond_t *jobDoneChangedCondition, - pthread_mutex_t *jobDoneChangedMutex) + std::condition_variable *jobDoneChangedCondition, + std::mutex *jobDoneChangedMutex) : jobDone(jobDone), jobDoneChangedCondition(jobDoneChangedCondition), jobDoneChangedMutex(jobDoneChangedMutex) { @@ -39,9 +42,9 @@ struct JobData { int *jobDone = nullptr; /** is signaled after jobDone has been incremented (if set) */ - pthread_cond_t *jobDoneChangedCondition = nullptr; + std::condition_variable *jobDoneChangedCondition = nullptr; /** is locked to signal jobDoneChangedCondition condition (if set) */ - pthread_mutex_t *jobDoneChangedMutex = nullptr; + std::mutex *jobDoneChangedMutex = nullptr; /** callback when job is finished (if set) */ std::function callbackJobFinished = nullptr; @@ -107,13 +110,6 @@ class LoadBalancerMaster { int getNumQueuedJobs() const; private: - /** - * @brief Thread entry point. This is run from run() - * @param `this` - * @return nullptr, always - */ - static void *threadEntryPoint(void *vpLoadBalancerMaster); - /** * @brief Main function of the load balancer thread. * @@ -180,7 +176,7 @@ class LoadBalancerMaster { MPI_Datatype mpiJobDataType = MPI_BYTE; /** Indicates whether we are ready to handle jobs */ - bool isRunning_ = false; + std::atomic_bool isRunning_ = false; /** Number of workers we can send jobs to */ int numWorkers = 0; @@ -208,7 +204,7 @@ class LoadBalancerMaster { std::vector sentJobsData; /** Mutex to protect access to `queue`. */ - pthread_mutex_t mutexQueue = PTHREAD_MUTEX_INITIALIZER; + mutable std::mutex mutexQueue; /** Semaphore to limit queue length and avoid potentially huge memory * allocation for all send and receive buffers. Note that using this might @@ -217,7 +213,11 @@ class LoadBalancerMaster { sem_t semQueue = {}; /** Thread that runs the message dispatcher. */ - pthread_t queueThread = 0; + std::thread queueThread; + + /** Signals whether the queue thread should keep running */ + std::atomic_bool queue_thread_continue_ = true; + /** Value to indicate that there is currently no known free worker. */ constexpr static int NO_FREE_WORKER = -1; diff --git a/src/parpeamici/amiciSimulationRunner.cpp b/src/parpeamici/amiciSimulationRunner.cpp index 1eb3fce55..c7449d5f1 100644 --- a/src/parpeamici/amiciSimulationRunner.cpp +++ b/src/parpeamici/amiciSimulationRunner.cpp @@ -37,8 +37,8 @@ AmiciSimulationRunner::runDistributedMemory(LoadBalancerMaster* loadBalancer, #endif // mutex and condition to wait for simulations to finish - pthread_cond_t simulationsCond = PTHREAD_COND_INITIALIZER; - pthread_mutex_t simulationsMutex = PTHREAD_MUTEX_INITIALIZER; + std::condition_variable simulationsCond; + std::mutex simulationsMutex; // multiple simulations may be grouped into one work package auto numJobsTotal = static_cast( @@ -75,14 +75,10 @@ AmiciSimulationRunner::runDistributedMemory(LoadBalancerMaster* loadBalancer, } // wait for simulations to finish - pthread_mutex_lock(&simulationsMutex); - while (numJobsFinished < numJobsTotal) // TODO don't wait for all to - // complete; stop early if errors - // occurred - pthread_cond_wait(&simulationsCond, &simulationsMutex); - pthread_mutex_unlock(&simulationsMutex); - pthread_mutex_destroy(&simulationsMutex); - pthread_cond_destroy(&simulationsCond); + // TODO don't wait for all to complete; stop early if errors occurred + std::unique_lock lock(simulationsMutex); + simulationsCond.wait(lock, [&numJobsFinished, &numJobsTotal]{ + return numJobsFinished == numJobsTotal;}); // unpack if (aggregate_) @@ -138,15 +134,15 @@ AmiciSimulationRunner::runSharedMemory(const messageHandlerFunc& messageHandler, #ifdef PARPE_ENABLE_MPI void AmiciSimulationRunner::queueSimulation( - LoadBalancerMaster* loadBalancer, - JobData* d, - int* jobDone, - pthread_cond_t* jobDoneChangedCondition, - pthread_mutex_t* jobDoneChangedMutex, - int jobIdx, - std::vector const& optimizationParameters, - amici::SensitivityOrder sensitivityOrder, - std::vector const& conditionIndices) const + LoadBalancerMaster* loadBalancer, + JobData* d, + int* jobDone, + std::condition_variable* jobDoneChangedCondition, + std::mutex* jobDoneChangedMutex, + int jobIdx, + std::vector const& optimizationParameters, + amici::SensitivityOrder sensitivityOrder, + std::vector const& conditionIndices) const { // TODO avoid copy optimizationParameters; reuse;; for const& in work // package need to split into(de)serialize diff --git a/src/parpeloadbalancer/CMakeLists.txt b/src/parpeloadbalancer/CMakeLists.txt index d9b6a9d8b..44ec38611 100644 --- a/src/parpeloadbalancer/CMakeLists.txt +++ b/src/parpeloadbalancer/CMakeLists.txt @@ -11,7 +11,6 @@ target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(${PROJECT_NAME} PUBLIC parpecommon - PUBLIC ${CMAKE_THREAD_LIBS_INIT} ) if(${PARPE_ENABLE_MPI}) diff --git a/src/parpeloadbalancer/loadBalancerMaster.cpp b/src/parpeloadbalancer/loadBalancerMaster.cpp index 5e66d8ca1..e72e50144 100644 --- a/src/parpeloadbalancer/loadBalancerMaster.cpp +++ b/src/parpeloadbalancer/loadBalancerMaster.cpp @@ -4,7 +4,7 @@ #include #include -#include +#include #include #include @@ -43,11 +43,7 @@ void LoadBalancerMaster::run() { #endif sem_init(&semQueue, 0, queueMaxLength); - pthread_attr_t threadAttr; - pthread_attr_init(&threadAttr); - pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE); - pthread_create(&queueThread, &threadAttr, threadEntryPoint, this); - pthread_attr_destroy(&threadAttr); + queueThread = std::thread(&LoadBalancerMaster::loadBalancerThreadRun, this); isRunning_ = true; } @@ -55,8 +51,6 @@ void LoadBalancerMaster::run() { LoadBalancerMaster::~LoadBalancerMaster() { terminate(); - - pthread_mutex_destroy(&mutexQueue); sem_destroy(&semQueue); } @@ -66,20 +60,15 @@ void LoadBalancerMaster::assertMpiActive() { } #endif -void *LoadBalancerMaster::threadEntryPoint(void *vpLoadBalancerMaster) { - auto master = static_cast(vpLoadBalancerMaster); - master->loadBalancerThreadRun(); - return nullptr; -} void LoadBalancerMaster::loadBalancerThreadRun() { // dispatch queued work packages - while (true) { + while (queue_thread_continue_) { int freeWorkerIndex = NO_FREE_WORKER; // empty send queue while there are free workers - while((freeWorkerIndex = getNextFreeWorkerIndex()) >= 0 + while(queue_thread_continue_ && (freeWorkerIndex = getNextFreeWorkerIndex()) >= 0 && sendQueuedJob(freeWorkerIndex)) {} // check if any job finished @@ -114,10 +103,6 @@ int LoadBalancerMaster::handleFinishedJobs() { // handle all finished jobs, if any while (true) { - // add cancellation point to avoid invalid reads in - // loadBalancer.recvRequests - pthread_testcancel(); - // check for waiting incoming message MPI_Status status; int messageWaiting = 0; @@ -150,7 +135,7 @@ int LoadBalancerMaster::getNextFreeWorkerIndex() { JobData *LoadBalancerMaster::getNextJob() { - pthread_mutex_lock(&mutexQueue); + std::unique_lock lock(mutexQueue); JobData *nextJob = nullptr; if (!queue.empty()) { @@ -158,8 +143,6 @@ JobData *LoadBalancerMaster::getNextJob() { queue.pop(); } - pthread_mutex_unlock(&mutexQueue); - return nextJob; } @@ -188,7 +171,7 @@ void LoadBalancerMaster::queueJob(JobData *data) { sem_wait(&semQueue); - pthread_mutex_lock(&mutexQueue); + std::unique_lock lock(mutexQueue); if (lastJobId == INT_MAX) // Unlikely, but prevent overflow lastJobId = 0; @@ -202,22 +185,16 @@ void LoadBalancerMaster::queueJob(JobData *data) { printf("\x1b[33mQueued job with size %dB. New queue length is %d.\x1b[0m\n", size, queue.size()); #endif - pthread_mutex_unlock(&mutexQueue); } void LoadBalancerMaster::terminate() { - // avoid double termination - pthread_mutex_lock(&mutexQueue); if (!isRunning_) { - pthread_mutex_unlock(&mutexQueue); + // avoid double termination return; } isRunning_ = false; - pthread_mutex_unlock(&mutexQueue); - - pthread_cancel(queueThread); - // wait until canceled - pthread_join(queueThread, nullptr); + queue_thread_continue_ = false; + queueThread.join(); } int LoadBalancerMaster::handleReply(MPI_Status *mpiStatus) { @@ -254,12 +231,15 @@ int LoadBalancerMaster::handleReply(MPI_Status *mpiStatus) { // signal job done - pthread_mutex_lock(data->jobDoneChangedMutex); + std::unique_lock lock; + if(data->jobDoneChangedMutex) { + lock = std::unique_lock(*data->jobDoneChangedMutex); + } if(data->jobDone) ++(*data->jobDone); - pthread_cond_signal(data->jobDoneChangedCondition); - pthread_mutex_unlock(data->jobDoneChangedMutex); - + if (data->jobDoneChangedCondition) { + data->jobDoneChangedCondition->notify_all(); + } return workerIdx; } @@ -301,6 +281,7 @@ bool LoadBalancerMaster::isRunning() const int LoadBalancerMaster::getNumQueuedJobs() const { + std::unique_lock lock(mutexQueue); return queue.size(); } diff --git a/src/parpeoptimization/CMakeLists.txt b/src/parpeoptimization/CMakeLists.txt index f584d07bc..4573f9ffa 100644 --- a/src/parpeoptimization/CMakeLists.txt +++ b/src/parpeoptimization/CMakeLists.txt @@ -104,7 +104,6 @@ target_link_libraries(${PROJECT_NAME} PUBLIC parpecommon PUBLIC ${HDF5_HL_LIBRARIES} PUBLIC ${HDF5_C_LIBRARIES} - PUBLIC ${CMAKE_THREAD_LIBS_INIT} ) install(TARGETS ${PROJECT_NAME} EXPORT ParPETargets ARCHIVE DESTINATION lib) diff --git a/templates/CMakeLists.template.txt b/templates/CMakeLists.template.txt index 677ae3119..b38426e78 100644 --- a/templates/CMakeLists.template.txt +++ b/templates/CMakeLists.template.txt @@ -26,9 +26,12 @@ project(${MODEL_NAME}) # for IDE find_package(ParPE REQUIRED) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-unused-function -fopenmp -D_GNU_SOURCE") # -D_GNU_SOURCE for pthread recursive mutex issues +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-unused-function -fopenmp") -execute_process(COMMAND sh -c "cd ${CMAKE_CURRENT_SOURCE_DIR} && git describe --abbrev=4 --dirty=-dirty --always --tags | tr -d '\n'" OUTPUT_VARIABLE GIT_VERSION) +execute_process( + COMMAND sh -c "cd ${CMAKE_CURRENT_SOURCE_DIR} && git describe --abbrev=4 --dirty=-dirty --always --tags | tr -d '\n'" + OUTPUT_VARIABLE GIT_VERSION +) message(STATUS "Building version ${GIT_VERSION}") add_definitions(-DGIT_VERSION="${GIT_VERSION}") diff --git a/tests/parpecommon/CMakeLists.txt b/tests/parpecommon/CMakeLists.txt index cef64daa8..68f2b03ac 100644 --- a/tests/parpecommon/CMakeLists.txt +++ b/tests/parpecommon/CMakeLists.txt @@ -9,7 +9,6 @@ set(SRC_LIST add_executable(${PROJECT_NAME} ${SRC_LIST}) target_link_libraries(${PROJECT_NAME} - ${CMAKE_THREAD_LIBS_INIT} parpecommon gtest_main ${GCOV_LIBRARY} diff --git a/tests/parpeloadbalancer/CMakeLists.txt b/tests/parpeloadbalancer/CMakeLists.txt index 461f3ec9e..fc724f27d 100644 --- a/tests/parpeloadbalancer/CMakeLists.txt +++ b/tests/parpeloadbalancer/CMakeLists.txt @@ -9,7 +9,6 @@ set(SRC_LIST add_executable(${PROJECT_NAME} ${SRC_LIST}) target_link_libraries(${PROJECT_NAME} - ${CMAKE_THREAD_LIBS_INIT} parpeloadbalancer gmock_main ${GCOV_LIBRARY} diff --git a/tests/parpeloadbalancer/loadBalancerMasterTest.cpp b/tests/parpeloadbalancer/loadBalancerMasterTest.cpp index efbebc3c2..73a75a3d0 100644 --- a/tests/parpeloadbalancer/loadBalancerMasterTest.cpp +++ b/tests/parpeloadbalancer/loadBalancerMasterTest.cpp @@ -22,19 +22,23 @@ int MPI_Comm_size(MPI_Comm comm, int *size) { return MPI_SUCCESS; } -int MPI_Testany(int count, MPI_Request array_of_requests[], int *index, - int *flag, MPI_Status *status) { - sleep(1000); // do nothing and wait to be killed - +int MPI_Testany(int /*count*/, MPI_Request /*array_of_requests*/[], int */*index*/, + int */*flag*/, MPI_Status */*status*/) { + sleep(1); return 0; } -int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag, - MPI_Status *status) { - sleep(1000); +int MPI_Iprobe(int /*source*/, int /*tag*/, MPI_Comm /*comm*/, int */*flag*/, + MPI_Status */*status*/) { return 0; } +int MPI_Isend(const void */*buf*/, int /*count*/, MPI_Datatype /*datatype*/, + int /*dest*/, int /*tag*/, MPI_Comm /*comm*/, + MPI_Request */*request*/) { + sleep(1); + return 0; +} class MockMPI { public: @@ -46,6 +50,11 @@ class MockMPI { MOCK_CONST_METHOD5(MPI_Iprobe, int(int source, int tag, MPI_Comm comm, int *flag, MPI_Status *status)); + MOCK_CONST_METHOD7(MPI_Isend, int(const void *buf, int count, + MPI_Datatype datatype, int dest, + int tag, MPI_Comm comm, + MPI_Request *request)); + MockMPI() { _MPI_Comm_size = [this](MPI_Comm comm, int *size){ return MPI_Comm_size(comm, size); }; }