From 295ff313ec41530a0838341bb5e9d5073abfffe5 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 10 Jul 2023 20:56:25 +0200 Subject: [PATCH 1/3] fix for the stop command logic in TP Channel Filter --- plugins/TPChannelFilter.cpp | 15 +++++++++------ plugins/TPChannelFilter.hpp | 6 ++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/plugins/TPChannelFilter.cpp b/plugins/TPChannelFilter.cpp index 903f984e..d6ffcacb 100644 --- a/plugins/TPChannelFilter.cpp +++ b/plugins/TPChannelFilter.cpp @@ -18,7 +18,7 @@ namespace dunedaq { namespace trigger { TPChannelFilter::TPChannelFilter(const std::string& name) : DAQModule(name) - , m_thread(std::bind(&TPChannelFilter::do_work, this, std::placeholders::_1)) + , m_thread(std::bind(&TPChannelFilter::do_work, this)) , m_input_queue(nullptr) , m_output_queue(nullptr) , m_queue_timeout(1000) @@ -63,6 +63,7 @@ TPChannelFilter::do_conf(const nlohmann::json& conf_arg) void TPChannelFilter::do_start(const nlohmann::json&) { + m_running_flag.store(true); m_received_count.store(0); m_sent_count.store(0); m_thread.start_working_thread("channelfilter"); @@ -72,6 +73,7 @@ TPChannelFilter::do_start(const nlohmann::json&) void TPChannelFilter::do_stop(const nlohmann::json&) { + m_running_flag.store(false); m_thread.stop_working_thread(); TLOG_DEBUG(2) << get_name() + " successfully stopped."; } @@ -86,7 +88,7 @@ TPChannelFilter::channel_should_be_removed(int channel) const // The plane numbering convention is found in detchannelmaps/plugins/VDColdboxChannelMap.cpp and is: // U (induction) = 0, Y (induction) = 1, Z (induction) = 2, unconnected channel = 9999 uint plane = m_channel_map->get_plane_from_offline_channel(channel); - TLOG_DEBUG(2) << "Checking received TP with channel " << channel << " and plane " << plane; + TLOG_DEBUG(5) << "Checking received TP with channel " << channel << " and plane " << plane; // Check for collection if (plane == 0 || plane == 1) { return !m_conf.keep_induction; @@ -105,16 +107,17 @@ TPChannelFilter::channel_should_be_removed(int channel) const } void -TPChannelFilter::do_work(std::atomic& running_flag) +TPChannelFilter::do_work() { - while (true) { + while (m_running_flag.load()) { + std::optional tpset = m_input_queue->try_receive(m_queue_timeout);; using namespace std::chrono; if (!tpset.has_value()) { // The condition to exit the loop is that we've been stopped and // there's nothing left on the input queue - if (!running_flag.load()) { + if (!m_running_flag.load()) { break; } else { continue; @@ -148,7 +151,7 @@ TPChannelFilter::do_work(std::atomic& running_flag) } } - } // while(true) + } // while TLOG_DEBUG(2) << "Exiting do_work() method"; } diff --git a/plugins/TPChannelFilter.hpp b/plugins/TPChannelFilter.hpp index 214401fd..2d065bc7 100644 --- a/plugins/TPChannelFilter.hpp +++ b/plugins/TPChannelFilter.hpp @@ -47,7 +47,7 @@ class TPChannelFilter : public dunedaq::appfwk::DAQModule void do_start(const nlohmann::json& obj); void do_stop(const nlohmann::json& obj); void do_scrap(const nlohmann::json& obj); - void do_work(std::atomic&); + void do_work(); bool channel_should_be_removed(int channel) const; dunedaq::utilities::WorkerThread m_thread; @@ -56,7 +56,6 @@ class TPChannelFilter : public dunedaq::appfwk::DAQModule std::atomic m_received_count; std::atomic m_sent_count; - using source_t = dunedaq::iomanager::ReceiverConcept; std::shared_ptr m_input_queue; using sink_t = dunedaq::iomanager::SenderConcept; @@ -66,6 +65,9 @@ class TPChannelFilter : public dunedaq::appfwk::DAQModule std::shared_ptr m_channel_map; dunedaq::trigger::tpchannelfilter::Conf m_conf; + + // Are we in the RUNNING state? + std::atomic m_running_flag{ false }; }; } // namespace trigger } // namespace dunedaq From 1c7ba6fc54325bc7c6e21fb6aeac97f03121db59 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Tue, 11 Jul 2023 21:49:35 +0200 Subject: [PATCH 2/3] fixing running flag checks in different trigger modules --- include/trigger/Tee.hxx | 3 +-- plugins/TriggerZipper.hpp | 2 +- src/trigger/TriggerGenericMaker.hpp | 15 ++++++++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/include/trigger/Tee.hxx b/include/trigger/Tee.hxx index 59d70afc..7e64c2d4 100644 --- a/include/trigger/Tee.hxx +++ b/include/trigger/Tee.hxx @@ -79,7 +79,7 @@ Tee::do_work(std::atomic& running_flag) { size_t n_objects = 0; - while (true) { + while (running_flag.load()) { T object; try { object = m_input_queue->receive(std::chrono::milliseconds(100)); @@ -108,7 +108,6 @@ Tee::do_work(std::atomic& running_flag) ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), "push to output queue 2", timeout_ms)); } } - TLOG() << get_name() << ": Exiting do_work() method after receiving " << n_objects << " objects"; } diff --git a/plugins/TriggerZipper.hpp b/plugins/TriggerZipper.hpp index fcfbc584..fe8f426c 100644 --- a/plugins/TriggerZipper.hpp +++ b/plugins/TriggerZipper.hpp @@ -177,7 +177,7 @@ class TriggerZipper : public dunedaq::appfwk::DAQModule // thread worker void worker() { - while (true) { + while (m_running.load()) { // Once we've received a stop command, keep reading the input // queue until there's nothing left on it if (!proc_one() && !m_running.load()) { diff --git a/src/trigger/TriggerGenericMaker.hpp b/src/trigger/TriggerGenericMaker.hpp index aaad83bd..8d4e6004 100644 --- a/src/trigger/TriggerGenericMaker.hpp +++ b/src/trigger/TriggerGenericMaker.hpp @@ -147,7 +147,10 @@ class TriggerGenericMaker : public dunedaq::appfwk::DAQModule m_thread.start_working_thread(get_name()); } - void do_stop(const nlohmann::json& /*obj*/) { m_thread.stop_working_thread(); } + void do_stop(const nlohmann::json& /*obj*/) + { + m_thread.stop_working_thread(); + } void do_configure(const nlohmann::json& obj) { @@ -162,15 +165,17 @@ class TriggerGenericMaker : public dunedaq::appfwk::DAQModule worker.reconfigure(); } - void do_work(std::atomic& running_flag) + void do_work(std::atomic& m_running_flag) { // Loop until a stop is received - while (running_flag.load()) { + while (m_running_flag.load()) { // While there are items in the input queue, continue draining even if // the running_flag is false, but stop _immediately_ when input is empty IN in; while (receive(in)) { - worker.process(in); + if (m_running_flag.load()) { + worker.process(in); + } } } // P. Rodrigues 2022-06-01. The argument here is whether to drop @@ -543,8 +548,8 @@ class TriggerGenericWorker, OUT, MAKER> ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name)); // out.back() is dropped } - out_vec.pop_back(); } + out_vec.pop_back(); } } } From 150c58946601d4a37ab666237da6289988ab492e Mon Sep 17 00:00:00 2001 From: Kurt Biery Date: Tue, 11 Jul 2023 16:12:34 -0500 Subject: [PATCH 3/3] Updated version to v1.5.8 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f45e7971..0bab3a63 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(trigger VERSION 1.5.7) +project(trigger VERSION 1.5.8) find_package(daq-cmake REQUIRED)