Skip to content

Commit

Permalink
Fix: improve IPC module file structure (#161)
Browse files Browse the repository at this point in the history
# Description
No logic change, just a file restructure with breaking changes:
* Move all the files related to zenoh specifics from `ipc` to
`ipc/zenoh`
* Updated the namespace of the classes moved to zenoh folder to
`ipc::zenoh`
* Renamed publisher subscriber filename to raw_publisher raw_subscriber
to match the class name
* Add a new Subscriber template class to match the Publisher
* This is mainly cosmetics but it is nice to have the type in the
subscriber for readability
* Removed from the `examples` module the ipc command line arguments and
use the one defined in the ipc module
  • Loading branch information
filippobrizzi authored Sep 24, 2024
1 parent 6257f1e commit 683c518
Show file tree
Hide file tree
Showing 54 changed files with 867 additions and 845 deletions.
6 changes: 3 additions & 3 deletions modules/bag/apps/bag_player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "hephaestus/bag/zenoh_player.h"
#include "hephaestus/cli/program_options.h"
#include "hephaestus/ipc/program_options.h"
#include "hephaestus/ipc/zenoh/program_options.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/utils/exception.h"
#include "hephaestus/utils/signal_handler.h"
Expand All @@ -28,14 +28,14 @@ auto main(int argc, const char* argv[]) -> int {

try {
auto desc = heph::cli::ProgramDescription("Playback a bag to zenoh topics");
heph::ipc::appendIPCProgramOption(desc);
heph::ipc::zenoh::appendProgramOption(desc);
desc.defineOption<std::filesystem::path>("input_bag", 'i', "output file where to write the bag")
.defineFlag("wait_for_readers_to_connect", 'w',
"Wait for readers to connect before starting playback");
const auto args = std::move(desc).parse(argc, argv);
auto input_file = args.getOption<std::filesystem::path>("input_bag");
auto wait_for_readers_to_connect = args.getOption<bool>("wait_for_readers_to_connect");
auto [config, _] = heph::ipc::parseIPCProgramOptions(args);
auto [config, _] = heph::ipc::zenoh::parseProgramOptions(args);

LOG(INFO) << fmt::format("Reading bag file: {}", input_file.string());

Expand Down
6 changes: 3 additions & 3 deletions modules/bag/apps/bag_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
#include "hephaestus/bag/writer.h"
#include "hephaestus/bag/zenoh_recorder.h"
#include "hephaestus/cli/program_options.h"
#include "hephaestus/ipc/program_options.h"
#include "hephaestus/ipc/topic_filter.h"
#include "hephaestus/ipc/zenoh/program_options.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/utils/signal_handler.h"
#include "hephaestus/utils/stack_trace.h"
Expand All @@ -22,10 +22,10 @@ auto main(int argc, const char* argv[]) -> int {

try {
auto desc = heph::cli::ProgramDescription("Record a bag from zenoh topics");
heph::ipc::appendIPCProgramOption(desc);
heph::ipc::zenoh::appendProgramOption(desc);
desc.defineOption<std::string>("output_bag", 'o', "output file where to write the bag");
const auto args = std::move(desc).parse(argc, argv);
auto [config, topic] = heph::ipc::parseIPCProgramOptions(args);
auto [config, topic] = heph::ipc::zenoh::parseProgramOptions(args);
auto output_file = args.getOption<std::string>("output_bag");

heph::bag::ZenohRecorderParams params{
Expand Down
4 changes: 2 additions & 2 deletions modules/bag/include/hephaestus/bag/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <memory>
#include <span>

#include "hephaestus/ipc/common.h"
#include "hephaestus/ipc/zenoh/subscriber.h"
#include "hephaestus/serdes/type_info.h"

namespace heph::bag {
Expand All @@ -19,7 +19,7 @@ class IBagWriter {

virtual void registerSchema(const serdes::TypeInfo& type_info) = 0;
virtual void registerChannel(const std::string& topic, const serdes::TypeInfo& type_info) = 0;
virtual void writeRecord(const ipc::MessageMetadata& metadata, std::span<const std::byte> data) = 0;
virtual void writeRecord(const ipc::zenoh::MessageMetadata& metadata, std::span<const std::byte> data) = 0;
};

struct McapWriterParams {
Expand Down
6 changes: 3 additions & 3 deletions modules/bag/src/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <mcap/types.hpp>
#include <mcap/writer.hpp>

#include "hephaestus/ipc/common.h"
#include "hephaestus/ipc/zenoh/raw_subscriber.h"
#include "hephaestus/serdes/type_info.h"
#include "hephaestus/utils/exception.h"

Expand All @@ -36,7 +36,7 @@ class McapWriter final : public IBagWriter {
explicit McapWriter(McapWriterParams params);
~McapWriter() override = default;

void writeRecord(const ipc::MessageMetadata& metadata, std::span<const std::byte> data) override;
void writeRecord(const ipc::zenoh::MessageMetadata& metadata, std::span<const std::byte> data) override;
void registerSchema(const serdes::TypeInfo& type_info) override;
void registerChannel(const std::string& topic, const serdes::TypeInfo& type_info) override;

Expand Down Expand Up @@ -69,7 +69,7 @@ void McapWriter::registerSchema(const serdes::TypeInfo& type_info) {
schema_db_[type_info.name] = schema;
}

void McapWriter::writeRecord(const ipc::MessageMetadata& metadata, std::span<const std::byte> data) {
void McapWriter::writeRecord(const ipc::zenoh::MessageMetadata& metadata, std::span<const std::byte> data) {
throwExceptionIf<InvalidDataException>(!channel_db_.contains(metadata.topic),
fmt::format("no channel registered for topic {}", metadata.topic));

Expand Down
4 changes: 2 additions & 2 deletions modules/bag/src/zenoh_player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
#include <mcap/reader.hpp>
#include <mcap/types.hpp>

#include "hephaestus/ipc/common.h"
#include "hephaestus/ipc/zenoh/publisher.h"
#include "hephaestus/ipc/topic.h"
#include "hephaestus/ipc/zenoh/raw_publisher.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/serdes/type_info.h"
#include "hephaestus/utils/exception.h"
Expand Down
6 changes: 3 additions & 3 deletions modules/bag/src/zenoh_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
#include <absl/synchronization/mutex.h>

#include "hephaestus/bag/writer.h"
#include "hephaestus/ipc/common.h"
#include "hephaestus/ipc/zenoh/dynamic_subscriber.h"
#include "hephaestus/ipc/zenoh/raw_subscriber.h"
#include "hephaestus/serdes/type_info.h"

namespace heph::bag {

using BagRecord = std::pair<ipc::MessageMetadata, std::vector<std::byte>>;
using BagRecord = std::pair<ipc::zenoh::MessageMetadata, std::vector<std::byte>>;

/// This class does the following:
/// - constantly checks for new topics
Expand Down Expand Up @@ -63,7 +63,7 @@ ZenohRecorder::Impl::Impl(ZenohRecorderParams params)
bag_writer_->registerChannel(topic, type_info);
},
.subscriber_cb =
[this](const ipc::MessageMetadata& metadata, std::span<const std::byte> data,
[this](const ipc::zenoh::MessageMetadata& metadata, std::span<const std::byte> data,
const std::optional<serdes::TypeInfo>& type_info) {
(void)type_info;
const absl::MutexLock lock{ &writer_mutex_ };
Expand Down
4 changes: 2 additions & 2 deletions modules/examples/examples/mcap_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "hephaestus/bag/writer.h"
#include "hephaestus/examples/types/pose.h"
#include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner)
#include "hephaestus/ipc/common.h"
#include "hephaestus/ipc/zenoh/raw_subscriber.h"
#include "hephaestus/serdes/serdes.h"

auto main(int argc, const char* argv[]) -> int {
Expand All @@ -42,7 +42,7 @@ auto main(int argc, const char* argv[]) -> int {
heph::examples::types::Pose pose;
pose.position = Eigen::Vector3d{ static_cast<double>(i), 2, 3 }; // NOLINT
const auto data = heph::serdes::serialize(pose);
const heph::ipc::MessageMetadata metadata{
const heph::ipc::zenoh::MessageMetadata metadata{
.sender_id = "myself", .topic = "pose", .timestamp = frame_time, .sequence_id = i
};

Expand Down
18 changes: 11 additions & 7 deletions modules/examples/examples/zenoh_action_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
#include <absl/log/log.h>
#include <fmt/core.h>

#include "hephaestus/cli/program_options.h"
#include "hephaestus/examples/types/sample.h"
#include "hephaestus/examples/types_protobuf/sample.h" // NOLINT(misc-include-cleaner)
#include "hephaestus/ipc/publisher.h"
#include "hephaestus/ipc/zenoh/action_server/action_server.h"
#include "hephaestus/ipc/zenoh/program_options.h"
#include "hephaestus/ipc/zenoh/publisher.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/utils/signal_handler.h"
#include "hephaestus/utils/stack_trace.h"
Expand All @@ -36,9 +38,10 @@
return heph::ipc::zenoh::action_server::TriggerStatus::SUCCESSFUL;
}

[[nodiscard]] auto execute(const heph::examples::types::SampleRequest& request,
heph::ipc::Publisher<heph::examples::types::SampleReply>& status_update_publisher,
std::atomic_bool& stop_requested) -> heph::examples::types::SampleReply {
[[nodiscard]] auto
execute(const heph::examples::types::SampleRequest& request,
heph::ipc::zenoh::Publisher<heph::examples::types::SampleReply>& status_update_publisher,
std::atomic_bool& stop_requested) -> heph::examples::types::SampleReply {
static constexpr auto WAIT_FOR = std::chrono::milliseconds{ 500 };
LOG(INFO) << fmt::format("Start execution, iterations: {}", request.iterations_count);
std::size_t accumulated = request.initial_value;
Expand Down Expand Up @@ -67,10 +70,11 @@ auto main(int argc, const char* argv[]) -> int {
const heph::utils::StackTrace stack_trace;

try {
auto desc = getProgramDescription("ActionServer example", ExampleType::ACTION_SERVER);
auto desc = heph::cli::ProgramDescription("Action server example");
heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::ACTION_SERVER));
const auto args = std::move(desc).parse(argc, argv);
auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args);

auto [session_config, topic_config] = parseArgs(args);
auto stop_session_config = session_config;
auto session = heph::ipc::zenoh::createSession(std::move(session_config));
auto stop_session = heph::ipc::zenoh::createSession(std::move(stop_session_config));
Expand All @@ -80,7 +84,7 @@ auto main(int argc, const char* argv[]) -> int {
};

auto execute_callback = [](const heph::examples::types::SampleRequest& sample,
heph::ipc::Publisher<heph::examples::types::SampleReply>& publisher,
heph::ipc::zenoh::Publisher<heph::examples::types::SampleReply>& publisher,
std::atomic_bool& stop_requested) {
return execute(sample, publisher, stop_requested);
};
Expand Down
7 changes: 5 additions & 2 deletions modules/examples/examples/zenoh_action_server_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
#include <fmt/core.h>
#include <magic_enum.hpp>

#include "hephaestus/cli/program_options.h"
#include "hephaestus/examples/types/sample.h"
#include "hephaestus/examples/types_protobuf/sample.h" // NOLINT(misc-include-cleaner)
#include "hephaestus/ipc/zenoh/action_server/action_server.h"
#include "hephaestus/ipc/zenoh/program_options.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/utils/signal_handler.h"
#include "hephaestus/utils/stack_trace.h"
Expand All @@ -26,10 +28,11 @@ auto main(int argc, const char* argv[]) -> int {
const heph::utils::StackTrace stack_trace;

try {
auto desc = getProgramDescription("Binary service client example", ExampleType::ACTION_SERVER);
auto desc = heph::cli::ProgramDescription("Action server client example");
heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::ACTION_SERVER));
const auto args = std::move(desc).parse(argc, argv);
auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args);

auto [session_config, topic_config] = parseArgs(args);
auto session = heph::ipc::zenoh::createSession(std::move(session_config));

heph::utils::TerminationBlocker::registerInterruptCallback([session = std::ref(*session), &topic_config] {
Expand Down
71 changes: 9 additions & 62 deletions modules/examples/examples/zenoh_program_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,21 @@
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#include "hephaestus/cli/program_options.h"
#include "hephaestus/ipc/common.h"
#include "hephaestus/utils/exception.h"
#include <string>

enum class ExampleType : uint8_t { PUBSUB, SERVICE, ACTION_SERVER };

[[nodiscard]] inline auto getProgramDescription(const std::string& description, const ExampleType type)
-> heph::cli::ProgramDescription {
[[nodiscard]] inline auto getDefaultTopic(const ExampleType type) -> std::string {
static constexpr auto DEFAULT_PUBSUB_KEY = "hephaestus/ipc/example/zenoh/put";
static constexpr auto DEFAULT_SERVICE_KEY = "hephaestus/ipc/example/zenoh/service";
static constexpr auto DEFAULT_ACTION_SERVER_KEY = "hephaestus/ipc/example/zenoh/action_server";

const auto* const default_topic = [&type] {
switch (type) {
case ExampleType::PUBSUB:
return DEFAULT_PUBSUB_KEY;
case ExampleType::SERVICE:
return DEFAULT_SERVICE_KEY;
case ExampleType::ACTION_SERVER:
return DEFAULT_ACTION_SERVER_KEY;
}
}();

auto desc = heph::cli::ProgramDescription(description);
desc.defineOption<std::string>("topic", 't', "Key expression", default_topic)
.defineOption<std::size_t>("cache", 'c', "Cache size", 0)
.defineOption<std::string>("mode", 'm', "Running mode: options: peer, client", "peer")
.defineOption<std::string>("router", 'r', "Router endpoint", "")
.defineOption<std::string>("protocol", 'p', "Protocol to use, options: udp, tcp, any", "any")
.defineFlag("shared_memory", 's', "Enable shared memory")
.defineFlag("qos", 'q', "Enable QoS")
.defineFlag("realtime", "Enable real-time communication");

return desc;
}

[[nodiscard]] inline auto parseArgs(const heph::cli::ProgramOptions& args)
-> std::pair<heph::ipc::Config, heph::ipc::TopicConfig> {
heph::ipc::TopicConfig topic_config{ .name = args.getOption<std::string>("topic") };

heph::ipc::Config config;
config.cache_size = args.getOption<std::size_t>("cache");

auto mode = args.getOption<std::string>("mode");
if (mode == "peer") {
config.mode = heph::ipc::Mode::PEER;
} else if (mode == "client") {
config.mode = heph::ipc::Mode::CLIENT;
} else {
heph::throwException<heph::InvalidParameterException>(fmt::format("invalid mode value: {}", mode));
}

auto protocol = args.getOption<std::string>("protocol");
if (protocol == "any") {
config.protocol = heph::ipc::Protocol::ANY;
} else if (protocol == "udp") {
config.protocol = heph::ipc::Protocol::UDP;
} else if (protocol == "tcp") {
config.protocol = heph::ipc::Protocol::TCP;
} else {
heph::throwException<heph::InvalidParameterException>(
fmt::format("invalid value {} for option 'protocol'", protocol));
switch (type) {
case ExampleType::PUBSUB:
return DEFAULT_PUBSUB_KEY;
case ExampleType::SERVICE:
return DEFAULT_SERVICE_KEY;
case ExampleType::ACTION_SERVER:
return DEFAULT_ACTION_SERVER_KEY;
}

config.router = args.getOption<std::string>("router");
config.enable_shared_memory = args.getOption<bool>("shared_memory");
config.qos = args.getOption<bool>("qos");
config.real_time = args.getOption<bool>("realtime");

return { std::move(config), std::move(topic_config) };
}
26 changes: 15 additions & 11 deletions modules/examples/examples/zenoh_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@

#include <fmt/core.h>

#include "hephaestus/cli/program_options.h"
#include "hephaestus/examples/types/pose.h"
#include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner)
#include "hephaestus/ipc/publisher.h"
#include "hephaestus/ipc/zenoh/program_options.h"
#include "hephaestus/ipc/zenoh/publisher.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/utils/exception.h"
#include "hephaestus/utils/signal_handler.h"
Expand All @@ -25,20 +27,22 @@ auto main(int argc, const char* argv[]) -> int {
const heph::utils::StackTrace stack_trace;

try {
auto desc = getProgramDescription("Periodic publisher example", ExampleType::PUBSUB);
auto desc = heph::cli::ProgramDescription("Periodic publisher example");
heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::PUBSUB));
const auto args = std::move(desc).parse(argc, argv);

auto [session_config, topic_config] = parseArgs(args);
auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args);
auto session = heph::ipc::zenoh::createSession(std::move(session_config));

heph::ipc::Publisher<heph::examples::types::Pose> publisher{ session, topic_config,
[](const auto& status) {
if (status.matching) {
fmt::println("Subscriber match");
} else {
fmt::println("NO subscriber matching");
}
} };
heph::ipc::zenoh::Publisher<heph::examples::types::Pose> publisher{ session, topic_config,
[](const auto& status) {
if (status.matching) {
fmt::println("Subscriber match");
} else {
fmt::println(
"NO subscriber matching");
}
} };

fmt::println("Declaring RawPublisher on '{}' with id: '{}'", topic_config.name, publisher.id());

Expand Down
7 changes: 5 additions & 2 deletions modules/examples/examples/zenoh_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
#include <fmt/chrono.h> // NOLINT(misc-include-cleaner)
#include <fmt/core.h>

#include "hephaestus/cli/program_options.h"
#include "hephaestus/examples/types/pose.h"
#include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner)
#include "hephaestus/ipc/zenoh/program_options.h"
#include "hephaestus/ipc/zenoh/service.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/utils/stack_trace.h"
Expand All @@ -26,10 +28,11 @@ auto main(int argc, const char* argv[]) -> int {
const heph::utils::StackTrace stack_trace;

try {
auto desc = getProgramDescription("Binary service client example", ExampleType::SERVICE);
auto desc = heph::cli::ProgramDescription("Binary service client example");
heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::SERVICE));
const auto args = std::move(desc).parse(argc, argv);
auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args);

auto [session_config, topic_config] = parseArgs(args);
auto session = heph::ipc::zenoh::createSession(std::move(session_config));

static constexpr auto K_TIMEOUT = std::chrono::seconds(10);
Expand Down
Loading

0 comments on commit 683c518

Please sign in to comment.