Skip to content

Commit

Permalink
Update so pipelines can have child graph tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Levi-Armstrong committed Jul 31, 2024
1 parent b321b52 commit 874df1a
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class TaskComposerGraph : public TaskComposerNode
template <class Archive>
void serialize(Archive& ar, const unsigned int version); // NOLINT

std::unique_ptr<TaskComposerNodeInfo> runImpl(TaskComposerContext& context,
OptionalTaskComposerExecutor executor = std::nullopt) const override;

std::map<boost::uuids::uuid, TaskComposerNode::Ptr> nodes_;
std::vector<boost::uuids::uuid> terminals_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TESSERACT_COMMON_IGNORE_WARNINGS_PUSH
#include <memory>
#include <vector>
#include <map>
#include <optional>
#include <boost/uuid/uuid.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/export.hpp>
Expand All @@ -50,6 +51,8 @@ class Node;
namespace tesseract_planning
{
class TaskComposerDataStorage;
class TaskComposerContext;
class TaskComposerExecutor;

enum class TaskComposerNodeType
{
Expand All @@ -68,6 +71,9 @@ class TaskComposerNode
using UPtr = std::unique_ptr<TaskComposerNode>;
using ConstUPtr = std::unique_ptr<const TaskComposerNode>;

/** @brief Most task will not require a executor so making it optional */
using OptionalTaskComposerExecutor = std::optional<std::reference_wrapper<TaskComposerExecutor>>;

TaskComposerNode(std::string name = "TaskComposerNode",
TaskComposerNodeType type = TaskComposerNodeType::NODE,
TaskComposerNodePorts ports = TaskComposerNodePorts(),
Expand All @@ -82,6 +88,8 @@ class TaskComposerNode
TaskComposerNode(TaskComposerNode&&) = delete;
TaskComposerNode& operator=(TaskComposerNode&&) = delete;

int run(TaskComposerContext& context, OptionalTaskComposerExecutor executor = std::nullopt) const;

/** @brief Set the name of the node */
void setName(const std::string& name);

Expand Down Expand Up @@ -169,6 +177,9 @@ class TaskComposerNode
template <class Archive>
void serialize(Archive& ar, const unsigned int version); // NOLINT

virtual std::unique_ptr<TaskComposerNodeInfo> runImpl(TaskComposerContext& context,
OptionalTaskComposerExecutor executor = std::nullopt) const = 0;

/** @brief The name of the task */
std::string name_;

Expand Down Expand Up @@ -208,6 +219,9 @@ class TaskComposerNode
/** @brief The nodes ports definition */
TaskComposerNodePorts ports_;

/** @brief Indicate if task triggers abort */
bool trigger_abort_{ false };

/** @brief This will create a UUID string with no hyphens used when creating dot graph */
static std::string toString(const boost::uuids::uuid& u, const std::string& prefix = "");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ class TaskComposerNodeInfo
private:
friend struct tesseract_common::Serialization;
friend class boost::serialization::access;
friend class TaskComposerTask;
friend class TaskComposerPipeline;
friend class TaskComposerNode;

/** @brief Indicate if task was not ran because abort flag was enabled */
bool aborted_{ false };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
TESSERACT_COMMON_IGNORE_WARNINGS_PUSH
#include <string>
#include <memory>
#include <optional>
TESSERACT_COMMON_IGNORE_WARNINGS_POP

#include <tesseract_task_composer/core/task_composer_graph.h>
Expand All @@ -53,8 +52,6 @@ class TaskComposerPipeline : public TaskComposerGraph
using ConstPtr = std::shared_ptr<const TaskComposerPipeline>;
using UPtr = std::unique_ptr<TaskComposerPipeline>;
using ConstUPtr = std::unique_ptr<const TaskComposerPipeline>;
/** @brief Most task will not require a executor so making it optional */
using OptionalTaskComposerExecutor = std::optional<std::reference_wrapper<TaskComposerExecutor>>;

TaskComposerPipeline(std::string name = "TaskComposerPipeline");
TaskComposerPipeline(std::string name, bool conditional);
Expand All @@ -65,8 +62,6 @@ class TaskComposerPipeline : public TaskComposerGraph
TaskComposerPipeline(TaskComposerPipeline&&) = delete;
TaskComposerPipeline& operator=(TaskComposerPipeline&&) = delete;

int run(TaskComposerContext& context, OptionalTaskComposerExecutor executor = std::nullopt) const;

bool operator==(const TaskComposerPipeline& rhs) const;
bool operator!=(const TaskComposerPipeline& rhs) const;

Expand All @@ -77,8 +72,8 @@ class TaskComposerPipeline : public TaskComposerGraph
template <class Archive>
void serialize(Archive& ar, const unsigned int version); // NOLINT

std::unique_ptr<TaskComposerNodeInfo> runImpl(TaskComposerContext& context,
OptionalTaskComposerExecutor executor = std::nullopt) const;
std::unique_ptr<TaskComposerNodeInfo>
runImpl(TaskComposerContext& context, OptionalTaskComposerExecutor executor = std::nullopt) const override final;

void runRecursive(const TaskComposerNode& node,
TaskComposerContext& context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
TESSERACT_COMMON_IGNORE_WARNINGS_PUSH
#include <string>
#include <memory>
#include <optional>
TESSERACT_COMMON_IGNORE_WARNINGS_POP

#include <tesseract_task_composer/core/task_composer_node.h>
Expand All @@ -47,9 +46,6 @@ class TaskComposerTask : public TaskComposerNode
using UPtr = std::unique_ptr<TaskComposerTask>;
using ConstUPtr = std::unique_ptr<const TaskComposerTask>;

/** @brief Most task will not require a executor so making it optional */
using OptionalTaskComposerExecutor = std::optional<std::reference_wrapper<TaskComposerExecutor>>;

TaskComposerTask();
explicit TaskComposerTask(std::string name, TaskComposerNodePorts ports, bool conditional);
explicit TaskComposerTask(std::string name, TaskComposerNodePorts ports, const YAML::Node& config);
Expand All @@ -68,20 +64,12 @@ class TaskComposerTask : public TaskComposerNode
*/
void setTriggerAbort(bool enable);

int run(TaskComposerContext& context, OptionalTaskComposerExecutor executor = std::nullopt) const;

protected:
/** @brief Indicate if task triggers abort */
bool trigger_abort_{ false };

friend struct tesseract_common::Serialization;
friend class boost::serialization::access;

template <class Archive>
void serialize(Archive& ar, const unsigned int version); // NOLINT

virtual std::unique_ptr<TaskComposerNodeInfo> runImpl(TaskComposerContext& context,
OptionalTaskComposerExecutor executor = std::nullopt) const = 0;
};

} // namespace tesseract_planning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ TESSERACT_COMMON_IGNORE_WARNINGS_POP
#include <tesseract_task_composer/core/task_composer_node.h>
#include <tesseract_task_composer/core/task_composer_node_info.h>
#include <tesseract_task_composer/core/task_composer_context.h>
#include <tesseract_task_composer/core/test_suite/test_task.h>
#include <tesseract_task_composer/core/test_suite/task_composer_serialization_utils.hpp>

namespace tesseract_planning::test_suite
Expand All @@ -57,7 +58,7 @@ void runTaskComposerNodeInfoTest()
}

{ // Constructor
TaskComposerNode node;
test_suite::DummyTaskComposerNode node;
T node_info(node);
EXPECT_EQ(node_info.return_value, -1);
EXPECT_EQ(node_info.status_code, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class TaskComposerPluginFactory;

namespace tesseract_planning::test_suite
{
class DummyTaskComposerNode : public TaskComposerNode
{
using TaskComposerNode::TaskComposerNode;

std::unique_ptr<TaskComposerNodeInfo>
runImpl(TaskComposerContext& context, OptionalTaskComposerExecutor /*executor*/ = std::nullopt) const override final;
};

class TestTask : public TaskComposerTask
{
public:
Expand Down
58 changes: 55 additions & 3 deletions tesseract_task_composer/core/src/task_composer_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ TESSERACT_COMMON_IGNORE_WARNINGS_PUSH
#include <tesseract_common/serialization.h>
#include <tesseract_common/plugin_info.h>
#include <tesseract_common/yaml_utils.h>
#include <tesseract_common/timer.h>
TESSERACT_COMMON_IGNORE_WARNINGS_POP

#include <tesseract_task_composer/core/task_composer_context.h>
#include <tesseract_task_composer/core/task_composer_future.h>
#include <tesseract_task_composer/core/task_composer_executor.h>
#include <tesseract_task_composer/core/task_composer_graph.h>
#include <tesseract_task_composer/core/task_composer_task.h>
#include <tesseract_task_composer/core/task_composer_pipeline.h>
Expand Down Expand Up @@ -214,6 +218,55 @@ TaskComposerGraph::TaskComposerGraph(std::string name,
throw std::runtime_error(is_valid.second);
}

std::unique_ptr<TaskComposerNodeInfo> TaskComposerGraph::runImpl(TaskComposerContext& context,
OptionalTaskComposerExecutor executor) const
{
if (terminals_.empty())
throw std::runtime_error("TaskComposerGraph, with name '" + name_ + "' does not have terminals!");

tesseract_common::Timer timer;
timer.start();

TaskComposerFuture::UPtr future = executor.value().get().run(*this, context.data_storage, context.dotgraph);
future->wait();

// Merge child context data into parent context
context.task_infos.mergeInfoMap(std::move(future->context->task_infos));
if (future->context->isAborted())
context.abort(future->context->task_infos.getAbortingNode());

auto info = std::make_unique<TaskComposerNodeInfo>(*this);
auto info_map = context.task_infos.getInfoMap();
if (context.dotgraph)
{
std::stringstream dot_graph;
dot_graph << "subgraph cluster_" << toString(uuid_) << " {\n color=black;\n label = \"" << name_ << "\\n("
<< uuid_str_ << ")\";\n";
dump(dot_graph, this, info_map); // dump the graph including dynamic tasks
dot_graph << "}\n";
info->dotgraph = dot_graph.str();
}

for (std::size_t i = 0; i < terminals_.size(); ++i)
{
auto node_info = context.task_infos.getInfo(terminals_[i]);
if (node_info != nullptr)
{
timer.stop();
info->input_keys = input_keys_;
info->output_keys = output_keys_;
info->return_value = static_cast<int>(i);
info->color = node_info->color;
info->status_code = node_info->status_code;
info->status_message = node_info->status_message;
info->elapsed_time = timer.elapsedSeconds();
return info;
}
}

throw std::runtime_error("TaskComposerGraph, with name '" + name_ + "' has no node info for any of the leaf nodes!");
}

boost::uuids::uuid TaskComposerGraph::addNode(std::unique_ptr<TaskComposerNode> task_node)
{
boost::uuids::uuid uuid = task_node->getUUID();
Expand Down Expand Up @@ -392,10 +445,9 @@ TaskComposerGraph::dump(std::ostream& os,
}
}

if (type_ == TaskComposerNodeType::PIPELINE)
if (type_ == TaskComposerNodeType::GRAPH || type_ == TaskComposerNodeType::PIPELINE)
{
const auto& pipeline = static_cast<const TaskComposerPipeline&>(*this);
if (pipeline.isConditional())
if (conditional_)
{
int return_value = -1;

Expand Down
57 changes: 57 additions & 0 deletions tesseract_task_composer/core/src/task_composer_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ TESSERACT_COMMON_IGNORE_WARNINGS_PUSH
#include <boost/uuid/uuid_serialize.hpp>
#include <yaml-cpp/yaml.h>
#include <tesseract_common/serialization.h>
#include <tesseract_common/timer.h>
TESSERACT_COMMON_IGNORE_WARNINGS_POP

#include <tesseract_task_composer/core/task_composer_context.h>
#include <tesseract_task_composer/core/task_composer_node.h>
#include <tesseract_task_composer/core/task_composer_node_info.h>
#include <tesseract_task_composer/core/task_composer_data_storage.h>
Expand Down Expand Up @@ -132,6 +134,59 @@ TaskComposerNode::TaskComposerNode(std::string name,
validatePorts();
}

int TaskComposerNode::run(TaskComposerContext& context, OptionalTaskComposerExecutor executor) const
{
auto start_time = std::chrono::system_clock::now();
if (context.isAborted())
{
auto info = std::make_unique<TaskComposerNodeInfo>(*this);
info->start_time = start_time;
info->input_keys = input_keys_;
info->output_keys = output_keys_;
info->return_value = 0;
info->color = "white";
info->status_code = 0;
info->status_message = "Aborted";
info->aborted_ = true;
context.task_infos.addInfo(std::move(info));
return 0;
}

tesseract_common::Timer timer;
TaskComposerNodeInfo::UPtr results;
timer.start();
try
{
results = runImpl(context, executor);
}
catch (const std::exception& e)
{
results = std::make_unique<TaskComposerNodeInfo>(*this);
results->color = "red";
results->status_code = -1;
results->status_message = "Exception thrown: " + std::string(e.what());
results->return_value = 0;
}
timer.stop();
results->input_keys = input_keys_;
results->output_keys = output_keys_;
results->start_time = start_time;
results->elapsed_time = timer.elapsedSeconds();

int value = results->return_value;
assert(value >= 0);

// Call abort if required and is a task
if (type_ == TaskComposerNodeType::TASK && trigger_abort_ && !context.isAborted())
{
results->status_message += " (Abort Triggered)";
context.abort(uuid_);
}

context.task_infos.addInfo(std::move(results));
return value;
}

void TaskComposerNode::setName(const std::string& name) { name_ = name; }
const std::string& TaskComposerNode::getName() const { return name_; }

Expand Down Expand Up @@ -424,6 +479,7 @@ bool TaskComposerNode::operator==(const TaskComposerNode& rhs) const
equal &= output_keys_ == rhs.output_keys_;
equal &= conditional_ == rhs.conditional_;
equal &= ports_ == rhs.ports_;
equal &= trigger_abort_ == rhs.trigger_abort_;
return equal;
}
bool TaskComposerNode::operator!=(const TaskComposerNode& rhs) const { return !operator==(rhs); }
Expand All @@ -442,6 +498,7 @@ void TaskComposerNode::serialize(Archive& ar, const unsigned int /*version*/)
ar& boost::serialization::make_nvp("output_keys", output_keys_);
ar& boost::serialization::make_nvp("conditional", conditional_);
ar& boost::serialization::make_nvp("ports", ports_);
ar& boost::serialization::make_nvp("trigger_abort", trigger_abort_);
}

std::string TaskComposerNode::toString(const boost::uuids::uuid& u, const std::string& prefix)
Expand Down
Loading

0 comments on commit 874df1a

Please sign in to comment.