Skip to content

Commit

Permalink
Only allow a single CAN bus per broker
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterBowman committed Jun 25, 2024
1 parent 0f163f7 commit 0279a50
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 146 deletions.
11 changes: 7 additions & 4 deletions libraries/YarpPlugins/CanBusBroker/CanBusBroker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#ifndef __CAN_BUS_BROKER_HPP__
#define __CAN_BUS_BROKER_HPP__

#include <vector>

#include <yarp/dev/IMultipleWrapper.h>
#include <yarp/dev/ControlBoardInterfaces.h>
#include <yarp/dev/MultipleAnalogSensorsInterfaces.h>
Expand Down Expand Up @@ -416,12 +414,17 @@ class CanBusBroker : public yarp::dev::DeviceDriver,
bool getThreeAxisMagnetometerMeasure(std::size_t sens_index, yarp::sig::Vector & out, double & timestamp) const override;

private:
static SyncPeriodicThread & getSyncThread()
{
static SyncPeriodicThread instance;
return instance;
}

const yarp::dev::PolyDriverDescriptor * tryCreateFakeNode(const yarp::dev::PolyDriverDescriptor * driver);

DeviceMapper deviceMapper;
std::vector<SingleBusBroker *> brokers;
yarp::dev::PolyDriverList fakeNodes;
SyncPeriodicThread * syncThread {nullptr};
SingleBusBroker * broker {nullptr};
};

} // namespace roboticslab
Expand Down
102 changes: 34 additions & 68 deletions libraries/YarpPlugins/CanBusBroker/DeviceDriverImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <yarp/os/LogStream.h>
#include <yarp/os/Value.h>

#include "FutureTask.hpp"
#include "ICanBusSharer.hpp"
#include "LogComponent.hpp"

Expand All @@ -22,49 +21,40 @@ bool CanBusBroker::open(yarp::os::Searchable & config)
options.fromString(config.findGroup("common").toString(), false); // override global options
options.fromString(config.toString(), false); // override common options

const auto * buses = options.find("buses").asList();

if (!buses)
{
yCError(CBB) << R"(Missing key "buses" or not a list)";
return false;
}

for (int i = 0; i < buses->size(); i++)
if (yarp::os::Value * v; options.check("bus", v, "CAN bus") && v->isString())
{
auto bus = buses->get(i).asString();
auto bus = v->asString();

if (!options.check(bus))
if (yarp::os::Value * vv; options.check("nodes", vv, "CAN nodes") && vv->isList())
{
yCError(CBB) << "Missing CAN bus key:" << bus;
return false;
}
const auto * nodes = vv->asList();

if (!options.find(bus).isList())
{
yCError(CBB) << "Key" << bus << "must be a list";
return false;
}
std::vector<std::string> names;

const auto * nodes = options.find(bus).asList();
for (int j = 0; j < nodes->size(); j++)
{
auto node = nodes->get(j).asString();
names.push_back(node);
}

std::vector<std::string> names;
broker = new SingleBusBroker(bus, names);

for (int j = 0; j < nodes->size(); j++)
{
auto node = nodes->get(j).asString();
names.push_back(node);
if (!broker->configure(options))
{
yCError(CBB) << "Unable to configure broker of CAN bus device" << bus;
return false;
}
}

auto * broker = new SingleBusBroker(bus, names);
brokers.push_back(broker);

if (!broker->configure(options))
else
{
yCError(CBB) << "Unable to configure broker of CAN bus device" << bus;
return false;
yCError(CBB) << R"(Missing key "nodes" or not a list)";
}
}
else
{
yCError(CBB) << R"(Missing key "bus" or not a string)";
return false;
}

if (yarp::os::Value * v; options.check("fakeNodes", v, "fake CAN nodes"))
{
Expand All @@ -89,44 +79,26 @@ bool CanBusBroker::open(yarp::os::Searchable & config)
}
}

