From 0279a500f44112875ffa8f04a2546531270bb610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20=C5=81ukawski?= Date: Mon, 24 Jun 2024 16:41:37 +0200 Subject: [PATCH] Only allow a single CAN bus per broker --- .../YarpPlugins/CanBusBroker/CanBusBroker.hpp | 11 +- .../CanBusBroker/DeviceDriverImpl.cpp | 102 ++++++------------ .../CanBusBroker/IMultipleWrapperImpl.cpp | 101 ++++++++--------- .../CanBusBroker/SingleBusBroker.hpp | 10 ++ .../CanBusBroker/SyncPeriodicThread.cpp | 69 ++++++++++-- .../CanBusBroker/SyncPeriodicThread.hpp | 27 +++-- 6 files changed, 174 insertions(+), 146 deletions(-) diff --git a/libraries/YarpPlugins/CanBusBroker/CanBusBroker.hpp b/libraries/YarpPlugins/CanBusBroker/CanBusBroker.hpp index 051d92f56..105a2e9f2 100644 --- a/libraries/YarpPlugins/CanBusBroker/CanBusBroker.hpp +++ b/libraries/YarpPlugins/CanBusBroker/CanBusBroker.hpp @@ -3,8 +3,6 @@ #ifndef __CAN_BUS_BROKER_HPP__ #define __CAN_BUS_BROKER_HPP__ -#include - #include #include #include @@ -416,12 +414,17 @@ class CanBusBroker : public yarp::dev::DeviceDriver, bool getThreeAxisMagnetometerMeasure(std::size_t sens_index, yarp::sig::Vector & out, double & timestamp) const override; private: + static SyncPeriodicThread & getSyncThread() + { + static SyncPeriodicThread instance; + return instance; + } + const yarp::dev::PolyDriverDescriptor * tryCreateFakeNode(const yarp::dev::PolyDriverDescriptor * driver); DeviceMapper deviceMapper; - std::vector brokers; yarp::dev::PolyDriverList fakeNodes; - SyncPeriodicThread * syncThread {nullptr}; + SingleBusBroker * broker {nullptr}; }; } // namespace roboticslab diff --git a/libraries/YarpPlugins/CanBusBroker/DeviceDriverImpl.cpp b/libraries/YarpPlugins/CanBusBroker/DeviceDriverImpl.cpp index bd4b881ea..d6582de24 100644 --- a/libraries/YarpPlugins/CanBusBroker/DeviceDriverImpl.cpp +++ b/libraries/YarpPlugins/CanBusBroker/DeviceDriverImpl.cpp @@ -7,7 +7,6 @@ #include #include -#include "FutureTask.hpp" #include "ICanBusSharer.hpp" #include "LogComponent.hpp" @@ -22,49 +21,40 @@ bool CanBusBroker::open(yarp::os::Searchable & config) options.fromString(config.findGroup("common").toString(), false); // override global options options.fromString(config.toString(), false); // override common options - const auto * buses = options.find("buses").asList(); - - if (!buses) - { - yCError(CBB) << R"(Missing key "buses" or not a list)"; - return false; - } - - for (int i = 0; i < buses->size(); i++) + if (yarp::os::Value * v; options.check("bus", v, "CAN bus") && v->isString()) { - auto bus = buses->get(i).asString(); + auto bus = v->asString(); - if (!options.check(bus)) + if (yarp::os::Value * vv; options.check("nodes", vv, "CAN nodes") && vv->isList()) { - yCError(CBB) << "Missing CAN bus key:" << bus; - return false; - } + const auto * nodes = vv->asList(); - if (!options.find(bus).isList()) - { - yCError(CBB) << "Key" << bus << "must be a list"; - return false; - } + std::vector names; - const auto * nodes = options.find(bus).asList(); + for (int j = 0; j < nodes->size(); j++) + { + auto node = nodes->get(j).asString(); + names.push_back(node); + } - std::vector names; + broker = new SingleBusBroker(bus, names); - for (int j = 0; j < nodes->size(); j++) - { - auto node = nodes->get(j).asString(); - names.push_back(node); + if (!broker->configure(options)) + { + yCError(CBB) << "Unable to configure broker of CAN bus device" << bus; + return false; + } } - - auto * broker = new SingleBusBroker(bus, names); - brokers.push_back(broker); - - if (!broker->configure(options)) + else { - yCError(CBB) << "Unable to configure broker of CAN bus device" << bus; - return false; + yCError(CBB) << R"(Missing key "nodes" or not a list)"; } } + else + { + yCError(CBB) << R"(Missing key "bus" or not a string)"; + return false; + } if (yarp::os::Value * v; options.check("fakeNodes", v, "fake CAN nodes")) { @@ -89,9 +79,9 @@ bool CanBusBroker::open(yarp::os::Searchable & config) } } - if (options.check("syncPeriod", "SYNC message period (s)")) + if (yarp::os::Value * v; options.check("syncPeriod", v, "SYNC message period (s)")) { - auto syncPeriod = options.find("syncPeriod").asFloat64(); + auto syncPeriod = v->asFloat64(); if (syncPeriod <= 0.0) { @@ -99,34 +89,16 @@ bool CanBusBroker::open(yarp::os::Searchable & config) return false; } - FutureTaskFactory * taskFactory = nullptr; + auto & syncThread = getSyncThread(); - if (brokers.size() > 1) - { - taskFactory = new ParallelTaskFactory(brokers.size()); - } - else if (brokers.size() == 1) - { - taskFactory = new SequentialTaskFactory; - } + syncThread.registerBroker(broker); + syncThread.setPeriod(syncPeriod); - if (taskFactory) + if (yarp::os::Value * vv; options.check("syncObserver", vv, "synchronization signal observer") && vv->isBlob()) { - syncThread = new SyncPeriodicThread(brokers, taskFactory); // owns `taskFactory` - syncThread->setPeriod(syncPeriod); - - if (!syncThread->openPort("/sync:o")) - { - yCError(CBB) << "Unable to open synchronization port"; - return false; - } - - if (yarp::os::Value * v; options.check("syncObserver", v, "synchronization signal observer") && v->isBlob()) - { - yCDebug(CBB) << "Setting synchronization signal observer"; - auto * observer = *reinterpret_cast * const *>(v->asBlob()); - syncThread->setObserver(observer); - } + yCDebug(CBB) << "Setting synchronization signal observer"; + auto * observer = *reinterpret_cast * const *>(vv->asBlob()); + syncThread.setObserver(observer); } } else @@ -143,18 +115,12 @@ bool CanBusBroker::close() { bool ok = detachAll(); - if (syncThread) - { - delete syncThread; - syncThread = nullptr; - } - - for (auto * broker : brokers) + if (broker) { delete broker; + broker = nullptr; } - brokers.clear(); return ok; } diff --git a/libraries/YarpPlugins/CanBusBroker/IMultipleWrapperImpl.cpp b/libraries/YarpPlugins/CanBusBroker/IMultipleWrapperImpl.cpp index 6295b19fa..c356cd1d1 100644 --- a/libraries/YarpPlugins/CanBusBroker/IMultipleWrapperImpl.cpp +++ b/libraries/YarpPlugins/CanBusBroker/IMultipleWrapperImpl.cpp @@ -2,7 +2,7 @@ #include "CanBusBroker.hpp" -#include // std::any_of, std::find_if, std::for_each +#include // std::any_of, std::find_if #include // std::distance #include @@ -98,33 +98,23 @@ const yarp::dev::PolyDriverDescriptor * CanBusBroker::tryCreateFakeNode(const ya bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers) { - std::vector nodeNames; // flattened broker[i]->getNodeNames() - - std::for_each(brokers.begin(), brokers.end(), [&nodeNames](const auto * broker) - { nodeNames.insert(nodeNames.end(), - broker->getNodeNames().begin(), - broker->getNodeNames().end()); }); - - std::vector buses(brokers.size()); + const auto & nodeNames = broker->getNodeNames(); + const yarp::dev::PolyDriverDescriptor * bus = nullptr; std::vector nodes(nodeNames.size()); for (int i = 0; i < drivers.size(); i++) { const auto * driver = drivers[i]; - if (auto bus = std::find_if(brokers.begin(), brokers.end(), [driver](const auto * broker) - { return broker->getBusName() == driver->key; }); - bus != brokers.end()) + if (driver->key == broker->getBusName()) { - int index = std::distance(brokers.begin(), bus); - - if (buses[index] != nullptr) + if (bus != nullptr) { yCError(CBB) << "Bus device" << driver->key << "already attached"; return false; } - buses[index] = driver; + bus = driver; } else if (auto node = std::find_if(nodeNames.begin(), nodeNames.end(), [driver](const auto & name) { return name == driver->key; }); @@ -147,19 +137,9 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers) } } - if (std::any_of(buses.begin(), buses.end(), [](const auto * bus) { return bus == nullptr; })) + if (bus == nullptr) { - std::vector names; - - for (int i = 0; i < buses.size(); i++) - { - if (buses[i] == nullptr) - { - names.push_back(brokers[i]->getBusName()); - } - } - - yCError(CBB) << "Some bus devices are missing:" << names; + yCError(CBB) << "The bus device is missing:" << broker->getBusName(); return false; } @@ -188,18 +168,15 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers) } } - yCInfo(CBB) << "Attached" << buses.size() << "bus device(s) and" << nodes.size() << "node device(s)"; + yCInfo(CBB) << "Attached" << bus->key << "bus device and" << nodes.size() << "node device(s):" << nodeNames; - for (int i = 0; i < buses.size(); i++) + if (!broker->registerDevice(bus->poly)) { - if (!brokers[i]->registerDevice(buses[i]->poly)) - { - yCError(CBB) << "Unable to register bus device" << buses[i]->key; - return false; - } + yCError(CBB) << "Unable to register bus device" << bus->key; + return false; } - yCInfo(CBB) << "Registered bus devices"; + yCInfo(CBB) << "Registered bus device"; for (int i = 0; i < nodes.size(); i++) { @@ -219,24 +196,16 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers) return false; } - auto it = std::find_if(brokers.begin(), brokers.end(), [node](const auto * broker) - { const auto & names = broker->getNodeNames(); - return std::find(names.begin(), names.end(), node->key) != names.end(); }); - - auto * broker = *it; broker->getReader()->registerHandle(iCanBusSharer); iCanBusSharer->registerSender(broker->getWriter()->getDelegate()); } yCInfo(CBB) << "Registered node devices"; - for (int i = 0; i < buses.size(); i++) + if (!broker->startThreads()) { - if (!brokers[i]->startThreads()) - { - yCError(CBB) << "Unable to start CAN threads in" << brokers[i]->getBusName(); - return false; - } + yCError(CBB) << "Unable to start CAN threads in" << broker->getBusName(); + return false; } yCInfo(CBB) << "Started CAN threads"; @@ -251,15 +220,31 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers) } } + broker->markInitialized(true); + yCInfo(CBB) << "Initialized node devices"; - if (syncThread && !syncThread->isRunning() && !syncThread->start()) + if (auto & syncThread = getSyncThread(); !syncThread.getBrokers().empty()) { - yCError(CBB) << "Unable to start synchronization thread"; - return false; - } + if (!syncThread.openPort("/sync:o")) + { + yCError(CBB) << "Unable to open synchronization port"; + return false; + } - yCInfo(CBB) << "Started synchronization thread"; + if (!syncThread.isRunning()) + { + if (!syncThread.start()) + { + yCError(CBB) << "Unable to start synchronization thread"; + return false; + } + else + { + yCInfo(CBB) << "Started synchronization thread"; + } + } + } return true; } @@ -270,9 +255,15 @@ bool CanBusBroker::detachAll() { bool ok = true; - if (syncThread && syncThread->isRunning()) + if (broker) + { + broker->markInitialized(false); + } + + if (auto & syncThread = getSyncThread(); syncThread.isRunning()) { - syncThread->stop(); + syncThread.stop(); + syncThread.closePort(); } for (const auto & rawDevice : deviceMapper.getDevices()) @@ -288,7 +279,7 @@ bool CanBusBroker::detachAll() deviceMapper.clear(); - for (auto * broker : brokers) + if (broker) { ok &= broker->stopThreads(); ok &= broker->clearFilters(); diff --git a/libraries/YarpPlugins/CanBusBroker/SingleBusBroker.hpp b/libraries/YarpPlugins/CanBusBroker/SingleBusBroker.hpp index 5349f4528..62bc35bfd 100644 --- a/libraries/YarpPlugins/CanBusBroker/SingleBusBroker.hpp +++ b/libraries/YarpPlugins/CanBusBroker/SingleBusBroker.hpp @@ -3,6 +3,7 @@ #ifndef __SINGLE_BUS_BROKER_HPP__ #define __SINGLE_BUS_BROKER_HPP__ +#include #include #include #include @@ -82,6 +83,14 @@ class SingleBusBroker final : public yarp::os::TypedReaderCallbackinitialized = initialized; } + + //! Check if this instance is fully initialized. + bool isInitialized() const + { return initialized; } + private: //! Open remote CAN interface ports. bool createPorts(const std::string & prefix); @@ -109,6 +118,7 @@ class SingleBusBroker final : public yarp::os::TypedReaderCallback & _brokers, FutureTaskFactory * _taskFactory) - : yarp::os::PeriodicThread(1.0, yarp::os::ShouldUseSystemClock::Yes, yarp::os::PeriodicThreadClock::Absolute), - brokers(_brokers), - taskFactory(_taskFactory), - syncObserver(nullptr) +SyncPeriodicThread::SyncPeriodicThread() + : yarp::os::PeriodicThread(1.0, + yarp::os::ShouldUseSystemClock::Yes, + yarp::os::PeriodicThreadClock::Absolute) {} // ----------------------------------------------------------------------------- SyncPeriodicThread::~SyncPeriodicThread() +{ + delete taskFactory; +} + +// ----------------------------------------------------------------------------- + +void SyncPeriodicThread::registerBroker(SingleBusBroker * broker) +{ + brokers.push_back(broker); +} + +// ----------------------------------------------------------------------------- + +bool SyncPeriodicThread::openPort(const std::string & name) +{ + if (!syncPort.isOpen()) + { + syncPort.setWriteOnly(); + syncWriter.attach(syncPort); + return syncPort.open(name); + } + + return true; +} + +// ----------------------------------------------------------------------------- + +void SyncPeriodicThread::setObserver(TypedStateObserver * syncObserver) +{ + this->syncObserver = syncObserver; +} + +// ----------------------------------------------------------------------------- + +void SyncPeriodicThread::closePort() { if (syncPort.isOpen()) { syncPort.interrupt(); syncPort.close(); } +} - delete taskFactory; +// ----------------------------------------------------------------------------- + +void SyncPeriodicThread::beforeStart() +{ + if (brokers.size() > 1) + { + taskFactory = new ParallelTaskFactory(brokers.size()); + } + else if (brokers.size() == 1) + { + taskFactory = new SequentialTaskFactory; + } } // ----------------------------------------------------------------------------- -bool SyncPeriodicThread::openPort(const std::string & name) +bool SyncPeriodicThread::threadInit() { - syncPort.setWriteOnly(); - syncWriter.attach(syncPort); - return syncPort.open(name); + return taskFactory != nullptr; } // ----------------------------------------------------------------------------- @@ -48,7 +92,9 @@ void SyncPeriodicThread::run() for (auto * broker : brokers) { - task->add([broker, now] + if (broker->isInitialized()) + { + task->add([broker, now] { for (auto * handle : broker->getReader()->getHandles()) { @@ -59,6 +105,7 @@ void SyncPeriodicThread::run() broker->getWriter()->flush(); return true; }); + } } task->dispatch(); diff --git a/libraries/YarpPlugins/CanBusBroker/SyncPeriodicThread.hpp b/libraries/YarpPlugins/CanBusBroker/SyncPeriodicThread.hpp index f80b80a8c..94264cd16 100644 --- a/libraries/YarpPlugins/CanBusBroker/SyncPeriodicThread.hpp +++ b/libraries/YarpPlugins/CanBusBroker/SyncPeriodicThread.hpp @@ -28,26 +28,37 @@ namespace roboticslab class SyncPeriodicThread final : public yarp::os::PeriodicThread { public: - //! Constructor, manages the lifetime of @ref taskFactory. - SyncPeriodicThread(std::vector & brokers, FutureTaskFactory * taskFactory); + //! Constructor. + SyncPeriodicThread(); //! Destructor. ~SyncPeriodicThread() override; + //! Register broker. + void registerBroker(SingleBusBroker * broker); + + //! Retrieve registered brokers. + const auto & getBrokers() const + { return brokers; } + //! Open synchronization port. bool openPort(const std::string & name); + //! Close synchronization port. + void closePort(); + //! Set synchronization observer. - void setObserver(TypedStateObserver * syncObserver) - { this->syncObserver = syncObserver; } + void setObserver(TypedStateObserver * syncObserver); - //! Periodic task. +protected: + void beforeStart() override; + bool threadInit() override; void run() override; private: - std::vector & brokers; - FutureTaskFactory * taskFactory; - TypedStateObserver * syncObserver; + std::vector brokers; + FutureTaskFactory * taskFactory {nullptr}; + TypedStateObserver * syncObserver {nullptr}; yarp::os::Port syncPort; yarp::os::PortWriterBuffer syncWriter; };