Skip to content

Commit

Permalink
Refs #21703: Apply rev suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: eProsima <[email protected]>
  • Loading branch information
JesusPoderoso committed Oct 10, 2024
1 parent e6a335f commit 119538d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ class OrchestratorNode
/**
* @brief Used to retrieve the associated OrchestratorNodeHandle.
*/
inline OrchestratorNodeHandle& get_handler()
inline OrchestratorNodeHandle* get_handler() const
{
return *handler_;
return handler_;
}

/**
Expand All @@ -188,10 +188,16 @@ class OrchestratorNode
*/
void spin();

/**
* @brief Remove all Fast DDS entities and clean up the OrchestratorNode and OrchestratorNodeHandle.
*
*/
void destroy();

/**
* @brief Stops the execution of the node.
*/
static void terminate();
void terminate();

protected:

Expand Down Expand Up @@ -256,8 +262,8 @@ class OrchestratorNode

};

static std::condition_variable spin_cv_;
static std::atomic_bool terminate_;
std::condition_variable spin_cv_;
std::atomic_bool terminate_;

std::unique_ptr<OrchestratorParticipantListener> participant_listener_;

Expand Down
4 changes: 2 additions & 2 deletions sustainml_cpp/src/cpp/orchestrator/ModuleNodeProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ ModuleNodeProxy::~ModuleNodeProxy()
void ModuleNodeProxy::notify_status_change()
{
std::lock_guard<std::mutex> lock(orchestrator_->get_mutex());
OrchestratorNodeHandle* handler_ptr = &(orchestrator_->get_handler());
OrchestratorNodeHandle* handler_ptr = orchestrator_->get_handler();
if (handler_ptr != nullptr)
{
handler_ptr->on_node_status_change(node_id_, status_);
Expand All @@ -208,7 +208,7 @@ void ModuleNodeProxy::notify_new_node_ouput()
store_data_in_db();
void* untyped_data = get_tmp_untyped_data();
std::lock_guard<std::mutex> lock(orchestrator_->get_mutex());
OrchestratorNodeHandle* handler_ptr = &(orchestrator_->get_handler());
OrchestratorNodeHandle* handler_ptr = orchestrator_->get_handler();
if (handler_ptr != nullptr)
{
handler_ptr->on_new_node_output(node_id_, untyped_data);
Expand Down
58 changes: 33 additions & 25 deletions sustainml_cpp/src/cpp/orchestrator/OrchestratorNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ void OrchestratorNode::OrchestratorParticipantListener::on_participant_discovery
}
}

std::atomic<bool> OrchestratorNode::terminate_(false);
std::condition_variable OrchestratorNode::spin_cv_;

OrchestratorNode::OrchestratorNode(
OrchestratorNodeHandle& handle,
uint32_t domain)
Expand All @@ -119,34 +116,45 @@ OrchestratorNode::OrchestratorNode(

OrchestratorNode::~OrchestratorNode()
{
std::lock_guard<std::mutex> lock(mtx_);
for (size_t i = 0; i < (size_t)NodeID::MAX; i++)
destroy();
}

void OrchestratorNode::destroy()
{
if (initialized_.load())
{
std::lock_guard<std::mutex> lock(proxies_mtx_);
if (node_proxies_[i] != nullptr)
std::lock_guard<std::mutex> lock(mtx_);
for (size_t i = 0; i < (size_t)NodeID::MAX; i++)
{
delete node_proxies_[i];
std::lock_guard<std::mutex> lock(proxies_mtx_);
if (node_proxies_[i] != nullptr)
{
delete node_proxies_[i];
}
}
}

if (sub_ != nullptr)
{
sub_->delete_contained_entities();
}
if (sub_ != nullptr)
{
sub_->delete_contained_entities();
}

if (pub_ != nullptr)
{
pub_->delete_contained_entities();
}
if (pub_ != nullptr)
{
pub_->delete_contained_entities();
}

if (participant_ != nullptr)
{
participant_->delete_contained_entities();
}
if (participant_ != nullptr)
{
participant_->delete_contained_entities();
}

DomainParticipantFactory::get_instance()->delete_participant(participant_);

DomainParticipantFactory::get_instance()->delete_participant(participant_);
delete task_man_;

delete task_man_;
handler_ = nullptr;
initialized_.store(false);
}
}

void OrchestratorNode::print_db()
Expand Down Expand Up @@ -470,7 +478,6 @@ void OrchestratorNode::send_control_command(
void OrchestratorNode::spin()
{
EPROSIMA_LOG_INFO(ORCHESTRATOR, "Spinning Orchestrator... ");

std::unique_lock<std::mutex> lock(mtx_);
spin_cv_.wait(lock, [&]
{
Expand All @@ -481,9 +488,10 @@ void OrchestratorNode::spin()
void OrchestratorNode::terminate()
{
terminate_.store(true);
destroy();
spin_cv_.notify_all();
}

} // namespace orchestrator
} // namespace orchestrator
} // namespace sustainml

Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ static TestOrchestratorNodeHandle::DataCollection nodes_ready_expected_data =

TEST(OrchestratorNode, OrchestratorInitializesProperlyWhenNodesAreALive)
{
TestOrchestratorNodeHandle* tonh = new TestOrchestratorNodeHandle();
std::shared_ptr<TestOrchestratorNodeHandle> tonh = std::make_shared<TestOrchestratorNodeHandle>();

tonh->prepare_expected_data(nodes_ready_expected_data);

orchestrator::OrchestratorNode orchestrator(*tonh);
orchestrator::OrchestratorNode orchestrator(*(tonh.get()));

MLModelMetadataManagedNode ml_met_node;
MLModelManagedNode ml_node;
Expand All @@ -111,11 +111,12 @@ TEST(OrchestratorNode, OrchestratorInitializesProperlyWhenNodesAreALive)
hw_cons_node.start();

ASSERT_TRUE(tonh->wait_for_data(std::chrono::seconds(5)));
orchestrator.destroy();
}

TEST(OrchestratorNode, AlateJoinerOrchestratorInitializesProperly)
{
TestOrchestratorNodeHandle* tonh = new TestOrchestratorNodeHandle();
std::shared_ptr<TestOrchestratorNodeHandle> tonh = std::make_shared<TestOrchestratorNodeHandle>();

TestOrchestratorNodeHandle::DataCollection expected_data =
{
Expand Down Expand Up @@ -145,16 +146,17 @@ TEST(OrchestratorNode, AlateJoinerOrchestratorInitializesProperly)

std::this_thread::sleep_for(std::chrono::seconds(2));

orchestrator::OrchestratorNode orchestrator(*tonh);
orchestrator::OrchestratorNode orchestrator(*(tonh.get()));

ASSERT_TRUE(tonh->wait_for_data(std::chrono::seconds(5)));
orchestrator.destroy();
}

TEST(OrchestratorNode, OrchestratorReceivesNodeOutputs)
{
TestOrchestratorNodeHandle* tonh = new TestOrchestratorNodeHandle();
std::shared_ptr<TestOrchestratorNodeHandle> tonh = std::make_shared<TestOrchestratorNodeHandle>();

orchestrator::OrchestratorNode orchestrator(*tonh);
orchestrator::OrchestratorNode orchestrator(*(tonh.get()));

MLModelMetadataManagedNode te_node;
MLModelManagedNode ml_node;
Expand Down Expand Up @@ -196,13 +198,14 @@ TEST(OrchestratorNode, OrchestratorReceivesNodeOutputs)
orchestrator.start_task(task.first, task.second);

ASSERT_TRUE(tonh->wait_for_data(std::chrono::seconds(5)));
orchestrator.destroy();
}

TEST(OrchestratorNode, OrchestratorGetTaskData)
{
TestOrchestratorNodeHandle* tonh = new TestOrchestratorNodeHandle();
std::shared_ptr<TestOrchestratorNodeHandle> tonh = std::make_shared<TestOrchestratorNodeHandle>();

orchestrator::OrchestratorNode orchestrator(*tonh);
orchestrator::OrchestratorNode orchestrator(*(tonh.get()));

MLModelMetadataManagedNode te_node;
MLModelManagedNode ml_node;
Expand Down Expand Up @@ -261,13 +264,14 @@ TEST(OrchestratorNode, OrchestratorGetTaskData)
ASSERT_EQ(((types::MLModelMetadata*)enc_task)->task_id().problem_id(), 2);
ASSERT_EQ(orchestrator.get_task_data({2, 1}, NodeID::ID_HW_RESOURCES, hw), RetCode_t::RETCODE_OK);
ASSERT_EQ(((types::HWResource*)hw)->task_id().problem_id(), 2);
orchestrator.destroy();
}

TEST(OrchestratorNode, OrchestratorGetNodeStatus)
{
TestOrchestratorNodeHandle* tonh = new TestOrchestratorNodeHandle();
std::shared_ptr<TestOrchestratorNodeHandle> tonh = std::make_shared<TestOrchestratorNodeHandle>();

orchestrator::OrchestratorNode orchestrator(*tonh);
orchestrator::OrchestratorNode orchestrator(*(tonh.get()));

MLModelMetadataManagedNode te_node;
MLModelManagedNode ml_node;
Expand Down Expand Up @@ -322,13 +326,14 @@ TEST(OrchestratorNode, OrchestratorGetNodeStatus)
ASSERT_EQ(status->node_status(), Status::NODE_IDLE);
orchestrator.get_node_status(NodeID::ID_APP_REQUIREMENTS, status);
ASSERT_EQ(status->node_status(), Status::NODE_IDLE);
orchestrator.destroy();
}

TEST(OrchestratorNode, OrchestratorTaskIteration)
{
TestOrchestratorNodeHandle* tonh = new TestOrchestratorNodeHandle();
std::shared_ptr<TestOrchestratorNodeHandle> tonh = std::make_shared<TestOrchestratorNodeHandle>();

orchestrator::OrchestratorNode orchestrator(*tonh);
orchestrator::OrchestratorNode orchestrator(*(tonh.get()));

MLModelMetadataManagedNode te_node;
MLModelManagedNode ml_node;
Expand Down Expand Up @@ -420,13 +425,14 @@ TEST(OrchestratorNode, OrchestratorTaskIteration)
ASSERT_EQ(1, carbon_iterated_data->task_id().problem_id());
ASSERT_EQ(2, carbon_iterated_data->task_id().iteration_id());
ASSERT_GT(carbon_iterated_data->energy_consumption(), 300);
orchestrator.destroy();
}

TEST(OrchestratorNode, OrchestratorGetTaskDataDoesNotAccumulate)
{
TestOrchestratorNodeHandle* tonh = new TestOrchestratorNodeHandle();
std::shared_ptr<TestOrchestratorNodeHandle> tonh = std::make_shared<TestOrchestratorNodeHandle>();

orchestrator::OrchestratorNode orchestrator(*tonh);
orchestrator::OrchestratorNode orchestrator(*(tonh.get()));

MLModelMetadataManagedNode te_node;
MLModelManagedNode ml_node;
Expand Down Expand Up @@ -497,4 +503,5 @@ TEST(OrchestratorNode, OrchestratorGetTaskDataDoesNotAccumulate)
orchestrator.start_task(task.first, task.second);

ASSERT_TRUE(tonh->wait_for_data(std::chrono::seconds(10)));
orchestrator.destroy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ int main(
int argc,
char** argv)
{
SimpleOrchestratorNodeHandle* sonh = new SimpleOrchestratorNodeHandle();
std::shared_ptr<SimpleOrchestratorNodeHandle> sonh = std::make_shared<SimpleOrchestratorNodeHandle>();

// NodeID, <Node Status, Output counter>
SimpleOrchestratorNodeHandle::DataCollection expected_data_nodes_ready =
Expand All @@ -107,7 +107,7 @@ int main(

sonh->prepare_expected_data(expected_data_nodes_ready);

sustainml::orchestrator::OrchestratorNode orchestrator(*sonh);
sustainml::orchestrator::OrchestratorNode orchestrator(*(sonh.get()));

// First, wait for all nodes to be ready
sonh->wait_for_data(std::chrono::hours(24));
Expand All @@ -123,6 +123,7 @@ int main(

// Now all nodes should have publish one output i.e the one for {1,1}
sonh->wait_for_data(std::chrono::hours(24));
orchestrator.destroy();

return 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ def spin(self):
self.node_.spin()

# Proxy method to manually terminate
def terminate():
def terminate(self):

cpp_OrchestratorNode.terminate()
self.node_.terminate()
self.node_.destroy()

# Call main in program execution
if __name__ == '__main__':
Expand All @@ -129,3 +130,4 @@ def terminate():
ui.problem_definition("Classify cars in a video sequence.")
node.node_.start_task(task_id, ui)
node.handler_.wait_for_data()
node.terminate()
5 changes: 3 additions & 2 deletions sustainml_py/sustainml_py/nodes/OrchestratorNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def spin(self):
self.node_.spin()

# Proxy method to manually terminate
def terminate():
def terminate(self):

cpp_OrchestratorNode.terminate()
self.node_.terminate()
self.node_.destroy()

0 comments on commit 119538d

Please sign in to comment.