if (options.check("syncPeriod", "SYNC message period (s)"))
if (yarp::os::Value * v; options.check("syncPeriod", v, "SYNC message period (s)"))
{
auto syncPeriod = options.find("syncPeriod").asFloat64();
auto syncPeriod = v->asFloat64();

if (syncPeriod <= 0.0)
{
yCError(CBB) << "Invalid --syncPeriod:" << syncPeriod;
return false;
}

FutureTaskFactory * taskFactory = nullptr;
auto & syncThread = getSyncThread();

if (brokers.size() > 1)
{
taskFactory = new ParallelTaskFactory(brokers.size());
}
else if (brokers.size() == 1)
{
taskFactory = new SequentialTaskFactory;
}
syncThread.registerBroker(broker);
syncThread.setPeriod(syncPeriod);

if (taskFactory)
if (yarp::os::Value * vv; options.check("syncObserver", vv, "synchronization signal observer") && vv->isBlob())
{
syncThread = new SyncPeriodicThread(brokers, taskFactory); // owns `taskFactory`
syncThread->setPeriod(syncPeriod);

if (!syncThread->openPort("/sync:o"))
{
yCError(CBB) << "Unable to open synchronization port";
return false;
}

if (yarp::os::Value * v; options.check("syncObserver", v, "synchronization signal observer") && v->isBlob())
{
yCDebug(CBB) << "Setting synchronization signal observer";
auto * observer = *reinterpret_cast<TypedStateObserver<double> * const *>(v->asBlob());
syncThread->setObserver(observer);
}
yCDebug(CBB) << "Setting synchronization signal observer";
auto * observer = *reinterpret_cast<TypedStateObserver<double> * const *>(vv->asBlob());
syncThread.setObserver(observer);
}
}
else
Expand All @@ -143,18 +115,12 @@ bool CanBusBroker::close()
{
bool ok = detachAll();

if (syncThread)
{
delete syncThread;
syncThread = nullptr;
}

for (auto * broker : brokers)
if (broker)
{
delete broker;
broker = nullptr;
}

brokers.clear();
return ok;
}

Expand Down
101 changes: 46 additions & 55 deletions libraries/YarpPlugins/CanBusBroker/IMultipleWrapperImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "CanBusBroker.hpp"

#include <algorithm> // std::any_of, std::find_if, std::for_each
#include <algorithm> // std::any_of, std::find_if
#include <iterator> // std::distance

#include <yarp/os/Bottle.h>
Expand Down Expand Up @@ -98,33 +98,23 @@ const yarp::dev::PolyDriverDescriptor * CanBusBroker::tryCreateFakeNode(const ya

bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers)
{
std::vector<std::string> nodeNames; // flattened broker[i]->getNodeNames()

std::for_each(brokers.begin(), brokers.end(), [&nodeNames](const auto * broker)
{ nodeNames.insert(nodeNames.end(),
broker->getNodeNames().begin(),
broker->getNodeNames().end()); });

std::vector<const yarp::dev::PolyDriverDescriptor *> buses(brokers.size());
const auto & nodeNames = broker->getNodeNames();
const yarp::dev::PolyDriverDescriptor * bus = nullptr;
std::vector<const yarp::dev::PolyDriverDescriptor *> nodes(nodeNames.size());

for (int i = 0; i < drivers.size(); i++)
{
const auto * driver = drivers[i];

if (auto bus = std::find_if(brokers.begin(), brokers.end(), [driver](const auto * broker)
{ return broker->getBusName() == driver->key; });
bus != brokers.end())
if (driver->key == broker->getBusName())
{
int index = std::distance(brokers.begin(), bus);

if (buses[index] != nullptr)
if (bus != nullptr)
{
yCError(CBB) << "Bus device" << driver->key << "already attached";
return false;
}

buses[index] = driver;
bus = driver;
}
else if (auto node = std::find_if(nodeNames.begin(), nodeNames.end(), [driver](const auto & name)
{ return name == driver->key; });
Expand All @@ -147,19 +137,9 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers)
}
}

