Skip to content

Commit

Permalink
Merge pull request #224 from DUNE-DAQ/mrigansusx/trigger_not_stopping…
Browse files Browse the repository at this point in the history
…_fix

fix for the stop command logic in TP Channel Filter
  • Loading branch information
bieryAtFnal authored Jul 11, 2023
2 parents f0eb788 + 150c589 commit d666a00
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.12)
project(trigger VERSION 1.5.7)
project(trigger VERSION 1.5.8)

find_package(daq-cmake REQUIRED)

Expand Down
3 changes: 1 addition & 2 deletions include/trigger/Tee.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Tee<T>::do_work(std::atomic<bool>& running_flag)
{
size_t n_objects = 0;

while (true) {
while (running_flag.load()) {
T object;
try {
object = m_input_queue->receive(std::chrono::milliseconds(100));
Expand Down Expand Up @@ -108,7 +108,6 @@ Tee<T>::do_work(std::atomic<bool>& running_flag)
ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), "push to output queue 2", timeout_ms));
}
}

TLOG() << get_name() << ": Exiting do_work() method after receiving " << n_objects << " objects";
}

Expand Down
15 changes: 9 additions & 6 deletions plugins/TPChannelFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace dunedaq {
namespace trigger {
TPChannelFilter::TPChannelFilter(const std::string& name)
: DAQModule(name)
, m_thread(std::bind(&TPChannelFilter::do_work, this, std::placeholders::_1))
, m_thread(std::bind(&TPChannelFilter::do_work, this))
, m_input_queue(nullptr)
, m_output_queue(nullptr)
, m_queue_timeout(1000)
Expand Down Expand Up @@ -63,6 +63,7 @@ TPChannelFilter::do_conf(const nlohmann::json& conf_arg)
void
TPChannelFilter::do_start(const nlohmann::json&)
{
m_running_flag.store(true);
m_received_count.store(0);
m_sent_count.store(0);
m_thread.start_working_thread("channelfilter");
Expand All @@ -72,6 +73,7 @@ TPChannelFilter::do_start(const nlohmann::json&)
void
TPChannelFilter::do_stop(const nlohmann::json&)
{
m_running_flag.store(false);
m_thread.stop_working_thread();
TLOG_DEBUG(2) << get_name() + " successfully stopped.";
}
Expand All @@ -86,7 +88,7 @@ TPChannelFilter::channel_should_be_removed(int channel) const
// The plane numbering convention is found in detchannelmaps/plugins/VDColdboxChannelMap.cpp and is:
// U (induction) = 0, Y (induction) = 1, Z (induction) = 2, unconnected channel = 9999
uint plane = m_channel_map->get_plane_from_offline_channel(channel);
TLOG_DEBUG(2) << "Checking received TP with channel " << channel << " and plane " << plane;
TLOG_DEBUG(5) << "Checking received TP with channel " << channel << " and plane " << plane;
// Check for collection
if (plane == 0 || plane == 1) {
return !m_conf.keep_induction;
Expand All @@ -105,16 +107,17 @@ TPChannelFilter::channel_should_be_removed(int channel) const
}

void
TPChannelFilter::do_work(std::atomic<bool>& running_flag)
TPChannelFilter::do_work()
{
while (true) {
while (m_running_flag.load()) {

std::optional<TPSet> tpset = m_input_queue->try_receive(m_queue_timeout);;
using namespace std::chrono;

if (!tpset.has_value()) {
// The condition to exit the loop is that we've been stopped and
// there's nothing left on the input queue
if (!running_flag.load()) {
if (!m_running_flag.load()) {
break;
} else {
continue;
Expand Down Expand Up @@ -148,7 +151,7 @@ TPChannelFilter::do_work(std::atomic<bool>& running_flag)
}
}

} // while(true)
} // while
TLOG_DEBUG(2) << "Exiting do_work() method";
}

Expand Down
6 changes: 4 additions & 2 deletions plugins/TPChannelFilter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TPChannelFilter : public dunedaq::appfwk::DAQModule
void do_start(const nlohmann::json& obj);
void do_stop(const nlohmann::json& obj);
void do_scrap(const nlohmann::json& obj);
void do_work(std::atomic<bool>&);
void do_work();

bool channel_should_be_removed(int channel) const;
dunedaq::utilities::WorkerThread m_thread;
Expand All @@ -56,7 +56,6 @@ class TPChannelFilter : public dunedaq::appfwk::DAQModule
std::atomic<metric_counter_type> m_received_count;
std::atomic<metric_counter_type> m_sent_count;


using source_t = dunedaq::iomanager::ReceiverConcept<TPSet>;
std::shared_ptr<source_t> m_input_queue;
using sink_t = dunedaq::iomanager::SenderConcept<TPSet>;
Expand All @@ -66,6 +65,9 @@ class TPChannelFilter : public dunedaq::appfwk::DAQModule
std::shared_ptr<detchannelmaps::TPCChannelMap> m_channel_map;

dunedaq::trigger::tpchannelfilter::Conf m_conf;

// Are we in the RUNNING state?
std::atomic<bool> m_running_flag{ false };
};
} // namespace trigger
} // namespace dunedaq
Expand Down
2 changes: 1 addition & 1 deletion plugins/TriggerZipper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class TriggerZipper : public dunedaq::appfwk::DAQModule
// thread worker
void worker()
{
while (true) {
while (m_running.load()) {
// Once we've received a stop command, keep reading the input
// queue until there's nothing left on it
if (!proc_one() && !m_running.load()) {
Expand Down
15 changes: 10 additions & 5 deletions src/trigger/TriggerGenericMaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ class TriggerGenericMaker : public dunedaq::appfwk::DAQModule
m_thread.start_working_thread(get_name());
}

void do_stop(const nlohmann::json& /*obj*/) { m_thread.stop_working_thread(); }
void do_stop(const nlohmann::json& /*obj*/)
{
m_thread.stop_working_thread();
}

void do_configure(const nlohmann::json& obj)
{
Expand All @@ -162,15 +165,17 @@ class TriggerGenericMaker : public dunedaq::appfwk::DAQModule
worker.reconfigure();
}

void do_work(std::atomic<bool>& running_flag)
void do_work(std::atomic<bool>& m_running_flag)
{
// Loop until a stop is received
while (running_flag.load()) {
while (m_running_flag.load()) {
// While there are items in the input queue, continue draining even if
// the running_flag is false, but stop _immediately_ when input is empty
IN in;
while (receive(in)) {
worker.process(in);
if (m_running_flag.load()) {
worker.process(in);
}
}
}
// P. Rodrigues 2022-06-01. The argument here is whether to drop
Expand Down Expand Up @@ -543,8 +548,8 @@ class TriggerGenericWorker<Set<A>, OUT, MAKER>
ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
// out.back() is dropped
}
out_vec.pop_back();
}
out_vec.pop_back();
}
}
}
Expand Down

0 comments on commit d666a00

Please sign in to comment.