Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IPC/Zenoh] Service and ActionServer add type check #196

Merged
merged 4 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ void ActionServer<RequestT, StatusT, ReplyT>::execute(const RequestT& request) {
auto status_update_publisher =
Publisher<StatusT>{ session_, internal::getStatusPublisherTopic(topic_config_) };

std::atomic_bool stop_requested{ false };
std::atomic_bool stop_requested{ false }; // NOLINT(misc-const-correctness) False positive
auto stop_service = Service<std::string, std::string>(
session_, internal::getStopServiceTopic(topic_config_), [&stop_requested](const std::string&) {
stop_requested = true;
Expand All @@ -195,7 +195,10 @@ void ActionServer<RequestT, StatusT, ReplyT>::execute(const RequestT& request) {
} catch (const std::exception& ex) {
LOG(ERROR) << fmt::format("[ActionServer {}] execute callback failed with exception: {}.",
topic_config_.name, ex.what());
return Response<ReplyT>{};
return Response<ReplyT>{
.value = ReplyT{},
.status = RequestStatus::INVALID,
};
}
}();
const auto response_topic = internal::getResponseServiceTopic(topic_config_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ClientHelper {

private:
[[nodiscard]] auto serviceCallback(const Response<ReplyT>& reply) -> RequestResponse;
void onFailure();

private:
SessionPtr session_;
Expand All @@ -68,7 +69,8 @@ ClientHelper<RequestT, StatusT, ReplyT>::ClientHelper(SessionPtr session, TopicC
}))
, response_service_(std::make_unique<Service<Response<ReplyT>, RequestResponse>>(
session_, internal::getResponseServiceTopic(topic_config_),
[this](const Response<ReplyT>& reply) { return serviceCallback(reply); })) {
[this](const Response<ReplyT>& reply) { return serviceCallback(reply); },
[this]() { onFailure(); })) {
}

template <typename RequestT, typename StatusT, typename ReplyT>
Expand All @@ -83,4 +85,9 @@ auto ClientHelper<RequestT, StatusT, ReplyT>::serviceCallback(const Response<Rep
return { .status = RequestStatus::SUCCESSFUL };
}

template <typename RequestT, typename StatusT, typename ReplyT>
void ClientHelper<RequestT, StatusT, ReplyT>::onFailure() {
reply_promise_.set_value({ .value = {}, .status = RequestStatus::INVALID });
}

} // namespace heph::ipc::zenoh::action_server::internal
137 changes: 97 additions & 40 deletions modules/ipc/include/hephaestus/ipc/zenoh/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,38 @@
#include <zenoh.h>
#include <zenoh.hxx>
#include <zenoh/api/encoding.hxx>
#include <zenoh/api/ext/serialization.hxx>
#include <zenoh/api/queryable.hxx>
#include <zenoh/api/session.hxx>

#include "hephaestus/ipc/topic.h"
#include "hephaestus/ipc/zenoh/conversions.h"
#include "hephaestus/ipc/zenoh/session.h"
#include "hephaestus/serdes/serdes.h"
#include "hephaestus/utils/exception.h"
#include "hephaestus/utils/utils.h"