if (std::any_of(buses.begin(), buses.end(), [](const auto * bus) { return bus == nullptr; }))
if (bus == nullptr)
{
std::vector<std::string> names;

for (int i = 0; i < buses.size(); i++)
{
if (buses[i] == nullptr)
{
names.push_back(brokers[i]->getBusName());
}
}

yCError(CBB) << "Some bus devices are missing:" << names;
yCError(CBB) << "The bus device is missing:" << broker->getBusName();
return false;
}

Expand Down Expand Up @@ -188,18 +168,15 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers)
}
}

yCInfo(CBB) << "Attached" << buses.size() << "bus device(s) and" << nodes.size() << "node device(s)";
yCInfo(CBB) << "Attached" << bus->key << "bus device and" << nodes.size() << "node device(s):" << nodeNames;

for (int i = 0; i < buses.size(); i++)
if (!broker->registerDevice(bus->poly))
{
if (!brokers[i]->registerDevice(buses[i]->poly))
{
yCError(CBB) << "Unable to register bus device" << buses[i]->key;
return false;
}
yCError(CBB) << "Unable to register bus device" << bus->key;
return false;
}

yCInfo(CBB) << "Registered bus devices";
yCInfo(CBB) << "Registered bus device";

for (int i = 0; i < nodes.size(); i++)
{
Expand All @@ -219,24 +196,16 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers)
return false;
}

auto it = std::find_if(brokers.begin(), brokers.end(), [node](const auto * broker)
{ const auto & names = broker->getNodeNames();
return std::find(names.begin(), names.end(), node->key) != names.end(); });

auto * broker = *it;
broker->getReader()->registerHandle(iCanBusSharer);
iCanBusSharer->registerSender(broker->getWriter()->getDelegate());
}

yCInfo(CBB) << "Registered node devices";

for (int i = 0; i < buses.size(); i++)
if (!broker->startThreads())
{
if (!brokers[i]->startThreads())
{
yCError(CBB) << "Unable to start CAN threads in" << brokers[i]->getBusName();
return false;
}
yCError(CBB) << "Unable to start CAN threads in" << broker->getBusName();
return false;
}

yCInfo(CBB) << "Started CAN threads";
Expand All @@ -251,15 +220,31 @@ bool CanBusBroker::attachAll(const yarp::dev::PolyDriverList & drivers)
}
}

broker->markInitialized(true);

yCInfo(CBB) << "Initialized node devices";

if (syncThread && !syncThread->isRunning() && !syncThread->start())
if (auto & syncThread = getSyncThread(); !syncThread.getBrokers().empty())
{
yCError(CBB) << "Unable to start synchronization thread";
return false;
}
if (!syncThread.openPort("/sync:o"))
{
yCError(CBB) << "Unable to open synchronization port";
return false;
}

yCInfo(CBB) << "Started synchronization thread";
if (!syncThread.isRunning())
{
if (!syncThread.start())
{
yCError(CBB) << "Unable to start synchronization thread";
return false;
}
else
{
yCInfo(CBB) << "Started synchronization thread";
}
}
}

return true;
}
Expand All @@ -270,9 +255,15 @@ bool CanBusBroker::detachAll()
{
bool ok = true;

if (syncThread && syncThread->isRunning())
if (broker)
{
broker->markInitialized(false);
}

if (auto & syncThread = getSyncThread(); syncThread.isRunning())
{
syncThread->stop();
syncThread.stop();
syncThread.closePort();
}

for (const auto & rawDevice : deviceMapper.getDevices())
Expand All @@ -288,7 +279,7 @@ bool CanBusBroker::detachAll()

deviceMapper.clear();

for (auto * broker : brokers)
if (broker)
{
ok &= broker->stopThreads();
ok &= broker->clearFilters();
Expand Down
Loading

0 comments on commit 0279a50

Please sign in to comment.