Skip to content

Commit

Permalink
feat: add feature to record multi-host system (#262)
Browse files Browse the repository at this point in the history
* add multihost record feature

Signed-off-by: ymski <[email protected]>

* replace trace node identifier from PID to UUID

Signed-off-by: ymski <[email protected]>

* ci(pre-commit): autofix

* fix typo

Signed-off-by: ymski <[email protected]>

* fix test

Signed-off-by: ymski <[email protected]>

* ci(pre-commit): autofix

* publish status when execute callback_start with RECORD

Signed-off-by: ymski <[email protected]>

* ci(pre-commit): autofix

* refactor

Signed-off-by: ymski <[email protected]>

* refactor

Signed-off-by: ymski <[email protected]>

* fix initialization order

Signed-off-by: ymski <[email protected]>

* add uuid to package.xml

Signed-off-by: ymski <[email protected]>

* remove unnecessary standard output

Signed-off-by: ymski <[email protected]>

* rename variable

Signed-off-by: ymski <[email protected]>

* fix test

Signed-off-by: ymski <[email protected]>

* refactor

Signed-off-by: ymski <[email protected]>

---------

Signed-off-by: ymski <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
ymski and pre-commit-ci[bot] committed May 10, 2024
1 parent f298134 commit a0ff9fb
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CARET_trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ ament_target_dependencies(caret
ament_export_targets(export_caret_trace HAS_LIBRARY_TARGET)
ament_export_dependencies(rclcpp LTTngUST)

target_link_libraries(caret CycloneDDS::ddsc fastrtps)
target_link_libraries(caret CycloneDDS::ddsc fastrtps uuid)

add_executable(clock_recorder src/clock_recorder.cpp src/tp.c)
ament_target_dependencies(clock_recorder rclcpp LTTngUST)
Expand Down
2 changes: 2 additions & 0 deletions CARET_trace/include/caret_trace/trace_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class TraceNode : public rclcpp::Node, public TraceNodeInterface
const bool use_log_; // for test

static std::string get_unique_node_name(std::string base_name);
static std::string get_formatted_uuid();
void run_timer();
void stop_timer();
void set_log_level(rclcpp::Logger::Level level);
Expand All @@ -130,6 +131,7 @@ class TraceNode : public rclcpp::Node, public TraceNodeInterface
rclcpp::Publisher<caret_msgs::msg::Status>::SharedPtr status_pub_;
rclcpp::TimerBase::SharedPtr timer_;
std::shared_ptr<DataContainerInterface> data_container_;
std::shared_ptr<LttngSession> lttng_session_;
bool execute_timer_on_run_;

mutable std::shared_mutex mutex_;
Expand Down
1 change: 1 addition & 0 deletions CARET_trace/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<depend>rmw_fastrtps_shared_cpp</depend>
<depend>rmw_implementation</depend>
<depend>std_msgs</depend>
<depend>uuid</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>caret_lint_common</test_depend>
Expand Down
38 changes: 32 additions & 6 deletions CARET_trace/src/trace_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "caret_msgs/msg/status.hpp"

#include <unistd.h>
#include <uuid/uuid.h>

#include <chrono>
#include <memory>
Expand All @@ -43,6 +44,7 @@ TraceNode::TraceNode(
record_block_size_(100),
use_log_(use_log),
data_container_(data_container),
lttng_session_(lttng_session),
execute_timer_on_run_(execute_timer_on_run)
{
set_log_level(level);
Expand Down Expand Up @@ -87,15 +89,29 @@ TraceNode::~TraceNode()

std::string TraceNode::get_unique_node_name(std::string base_name)
{
auto pid = getpid();
char * pid_str = nullptr;
if (asprintf(&pid_str, "%jd", (intmax_t)pid) != -1) {
base_name += "_" + std::string(pid_str);
free(pid_str);
}
std::string uuid_str = get_formatted_uuid();
base_name += "_" + uuid_str;

return base_name;
}

std::string TraceNode::get_formatted_uuid()
{
uuid_t uuid_value;
std::unique_ptr<char[]> uuid_string_ptr = std::make_unique<char[]>(UUID_STR_LEN);
uuid_generate(uuid_value);
uuid_unparse_lower(uuid_value, uuid_string_ptr.get());
auto uuid_string = std::string(uuid_string_ptr.get());

// Avoid violating node naming conventions
auto pos = uuid_string.find('-');
while (pos != std::string::npos) {
uuid_string.replace(pos, 1, "_");
pos = uuid_string.find('-');
}
return uuid_string;
}

void TraceNode::debug(std::string message) const
{
if (use_log_) {
Expand Down Expand Up @@ -181,6 +197,11 @@ void TraceNode::start_callback(caret_msgs::msg::Start::UniquePtr msg)

debug("Received start message.");

if (status_ != TRACE_STATUS::WAIT || !lttng_session_->is_session_running()) {
publish_status(status_);
return;
}

// As long as PREPARE state, data of initialization trace point are stored into pending.
// Before calling the caret_init trace point,
// transition to the prepare state to set is_recording_allowed to False.
Expand Down Expand Up @@ -229,6 +250,11 @@ void TraceNode::end_callback(caret_msgs::msg::End::UniquePtr msg)
{
(void)msg;

if (lttng_session_->is_session_running()) {
publish_status(status_);
return;
}

status_ = TRACE_STATUS::WAIT;

publish_status(status_);
Expand Down
42 changes: 42 additions & 0 deletions CARET_trace/test/test_scenario.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ TEST(ScenarioTest, RepetitiveRecordingCase)
EXPECT_CALL(func, Call(addr_0, 0)).Times(1);
EXPECT_CALL(func, Call(addr_1, 0)).Times(1);

EXPECT_CALL(*lttng, is_session_running()).WillRepeatedly(Return(true));
auto start_msg = std::make_unique<caret_msgs::msg::Start>();
start_msg->recording_frequency = 1; // record nullptr only.
node.start_callback(std::move(start_msg));
Expand All @@ -146,6 +147,7 @@ TEST(ScenarioTest, RepetitiveRecordingCase)
node.timer_callback(); // record rest data.
EXPECT_EQ(node.get_status(), TRACE_STATUS::RECORD);

EXPECT_CALL(*lttng, is_session_running()).WillRepeatedly(Return(false));
auto end_msg = std::make_unique<caret_msgs::msg::End>();
node.end_callback(std::move(end_msg));
EXPECT_EQ(node.get_status(), TRACE_STATUS::WAIT);
Expand All @@ -155,6 +157,7 @@ TEST(ScenarioTest, RepetitiveRecordingCase)
EXPECT_CALL(func, Call(addr_0, 0)).Times(1);
EXPECT_CALL(func, Call(addr_1, 0)).Times(1);

EXPECT_CALL(*lttng, is_session_running()).WillRepeatedly(Return(true));
auto start_msg = std::make_unique<caret_msgs::msg::Start>();
start_msg->recording_frequency = 1; // record nullptr only.
node.start_callback(std::move(start_msg));
Expand All @@ -163,6 +166,7 @@ TEST(ScenarioTest, RepetitiveRecordingCase)
node.timer_callback(); // record rest data.
EXPECT_EQ(node.get_status(), TRACE_STATUS::RECORD);

EXPECT_CALL(*lttng, is_session_running()).WillRepeatedly(Return(false));
auto end_msg = std::make_unique<caret_msgs::msg::End>();
node.end_callback(std::move(end_msg));
EXPECT_EQ(node.get_status(), TRACE_STATUS::WAIT);
Expand Down Expand Up @@ -194,6 +198,7 @@ TEST(ScenarioTest, IsRecordingAllowed)

auto start_msg = std::make_unique<caret_msgs::msg::Start>();
start_msg->recording_frequency = 1;
EXPECT_CALL(*lttng, is_session_running()).WillRepeatedly(Return(true));
node->start_callback(std::move(start_msg));
EXPECT_EQ(node->get_status(), TRACE_STATUS::PREPARE);
EXPECT_FALSE(context.is_recording_allowed());
Expand Down Expand Up @@ -254,6 +259,7 @@ TEST(ScenarioTest, Record)
// case: PREPARE status
auto start_msg = std::make_unique<caret_msgs::msg::Start>();
start_msg->recording_frequency = 1;
EXPECT_CALL(*lttng, is_session_running()).WillRepeatedly(Return(true));
node->start_callback(std::move(start_msg));

EXPECT_EQ(node->get_status(), TRACE_STATUS::PREPARE);
Expand All @@ -271,3 +277,39 @@ TEST(ScenarioTest, Record)
container->store_rcl_init(addr_4, 0);
EXPECT_EQ(keys->size(), (size_t)4);
}

TEST(ScenarioTest, WithoutLttngSession)
{
init_context();

auto container = std::make_shared<DataContainer>();
Context context;

void * addr_0 = reinterpret_cast<void *>(0);
void * addr_1 = reinterpret_cast<void *>(1);
container->store_rcl_init(addr_0, 0);
container->store_rcl_init(addr_1, 0);

auto lttng = std::make_shared<LttngSessionMock>();
EXPECT_CALL(*lttng, started_session_running()).WillRepeatedly(Return(false));

rclcpp::NodeOptions options;
auto node =
std::make_shared<TraceNode>("test", options, lttng, container, Logger::Level::Warn, false);
context.assign_node(node);

EXPECT_EQ(node->get_status(), TRACE_STATUS::WAIT);
EXPECT_FALSE(context.is_recording_allowed());

EXPECT_CALL(*lttng, is_session_running()).WillRepeatedly(Return(false));
auto start_msg = std::make_unique<caret_msgs::msg::Start>();
start_msg->recording_frequency = 1;

node->start_callback(std::move(start_msg));
EXPECT_EQ(node->get_status(), TRACE_STATUS::WAIT);
EXPECT_FALSE(context.is_recording_allowed());

// node->timer_callback(); // time_callback only execute with PREPARE state
// EXPECT_EQ(node->get_status(), TRACE_STATUS::WAIT);
// EXPECT_FALSE(context.is_recording_allowed());
}
97 changes: 86 additions & 11 deletions CARET_trace/test/test_trace_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ class CaretTraceNodeTest : public ::testing::Test
node_ = get_init_node();

// transition to wait status
set_lttng_session_running_return_value(false);
auto end_msg = get_end_msg();
node_->end_callback(std::move(end_msg));
if (node_->get_status() == status && status == TRACE_STATUS::WAIT) {
return;
}

// transition to prepare status
set_lttng_session_running_return_value(true);
auto start_msg = get_start_msg();
node_->start_callback(std::move(start_msg));
if (node_->get_status() == status && status == TRACE_STATUS::PREPARE) {
Expand All @@ -78,6 +80,11 @@ class CaretTraceNodeTest : public ::testing::Test
EXPECT_CALL(*lttng_session_, started_session_running()).WillRepeatedly(Return(return_value));
}

void set_lttng_session_running_return_value(bool return_value)
{
EXPECT_CALL(*lttng_session_, is_session_running()).WillRepeatedly(Return(return_value));
}

std::unique_ptr<caret_msgs::msg::Start> get_start_msg()
{
return std::make_unique<caret_msgs::msg::Start>();
Expand Down Expand Up @@ -137,22 +144,42 @@ TEST_F(CaretTraceNodeTest, TestWaitTransition)
{
init_context();

{ // subscribe start message: WAIT -> PREPARE
{ // subscribe start message without lttng session: WAIT -> WAIT
set_status(TRACE_STATUS::WAIT);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
set_lttng_session_running_return_value(false);
auto start_msg = get_start_msg();
node_->start_callback(std::move(start_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
}

{ // subscribe start message with lttng session: WAIT -> PREPARE
set_status(TRACE_STATUS::WAIT);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
set_lttng_session_running_return_value(true);
auto start_msg = get_start_msg();
node_->start_callback(std::move(start_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);
}

{ // subscribe end message: WAIT -> WAIT
{ // subscribe end message without lttng session: WAIT -> WAIT
set_status(TRACE_STATUS::WAIT);
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
set_lttng_session_running_return_value(false);
auto end_msg = get_end_msg();
node_->end_callback(std::move(end_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
}

{ // subscribe end message with lttng session: WAIT -> WAIT
set_status(TRACE_STATUS::WAIT);
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
set_lttng_session_running_return_value(true);
auto end_msg = get_end_msg();
node_->end_callback(std::move(end_msg));

Expand All @@ -164,8 +191,21 @@ TEST_F(CaretTraceNodeTest, TestPrepareTransition)
{
init_context();

{ // subscribe start message: PREPARE -> PREPARE
{ // subscribe start message without lttng session: PREPARE -> PREPARE
set_status(TRACE_STATUS::PREPARE);
set_lttng_session_running_return_value(false);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);

auto start_msg = get_start_msg();
node_->start_callback(std::move(start_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);
}

{ // subscribe start message with lttng session: PREPARE -> PREPARE
set_status(TRACE_STATUS::PREPARE);
set_lttng_session_running_return_value(true);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);

Expand All @@ -175,8 +215,9 @@ TEST_F(CaretTraceNodeTest, TestPrepareTransition)
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);
}

{ // subscribe end message: PREPARE -> WAIT
{ // subscribe end message without lttng session: PREPARE -> WAIT
set_status(TRACE_STATUS::PREPARE);
set_lttng_session_running_return_value(false);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);

Expand All @@ -186,7 +227,19 @@ TEST_F(CaretTraceNodeTest, TestPrepareTransition)
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
}

{
{ // subscribe end message with lttng session: PREPARE -> PREPARE
set_status(TRACE_STATUS::PREPARE);
set_lttng_session_running_return_value(true);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);

auto end_msg = get_end_msg();
node_->end_callback(std::move(end_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);
}

{ // transition by timer_callback: PREPARE -> RECORD
set_status(TRACE_STATUS::PREPARE);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);
Expand All @@ -203,31 +256,53 @@ TEST_F(CaretTraceNodeTest, TestPrepareTransition)
}
}

TEST_F(CaretTraceNodeTest, TestMeasureTransition)
TEST_F(CaretTraceNodeTest, TestRecordTransition)
{
init_context();

{ // subscribe start message: MEASURE -> PREPARE
{ // subscribe start message without lttng session: RECORD -> RECORD
set_status(TRACE_STATUS::RECORD);

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::RECORD);

set_lttng_session_running_return_value(false);
auto start_msg = get_start_msg();
node_->start_callback(std::move(start_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::PREPARE);
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::RECORD);
}

{ // subscribe end message: MEASURE -> WAIT
{ // subscribe start message with lttng session: RECORD -> RECORD
set_status(TRACE_STATUS::RECORD);
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::RECORD);

set_lttng_session_running_return_value(true);
auto start_msg = get_start_msg();
node_->start_callback(std::move(start_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::RECORD);
}

{ // subscribe end message without lttng session: RECORD -> WAIT
set_status(TRACE_STATUS::RECORD);
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::RECORD);

set_lttng_session_running_return_value(false);
auto end_msg = get_end_msg();
node_->end_callback(std::move(end_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::WAIT);
}

{ // subscribe end message with lttng session: RECORD -> RECORD
set_status(TRACE_STATUS::RECORD);
EXPECT_TRUE(node_->get_status() == TRACE_STATUS::RECORD);

set_lttng_session_running_return_value(true);
auto end_msg = get_end_msg();
node_->end_callback(std::move(end_msg));

EXPECT_TRUE(node_->get_status() == TRACE_STATUS::RECORD);
}
}

TEST_F(CaretTraceNodeTest, TestWait)
Expand All @@ -248,7 +323,7 @@ TEST_F(CaretTraceNodeTest, TestPrepare)
EXPECT_TRUE(node_->is_timer_running());
}

TEST_F(CaretTraceNodeTest, TestMeasure)
TEST_F(CaretTraceNodeTest, TestRecord)
{
init_context();

Expand Down

0 comments on commit a0ff9fb

Please sign in to comment.