namespace heph::ipc::zenoh {

template <typename RequestT, typename ReplyT>
class Service {
public:
using Callback = std::function<ReplyT(const RequestT&)>;
Service(SessionPtr session, TopicConfig topic_config, Callback&& callback);
using FailureCallback = std::function<void()>;
Service(
SessionPtr session, TopicConfig topic_config, Callback&& callback,
FailureCallback&& failure_callback = []() {});

private:
void onQuery(const ::zenoh::Query& query);

private:
SessionPtr session_;
std::unique_ptr<::zenoh::Queryable<void>> queryable_;

TopicConfig topic_config_;
Callback callback_;
FailureCallback failure_callback_;
};

template <typename ReplyT>
Expand All @@ -62,6 +72,8 @@ auto callService(Session& session, const TopicConfig& topic_config, const Reques
// --------- Implementation ----------

namespace internal {
static constexpr auto SERVICE_ATTACHMENT_REQUEST_TYPE_INFO = "0";
static constexpr auto SERVICE_ATTACHMENT_REPLY_TYPE_INFO = "1";

template <typename RequestT, typename ReplyT>
constexpr void checkTemplatedTypes() {
Expand All @@ -71,6 +83,25 @@ constexpr void checkTemplatedTypes() {
"Reply needs to be serializable or std::string.");
}

template <typename RequestT, typename ReplyT>
[[nodiscard]] auto checkQueryTypeInfo(const ::zenoh::Query& query) -> bool {
const auto attachment = query.get_attachment();
if (!attachment.has_value()) {
LOG(WARNING) << fmt::format("[Service {}] Query is missing attachments",
query.get_keyexpr().as_string_view());
return false;
}

auto attachment_data =
::zenoh::ext::deserialize<std::unordered_map<std::string, std::string>>(attachment->get());

const auto request_type_info = attachment_data[SERVICE_ATTACHMENT_REQUEST_TYPE_INFO];
const auto reply_type_info = attachment_data[SERVICE_ATTACHMENT_REPLY_TYPE_INFO];

return request_type_info == utils::getTypeName<RequestT>() &&
reply_type_info == utils::getTypeName<ReplyT>();
}

template <class RequestT>
auto deserializeRequest(const ::zenoh::Query& query) -> RequestT {
const auto& keyexpr = query.get_keyexpr().as_string_view();
Expand Down Expand Up @@ -131,35 +162,45 @@ auto onReply(const ::zenoh::Sample& sample) -> ServiceResponse<ReplyT> {
return ServiceResponse<ReplyT>{ .topic = server_topic, .value = std::move(reply) };
}
}

template <typename RequestT, typename ReplyT>
[[nodiscard]] auto createZenohGetOptions(const RequestT& request,
const std::optional<std::chrono::milliseconds>& timeout)
-> ::zenoh::Session::GetOptions {
::zenoh::Session::GetOptions options{};
if (timeout.has_value()) {
options.timeout_ms = static_cast<uint64_t>(timeout->count());
}

if constexpr (std::is_same_v<RequestT, std::string>) {
options.encoding = ::zenoh::Encoding::Predefined::zenoh_string();
options.payload = ::zenoh::ext::serialize(request);
} else {
options.encoding = ::zenoh::Encoding::Predefined::zenoh_bytes();
options.payload = toZenohBytes(serdes::serialize(request));
}

std::unordered_map<std::string, std::string> attachments;
attachments[SERVICE_ATTACHMENT_REQUEST_TYPE_INFO] = utils::getTypeName<RequestT>();
attachments[SERVICE_ATTACHMENT_REPLY_TYPE_INFO] = utils::getTypeName<ReplyT>();
options.attachment = ::zenoh::ext::serialize(attachments);

return options;
}

} // namespace internal

template <typename RequestT, typename ReplyT>
Service<RequestT, ReplyT>::Service(SessionPtr session, TopicConfig topic_config, Callback&& callback)
: session_(std::move(session)), topic_config_(std::move(topic_config)), callback_(std::move(callback)) {
Service<RequestT, ReplyT>::Service(SessionPtr session, TopicConfig topic_config, Callback&& callback,
FailureCallback&& failure_callback)
: session_(std::move(session))
, topic_config_(std::move(topic_config))
, callback_(std::move(callback))
, failure_callback_(std::move(failure_callback)) {
internal::checkTemplatedTypes<RequestT, ReplyT>();
LOG(INFO) << fmt::format("[Service {}] Started service", topic_config_.name);

auto on_query_cb = [this](const ::zenoh::Query& query) mutable {
LOG(INFO) << fmt::format("[Service {}] received query from '{}'", topic_config_.name,
query.get_keyexpr().as_string_view());

auto reply = this->callback_(internal::deserializeRequest<RequestT>(query));
::zenoh::Query::ReplyOptions options;
::zenoh::ZResult result{};
if constexpr (std::is_same_v<ReplyT, std::string>) {
options.encoding = ::zenoh::Encoding::Predefined::zenoh_string();
query.reply(this->topic_config_.name, reply, std::move(options), &result);
} else {
options.encoding = ::zenoh::Encoding::Predefined::zenoh_bytes();
auto buffer = serdes::serialize(reply);
DLOG(INFO) << fmt::format("[Service {}] reply payload size: {}", topic_config_.name, buffer.size());
query.reply(this->topic_config_.name, toZenohBytes(buffer), std::move(options), &result);
}

throwExceptionIf<FailedZenohOperation>(
result != Z_OK,
fmt::format("[Service {}] failed to reply to query, err {}", topic_config_.name, result));
};
auto on_query_cb = [this](const ::zenoh::Query& query) mutable { onQuery(query); };

::zenoh::ZResult result{};
const ::zenoh::KeyExpr keyexpr{ topic_config_.name };
Expand All @@ -172,33 +213,46 @@ Service<RequestT, ReplyT>::Service(SessionPtr session, TopicConfig topic_config,
}

template <typename RequestT, typename ReplyT>
auto callService(Session& session, const TopicConfig& topic_config, const RequestT& request,
const std::optional<std::chrono::milliseconds>& timeout /*= std::nullopt*/)
-> std::vector<ServiceResponse<ReplyT>> {
internal::checkTemplatedTypes<RequestT, ReplyT>();
void Service<RequestT, ReplyT>::onQuery(const ::zenoh::Query& query) {
LOG(INFO) << fmt::format("[Service {}] received query from '{}'", topic_config_.name,
query.get_keyexpr().as_string_view());

LOG(INFO) << fmt::format("Calling service on '{}'", topic_config.name);
::zenoh::ZResult result{};

::zenoh::Session::GetOptions options{};
if (timeout.has_value()) {
options.timeout_ms = static_cast<uint64_t>(timeout->count());
if (!internal::checkQueryTypeInfo<RequestT, ReplyT>(query)) {
LOG(ERROR) << fmt::format("[Service {}] failed to process query: type mismatch for request and reply",
query.get_keyexpr().as_string_view());
failure_callback_();
query.reply_err(::zenoh::ext::serialize("Type mismatch for request and reply"),
::zenoh::Query::ReplyErrOptions::create_default(), &result);
return;
}

if constexpr (std::is_same_v<RequestT, std::string>) {
auto reply = this->callback_(internal::deserializeRequest<RequestT>(query));
::zenoh::Query::ReplyOptions options;
if constexpr (std::is_same_v<ReplyT, std::string>) {
options.encoding = ::zenoh::Encoding::Predefined::zenoh_string();
options.payload = ::zenoh::ext::serialize(request);
query.reply(this->topic_config_.name, reply, std::move(options), &result);
} else {
options.encoding = ::zenoh::Encoding::Predefined::zenoh_bytes();
auto buffer = serdes::serialize(request);
auto buffer = serdes::serialize(reply);
query.reply(this->topic_config_.name, toZenohBytes(buffer), std::move(options), &result);
}

DLOG(INFO) << fmt::format("Request: payload size: {}", buffer.size());
fmt::println("Buffer: {}", fmt::join(buffer, ", "));
LOG_IF(ERROR, result != Z_OK) << fmt::format("[Service {}] failed to reply to query, err {}",
topic_config_.name, result);
}

auto value = toZenohBytes(std::move(buffer));
options.payload = std::move(value);
}
template <typename RequestT, typename ReplyT>
auto callService(Session& session, const TopicConfig& topic_config, const RequestT& request,
const std::optional<std::chrono::milliseconds>& timeout /*= std::nullopt*/)
-> std::vector<ServiceResponse<ReplyT>> {
internal::checkTemplatedTypes<RequestT, ReplyT>();

LOG(INFO) << fmt::format("Calling service on '{}'", topic_config.name);

const ::zenoh::KeyExpr keyexpr(topic_config.name);
auto options = internal::createZenohGetOptions<RequestT, ReplyT>(request, timeout);

::zenoh::ZResult result{};
static constexpr auto FIFO_QUEUE_SIZE = 100;
Expand All @@ -211,8 +265,11 @@ auto callService(Session& session, const TopicConfig& topic_config, const Reques
for (auto res = replies.recv(); std::holds_alternative<::zenoh::Reply>(res); res = replies.recv()) {
const auto& reply = std::get<::zenoh::Reply>(res);
if (!reply.is_ok()) {
LOG(ERROR) << fmt::format("Failed to call service on '{}': {}", topic_config.name,
reply.get_err().get_payload().as_string());
continue;
}

auto message = internal::onReply<ReplyT>(reply.get_ok());
reply_messages.emplace_back(std::move(message));
}
Expand Down
3 changes: 3 additions & 0 deletions modules/ipc/include/hephaestus/ipc/zenoh/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class Subscriber {

private:
void checkTypeInfo(const MessageMetadata& metadata) {
LOG_IF(ERROR, metadata.type_info != utils::getTypeName<T>())
<< fmt::format("Topic '{}' is of type '{}', but subscriber expect type '{}'", metadata.topic,
metadata.type_info, utils::getTypeName<T>());
throwExceptionIf<FailedZenohOperation>(
metadata.type_info != utils::getTypeName<T>(),
fmt::format("Topic '{}' is of type '{}', but subscriber expect type '{}'", metadata.topic,
Expand Down
58 changes: 54 additions & 4 deletions modules/ipc/tests/action_server_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#include <atomic>
#include <chrono>
#include <random>
#include <string>
#include <thread>
Expand Down Expand Up @@ -44,8 +44,8 @@ struct ActionServerData {
auto service_topic = ipc::TopicConfig(
fmt::format("test_action_server/{}", random::random<std::string>(mt, TOPIC_LENGTH, false, true)));

ipc::zenoh::Config server_config{};
auto server_session = ipc::zenoh::createSession(std::move(server_config));
Config server_config{};
auto server_session = createSession(std::move(server_config));

return {
.topic_config = service_topic,
Expand All @@ -67,7 +67,7 @@ TEST(ActionServer, RejectedCall) {
auto request = types::DummyType::random(mt);
const auto reply = callActionServer<types::DummyType, types::DummyPrimitivesType, types::DummyType>(
action_server_data.session, action_server_data.topic_config, request,
[](const types::DummyPrimitivesType& status) { (void)status; }, SERVICE_CALL_TIMEOUT)
[](const types::DummyPrimitivesType&) {}, SERVICE_CALL_TIMEOUT)
.get();

EXPECT_EQ(reply.status, RequestStatus::REJECTED_USER);
Expand Down Expand Up @@ -167,5 +167,55 @@ TEST(ActionServer, ActionServerRejectedAlreadyRunning) {
stop.notify_all();
reply_future.get();
}

TEST(ActionServer, TypesMismatch) {
auto mt = random::createRNG();

auto action_server_data = createDummyActionServer(
mt, [](const types::DummyType&) { return TriggerStatus::SUCCESSFUL; },
[](const types::DummyType& request, Publisher<types::DummyPrimitivesType>& status_publisher,
std::atomic_bool&) {
auto success = status_publisher.publish({});
EXPECT_TRUE(success);
return request;
return request;
});

// Invalid Request
{
auto request = types::DummyPrimitivesType::random(mt);
const auto reply =
callActionServer<types::DummyPrimitivesType, types::DummyPrimitivesType, types::DummyPrimitivesType>(
action_server_data.session, action_server_data.topic_config, request,
[](const types::DummyPrimitivesType&) {}, SERVICE_CALL_TIMEOUT)
.get();

EXPECT_EQ(reply.status, RequestStatus::INVALID);
}

// Invalid Reply
{
auto request = types::DummyType::random(mt);
const auto reply =
callActionServer<types::DummyType, types::DummyPrimitivesType, types::DummyPrimitivesType>(
action_server_data.session, action_server_data.topic_config, request,
[](const types::DummyPrimitivesType&) {}, SERVICE_CALL_TIMEOUT)
.get();

EXPECT_EQ(reply.status, RequestStatus::INVALID);
}

// Invalid Status
{
auto request = types::DummyType::random(mt);
const auto reply = callActionServer<types::DummyType, types::DummyType, types::DummyType>(
action_server_data.session, action_server_data.topic_config, request,
[](const types::DummyType&) {}, SERVICE_CALL_TIMEOUT)
.get();

EXPECT_EQ(reply.status, RequestStatus::INVALID);
}
}

} // namespace
} // namespace heph::ipc::zenoh::action_server::tests
Loading