diff --git a/modules/bag/apps/bag_player.cpp b/modules/bag/apps/bag_player.cpp index d8fe0334..91ef7f8f 100644 --- a/modules/bag/apps/bag_player.cpp +++ b/modules/bag/apps/bag_player.cpp @@ -17,7 +17,7 @@ #include "hephaestus/bag/zenoh_player.h" #include "hephaestus/cli/program_options.h" -#include "hephaestus/ipc/program_options.h" +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/exception.h" #include "hephaestus/utils/signal_handler.h" @@ -28,14 +28,14 @@ auto main(int argc, const char* argv[]) -> int { try { auto desc = heph::cli::ProgramDescription("Playback a bag to zenoh topics"); - heph::ipc::appendIPCProgramOption(desc); + heph::ipc::zenoh::appendProgramOption(desc); desc.defineOption("input_bag", 'i', "output file where to write the bag") .defineFlag("wait_for_readers_to_connect", 'w', "Wait for readers to connect before starting playback"); const auto args = std::move(desc).parse(argc, argv); auto input_file = args.getOption("input_bag"); auto wait_for_readers_to_connect = args.getOption("wait_for_readers_to_connect"); - auto [config, _] = heph::ipc::parseIPCProgramOptions(args); + auto [config, _] = heph::ipc::zenoh::parseProgramOptions(args); LOG(INFO) << fmt::format("Reading bag file: {}", input_file.string()); diff --git a/modules/bag/apps/bag_recorder.cpp b/modules/bag/apps/bag_recorder.cpp index dd0aacf5..ae93a9b1 100644 --- a/modules/bag/apps/bag_recorder.cpp +++ b/modules/bag/apps/bag_recorder.cpp @@ -11,8 +11,8 @@ #include "hephaestus/bag/writer.h" #include "hephaestus/bag/zenoh_recorder.h" #include "hephaestus/cli/program_options.h" -#include "hephaestus/ipc/program_options.h" #include "hephaestus/ipc/topic_filter.h" +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/signal_handler.h" #include "hephaestus/utils/stack_trace.h" @@ -22,10 +22,10 @@ auto main(int argc, const char* argv[]) -> int { try { auto desc = heph::cli::ProgramDescription("Record a bag from zenoh topics"); - heph::ipc::appendIPCProgramOption(desc); + heph::ipc::zenoh::appendProgramOption(desc); desc.defineOption("output_bag", 'o', "output file where to write the bag"); const auto args = std::move(desc).parse(argc, argv); - auto [config, topic] = heph::ipc::parseIPCProgramOptions(args); + auto [config, topic] = heph::ipc::zenoh::parseProgramOptions(args); auto output_file = args.getOption("output_bag"); heph::bag::ZenohRecorderParams params{ diff --git a/modules/bag/include/hephaestus/bag/writer.h b/modules/bag/include/hephaestus/bag/writer.h index 72ec63b2..1f5100c0 100644 --- a/modules/bag/include/hephaestus/bag/writer.h +++ b/modules/bag/include/hephaestus/bag/writer.h @@ -8,7 +8,7 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/zenoh/subscriber.h" #include "hephaestus/serdes/type_info.h" namespace heph::bag { @@ -19,7 +19,7 @@ class IBagWriter { virtual void registerSchema(const serdes::TypeInfo& type_info) = 0; virtual void registerChannel(const std::string& topic, const serdes::TypeInfo& type_info) = 0; - virtual void writeRecord(const ipc::MessageMetadata& metadata, std::span data) = 0; + virtual void writeRecord(const ipc::zenoh::MessageMetadata& metadata, std::span data) = 0; }; struct McapWriterParams { diff --git a/modules/bag/src/writer.cpp b/modules/bag/src/writer.cpp index 1c6086f8..7916873d 100644 --- a/modules/bag/src/writer.cpp +++ b/modules/bag/src/writer.cpp @@ -18,7 +18,7 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/serdes/type_info.h" #include "hephaestus/utils/exception.h" @@ -36,7 +36,7 @@ class McapWriter final : public IBagWriter { explicit McapWriter(McapWriterParams params); ~McapWriter() override = default; - void writeRecord(const ipc::MessageMetadata& metadata, std::span data) override; + void writeRecord(const ipc::zenoh::MessageMetadata& metadata, std::span data) override; void registerSchema(const serdes::TypeInfo& type_info) override; void registerChannel(const std::string& topic, const serdes::TypeInfo& type_info) override; @@ -69,7 +69,7 @@ void McapWriter::registerSchema(const serdes::TypeInfo& type_info) { schema_db_[type_info.name] = schema; } -void McapWriter::writeRecord(const ipc::MessageMetadata& metadata, std::span data) { +void McapWriter::writeRecord(const ipc::zenoh::MessageMetadata& metadata, std::span data) { throwExceptionIf(!channel_db_.contains(metadata.topic), fmt::format("no channel registered for topic {}", metadata.topic)); diff --git a/modules/bag/src/zenoh_player.cpp b/modules/bag/src/zenoh_player.cpp index 247f4795..800e66d5 100644 --- a/modules/bag/src/zenoh_player.cpp +++ b/modules/bag/src/zenoh_player.cpp @@ -22,8 +22,8 @@ #include #include -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/zenoh/publisher.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/raw_publisher.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/serdes/type_info.h" #include "hephaestus/utils/exception.h" diff --git a/modules/bag/src/zenoh_recorder.cpp b/modules/bag/src/zenoh_recorder.cpp index 64140af8..911ae3fc 100644 --- a/modules/bag/src/zenoh_recorder.cpp +++ b/modules/bag/src/zenoh_recorder.cpp @@ -17,13 +17,13 @@ #include #include "hephaestus/bag/writer.h" -#include "hephaestus/ipc/common.h" #include "hephaestus/ipc/zenoh/dynamic_subscriber.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/serdes/type_info.h" namespace heph::bag { -using BagRecord = std::pair>; +using BagRecord = std::pair>; /// This class does the following: /// - constantly checks for new topics @@ -63,7 +63,7 @@ ZenohRecorder::Impl::Impl(ZenohRecorderParams params) bag_writer_->registerChannel(topic, type_info); }, .subscriber_cb = - [this](const ipc::MessageMetadata& metadata, std::span data, + [this](const ipc::zenoh::MessageMetadata& metadata, std::span data, const std::optional& type_info) { (void)type_info; const absl::MutexLock lock{ &writer_mutex_ }; diff --git a/modules/examples/examples/mcap_writer.cpp b/modules/examples/examples/mcap_writer.cpp index 9a3b33a0..0071e2ed 100644 --- a/modules/examples/examples/mcap_writer.cpp +++ b/modules/examples/examples/mcap_writer.cpp @@ -15,7 +15,7 @@ #include "hephaestus/bag/writer.h" #include "hephaestus/examples/types/pose.h" #include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner) -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/serdes/serdes.h" auto main(int argc, const char* argv[]) -> int { @@ -42,7 +42,7 @@ auto main(int argc, const char* argv[]) -> int { heph::examples::types::Pose pose; pose.position = Eigen::Vector3d{ static_cast(i), 2, 3 }; // NOLINT const auto data = heph::serdes::serialize(pose); - const heph::ipc::MessageMetadata metadata{ + const heph::ipc::zenoh::MessageMetadata metadata{ .sender_id = "myself", .topic = "pose", .timestamp = frame_time, .sequence_id = i }; diff --git a/modules/examples/examples/zenoh_action_server.cpp b/modules/examples/examples/zenoh_action_server.cpp index 5da926fa..4ff474e1 100644 --- a/modules/examples/examples/zenoh_action_server.cpp +++ b/modules/examples/examples/zenoh_action_server.cpp @@ -16,10 +16,12 @@ #include #include +#include "hephaestus/cli/program_options.h" #include "hephaestus/examples/types/sample.h" #include "hephaestus/examples/types_protobuf/sample.h" // NOLINT(misc-include-cleaner) -#include "hephaestus/ipc/publisher.h" #include "hephaestus/ipc/zenoh/action_server/action_server.h" +#include "hephaestus/ipc/zenoh/program_options.h" +#include "hephaestus/ipc/zenoh/publisher.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/signal_handler.h" #include "hephaestus/utils/stack_trace.h" @@ -36,9 +38,10 @@ return heph::ipc::zenoh::action_server::TriggerStatus::SUCCESSFUL; } -[[nodiscard]] auto execute(const heph::examples::types::SampleRequest& request, - heph::ipc::Publisher& status_update_publisher, - std::atomic_bool& stop_requested) -> heph::examples::types::SampleReply { +[[nodiscard]] auto +execute(const heph::examples::types::SampleRequest& request, + heph::ipc::zenoh::Publisher& status_update_publisher, + std::atomic_bool& stop_requested) -> heph::examples::types::SampleReply { static constexpr auto WAIT_FOR = std::chrono::milliseconds{ 500 }; LOG(INFO) << fmt::format("Start execution, iterations: {}", request.iterations_count); std::size_t accumulated = request.initial_value; @@ -67,10 +70,11 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("ActionServer example", ExampleType::ACTION_SERVER); + auto desc = heph::cli::ProgramDescription("Action server example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::ACTION_SERVER)); const auto args = std::move(desc).parse(argc, argv); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); - auto [session_config, topic_config] = parseArgs(args); auto stop_session_config = session_config; auto session = heph::ipc::zenoh::createSession(std::move(session_config)); auto stop_session = heph::ipc::zenoh::createSession(std::move(stop_session_config)); @@ -80,7 +84,7 @@ auto main(int argc, const char* argv[]) -> int { }; auto execute_callback = [](const heph::examples::types::SampleRequest& sample, - heph::ipc::Publisher& publisher, + heph::ipc::zenoh::Publisher& publisher, std::atomic_bool& stop_requested) { return execute(sample, publisher, stop_requested); }; diff --git a/modules/examples/examples/zenoh_action_server_client.cpp b/modules/examples/examples/zenoh_action_server_client.cpp index c1de01f8..d0bff794 100644 --- a/modules/examples/examples/zenoh_action_server_client.cpp +++ b/modules/examples/examples/zenoh_action_server_client.cpp @@ -14,9 +14,11 @@ #include #include +#include "hephaestus/cli/program_options.h" #include "hephaestus/examples/types/sample.h" #include "hephaestus/examples/types_protobuf/sample.h" // NOLINT(misc-include-cleaner) #include "hephaestus/ipc/zenoh/action_server/action_server.h" +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/signal_handler.h" #include "hephaestus/utils/stack_trace.h" @@ -26,10 +28,11 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("Binary service client example", ExampleType::ACTION_SERVER); + auto desc = heph::cli::ProgramDescription("Action server client example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::ACTION_SERVER)); const auto args = std::move(desc).parse(argc, argv); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); - auto [session_config, topic_config] = parseArgs(args); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); heph::utils::TerminationBlocker::registerInterruptCallback([session = std::ref(*session), &topic_config] { diff --git a/modules/examples/examples/zenoh_program_options.h b/modules/examples/examples/zenoh_program_options.h index cb5589c5..d8b5a33f 100644 --- a/modules/examples/examples/zenoh_program_options.h +++ b/modules/examples/examples/zenoh_program_options.h @@ -2,74 +2,21 @@ // Copyright (C) 2023-2024 HEPHAESTUS Contributors //================================================================================================= -#include "hephaestus/cli/program_options.h" -#include "hephaestus/ipc/common.h" -#include "hephaestus/utils/exception.h" +#include enum class ExampleType : uint8_t { PUBSUB, SERVICE, ACTION_SERVER }; -[[nodiscard]] inline auto getProgramDescription(const std::string& description, const ExampleType type) - -> heph::cli::ProgramDescription { +[[nodiscard]] inline auto getDefaultTopic(const ExampleType type) -> std::string { static constexpr auto DEFAULT_PUBSUB_KEY = "hephaestus/ipc/example/zenoh/put"; static constexpr auto DEFAULT_SERVICE_KEY = "hephaestus/ipc/example/zenoh/service"; static constexpr auto DEFAULT_ACTION_SERVER_KEY = "hephaestus/ipc/example/zenoh/action_server"; - const auto* const default_topic = [&type] { - switch (type) { - case ExampleType::PUBSUB: - return DEFAULT_PUBSUB_KEY; - case ExampleType::SERVICE: - return DEFAULT_SERVICE_KEY; - case ExampleType::ACTION_SERVER: - return DEFAULT_ACTION_SERVER_KEY; - } - }(); - - auto desc = heph::cli::ProgramDescription(description); - desc.defineOption("topic", 't', "Key expression", default_topic) - .defineOption("cache", 'c', "Cache size", 0) - .defineOption("mode", 'm', "Running mode: options: peer, client", "peer") - .defineOption("router", 'r', "Router endpoint", "") - .defineOption("protocol", 'p', "Protocol to use, options: udp, tcp, any", "any") - .defineFlag("shared_memory", 's', "Enable shared memory") - .defineFlag("qos", 'q', "Enable QoS") - .defineFlag("realtime", "Enable real-time communication"); - - return desc; -} - -[[nodiscard]] inline auto parseArgs(const heph::cli::ProgramOptions& args) - -> std::pair { - heph::ipc::TopicConfig topic_config{ .name = args.getOption("topic") }; - - heph::ipc::Config config; - config.cache_size = args.getOption("cache"); - - auto mode = args.getOption("mode"); - if (mode == "peer") { - config.mode = heph::ipc::Mode::PEER; - } else if (mode == "client") { - config.mode = heph::ipc::Mode::CLIENT; - } else { - heph::throwException(fmt::format("invalid mode value: {}", mode)); - } - - auto protocol = args.getOption("protocol"); - if (protocol == "any") { - config.protocol = heph::ipc::Protocol::ANY; - } else if (protocol == "udp") { - config.protocol = heph::ipc::Protocol::UDP; - } else if (protocol == "tcp") { - config.protocol = heph::ipc::Protocol::TCP; - } else { - heph::throwException( - fmt::format("invalid value {} for option 'protocol'", protocol)); + switch (type) { + case ExampleType::PUBSUB: + return DEFAULT_PUBSUB_KEY; + case ExampleType::SERVICE: + return DEFAULT_SERVICE_KEY; + case ExampleType::ACTION_SERVER: + return DEFAULT_ACTION_SERVER_KEY; } - - config.router = args.getOption("router"); - config.enable_shared_memory = args.getOption("shared_memory"); - config.qos = args.getOption("qos"); - config.real_time = args.getOption("realtime"); - - return { std::move(config), std::move(topic_config) }; } diff --git a/modules/examples/examples/zenoh_pub.cpp b/modules/examples/examples/zenoh_pub.cpp index 764292d3..b01c4746 100644 --- a/modules/examples/examples/zenoh_pub.cpp +++ b/modules/examples/examples/zenoh_pub.cpp @@ -12,9 +12,11 @@ #include +#include "hephaestus/cli/program_options.h" #include "hephaestus/examples/types/pose.h" #include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner) -#include "hephaestus/ipc/publisher.h" +#include "hephaestus/ipc/zenoh/program_options.h" +#include "hephaestus/ipc/zenoh/publisher.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/exception.h" #include "hephaestus/utils/signal_handler.h" @@ -25,20 +27,22 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("Periodic publisher example", ExampleType::PUBSUB); + auto desc = heph::cli::ProgramDescription("Periodic publisher example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::PUBSUB)); const auto args = std::move(desc).parse(argc, argv); - auto [session_config, topic_config] = parseArgs(args); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); - heph::ipc::Publisher publisher{ session, topic_config, - [](const auto& status) { - if (status.matching) { - fmt::println("Subscriber match"); - } else { - fmt::println("NO subscriber matching"); - } - } }; + heph::ipc::zenoh::Publisher publisher{ session, topic_config, + [](const auto& status) { + if (status.matching) { + fmt::println("Subscriber match"); + } else { + fmt::println( + "NO subscriber matching"); + } + } }; fmt::println("Declaring RawPublisher on '{}' with id: '{}'", topic_config.name, publisher.id()); diff --git a/modules/examples/examples/zenoh_service_client.cpp b/modules/examples/examples/zenoh_service_client.cpp index 5adb79bf..43f7d19c 100644 --- a/modules/examples/examples/zenoh_service_client.cpp +++ b/modules/examples/examples/zenoh_service_client.cpp @@ -15,8 +15,10 @@ #include // NOLINT(misc-include-cleaner) #include +#include "hephaestus/cli/program_options.h" #include "hephaestus/examples/types/pose.h" #include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner) +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/stack_trace.h" @@ -26,10 +28,11 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("Binary service client example", ExampleType::SERVICE); + auto desc = heph::cli::ProgramDescription("Binary service client example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::SERVICE)); const auto args = std::move(desc).parse(argc, argv); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); - auto [session_config, topic_config] = parseArgs(args); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); static constexpr auto K_TIMEOUT = std::chrono::seconds(10); diff --git a/modules/examples/examples/zenoh_service_server.cpp b/modules/examples/examples/zenoh_service_server.cpp index 33b0ddeb..a8d2da51 100644 --- a/modules/examples/examples/zenoh_service_server.cpp +++ b/modules/examples/examples/zenoh_service_server.cpp @@ -11,8 +11,10 @@ #include #include +#include "hephaestus/cli/program_options.h" #include "hephaestus/examples/types/pose.h" #include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner) +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/signal_handler.h" @@ -23,10 +25,10 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("Binary service server example", ExampleType::SERVICE); + auto desc = heph::cli::ProgramDescription("Binary service example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::SERVICE)); const auto args = std::move(desc).parse(argc, argv); - - auto [session_config, topic_config] = parseArgs(args); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); auto callback = [](const heph::examples::types::Pose& sample) { diff --git a/modules/examples/examples/zenoh_string_service_client.cpp b/modules/examples/examples/zenoh_string_service_client.cpp index e386eb16..6bf64caf 100644 --- a/modules/examples/examples/zenoh_string_service_client.cpp +++ b/modules/examples/examples/zenoh_string_service_client.cpp @@ -15,7 +15,9 @@ #include // NOLINT(misc-include-cleaner) #include +#include "hephaestus/cli/program_options.h" #include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner) +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/stack_trace.h" @@ -25,10 +27,11 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("String service client example", ExampleType::SERVICE); + auto desc = heph::cli::ProgramDescription("String service client example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::SERVICE)); const auto args = std::move(desc).parse(argc, argv); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); - auto [session_config, topic_config] = parseArgs(args); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); static constexpr auto K_TIMEOUT = std::chrono::seconds(10); diff --git a/modules/examples/examples/zenoh_string_service_server.cpp b/modules/examples/examples/zenoh_string_service_server.cpp index 555372f9..b3bec24c 100644 --- a/modules/examples/examples/zenoh_string_service_server.cpp +++ b/modules/examples/examples/zenoh_string_service_server.cpp @@ -12,6 +12,8 @@ #include #include +#include "hephaestus/cli/program_options.h" +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/signal_handler.h" @@ -22,10 +24,11 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("String service server example", ExampleType::SERVICE); + auto desc = heph::cli::ProgramDescription("String service server example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::SERVICE)); const auto args = std::move(desc).parse(argc, argv); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); - auto [session_config, topic_config] = parseArgs(args); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); auto callback = [](const std::string& sample) { diff --git a/modules/examples/examples/zenoh_sub.cpp b/modules/examples/examples/zenoh_sub.cpp index 62dbdbb1..3c9408cd 100644 --- a/modules/examples/examples/zenoh_sub.cpp +++ b/modules/examples/examples/zenoh_sub.cpp @@ -14,10 +14,11 @@ #include //NOLINT(misc-include-cleaner) #include +#include "hephaestus/cli/program_options.h" #include "hephaestus/examples/types/pose.h" #include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner) -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/subscriber.h" +#include "hephaestus/ipc/zenoh/program_options.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/ipc/zenoh/subscriber.h" #include "hephaestus/utils/signal_handler.h" @@ -28,24 +29,24 @@ auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { - auto desc = getProgramDescription("Periodic publisher example", ExampleType::PUBSUB); + auto desc = heph::cli::ProgramDescription("Subscriber example"); + heph::ipc::zenoh::appendProgramOption(desc, getDefaultTopic(ExampleType::PUBSUB)); const auto args = std::move(desc).parse(argc, argv); - - auto [session_config, topic_config] = parseArgs(args); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); LOG(INFO) << "Opening session..."; LOG(INFO) << fmt::format("Declaring Subscriber on '{}'", topic_config.name); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); - auto cb = [topic = topic_config.name](const heph::ipc::MessageMetadata& metadata, + auto cb = [topic = topic_config.name](const heph::ipc::zenoh::MessageMetadata& metadata, const std::shared_ptr& pose) { fmt::println(">> Time: {}. Topic {}. From: {}. Counter: {}. Received {}", std::chrono::system_clock::time_point{ std::chrono::duration_cast(metadata.timestamp) }, metadata.topic, metadata.sender_id, metadata.sequence_id, *pose); }; - auto subscriber = heph::ipc::subscribe( + auto subscriber = heph::ipc::zenoh::createSubscriber( session, std::move(topic_config), std::move(cb), true); (void)subscriber; diff --git a/modules/examples/tests/zenoh_tests.cpp b/modules/examples/tests/zenoh_tests.cpp index 2d59d6f0..79c43e10 100644 --- a/modules/examples/tests/zenoh_tests.cpp +++ b/modules/examples/tests/zenoh_tests.cpp @@ -12,9 +12,9 @@ #include "hephaestus/examples/types/pose.h" #include "hephaestus/examples/types_protobuf/geometry.h" // NOLINT(misc-include-cleaner) #include "hephaestus/examples/types_protobuf/pose.h" // NOLINT(misc-include-cleaner) -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/publisher.h" -#include "hephaestus/ipc/subscriber.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/publisher.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/ipc/zenoh/subscriber.h" @@ -28,7 +28,7 @@ constexpr auto kSeed = 42; // NOLINT(readability-identifier-naming) void checkMessageExchange(bool subscriber_dedicated_callback_thread) { auto mt = random::createRNG(); - ipc::Config config{}; + ipc::zenoh::Config config{}; auto session = ipc::zenoh::createSession(std::move(config)); const auto topic = ipc::TopicConfig("test_topic"); @@ -39,11 +39,11 @@ void checkMessageExchange(bool subscriber_dedicated_callback_thread) { std::atomic_flag stop_flag = ATOMIC_FLAG_INIT; // Create publisher and subscriber - ipc::Publisher publisher(session, topic); + ipc::zenoh::Publisher publisher(session, topic); - auto subscriber = ipc::subscribe( + auto subscriber = ipc::zenoh::createSubscriber( session, topic, - [&received_message, &stop_flag]([[maybe_unused]] const ipc::MessageMetadata& metadata, + [&received_message, &stop_flag]([[maybe_unused]] const ipc::zenoh::MessageMetadata& metadata, const std::shared_ptr& message) { received_message = *message; stop_flag.test_and_set(); @@ -62,7 +62,7 @@ void checkMessageExchange(bool subscriber_dedicated_callback_thread) { TEST(ZenohTests, WrongSubsriberTypeLargeIntoSmall) { auto mt = random::createRNG(); - ipc::Config config{}; + ipc::zenoh::Config config{}; auto session = ipc::zenoh::createSession(std::move(config)); const auto topic = ipc::TopicConfig("test_topic"); @@ -71,10 +71,10 @@ TEST(ZenohTests, WrongSubsriberTypeLargeIntoSmall) { std::atomic_flag stop_flag = ATOMIC_FLAG_INIT; - ipc::Publisher publisher(session, topic); - auto subscriber = ipc::subscribe( + ipc::zenoh::Publisher publisher(session, topic); + auto subscriber = ipc::zenoh::createSubscriber( session, topic, - [&received_message, &stop_flag]([[maybe_unused]] const ipc::MessageMetadata& metadata, + [&received_message, &stop_flag]([[maybe_unused]] const ipc::zenoh::MessageMetadata& metadata, const std::shared_ptr& message) { received_message = *message; stop_flag.test_and_set(); @@ -86,7 +86,7 @@ TEST(ZenohTests, WrongSubsriberTypeLargeIntoSmall) { TEST(ZenohTests, WrongSubsriberTypeSmallIntoLarge) { auto mt = random::createRNG(); - ipc::Config config{}; + ipc::zenoh::Config config{}; auto session = ipc::zenoh::createSession(std::move(config)); const auto topic = ipc::TopicConfig("test_topic"); @@ -95,10 +95,10 @@ TEST(ZenohTests, WrongSubsriberTypeSmallIntoLarge) { std::atomic_flag stop_flag = ATOMIC_FLAG_INIT; - ipc::Publisher publisher(session, topic); - auto subscriber = ipc::subscribe( + ipc::zenoh::Publisher publisher(session, topic); + auto subscriber = ipc::zenoh::createSubscriber( session, topic, - [&received_message, &stop_flag]([[maybe_unused]] const ipc::MessageMetadata& metadata, + [&received_message, &stop_flag]([[maybe_unused]] const ipc::zenoh::MessageMetadata& metadata, const std::shared_ptr& message) { received_message = *message; stop_flag.test_and_set(); @@ -122,12 +122,12 @@ TEST(ZenohTests, ServiceCallExchange) { const auto service_topic = ipc::TopicConfig("test_service"); - ipc::Config server_config{}; + ipc::zenoh::Config server_config{}; auto server_session = ipc::zenoh::createSession(std::move(server_config)); auto service_server = ipc::zenoh::Service(server_session, service_topic, [](const Pose& request) { return request; }); - ipc::Config client_config{}; + ipc::zenoh::Config client_config{}; auto client_session = ipc::zenoh::createSession(std::move(client_config)); const auto reply = ipc::zenoh::callService(*client_session, service_topic, request_message, std::chrono::milliseconds(10)); diff --git a/modules/ipc/CMakeLists.txt b/modules/ipc/CMakeLists.txt index b071c082..c7c38cf2 100644 --- a/modules/ipc/CMakeLists.txt +++ b/modules/ipc/CMakeLists.txt @@ -19,29 +19,34 @@ add_subdirectory(proto) # library sources set(SOURCES - src/common.cpp - src/program_options.cpp + src/topic.cpp src/topic_database.cpp src/topic_filter.cpp + src/zenoh/conversions.cpp src/zenoh/dynamic_subscriber.cpp src/zenoh/liveliness.cpp src/zenoh/publisher.cpp - src/zenoh/service.cpp + src/zenoh/program_options.cpp + src/zenoh/raw_publisher.cpp + src/zenoh/raw_subscriber.cpp src/zenoh/scout.cpp + src/zenoh/service.cpp + src/zenoh/session.cpp src/zenoh/subscriber.cpp - src/zenoh/utils.cpp src/zenoh/action_server/action_server.cpp src/zenoh/action_server/client_helper.cpp src/zenoh/action_server/types.cpp src/zenoh/action_server/types_protobuf.cpp README.md - include/hephaestus/ipc/common.h - include/hephaestus/ipc/program_options.h + include/hephaestus/ipc/topic.h include/hephaestus/ipc/topic_database.h include/hephaestus/ipc/topic_filter.h + include/hephaestus/ipc/zenoh/conversions.h include/hephaestus/ipc/zenoh/dynamic_subscriber.h - include/hephaestus/ipc/zenoh/utils.h + include/hephaestus/ipc/zenoh/program_options.h include/hephaestus/ipc/zenoh/publisher.h + include/hephaestus/ipc/zenoh/raw_publisher.h + include/hephaestus/ipc/zenoh/raw_subscriber.h include/hephaestus/ipc/zenoh/service.h include/hephaestus/ipc/zenoh/scout.h include/hephaestus/ipc/zenoh/subscriber.h diff --git a/modules/ipc/apps/zenoh_string_service.cpp b/modules/ipc/apps/zenoh_string_service.cpp index 0af3900b..07934f55 100644 --- a/modules/ipc/apps/zenoh_string_service.cpp +++ b/modules/ipc/apps/zenoh_string_service.cpp @@ -14,10 +14,10 @@ #include #include "hephaestus/cli/program_options.h" -#include "hephaestus/ipc/program_options.h" +#include "hephaestus/ipc/zenoh/conversions.h" +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/utils.h" #include "hephaestus/utils/stack_trace.h" auto main(int argc, const char* argv[]) -> int { @@ -27,12 +27,12 @@ auto main(int argc, const char* argv[]) -> int { auto desc = heph::cli::ProgramDescription("Simple service for std::string types for both Request and Reply. " "Don't use for services with different types."); - heph::ipc::appendIPCProgramOption(desc); + heph::ipc::zenoh::appendProgramOption(desc); desc.defineOption("value", 'v', "the value to pass the query", ""); const auto args = std::move(desc).parse(argc, argv); const auto value = args.getOption("value"); - auto [config, topic_config] = heph::ipc::parseIPCProgramOptions(args); + auto [config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); auto session = heph::ipc::zenoh::createSession(std::move(config)); LOG(INFO) << fmt::format("Opening session: {}", heph::ipc::zenoh::toString(session->zenoh_session.get_zid())); diff --git a/modules/ipc/apps/zenoh_topic_echo.cpp b/modules/ipc/apps/zenoh_topic_echo.cpp index 58f2b582..fd097fdb 100644 --- a/modules/ipc/apps/zenoh_topic_echo.cpp +++ b/modules/ipc/apps/zenoh_topic_echo.cpp @@ -20,9 +20,10 @@ #include #include "hephaestus/cli/program_options.h" -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/program_options.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/zenoh/dynamic_subscriber.h" +#include "hephaestus/ipc/zenoh/program_options.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/serdes/dynamic_deserializer.h" #include "hephaestus/serdes/type_info.h" @@ -55,26 +56,26 @@ void truncateLongItems(std::string& msg_json, bool noarr, size_t max_length) { } } // namespace -namespace heph::ipc::apps { +namespace heph::ipc::zenoh::apps { class TopicEcho { public: - TopicEcho(zenoh::SessionPtr session, TopicConfig topic_config, bool noarr, std::size_t max_array_length) + TopicEcho(SessionPtr session, TopicConfig topic_config, bool noarr, std::size_t max_array_length) : noarr_(noarr), max_array_length_(max_array_length), topic_config_(std::move(topic_config)) { - zenoh::DynamicSubscriberParams params{ - .session = std::move(session), - .topics_filter_params = { .include_topics_only = {}, - .prefix = topic_config_.name, - .exclude_topics = {} }, - .init_subscriber_cb = - [this](const auto& topic, const auto& type_info) { - (void)topic; - dynamic_deserializer_.registerSchema(type_info); - }, - .subscriber_cb = [this](const auto& metadata, auto data, - const auto& topic_info) { subscribeCallback(metadata, data, topic_info); } - }; - - dynamic_subscriber_ = std::make_unique(std::move(params)); + DynamicSubscriberParams params{ .session = std::move(session), + .topics_filter_params = { .include_topics_only = {}, + .prefix = topic_config_.name, + .exclude_topics = {} }, + .init_subscriber_cb = + [this](const auto& topic, const auto& type_info) { + (void)topic; + dynamic_deserializer_.registerSchema(type_info); + }, + .subscriber_cb = + [this](const auto& metadata, auto data, const auto& topic_info) { + subscribeCallback(metadata, data, topic_info); + } }; + + dynamic_subscriber_ = std::make_unique(std::move(params)); } [[nodiscard]] auto start() -> std::future { @@ -103,17 +104,17 @@ class TopicEcho { bool noarr_; std::size_t max_array_length_; TopicConfig topic_config_; - heph::serdes::DynamicDeserializer dynamic_deserializer_; - std::unique_ptr dynamic_subscriber_; + serdes::DynamicDeserializer dynamic_deserializer_; + std::unique_ptr dynamic_subscriber_; }; -} // namespace heph::ipc::apps +} // namespace heph::ipc::zenoh::apps auto main(int argc, const char* argv[]) -> int { const heph::utils::StackTrace stack_trace; try { auto desc = heph::cli::ProgramDescription("Echo the data from a topic to the console."); - heph::ipc::appendIPCProgramOption(desc); + heph::ipc::zenoh::appendProgramOption(desc); desc.defineFlag("noarr", "Truncate print of long arrays"); desc.defineOption( @@ -123,7 +124,7 @@ auto main(int argc, const char* argv[]) -> int { DEFAULT_MAX_ARRAY_LENGTH); const auto args = std::move(desc).parse(argc, argv); - auto [session_config, topic_config] = heph::ipc::parseIPCProgramOptions(args); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); const auto noarr = args.getOption("noarr"); const auto max_array_length = args.getOption("noarr-max-size"); @@ -131,7 +132,7 @@ auto main(int argc, const char* argv[]) -> int { auto session = heph::ipc::zenoh::createSession(std::move(session_config)); - heph::ipc::apps::TopicEcho topic_echo{ std::move(session), topic_config, noarr, max_array_length }; + heph::ipc::zenoh::apps::TopicEcho topic_echo{ std::move(session), topic_config, noarr, max_array_length }; topic_echo.start().wait(); heph::utils::TerminationBlocker::waitForInterrupt(); diff --git a/modules/ipc/apps/zenoh_topic_list.cpp b/modules/ipc/apps/zenoh_topic_list.cpp index 924f6609..bda6b15d 100644 --- a/modules/ipc/apps/zenoh_topic_list.cpp +++ b/modules/ipc/apps/zenoh_topic_list.cpp @@ -13,9 +13,9 @@ #include #include "hephaestus/cli/program_options.h" -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/program_options.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/zenoh/liveliness.h" +#include "hephaestus/ipc/zenoh/program_options.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/signal_handler.h" #include "hephaestus/utils/stack_trace.h" @@ -40,11 +40,11 @@ auto main(int argc, const char* argv[]) -> int { try { auto desc = heph::cli::ProgramDescription("List all the publishers of a topic."); - heph::ipc::appendIPCProgramOption(desc); + heph::ipc::zenoh::appendProgramOption(desc); desc.defineFlag("live", 'l', "if set the app will keep running waiting for new publisher to advertise"); const auto args = std::move(desc).parse(argc, argv); - auto [session_config, topic_config] = heph::ipc::parseIPCProgramOptions(args); + auto [session_config, topic_config] = heph::ipc::zenoh::parseProgramOptions(args); fmt::println("Opening session..."); auto session = heph::ipc::zenoh::createSession(std::move(session_config)); diff --git a/modules/ipc/include/hephaestus/ipc/common.h b/modules/ipc/include/hephaestus/ipc/common.h deleted file mode 100644 index 0e566211..00000000 --- a/modules/ipc/include/hephaestus/ipc/common.h +++ /dev/null @@ -1,48 +0,0 @@ -//================================================================================================ -// Copyright (C) 2023-2024 HEPHAESTUS Contributors -//================================================================================================= - -#pragma once - -#include -#include - -#include - -namespace heph::ipc { - -enum class Mode : uint8_t { PEER = 0, CLIENT, ROUTER }; -enum class Protocol : uint8_t { ANY = 0, UDP, TCP }; - -struct Config { - bool enable_shared_memory = false; //! NOTE: With shared-memory enabled, the publisher still uses the - //! network transport layer to notify subscribers of the shared-memory - //! segment to read. Therefore, for very small messages, shared - - //! memory transport could be less efficient than using the default - //! network transport to directly carry the payload. - Mode mode = Mode::PEER; - // NOLINTNEXTLINE(readability-redundant-string-init) otherwise we need to specify in constructor - std::string router = ""; //! If specified connect to the given router endpoint. - std::size_t cache_size = 0; - bool qos = false; - bool real_time = false; - Protocol protocol{ Protocol::ANY }; -}; - -struct TopicConfig { - std::string name; -}; - -struct MessageMetadata { - // TODO: convert this to a uuid - std::string sender_id; - std::string topic; - std::chrono::nanoseconds timestamp{}; - std::size_t sequence_id{}; -}; - -[[nodiscard]] static inline auto getTypeInfoServiceTopic(const std::string& topic) -> std::string { - return fmt::format("type_info/{}", topic); -} - -} // namespace heph::ipc diff --git a/modules/ipc/include/hephaestus/ipc/program_options.h b/modules/ipc/include/hephaestus/ipc/program_options.h deleted file mode 100644 index 501d5b73..00000000 --- a/modules/ipc/include/hephaestus/ipc/program_options.h +++ /dev/null @@ -1,16 +0,0 @@ -//================================================================================================= -// Copyright (C) 2023-2024 HEPHAESTUS Contributors -//================================================================================================= -#pragma once - -#include "hephaestus/cli/program_options.h" -#include "hephaestus/ipc/common.h" - -namespace heph::ipc { - -void appendIPCProgramOption(cli::ProgramDescription& program_description); - -[[nodiscard]] auto parseIPCProgramOptions(const heph::cli::ProgramOptions& args) - -> std::pair; - -} // namespace heph::ipc diff --git a/modules/ipc/include/hephaestus/ipc/publisher.h b/modules/ipc/include/hephaestus/ipc/publisher.h deleted file mode 100644 index 2f8e8bbe..00000000 --- a/modules/ipc/include/hephaestus/ipc/publisher.h +++ /dev/null @@ -1,36 +0,0 @@ -//================================================================================================= -// Copyright (C) 2023-2024 HEPHAESTUS Contributors -//================================================================================================= - -#pragma once - -#include -#include - -#include "hephaestus/ipc/zenoh/publisher.h" -#include "hephaestus/serdes/serdes.h" - -namespace heph::ipc { - -template -class Publisher { -public: - Publisher(zenoh::SessionPtr session, TopicConfig topic_config, - zenoh::RawPublisher::MatchCallback&& match_cb = nullptr) - : publisher_(std::move(session), std::move(topic_config), serdes::getSerializedTypeInfo(), - std::move(match_cb)) { - } - - [[nodiscard]] auto publish(const T& data) -> bool { - auto buffer = serdes::serialize(data); - return publisher_.publish(buffer); - } - - [[nodiscard]] auto id() const -> std::string { - return publisher_.id(); - } - -private: - zenoh::RawPublisher publisher_; -}; -} // namespace heph::ipc diff --git a/modules/ipc/include/hephaestus/ipc/subscriber.h b/modules/ipc/include/hephaestus/ipc/subscriber.h deleted file mode 100644 index 12acaa87..00000000 --- a/modules/ipc/include/hephaestus/ipc/subscriber.h +++ /dev/null @@ -1,36 +0,0 @@ -//================================================================================================= -// Copyright (C) 2023-2024 HEPHAESTUS Contributors -//================================================================================================= - -#pragma once - -#include - -#include -#include - -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/serdes/serdes.h" - -namespace heph::ipc { - -template -using DataCallback = std::function&)>; - -template -[[nodiscard]] auto subscribe(zenoh::SessionPtr session, TopicConfig topic_config, - DataCallback&& callback, - bool dedicated_callback_thread = false) -> std::unique_ptr { - auto cb = [callback = std::move(callback)](const MessageMetadata& metadata, - std::span buffer) mutable { - auto data = std::make_shared(); - heph::serdes::deserialize(buffer, *data); - callback(metadata, std::move(data)); - }; - - return std::make_unique(std::move(session), std::move(topic_config), std::move(cb), - dedicated_callback_thread); -} - -} // namespace heph::ipc diff --git a/modules/ipc/include/hephaestus/ipc/topic.h b/modules/ipc/include/hephaestus/ipc/topic.h new file mode 100644 index 00000000..a01881ba --- /dev/null +++ b/modules/ipc/include/hephaestus/ipc/topic.h @@ -0,0 +1,15 @@ +//================================================================================================ +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#pragma once + +#include + +namespace heph::ipc { + +struct TopicConfig { + std::string name; +}; + +} // namespace heph::ipc diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/action_server/action_server.h b/modules/ipc/include/hephaestus/ipc/zenoh/action_server/action_server.h index bfd1bd8f..3b9e34e2 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/action_server/action_server.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/action_server/action_server.h @@ -12,11 +12,11 @@ #include #include "hephaestus/concurrency/message_queue_consumer.h" -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/publisher.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/zenoh/action_server/client_helper.h" #include "hephaestus/ipc/zenoh/action_server/types.h" #include "hephaestus/ipc/zenoh/action_server/types_protobuf.h" // NOLINT(misc-include-cleaner) +#include "hephaestus/ipc/zenoh/publisher.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/action_server/client_helper.h b/modules/ipc/include/hephaestus/ipc/zenoh/action_server/client_helper.h index 714ab7bb..b4b57936 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/action_server/client_helper.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/action_server/client_helper.h @@ -8,10 +8,9 @@ #include -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/subscriber.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/zenoh/action_server/types.h" -#include "hephaestus/ipc/zenoh/action_server/types_protobuf.h" +#include "hephaestus/ipc/zenoh/action_server/types_protobuf.h" // NOLINT(misc-include-cleaner) #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/ipc/zenoh/subscriber.h" @@ -50,7 +49,7 @@ class ClientHelper { SessionPtr session_; TopicConfig topic_config_; - std::unique_ptr status_subscriber_; + std::unique_ptr> status_subscriber_; std::unique_ptr, RequestResponse>> response_service_; std::promise> reply_promise_; @@ -61,7 +60,7 @@ ClientHelper::ClientHelper(SessionPtr session, TopicC StatusUpdateCallback&& status_update_cb) : session_(std::move(session)) , topic_config_(std::move(topic_config)) - , status_subscriber_(subscribe( + , status_subscriber_(createSubscriber( session_, internal::getStatusPublisherTopic(topic_config_), [status_update_cb = std::move(status_update_cb)](const MessageMetadata&, const std::shared_ptr& status) mutable { diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/conversions.h b/modules/ipc/include/hephaestus/ipc/zenoh/conversions.h new file mode 100644 index 00000000..51abce94 --- /dev/null +++ b/modules/ipc/include/hephaestus/ipc/zenoh/conversions.h @@ -0,0 +1,57 @@ +//================================================================================================ +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#pragma once + +#include +#include +#include + +#include +#include + +#include "hephaestus/ipc/zenoh/session.h" + +namespace heph::ipc::zenoh { + +static constexpr auto TEXT_PLAIN_ENCODING = "text/plain"; +/// We use single char key to reduce the overhead of the attachment. +static constexpr auto PUBLISHER_ATTACHMENT_MESSAGE_COUNTER_KEY = "0"; +static constexpr auto PUBLISHER_ATTACHMENT_MESSAGE_SESSION_ID_KEY = "1"; + +[[nodiscard]] auto toByteVector(const ::zenoh::Bytes& bytes) -> std::vector; + +[[nodiscard]] auto toZenohBytes(std::span buffer) -> ::zenoh::Bytes; + +[[nodiscard]] auto toString(const ::zenoh::Id& id) -> std::string; + +[[nodiscard]] auto toString(const std::vector& vec) -> std::string; + +[[nodiscard]] auto toChrono(const ::zenoh::Timestamp& timestamp) -> std::chrono::nanoseconds; + +[[nodiscard]] constexpr auto toString(const ::zenoh::WhatAmI& me) -> std::string_view { + switch (me) { + case ::zenoh::WhatAmI::Z_WHATAMI_ROUTER: + return "Router"; + case ::zenoh::WhatAmI::Z_WHATAMI_PEER: + return "Peer"; + case ::zenoh::WhatAmI::Z_WHATAMI_CLIENT: + return "Client"; + } +} + +[[nodiscard]] constexpr auto toMode(const ::zenoh::WhatAmI& me) -> Mode { + switch (me) { + case ::zenoh::WhatAmI::Z_WHATAMI_ROUTER: + return Mode::ROUTER; + case ::zenoh::WhatAmI::Z_WHATAMI_PEER: + return Mode::PEER; + case ::zenoh::WhatAmI::Z_WHATAMI_CLIENT: + return Mode::CLIENT; + } + + __builtin_unreachable(); // TODO(C++23): replace with std::unreachable. +} + +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/dynamic_subscriber.h b/modules/ipc/include/hephaestus/ipc/zenoh/dynamic_subscriber.h index 42171b37..1871f10e 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/dynamic_subscriber.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/dynamic_subscriber.h @@ -9,8 +9,8 @@ #include "hephaestus/ipc/topic_database.h" #include "hephaestus/ipc/topic_filter.h" #include "hephaestus/ipc/zenoh/liveliness.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/subscriber.h" #include "hephaestus/serdes/type_info.h" namespace heph::ipc::zenoh { @@ -54,7 +54,7 @@ class DynamicSubscriber { SessionPtr topic_info_query_session_; // Session used to query topic service. std::unique_ptr topic_db_; - std::unordered_map> subscribers_; + std::unordered_map> subscribers_; TopicWithTypeInfoCallback init_subscriber_cb_; SubscriberWithTypeCallback subscriber_cb_; }; diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/liveliness.h b/modules/ipc/include/hephaestus/ipc/zenoh/liveliness.h index 0db08bb6..42f97bb0 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/liveliness.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/liveliness.h @@ -10,7 +10,7 @@ #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/zenoh/session.h" namespace heph::ipc::zenoh { @@ -21,8 +21,8 @@ struct PublisherInfo { PublisherStatus status; }; -[[nodiscard]] auto getListOfPublishers(const Session& session, - std::string_view topic = "**") -> std::vector; +[[nodiscard]] auto getListOfPublishers(const Session& session, std::string_view topic = "**") + -> std::vector; void printPublisherInfo(const PublisherInfo& info); diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/program_options.h b/modules/ipc/include/hephaestus/ipc/zenoh/program_options.h new file mode 100644 index 00000000..5baf278a --- /dev/null +++ b/modules/ipc/include/hephaestus/ipc/zenoh/program_options.h @@ -0,0 +1,19 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= +#pragma once + +#include "hephaestus/cli/program_options.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/session.h" + +namespace heph::ipc::zenoh { + +static constexpr auto DEFAULT_TOPIC = "**"; +void appendProgramOption(cli::ProgramDescription& program_description, + const std::string& default_topic = DEFAULT_TOPIC); + +[[nodiscard]] auto parseProgramOptions(const heph::cli::ProgramOptions& args) + -> std::pair; + +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/publisher.h b/modules/ipc/include/hephaestus/ipc/zenoh/publisher.h index a5398f2c..d113983b 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/publisher.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/publisher.h @@ -3,73 +3,35 @@ //================================================================================================= #pragma once +#include +#include -#include - -#include -#include - -#include "hephaestus/ipc/zenoh/service.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/raw_publisher.h" #include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/utils.h" -#include "hephaestus/serdes/type_info.h" +#include "hephaestus/serdes/serdes.h" namespace heph::ipc::zenoh { -struct MatchingStatus { - bool matching{}; //! If true publisher is connect to at least one subscriber. -}; - -/// - Create a Zenoh publisher on the topic specified in `config`. -/// - Create a service that provides the schema used to serialize the data. -/// - the service is published on the topic created via `getTypeInfoServiceTopic(topic)` -/// - e.g. for topic `hephaestus/pose` it create a service on `type_info/hephaestus/pose` -/// - the service returns the Json representation of the type info, that can be converted using -/// serdes::TypeInfo::fromJson(str); -/// - If `match_cb` is passed, it is called when the first subscriber matches and when the last one unmatch. -class RawPublisher { +template +class Publisher { public: - using MatchCallback = std::function; - /// - RawPublisher(SessionPtr session, TopicConfig topic_config, serdes::TypeInfo type_info, - MatchCallback&& match_cb = nullptr); - ~RawPublisher(); - RawPublisher(const RawPublisher&) = delete; - RawPublisher(RawPublisher&&) = delete; - auto operator=(const RawPublisher&) -> RawPublisher& = delete; - auto operator=(RawPublisher&&) -> RawPublisher& = delete; + Publisher(SessionPtr session, TopicConfig topic_config, RawPublisher::MatchCallback&& match_cb = nullptr) + : publisher_(std::move(session), std::move(topic_config), serdes::getSerializedTypeInfo(), + std::move(match_cb)) { + } - [[nodiscard]] auto publish(std::span data) -> bool; + [[nodiscard]] auto publish(const T& data) -> bool { + auto buffer = serdes::serialize(data); + return publisher_.publish(buffer); + } [[nodiscard]] auto id() const -> std::string { - return toString(session_->zenoh_session.get_zid()); + return publisher_.id(); } private: - void enableCache(); - [[nodiscard]] auto createPublisherOptions() -> ::zenoh::Publisher::PutOptions; - void enableMatchingListener(); - void createTypeInfoService(); - -private: - SessionPtr session_; - TopicConfig topic_config_; - std::unique_ptr<::zenoh::Publisher> publisher_; - - serdes::TypeInfo type_info_; - std::unique_ptr> type_service_; - - std::unique_ptr<::zenoh::LivelinessToken> liveliness_token_; - - bool enable_cache_ = false; - ze_owned_publication_cache_t cache_publisher_{}; - z_owned_session_t zenoh_cache_session_{}; - - std::size_t pub_msg_count_ = 0; - std::unordered_map attachment_; - - MatchCallback match_cb_{ nullptr }; - zc_owned_matching_listener_t subscriers_listener_{}; + RawPublisher publisher_; }; } // namespace heph::ipc::zenoh diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/raw_publisher.h b/modules/ipc/include/hephaestus/ipc/zenoh/raw_publisher.h new file mode 100644 index 00000000..0596aff5 --- /dev/null +++ b/modules/ipc/include/hephaestus/ipc/zenoh/raw_publisher.h @@ -0,0 +1,79 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#pragma once + +#include + +#include +#include + +#include "hephaestus/ipc/zenoh/conversions.h" +#include "hephaestus/ipc/zenoh/service.h" +#include "hephaestus/ipc/zenoh/session.h" +#include "hephaestus/serdes/type_info.h" + +namespace heph::ipc::zenoh { + +struct MatchingStatus { + bool matching{}; //! If true publisher is connect to at least one subscriber. +}; + +/// - Create a Zenoh publisher on the topic specified in `config`. +/// - Create a service that provides the schema used to serialize the data. +/// - the service is published on the topic created via `getTypeInfoServiceTopic(topic)` +/// - e.g. for topic `hephaestus/pose` it create a service on `type_info/hephaestus/pose` +/// - the service returns the Json representation of the type info, that can be converted using +/// serdes::TypeInfo::fromJson(str); +/// - If `match_cb` is passed, it is called when the first subscriber matches and when the last one unmatch. +class RawPublisher { +public: + using MatchCallback = std::function; + /// + RawPublisher(SessionPtr session, TopicConfig topic_config, serdes::TypeInfo type_info, + MatchCallback&& match_cb = nullptr); + ~RawPublisher(); + RawPublisher(const RawPublisher&) = delete; + RawPublisher(RawPublisher&&) = delete; + auto operator=(const RawPublisher&) -> RawPublisher& = delete; + auto operator=(RawPublisher&&) -> RawPublisher& = delete; + + [[nodiscard]] auto publish(std::span data) -> bool; + + [[nodiscard]] auto id() const -> std::string { + return toString(session_->zenoh_session.get_zid()); + } + +private: + void enableCache(); + [[nodiscard]] auto createPublisherOptions() -> ::zenoh::Publisher::PutOptions; + void enableMatchingListener(); + void createTypeInfoService(); + +private: + SessionPtr session_; + TopicConfig topic_config_; + std::unique_ptr<::zenoh::Publisher> publisher_; + + serdes::TypeInfo type_info_; + std::unique_ptr> type_service_; + + std::unique_ptr<::zenoh::LivelinessToken> liveliness_token_; + + bool enable_cache_ = false; + ze_owned_publication_cache_t cache_publisher_{}; + z_owned_session_t zenoh_cache_session_{}; + + std::size_t pub_msg_count_ = 0; + std::unordered_map attachment_; + + MatchCallback match_cb_{ nullptr }; + zc_owned_matching_listener_t subscriers_listener_{}; +}; + +[[nodiscard]] static inline auto getTypeInfoServiceTopic(const std::string& topic) -> std::string { + return fmt::format("type_info/{}", topic); +} + +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/raw_subscriber.h b/modules/ipc/include/hephaestus/ipc/zenoh/raw_subscriber.h new file mode 100644 index 00000000..0ce7ad2b --- /dev/null +++ b/modules/ipc/include/hephaestus/ipc/zenoh/raw_subscriber.h @@ -0,0 +1,63 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#pragma once + +#include + +#include +#include + +#include "hephaestus/concurrency/message_queue_consumer.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/session.h" + +namespace heph::ipc::zenoh { + +struct MessageMetadata { + // TODO: convert this to a uuid + std::string sender_id; + std::string topic; + std::chrono::nanoseconds timestamp{}; + std::size_t sequence_id{}; +}; + +class RawSubscriber { +public: + using DataCallback = std::function)>; + + /// Note: setting dedicated_callback_thread to true will consume the messages in a dedicated thread. + /// While this avoid blocking the Zenoh session thread to process other messages, + /// it also introduce an overhead due to the message data being copied. + RawSubscriber(SessionPtr session, TopicConfig topic_config, DataCallback&& callback, + bool dedicated_callback_thread = false); + ~RawSubscriber(); + RawSubscriber(const RawSubscriber&) = delete; + RawSubscriber(RawSubscriber&&) = delete; + auto operator=(const RawSubscriber&) -> RawSubscriber& = delete; + auto operator=(RawSubscriber&&) -> RawSubscriber& = delete; + +private: + void callback(const ::zenoh::Sample& sample); + +private: + using Message = std::pair>; + + SessionPtr session_; + TopicConfig topic_config_; + + DataCallback callback_; + + std::unique_ptr<::zenoh::Subscriber> subscriber_; + + bool enable_cache_ = false; + ze_owned_querying_subscriber_t cache_subscriber_{}; + z_owned_session_t zenoh_cache_session_{}; + + bool dedicated_callback_thread_; + static constexpr std::size_t DEFAULT_CACHE_RESERVES = 100; + std::unique_ptr> callback_messages_consumer_; +}; + +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/scout.h b/modules/ipc/include/hephaestus/ipc/zenoh/scout.h index 13284d78..6459d885 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/scout.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/scout.h @@ -5,7 +5,7 @@ #pragma once #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/zenoh/session.h" namespace heph::ipc::zenoh { diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/service.h b/modules/ipc/include/hephaestus/ipc/zenoh/service.h index 5397ad98..8664304f 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/service.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/service.h @@ -25,9 +25,9 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/conversions.h" #include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/utils.h" #include "hephaestus/serdes/serdes.h" #include "hephaestus/utils/exception.h" diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/session.h b/modules/ipc/include/hephaestus/ipc/zenoh/session.h index bfa268d1..d96a3916 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/session.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/session.h @@ -7,20 +7,32 @@ #include #include -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/zenoh/utils.h" - namespace heph::ipc::zenoh { +enum class Mode : uint8_t { PEER = 0, CLIENT, ROUTER }; +enum class Protocol : uint8_t { ANY = 0, UDP, TCP }; + +struct Config { + bool enable_shared_memory = false; //! NOTE: With shared-memory enabled, the publisher still uses the + //! network transport layer to notify subscribers of the shared-memory + //! segment to read. Therefore, for very small messages, shared - + //! memory transport could be less efficient than using the default + //! network transport to directly carry the payload. + Mode mode = Mode::PEER; + // NOLINTNEXTLINE(readability-redundant-string-init) otherwise we need to specify in constructor + std::string router = ""; //! If specified connect to the given router endpoint. + std::size_t cache_size = 0; + bool qos = false; + bool real_time = false; + Protocol protocol{ Protocol::ANY }; +}; + struct Session { ::zenoh::Session zenoh_session; Config config; }; using SessionPtr = std::shared_ptr; +[[nodiscard]] auto createSession(Config config) -> SessionPtr; -[[nodiscard]] inline auto createSession(Config config) -> SessionPtr { - auto zconfig = createZenohConfig(config); - return std::make_shared(::zenoh::Session::open(std::move(zconfig)), std::move(config)); -} } // namespace heph::ipc::zenoh diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/subscriber.h b/modules/ipc/include/hephaestus/ipc/zenoh/subscriber.h index 1577a19d..2190e63b 100644 --- a/modules/ipc/include/hephaestus/ipc/zenoh/subscriber.h +++ b/modules/ipc/include/hephaestus/ipc/zenoh/subscriber.h @@ -9,47 +9,40 @@ #include #include -#include "hephaestus/concurrency/message_queue_consumer.h" -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/ipc/zenoh/session.h" +#include "hephaestus/serdes/serdes.h" namespace heph::ipc::zenoh { +template class Subscriber { public: - using DataCallback = std::function)>; - - /// Note: setting dedicated_callback_thread to true will consume the messages in a dedicated thread. - /// While this avoid blocking the Zenoh session thread to process other messages, - /// it also introduce an overhead due to the message data being copied. - Subscriber(SessionPtr session, TopicConfig topic_config, DataCallback&& callback, - bool dedicated_callback_thread = false); - ~Subscriber(); - Subscriber(const Subscriber&) = delete; - Subscriber(Subscriber&&) = delete; - auto operator=(const Subscriber&) -> Subscriber& = delete; - auto operator=(Subscriber&&) -> Subscriber& = delete; + using DataCallback = std::function&)>; + Subscriber(zenoh::SessionPtr session, TopicConfig topic_config, DataCallback&& callback, + bool dedicated_callback_thread = false) + : subscriber_( + std::move(session), std::move(topic_config), + [callback = std::move(callback)](const MessageMetadata& metadata, + std::span buffer) mutable { + auto data = std::make_shared(); + heph::serdes::deserialize(buffer, *data); + callback(metadata, std::move(data)); + }, + dedicated_callback_thread) { + } private: - void callback(const ::zenoh::Sample& sample); - -private: - using Message = std::pair>; - - SessionPtr session_; - TopicConfig topic_config_; - - DataCallback callback_; - - std::unique_ptr<::zenoh::Subscriber> subscriber_; - - bool enable_cache_ = false; - ze_owned_querying_subscriber_t cache_subscriber_{}; - z_owned_session_t zenoh_cache_session_{}; - - bool dedicated_callback_thread_; - static constexpr std::size_t DEFAULT_CACHE_RESERVES = 100; - std::unique_ptr> callback_messages_consumer_; + RawSubscriber subscriber_; }; +template +[[nodiscard]] auto createSubscriber(zenoh::SessionPtr session, TopicConfig topic_config, + typename Subscriber::DataCallback&& callback, + bool dedicated_callback_thread = false) + -> std::unique_ptr> { + return std::make_unique>(std::move(session), std::move(topic_config), std::move(callback), + dedicated_callback_thread); +} + } // namespace heph::ipc::zenoh diff --git a/modules/ipc/include/hephaestus/ipc/zenoh/utils.h b/modules/ipc/include/hephaestus/ipc/zenoh/utils.h deleted file mode 100644 index 3c548bb7..00000000 --- a/modules/ipc/include/hephaestus/ipc/zenoh/utils.h +++ /dev/null @@ -1,108 +0,0 @@ -//================================================================================================ -// Copyright (C) 2023-2024 HEPHAESTUS Contributors -//================================================================================================= - -#pragma once - -#include -#include -#include - -#include -#include - -#include "hephaestus/ipc/common.h" - -namespace heph::ipc::zenoh { - -static constexpr auto TEXT_PLAIN_ENCODING = "text/plain"; -/// We use single char key to reduce the overhead of the attachment. -static constexpr auto PUBLISHER_ATTACHMENT_MESSAGE_COUNTER_KEY = "0"; -static constexpr auto PUBLISHER_ATTACHMENT_MESSAGE_SESSION_ID_KEY = "1"; - -[[nodiscard]] static inline auto toByteVector(const ::zenoh::Bytes& bytes) -> std::vector { - auto reader = bytes.reader(); - std::vector vec(bytes.size()); - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-type-const-cast) - reader.read(reinterpret_cast(vec.data()), vec.size()); - return vec; -} - -[[nodiscard]] static inline auto toZenohBytes(std::span buffer) -> ::zenoh::Bytes { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - const std::string_view data_view{ reinterpret_cast(buffer.data()), buffer.size() }; - return ::zenoh::Bytes{ data_view }; -} - -inline auto toString(const ::zenoh::Id& id) -> std::string { - return std::accumulate(std::begin(id.bytes()), std::end(id.bytes()), std::string(), - [](const std::string& s, uint8_t v) { return fmt::format("{:02x}", v) + s; }); -} - -constexpr auto toString(const ::zenoh::WhatAmI& me) -> std::string_view { - switch (me) { - case ::zenoh::WhatAmI::Z_WHATAMI_ROUTER: - return "Router"; - case ::zenoh::WhatAmI::Z_WHATAMI_PEER: - return "Peer"; - case ::zenoh::WhatAmI::Z_WHATAMI_CLIENT: - return "Client"; - } -} - -constexpr auto toString(const Mode& mode) -> std::string_view { - switch (mode) { - case Mode::ROUTER: - return "Router"; - case Mode::PEER: - return "Peer"; - case Mode::CLIENT: - return "Client"; - } - - __builtin_unreachable(); // TODO(C++23): replace with std::unreachable. -} - -constexpr auto toMode(const ::zenoh::WhatAmI& me) -> Mode { - switch (me) { - case ::zenoh::WhatAmI::Z_WHATAMI_ROUTER: - return Mode::ROUTER; - case ::zenoh::WhatAmI::Z_WHATAMI_PEER: - return Mode::PEER; - case ::zenoh::WhatAmI::Z_WHATAMI_CLIENT: - return Mode::CLIENT; - } - - __builtin_unreachable(); // TODO(C++23): replace with std::unreachable. -} - -inline auto toString(const std::vector& vec) -> std::string { - std::string str = "["; - for (const auto& value : vec) { - str += fmt::format("\"{:s}\", ", value); - } - - str += "]"; - return str; -} - -inline auto toChrono(uint64_t timestamp) -> std::chrono::nanoseconds { - // For details see https://zenoh.io/docs/manual/abstractions/#timestamp - const auto seconds = std::chrono::seconds{ static_cast(timestamp >> 32U) }; - static constexpr auto FRACTION_MASK = 0xFFFFFFF0; - auto fraction = static_cast(timestamp & FRACTION_MASK); // - // Convert fraction to nanoseconds - // The fraction is in units of 2^-32 seconds, so we multiply by 10^9 / 2^32 - auto nanoseconds = - std::chrono::nanoseconds{ static_cast(fraction) * 1'000'000'000 / 0x100000000 }; // NOLINT - - return seconds + nanoseconds; -} - -inline auto toChrono(const ::zenoh::Timestamp& timestamp) -> std::chrono::nanoseconds { - return toChrono(timestamp.get_time()); -} - -[[nodiscard]] auto createZenohConfig(const Config& config) -> ::zenoh::Config; - -} // namespace heph::ipc::zenoh diff --git a/modules/ipc/src/common.cpp b/modules/ipc/src/topic.cpp similarity index 78% rename from modules/ipc/src/common.cpp rename to modules/ipc/src/topic.cpp index c0702f0a..d171965a 100644 --- a/modules/ipc/src/common.cpp +++ b/modules/ipc/src/topic.cpp @@ -2,4 +2,4 @@ // Copyright (C) 2023-2024 HEPHAESTUS Contributors //================================================================================================= -#include "hephaestus/ipc/common.h" // NOLINT(misc-include-cleaner) +#include "hephaestus/ipc/topic.h" // NOLINT(misc-include-cleaner) diff --git a/modules/ipc/src/topic_database.cpp b/modules/ipc/src/topic_database.cpp index ea7e00d7..e8f3c347 100644 --- a/modules/ipc/src/topic_database.cpp +++ b/modules/ipc/src/topic_database.cpp @@ -14,7 +14,8 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/raw_publisher.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/serdes/type_info.h" @@ -46,7 +47,7 @@ auto ZenohTopicDatabase::getTypeInfo(const std::string& topic) -> const serdes:: } } // Unlock while querying the service. - auto query_topic = getTypeInfoServiceTopic(topic); + auto query_topic = zenoh::getTypeInfoServiceTopic(topic); static constexpr auto TIMEOUT = std::chrono::milliseconds{ 5000 }; const auto response = zenoh::callService( diff --git a/modules/ipc/src/zenoh/action_server/action_server.cpp b/modules/ipc/src/zenoh/action_server/action_server.cpp index b231c3fa..d8b7f0c8 100644 --- a/modules/ipc/src/zenoh/action_server/action_server.cpp +++ b/modules/ipc/src/zenoh/action_server/action_server.cpp @@ -10,7 +10,7 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/zenoh/action_server/client_helper.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" diff --git a/modules/ipc/src/zenoh/action_server/client_helper.cpp b/modules/ipc/src/zenoh/action_server/client_helper.cpp index e4e7f84b..54a9130a 100644 --- a/modules/ipc/src/zenoh/action_server/client_helper.cpp +++ b/modules/ipc/src/zenoh/action_server/client_helper.cpp @@ -6,7 +6,7 @@ #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" namespace heph::ipc::zenoh::action_server::internal { auto getStatusPublisherTopic(const TopicConfig& server_topic) -> TopicConfig { diff --git a/modules/ipc/src/zenoh/conversions.cpp b/modules/ipc/src/zenoh/conversions.cpp new file mode 100644 index 00000000..da9900a7 --- /dev/null +++ b/modules/ipc/src/zenoh/conversions.cpp @@ -0,0 +1,70 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#include "hephaestus/ipc/zenoh/conversions.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace heph::ipc::zenoh { +namespace { +[[nodiscard]] auto toChrono(uint64_t timestamp) -> std::chrono::nanoseconds { + // For details see https://zenoh.io/docs/manual/abstractions/#timestamp + const auto seconds = std::chrono::seconds{ static_cast(timestamp >> 32U) }; + static constexpr auto FRACTION_MASK = 0xFFFFFFF0; + auto fraction = static_cast(timestamp & FRACTION_MASK); // + // Convert fraction to nanoseconds + // The fraction is in units of 2^-32 seconds, so we multiply by 10^9 / 2^32 + auto nanoseconds = + std::chrono::nanoseconds{ static_cast(fraction) * 1'000'000'000 / 0x100000000 }; // NOLINT + + return seconds + nanoseconds; +} +} // namespace + +auto toByteVector(const ::zenoh::Bytes& bytes) -> std::vector { + auto reader = bytes.reader(); + std::vector vec(bytes.size()); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-type-const-cast) + reader.read(reinterpret_cast(vec.data()), vec.size()); + return vec; +} + +auto toZenohBytes(std::span buffer) -> ::zenoh::Bytes { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + const std::string_view data_view{ reinterpret_cast(buffer.data()), buffer.size() }; + return ::zenoh::Bytes{ data_view }; +} + +auto toString(const ::zenoh::Id& id) -> std::string { + return std::accumulate(std::begin(id.bytes()), std::end(id.bytes()), std::string(), + [](const std::string& s, uint8_t v) { return fmt::format("{:02x}", v) + s; }); +} + +auto toString(const std::vector& vec) -> std::string { + std::string str = "["; + for (const auto& value : vec) { + str += fmt::format("\"{:s}\", ", value); + } + + str += "]"; + return str; +} + +auto toChrono(const ::zenoh::Timestamp& timestamp) -> std::chrono::nanoseconds { + return toChrono(timestamp.get_time()); +} + +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/src/zenoh/dynamic_subscriber.cpp b/modules/ipc/src/zenoh/dynamic_subscriber.cpp index 35e78cbf..748d70d5 100644 --- a/modules/ipc/src/zenoh/dynamic_subscriber.cpp +++ b/modules/ipc/src/zenoh/dynamic_subscriber.cpp @@ -14,12 +14,12 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/topic_database.h" #include "hephaestus/ipc/topic_filter.h" #include "hephaestus/ipc/zenoh/liveliness.h" +#include "hephaestus/ipc/zenoh/raw_subscriber.h" #include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/subscriber.h" #include "hephaestus/serdes/type_info.h" #include "hephaestus/utils/exception.h" @@ -82,7 +82,7 @@ void DynamicSubscriber::onPublisherAdded(const PublisherInfo& info) { fmt::format("Adding subscriber for topic: {}, but one already exists", info.topic)); LOG(INFO) << fmt::format("Create subscriber for topic: {}", info.topic); - subscribers_[info.topic] = std::make_unique( + subscribers_[info.topic] = std::make_unique( session_, ipc::TopicConfig{ .name = info.topic }, [this, optional_type_info = std::move(optional_type_info)](const MessageMetadata& metadata, std::span data) { diff --git a/modules/ipc/src/zenoh/liveliness.cpp b/modules/ipc/src/zenoh/liveliness.cpp index da4262c0..cb524e98 100644 --- a/modules/ipc/src/zenoh/liveliness.cpp +++ b/modules/ipc/src/zenoh/liveliness.cpp @@ -22,7 +22,7 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" #include "hephaestus/ipc/zenoh/session.h" namespace heph::ipc::zenoh { diff --git a/modules/ipc/src/program_options.cpp b/modules/ipc/src/zenoh/program_options.cpp similarity index 81% rename from modules/ipc/src/program_options.cpp rename to modules/ipc/src/zenoh/program_options.cpp index 10030500..6e5da1dd 100644 --- a/modules/ipc/src/program_options.cpp +++ b/modules/ipc/src/zenoh/program_options.cpp @@ -2,7 +2,7 @@ // Copyright (C) 2023-2024 HEPHAESTUS Contributors //================================================================================================= -#include "hephaestus/ipc/program_options.h" +#include "hephaestus/ipc/zenoh/program_options.h" #include #include @@ -10,16 +10,16 @@ #include -#include "hephaestus//ipc/common.h" #include "hephaestus/cli/program_options.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/session.h" #include "hephaestus/utils/exception.h" -namespace heph::ipc { +namespace heph::ipc::zenoh { -void appendIPCProgramOption(cli::ProgramDescription& program_description) { - static constexpr auto DEFAULT_KEY = "**"; - - program_description.defineOption("topic", 't', "Key expression", DEFAULT_KEY) +void appendProgramOption(cli::ProgramDescription& program_description, + const std::string& default_topic /*= DEFAULT_TOPIC*/) { + program_description.defineOption("topic", 't', "Key expression", default_topic) .defineOption("cache", 'c', "Cache size", 0) .defineOption("mode", 'm', "Running mode: options: peer, client", "peer") .defineOption("router", 'r', "Router endpoint", "") @@ -29,7 +29,7 @@ void appendIPCProgramOption(cli::ProgramDescription& program_description) { .defineFlag("realtime", "Enable real-time communication"); } -auto parseIPCProgramOptions(const heph::cli::ProgramOptions& args) -> std::pair { +auto parseProgramOptions(const heph::cli::ProgramOptions& args) -> std::pair { TopicConfig topic_config{ .name = args.getOption("topic") }; Config config; @@ -64,4 +64,4 @@ auto parseIPCProgramOptions(const heph::cli::ProgramOptions& args) -> std::pair< return { std::move(config), std::move(topic_config) }; } -} // namespace heph::ipc +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/src/zenoh/publisher.cpp b/modules/ipc/src/zenoh/publisher.cpp index 6643c66a..a782b824 100644 --- a/modules/ipc/src/zenoh/publisher.cpp +++ b/modules/ipc/src/zenoh/publisher.cpp @@ -1,157 +1,5 @@ -//================================================================================================= +//================================================================================================ // Copyright (C) 2023-2024 HEPHAESTUS Contributors //================================================================================================= -#include "hephaestus/ipc/zenoh/publisher.h" - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/zenoh/service.h" -#include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/utils.h" -#include "hephaestus/serdes/type_info.h" -#include "hephaestus/utils/exception.h" - -namespace heph::ipc::zenoh { -namespace { -extern "C" { -inline void zenohOnMatchingStatus(const ::zc_matching_status_t* matching_status, void* context) { - ::zenoh::detail::closures::IClosure::call_from_context(context, - matching_status); -} -} - -template -[[nodiscard]] auto createZenohcClosureMatchingStatus(C&& on_matching_status, D&& on_drop) - -> zc_owned_closure_matching_status_t { - zc_owned_closure_matching_status_t c_closure; - using Cval = std::remove_reference_t; - using Dval = std::remove_reference_t; - using ClosureType = - typename ::zenoh::detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_matching_status), std::forward(on_drop)); - ::z_closure(&c_closure, zenohOnMatchingStatus, ::zenoh::detail::closures::_zenoh_on_drop, closure); - return c_closure; -} -} // namespace - -RawPublisher::RawPublisher(SessionPtr session, TopicConfig topic_config, serdes::TypeInfo type_info, - MatchCallback&& match_cb) - : session_(std::move(session)) - , topic_config_(std::move(topic_config)) - , type_info_(std::move(type_info)) - , enable_cache_(session_->config.cache_size > 0) - , match_cb_(std ::move(match_cb)) { - // Enable publishing of a liveliness token. - const ::zenoh::KeyExpr keyexpr{ topic_config_.name }; - ::zenoh::ZResult result{}; - liveliness_token_ = - std::make_unique<::zenoh::LivelinessToken>(session_->zenoh_session.liveliness_declare_token( - keyexpr, ::zenoh::Session::LivelinessDeclarationOptions::create_default(), &result)); - throwExceptionIf( - result != Z_OK, - fmt::format("[Publisher {}] failed to create livelines token, result {}", topic_config_.name, result)); - - if (enable_cache_) { - enableCache(); - } - - auto pub_options = ::zenoh::Session::PublisherOptions::create_default(); - if (session_->config.real_time) { - pub_options.priority = ::zenoh::Priority::Z_PRIORITY_REAL_TIME; - } - - publisher_ = std::make_unique<::zenoh::Publisher>( - session_->zenoh_session.declare_publisher(keyexpr, std::move(pub_options))); - - if (match_cb_ != nullptr) { - enableMatchingListener(); - } - - createTypeInfoService(); -} - -RawPublisher::~RawPublisher() { - if (enable_cache_) { - z_drop(z_move(cache_publisher_)); - z_drop(z_move(zenoh_cache_session_)); - } -} - -auto RawPublisher::publish(std::span data) -> bool { - ::zenoh::ZResult result{}; - auto bytes = toZenohBytes(data); - - auto options = createPublisherOptions(); - publisher_->put(std::move(bytes), std::move(options), &result); - return result == Z_OK; -} - -void RawPublisher::enableCache() { - ze_publication_cache_options_t cache_publisher_opts; - ze_publication_cache_options_default(&cache_publisher_opts); - cache_publisher_opts.history = session_->config.cache_size; - - z_view_keyexpr_t ke; - z_view_keyexpr_from_str(&ke, topic_config_.name.data()); - - // TODO replace with loan - zenoh_cache_session_ = std::move(session_->zenoh_session.clone()).take(); - const auto result = ze_declare_publication_cache(&cache_publisher_, z_loan(zenoh_cache_session_), - z_loan(ke), &cache_publisher_opts); - throwExceptionIf( - result != Z_OK, - fmt::format("[Publisher {}] failed to enable cache, result {}", topic_config_.name, result)); -} - -auto RawPublisher::createPublisherOptions() -> ::zenoh::Publisher::PutOptions { - auto put_options = ::zenoh::Publisher::PutOptions::create_default(); - put_options.encoding = ::zenoh::Encoding{ TEXT_PLAIN_ENCODING }; - attachment_[PUBLISHER_ATTACHMENT_MESSAGE_COUNTER_KEY] = std::to_string(pub_msg_count_++); - attachment_[PUBLISHER_ATTACHMENT_MESSAGE_SESSION_ID_KEY] = toString(session_->zenoh_session.get_zid()); - put_options.attachment = ::zenoh::Bytes::serialize(attachment_); - - return put_options; -} - -void RawPublisher::enableMatchingListener() { - auto closure = createZenohcClosureMatchingStatus( - [this](const zc_matching_status_t* matching_status) { - const MatchingStatus status{ .matching = matching_status->matching }; - this->match_cb_(status); - }, - []() {}); - - zc_publisher_matching_listener_declare(&subscriers_listener_, publisher_->loan(), z_move(closure)); -} - -void RawPublisher::createTypeInfoService() { - auto type_info_json = this->type_info_.toJson(); - auto type_info_callback = [type_info_json](const auto& request) { - (void)request; - return type_info_json; - }; - auto type_service_topic = TopicConfig{ .name = getTypeInfoServiceTopic(topic_config_.name) }; - type_service_ = std::make_unique>(session_, type_service_topic, - std::move(type_info_callback)); -} -} // namespace heph::ipc::zenoh +#include "hephaestus/ipc/zenoh/publisher.h" // NOLINT(misc-include-cleaner) diff --git a/modules/ipc/src/zenoh/raw_publisher.cpp b/modules/ipc/src/zenoh/raw_publisher.cpp new file mode 100644 index 00000000..25c9ac1a --- /dev/null +++ b/modules/ipc/src/zenoh/raw_publisher.cpp @@ -0,0 +1,157 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#include "hephaestus/ipc/zenoh/raw_publisher.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/conversions.h" +#include "hephaestus/ipc/zenoh/service.h" +#include "hephaestus/ipc/zenoh/session.h" +#include "hephaestus/serdes/type_info.h" +#include "hephaestus/utils/exception.h" + +namespace heph::ipc::zenoh { +namespace { +extern "C" { +inline void zenohOnMatchingStatus(const ::zc_matching_status_t* matching_status, void* context) { + ::zenoh::detail::closures::IClosure::call_from_context(context, + matching_status); +} +} + +template +[[nodiscard]] auto createZenohcClosureMatchingStatus(C&& on_matching_status, D&& on_drop) + -> zc_owned_closure_matching_status_t { + zc_owned_closure_matching_status_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = + typename ::zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_matching_status), std::forward(on_drop)); + ::z_closure(&c_closure, zenohOnMatchingStatus, ::zenoh::detail::closures::_zenoh_on_drop, closure); + return c_closure; +} +} // namespace + +RawPublisher::RawPublisher(SessionPtr session, TopicConfig topic_config, serdes::TypeInfo type_info, + MatchCallback&& match_cb) + : session_(std::move(session)) + , topic_config_(std::move(topic_config)) + , type_info_(std::move(type_info)) + , enable_cache_(session_->config.cache_size > 0) + , match_cb_(std ::move(match_cb)) { + // Enable publishing of a liveliness token. + const ::zenoh::KeyExpr keyexpr{ topic_config_.name }; + ::zenoh::ZResult result{}; + liveliness_token_ = + std::make_unique<::zenoh::LivelinessToken>(session_->zenoh_session.liveliness_declare_token( + keyexpr, ::zenoh::Session::LivelinessDeclarationOptions::create_default(), &result)); + throwExceptionIf( + result != Z_OK, + fmt::format("[Publisher {}] failed to create livelines token, result {}", topic_config_.name, result)); + + if (enable_cache_) { + enableCache(); + } + + auto pub_options = ::zenoh::Session::PublisherOptions::create_default(); + if (session_->config.real_time) { + pub_options.priority = ::zenoh::Priority::Z_PRIORITY_REAL_TIME; + } + + publisher_ = std::make_unique<::zenoh::Publisher>( + session_->zenoh_session.declare_publisher(keyexpr, std::move(pub_options))); + + if (match_cb_ != nullptr) { + enableMatchingListener(); + } + + createTypeInfoService(); +} + +RawPublisher::~RawPublisher() { + if (enable_cache_) { + z_drop(z_move(cache_publisher_)); + z_drop(z_move(zenoh_cache_session_)); + } +} + +auto RawPublisher::publish(std::span data) -> bool { + ::zenoh::ZResult result{}; + auto bytes = toZenohBytes(data); + + auto options = createPublisherOptions(); + publisher_->put(std::move(bytes), std::move(options), &result); + return result == Z_OK; +} + +void RawPublisher::enableCache() { + ze_publication_cache_options_t cache_publisher_opts; + ze_publication_cache_options_default(&cache_publisher_opts); + cache_publisher_opts.history = session_->config.cache_size; + + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, topic_config_.name.data()); + + // TODO replace with loan + zenoh_cache_session_ = std::move(session_->zenoh_session.clone()).take(); + const auto result = ze_declare_publication_cache(&cache_publisher_, z_loan(zenoh_cache_session_), + z_loan(ke), &cache_publisher_opts); + throwExceptionIf( + result != Z_OK, + fmt::format("[Publisher {}] failed to enable cache, result {}", topic_config_.name, result)); +} + +auto RawPublisher::createPublisherOptions() -> ::zenoh::Publisher::PutOptions { + auto put_options = ::zenoh::Publisher::PutOptions::create_default(); + put_options.encoding = ::zenoh::Encoding{ TEXT_PLAIN_ENCODING }; + attachment_[PUBLISHER_ATTACHMENT_MESSAGE_COUNTER_KEY] = std::to_string(pub_msg_count_++); + attachment_[PUBLISHER_ATTACHMENT_MESSAGE_SESSION_ID_KEY] = toString(session_->zenoh_session.get_zid()); + put_options.attachment = ::zenoh::Bytes::serialize(attachment_); + + return put_options; +} + +void RawPublisher::enableMatchingListener() { + auto closure = createZenohcClosureMatchingStatus( + [this](const zc_matching_status_t* matching_status) { + const MatchingStatus status{ .matching = matching_status->matching }; + this->match_cb_(status); + }, + []() {}); + + zc_publisher_matching_listener_declare(&subscriers_listener_, publisher_->loan(), z_move(closure)); +} + +void RawPublisher::createTypeInfoService() { + auto type_info_json = this->type_info_.toJson(); + auto type_info_callback = [type_info_json](const auto& request) { + (void)request; + return type_info_json; + }; + auto type_service_topic = TopicConfig{ .name = getTypeInfoServiceTopic(topic_config_.name) }; + type_service_ = std::make_unique>(session_, type_service_topic, + std::move(type_info_callback)); +} +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/src/zenoh/raw_subscriber.cpp b/modules/ipc/src/zenoh/raw_subscriber.cpp new file mode 100644 index 00000000..a23c04da --- /dev/null +++ b/modules/ipc/src/zenoh/raw_subscriber.cpp @@ -0,0 +1,136 @@ +//================================================================================================ +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#include "hephaestus/ipc/zenoh/raw_subscriber.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hephaestus/concurrency/message_queue_consumer.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/conversions.h" +#include "hephaestus/ipc/zenoh/session.h" +#include "hephaestus/utils/exception.h" + +namespace heph::ipc::zenoh { +namespace { +// This code comes from `zenoh/api/session.hxx` +template +[[nodiscard]] auto createZenohcClosure(C&& on_sample, D&& on_drop) -> z_owned_closure_sample_t { + z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename ::zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, ::zenoh::detail::closures::_zenoh_on_sample_call, + ::zenoh::detail::closures::_zenoh_on_drop, closure); + return c_closure; +} +} // namespace + +RawSubscriber::RawSubscriber(SessionPtr session, TopicConfig topic_config, DataCallback&& callback, + bool dedicated_callback_thread /*= false*/) + : session_(std::move(session)) + , topic_config_(std::move(topic_config)) + , callback_(std::move(callback)) + , enable_cache_(session_->config.cache_size > 0) + , dedicated_callback_thread_(dedicated_callback_thread) { + auto cb = [this](const ::zenoh::Sample& sample) { this->callback(sample); }; + if (!enable_cache_) { + ::zenoh::ZResult result{}; + const ::zenoh::KeyExpr keyexpr{ topic_config_.name }; + subscriber_ = std::make_unique<::zenoh::Subscriber>(session_->zenoh_session.declare_subscriber( + keyexpr, std::move(cb), ::zenoh::closures::none, + ::zenoh::Session::SubscriberOptions::create_default(), &result)); + heph::throwExceptionIf( + result != Z_OK, + fmt::format("[Subscriber {}] failed to create zenoh subscriber, err {}", topic_config_.name, result)); + } else { + // zenohcxx still doesn't support cache querying subscribers, so we have to use the C API. + ze_querying_subscriber_options_t sub_opts; + ze_querying_subscriber_options_default(&sub_opts); + + z_view_keyexpr_t keyexpr; + z_view_keyexpr_from_str(&keyexpr, topic_config_.name.c_str()); + + auto c_closure = createZenohcClosure(cb, ::zenoh::closures::none); + + zenoh_cache_session_ = std::move(session_->zenoh_session.clone()).take(); + const auto result = ze_declare_querying_subscriber(&cache_subscriber_, z_loan(zenoh_cache_session_), + z_loan(keyexpr), z_move(c_closure), &sub_opts); + + heph::throwExceptionIf( + result != Z_OK, + fmt::format("[Subscriber {}] failed to create zenoh subscriber, err {}", topic_config_.name, result)); + } + + if (dedicated_callback_thread_) { + callback_messages_consumer_ = std::make_unique>( + [this](const Message& message) { + const auto& [metadata, buffer] = message; + callback_(metadata, std::span(buffer.begin(), buffer.end())); + }, + DEFAULT_CACHE_RESERVES); + callback_messages_consumer_->start(); + } +} + +RawSubscriber::~RawSubscriber() { + if (callback_messages_consumer_ != nullptr) { + callback_messages_consumer_->stop(); + } + + if (enable_cache_) { + z_drop(z_move(cache_subscriber_)); + z_drop(z_move(zenoh_cache_session_)); + } +} + +void RawSubscriber::callback(const ::zenoh::Sample& sample) { + MessageMetadata metadata; + if (const auto attachment = sample.get_attachment(); attachment.has_value()) { + auto attachment_data = attachment->get().deserialize>(); + + auto res = + absl::SimpleAtoi(attachment_data[PUBLISHER_ATTACHMENT_MESSAGE_COUNTER_KEY], &metadata.sequence_id); + LOG_IF(ERROR, !res) << fmt::format("[Subscriber {}] failed to read message counter from attachment", + topic_config_.name); + metadata.sender_id = attachment_data[PUBLISHER_ATTACHMENT_MESSAGE_SESSION_ID_KEY]; + } + + if (const auto timestamp = sample.get_timestamp(); timestamp.has_value()) { + metadata.timestamp = toChrono(timestamp.value()); + } + + metadata.topic = sample.get_keyexpr().as_string_view(); + + auto payload = toByteVector(sample.get_payload()); + + if (dedicated_callback_thread_) { + callback_messages_consumer_->queue().forceEmplace(metadata, std::move(payload)); + } else { + callback_(metadata, { payload.data(), payload.size() }); + } +} + +} // namespace heph::ipc::zenoh diff --git a/modules/ipc/src/zenoh/scout.cpp b/modules/ipc/src/zenoh/scout.cpp index c81f4e3f..ed40f1b9 100644 --- a/modules/ipc/src/zenoh/scout.cpp +++ b/modules/ipc/src/zenoh/scout.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -22,10 +23,10 @@ #include #include -#include "hephaestus/ipc/common.h" +#include "hephaestus/ipc/topic.h" +#include "hephaestus/ipc/zenoh/conversions.h" #include "hephaestus/ipc/zenoh/service.h" #include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/utils.h" #include "hephaestus/utils/exception.h" namespace heph::ipc::zenoh { @@ -65,7 +66,7 @@ class ScoutDataManager { }; [[nodiscard]] auto getRouterInfoJson(const std::string& router_id) -> std::string { - heph::ipc::Config config; + heph::ipc::zenoh::Config config; auto session = heph::ipc::zenoh::createSession(std::move(config)); static constexpr auto ROUTER_TOPIC = "@/router/{}"; @@ -118,6 +119,7 @@ auto getListOfNodes() -> std::vector { } auto toString(const NodeInfo& info) -> std::string { - return fmt::format("[{}] ID: {}. Locators: {}", toString(info.mode), info.id, toString(info.locators)); + return fmt::format("[{}] ID: {}. Locators: {}", magic_enum::enum_name(info.mode), info.id, + toString(info.locators)); } } // namespace heph::ipc::zenoh diff --git a/modules/ipc/src/zenoh/utils.cpp b/modules/ipc/src/zenoh/session.cpp similarity index 74% rename from modules/ipc/src/zenoh/utils.cpp rename to modules/ipc/src/zenoh/session.cpp index 3ae41aad..aa73f32e 100644 --- a/modules/ipc/src/zenoh/utils.cpp +++ b/modules/ipc/src/zenoh/session.cpp @@ -1,18 +1,19 @@ //================================================================================================= // Copyright (C) 2023-2024 HEPHAESTUS Contributors //================================================================================================= +#include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/utils.h" +#include +#include #include -#include #include +#include -#include "hephaestus/ipc/common.h" #include "hephaestus/utils/exception.h" namespace heph::ipc::zenoh { - +namespace { // Default config https://github.com/eclipse-zenoh/zenoh/blob/master/DEFAULT_CONFIG.json5 auto createZenohConfig(const Config& config) -> ::zenoh::Config { throwExceptionIf(config.qos && config.real_time, @@ -22,7 +23,7 @@ auto createZenohConfig(const Config& config) -> ::zenoh::Config { auto zconfig = ::zenoh::Config::create_default(); // A timestamp is add to every published message. - zconfig.insert_json(Z_CONFIG_ADD_TIMESTAMP_KEY, "true"); + zconfig.insert_json(Z_CONFIG_ADD_TIMESTAMP_KEY, "true"); // NOLINT(misc-include-cleaner) // Enable shared memory support. if (config.enable_shared_memory) { @@ -31,21 +32,21 @@ auto createZenohConfig(const Config& config) -> ::zenoh::Config { // Set node in client mode. if (config.mode == Mode::CLIENT) { - zconfig.insert_json(Z_CONFIG_MODE_KEY, R"("client")"); + zconfig.insert_json(Z_CONFIG_MODE_KEY, R"("client")"); // NOLINT(misc-include-cleaner) } // Set the transport to UDP, but I am not sure it is the right way. // zconfig.insert_json(Z_CONFIG_LISTEN_KEY, R"(["udp/localhost:7447"])"); if (config.protocol == Protocol::UDP) { - zconfig.insert_json(Z_CONFIG_CONNECT_KEY, R"(["udp/0.0.0.0:0"])"); + zconfig.insert_json(Z_CONFIG_CONNECT_KEY, R"(["udp/0.0.0.0:0"])"); // NOLINT(misc-include-cleaner) } else if (config.protocol == Protocol::TCP) { - zconfig.insert_json(Z_CONFIG_CONNECT_KEY, R"(["tcp/0.0.0.0:0"])"); + zconfig.insert_json(Z_CONFIG_CONNECT_KEY, R"(["tcp/0.0.0.0:0"])"); // NOLINT(misc-include-cleaner) } // Add router endpoint. if (!config.router.empty()) { const auto router_endpoint = fmt::format(R"(["tcp/{}"])", config.router); - zconfig.insert_json(Z_CONFIG_CONNECT_KEY, router_endpoint); + zconfig.insert_json(Z_CONFIG_CONNECT_KEY, router_endpoint); // NOLINT(misc-include-cleaner) } { zconfig.insert_json("transport/unicast/qos/enabled", config.qos ? "true" : "false"); } if (config.real_time) { @@ -55,4 +56,11 @@ auto createZenohConfig(const Config& config) -> ::zenoh::Config { return zconfig; } +} // namespace + +auto createSession(Config config) -> SessionPtr { + auto zconfig = createZenohConfig(config); + return std::make_shared(::zenoh::Session::open(std::move(zconfig)), std::move(config)); +} + } // namespace heph::ipc::zenoh diff --git a/modules/ipc/src/zenoh/subscriber.cpp b/modules/ipc/src/zenoh/subscriber.cpp index 42344203..11e6e72c 100644 --- a/modules/ipc/src/zenoh/subscriber.cpp +++ b/modules/ipc/src/zenoh/subscriber.cpp @@ -1,136 +1,5 @@ -//================================================================================================ +//================================================================================================= // Copyright (C) 2023-2024 HEPHAESTUS Contributors //================================================================================================= -#include "hephaestus/ipc/zenoh/subscriber.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "hephaestus/concurrency/message_queue_consumer.h" -#include "hephaestus/ipc/common.h" -#include "hephaestus/ipc/zenoh/session.h" -#include "hephaestus/ipc/zenoh/utils.h" -#include "hephaestus/utils/exception.h" - -namespace heph::ipc::zenoh { -namespace { -// This code comes from `zenoh/api/session.hxx` -template -[[nodiscard]] auto createZenohcClosure(C&& on_sample, D&& on_drop) -> z_owned_closure_sample_t { - z_owned_closure_sample_t c_closure; - using Cval = std::remove_reference_t; - using Dval = std::remove_reference_t; - using ClosureType = typename ::zenoh::detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); - ::z_closure(&c_closure, ::zenoh::detail::closures::_zenoh_on_sample_call, - ::zenoh::detail::closures::_zenoh_on_drop, closure); - return c_closure; -} -} // namespace - -Subscriber::Subscriber(SessionPtr session, TopicConfig topic_config, DataCallback&& callback, - bool dedicated_callback_thread /*= false*/) - : session_(std::move(session)) - , topic_config_(std::move(topic_config)) - , callback_(std::move(callback)) - , enable_cache_(session_->config.cache_size > 0) - , dedicated_callback_thread_(dedicated_callback_thread) { - auto cb = [this](const ::zenoh::Sample& sample) { this->callback(sample); }; - if (!enable_cache_) { - ::zenoh::ZResult result{}; - const ::zenoh::KeyExpr keyexpr{ topic_config_.name }; - subscriber_ = std::make_unique<::zenoh::Subscriber>(session_->zenoh_session.declare_subscriber( - keyexpr, std::move(cb), ::zenoh::closures::none, - ::zenoh::Session::SubscriberOptions::create_default(), &result)); - heph::throwExceptionIf( - result != Z_OK, - fmt::format("[Subscriber {}] failed to create zenoh subscriber, err {}", topic_config_.name, result)); - } else { - // zenohcxx still doesn't support cache querying subscribers, so we have to use the C API. - ze_querying_subscriber_options_t sub_opts; - ze_querying_subscriber_options_default(&sub_opts); - - z_view_keyexpr_t keyexpr; - z_view_keyexpr_from_str(&keyexpr, topic_config_.name.c_str()); - - auto c_closure = createZenohcClosure(cb, ::zenoh::closures::none); - - zenoh_cache_session_ = std::move(session_->zenoh_session.clone()).take(); - const auto result = ze_declare_querying_subscriber(&cache_subscriber_, z_loan(zenoh_cache_session_), - z_loan(keyexpr), z_move(c_closure), &sub_opts); - - heph::throwExceptionIf( - result != Z_OK, - fmt::format("[Subscriber {}] failed to create zenoh subscriber, err {}", topic_config_.name, result)); - } - - if (dedicated_callback_thread_) { - callback_messages_consumer_ = std::make_unique>( - [this](const Message& message) { - const auto& [metadata, buffer] = message; - callback_(metadata, std::span(buffer.begin(), buffer.end())); - }, - DEFAULT_CACHE_RESERVES); - callback_messages_consumer_->start(); - } -} - -Subscriber::~Subscriber() { - if (callback_messages_consumer_ != nullptr) { - callback_messages_consumer_->stop(); - } - - if (enable_cache_) { - z_drop(z_move(cache_subscriber_)); - z_drop(z_move(zenoh_cache_session_)); - } -} - -void Subscriber::callback(const ::zenoh::Sample& sample) { - MessageMetadata metadata; - if (const auto attachment = sample.get_attachment(); attachment.has_value()) { - auto attachment_data = attachment->get().deserialize>(); - - auto res = - absl::SimpleAtoi(attachment_data[PUBLISHER_ATTACHMENT_MESSAGE_COUNTER_KEY], &metadata.sequence_id); - LOG_IF(ERROR, !res) << fmt::format("[Subscriber {}] failed to read message counter from attachment", - topic_config_.name); - metadata.sender_id = attachment_data[PUBLISHER_ATTACHMENT_MESSAGE_SESSION_ID_KEY]; - } - - if (const auto timestamp = sample.get_timestamp(); timestamp.has_value()) { - metadata.timestamp = toChrono(timestamp.value()); - } - - metadata.topic = sample.get_keyexpr().as_string_view(); - - auto payload = toByteVector(sample.get_payload()); - - if (dedicated_callback_thread_) { - callback_messages_consumer_->queue().forceEmplace(metadata, std::move(payload)); - } else { - callback_(metadata, { payload.data(), payload.size() }); - } -} - -} // namespace heph::ipc::zenoh +#include "hephaestus/ipc/zenoh/subscriber.h" // NOLINT(misc-include-cleaner)