From 0f72fda409707a3f521d88f5075206ce4ef7a83f Mon Sep 17 00:00:00 2001 From: Alexander Bondarev Date: Fri, 12 Jan 2024 14:36:48 +0200 Subject: [PATCH 1/3] [rd-cpp] Support auto-binding for bindable task results and bindable result lifetimes. --- .../src/main/lifetime/LifetimeDefinition.cpp | 4 +- .../src/main/lifetime/LifetimeDefinition.h | 6 +- .../src/main/lifetime/LifetimeImpl.h | 31 +++ .../src/main/protocol/MessageBroker.cpp | 5 + .../src/main/protocol/MessageBroker.h | 2 + .../src/main/task/RdEndpoint.h | 204 ++++++++++++++++-- .../src/main/task/RdTaskResult.h | 40 +++- .../src/main/task/WiredRdTaskImpl.h | 32 ++- .../src/main/util/framework_traits.h | 7 + .../src/test/cases/RdTaskTest.cpp | 109 ++++++++++ .../src/test/util/SimpleWire.h | 2 + 11 files changed, 407 insertions(+), 35 deletions(-) diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.cpp b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.cpp index 5d8a86426..3cbfba944 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.cpp +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.cpp @@ -4,7 +4,7 @@ namespace rd { -LifetimeDefinition::LifetimeDefinition(bool eternaled) : eternaled(eternaled), lifetime(eternaled) +LifetimeDefinition::LifetimeDefinition(bool eternaled) : lifetime(eternaled) { } @@ -18,7 +18,7 @@ bool LifetimeDefinition::is_terminated() const return lifetime->is_terminated(); } -void LifetimeDefinition::terminate() +void LifetimeDefinition::terminate() const { lifetime->terminate(); } diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.h b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.h index 39442e0e0..8f0748363 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.h +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeDefinition.h @@ -19,9 +19,6 @@ class RD_CORE_API LifetimeDefinition { private: friend class SequentialLifetimes; - - bool eternaled = false; - public: Lifetime lifetime; @@ -39,14 +36,13 @@ class RD_CORE_API LifetimeDefinition virtual ~LifetimeDefinition(); - // static std::shared_ptr eternal; static std::shared_ptr get_shared_eternal(); bool is_terminated() const; bool is_eternal() const; - void terminate(); + void terminate() const; template static auto use(F&& block) -> typename util::result_of_t diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h index d62a1352e..dd08b6244 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h @@ -75,6 +75,37 @@ class RD_CORE_API LifetimeImpl final actions.erase(i); } + + // Attach pointer to lifetime. It guarantee that pointer will survive at least lifetime duration. + template + std::shared_ptr make_attached(Args... args) + { + auto ptr = std::make_shared(std::forward(args)...); + attach(ptr); + return ptr; + } + + /// \brief Attach pointer to lifetime. Guarantees that pointer will survive at least lifetime duration. + template + counter_t attach(std::shared_ptr pointer) + { + // No-op structure wich acts as an action, but preserves pointer until lifetime terminated + struct holder + { + std::shared_ptr pointer; + + explicit holder(const std::shared_ptr& pointer) : pointer(pointer) + { + } + + void operator()() const + { + } + }; + + return add_action(holder(pointer)); + } + #if __cplusplus >= 201703L static inline counter_t get_id = 0; #else diff --git a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp index 840245a5a..5ae3f9f62 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp +++ b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp @@ -143,4 +143,9 @@ void MessageBroker::advise_on(Lifetime lifetime, RdReactiveBase const* entity) c lifetime->add_action([this, key]() { subscriptions.erase(key); }); } } + +bool MessageBroker::is_subscribed(const RdId id) const +{ + return subscriptions.find(id) != subscriptions.end(); +} } // namespace rd diff --git a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h index dbe90b445..52286d4bc 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h @@ -59,6 +59,8 @@ class RD_FRAMEWORK_API MessageBroker final void dispatch(RdId id, Buffer message) const; void advise_on(Lifetime lifetime, RdReactiveBase const* entity) const; + + bool is_subscribed(const RdId id) const; }; } // namespace rd diff --git a/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h b/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h index 2313ab251..6c6e322bb 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h @@ -3,9 +3,92 @@ #include "serialization/Polymorphic.h" #include "RdTask.h" +#include "framework_traits.h" namespace rd { +namespace detail +{ +/// \brief For bindable result +template +class RdEndpointWiredResult; + +template +class RdEndpointWiredResult> final : public RdReactiveBase +{ + friend class EndpointWiredRdTask; + + LifetimeDefinition result_lifetime_def; + Wrapper value; +public: + explicit RdEndpointWiredResult(LifetimeDefinition result_lifetime_def, const Wrapper& value) : result_lifetime_def(std::move(result_lifetime_def)), value(value) + { + } + + RdEndpointWiredResult(const RdEndpointWiredResult&) = delete; // non construction-copyable + RdEndpointWiredResult& operator=(const RdEndpointWiredResult&) = delete; // non copyable + + void init(const Lifetime lifetime) const override + { + RdReactiveBase::init(lifetime); + get_wire()->advise(lifetime, this); + } + + void bind(const Lifetime lf, IRdDynamic const* parent, string_view name) const override + { + RdReactiveBase::bind(lf, parent, name); + value->bind(lf, this, "Value"); + } + + void identify(Identities const& identities, RdId const& id) const override + { + RdReactiveBase::identify(identities, id); + value->identify(identities, id.mix("Value")); + } + + void on_wire_received(Buffer buffer) const override + { + spdlog::get("logReceived")->trace("received cancellation"); + + //nothing just a void value + + get_wire_scheduler()->invoke_or_queue([this] { result_lifetime_def.terminate(); }); + } +}; + +template +class RdEndpointTaskCancellation : public RdReactiveBase +{ + using Task = RdTask; + + Task task; + +public: + explicit RdEndpointTaskCancellation(Task task) : task(task) + { + } + + RdEndpointTaskCancellation(const RdEndpointTaskCancellation&) = delete; // non construction-copyable + RdEndpointTaskCancellation& operator=(const RdEndpointTaskCancellation&) = delete; // non copyable + + void init(const Lifetime lifetime) const override + { + RdReactiveBase::init(lifetime); + get_wire()->advise(lifetime, this); + } + + void on_wire_received(Buffer buffer) const override + { + spdlog::get("logReceived")->trace("received cancellation"); + + //nothing just a void value + + get_wire_scheduler()->invoke_or_queue([this] { task.set_result_if_empty(typename Task::result_type::Cancelled()); }); + } +}; + +} + /** * \brief An API that is exposed to the remote process and can be invoked over the protocol. * @@ -19,11 +102,88 @@ class RdEndpoint : public virtual RdReactiveBase, public ISerializable { using WTReq = value_or_wrapper; using WTRes = value_or_wrapper; + using Task = RdTask; + using TaskResult = typename Task::result_type; - using handler_t = std::function(Lifetime, TReq const&)>; + using handler_t = std::function(Lifetime result_lifetime, TReq const&)>; mutable handler_t local_handler; - mutable tsl::ordered_map, rd::hash> awaiting_tasks; // TO-DO get rid of it + void send_result(const RdId task_id, const TaskResult& result) const + { + auto logger = spdlog::get("logSend"); + if (logger->should_log(spdlog::level::trace)) + logger->trace("endpoint {}::{} response = {}", to_string(get_location()), to_string(get_id()), to_string(result)); + get_wire()->send(task_id, [this, &result](Buffer& inner_buffer) { result.write(get_serialization_context(), inner_buffer); }); + } + + template , bool> = true> + void handle_result(LifetimeDefinition result_lifetime_def, const RdId task_id, const TaskResult& result) const + { + try + { + if (result.is_succeeded()) + { + auto result_lifetime = result_lifetime_def.lifetime; + auto wired_result = result_lifetime->make_attached>(std::move(result_lifetime_def), result.get_value()); + wired_result->identify(*get_protocol()->get_identity(), task_id); + wired_result->bind(result_lifetime, this, "EndpointWiredResult"); + send_result(task_id, result); + } + else + { + send_result(task_id, result); + result_lifetime_def.terminate(); + } + } + catch (std::exception ex) + { + spdlog::get("logSend")->error(ex.what()); + result_lifetime_def.terminate(); + } + } + + template , bool> = true> + void handle_result(LifetimeDefinition result_lifetime_def, const RdId task_id, TaskResult result) const + { + try + { + send_result(task_id, result); + result_lifetime_def.terminate(); + } + catch (std::exception ex) + { + spdlog::get("logSend")->error(ex.what()); + result_lifetime_def.terminate(); + } + } + + void handle_result_async(LifetimeDefinition result_lifetime_def, const RdId task_id, Task task) const + { + struct AsyncCallData + { + const RdEndpoint* endpoint; + LifetimeDefinition call_lifetime_def; + LifetimeDefinition result_lifetime_def; + RdId task_id; + + AsyncCallData(const RdEndpoint* endpoint, LifetimeDefinition call_lifetime_def, LifetimeDefinition result_lifetime_def, const RdId task_id) : + endpoint(endpoint), call_lifetime_def(std::move(call_lifetime_def)), result_lifetime_def(std::move(result_lifetime_def)), task_id(task_id) + { + } + }; + + auto call_lifetime_def = LifetimeDefinition(*bind_lifetime); + auto call_lifetime = call_lifetime_def.lifetime; + auto cancellation = call_lifetime->make_attached>(task); + cancellation->set_id(task_id); + cancellation->bind(call_lifetime, this, "EndpointTaskCancellation"); + + task.advise(call_lifetime, [data = std::make_shared(this, std::move(call_lifetime_def), std::move(result_lifetime_def), task_id)](const auto& result) + { + data->call_lifetime_def.terminate(); + data->endpoint->handle_result(std::move(data->result_lifetime_def), data->task_id, result); + }); + } public: // region ctor/dtor @@ -74,8 +234,21 @@ class RdEndpoint : public virtual RdReactiveBase, public ISerializable */ void set(std::function functor) const { - local_handler = [handler = std::move(functor)](Lifetime _, TReq const& req) -> RdTask - { return RdTask::from_result(handler(req)); }; + local_handler = [handler = std::move(functor)](Lifetime _, TReq const& req) + { + return Task::from_result(handler(req)); + }; + } + + /** + * \brief @see set above + */ + void set(std::function functor) const + { + local_handler = [handler = std::move(functor)](const Lifetime& lifetime, TReq const& req) + { + return Task::from_result(handler(lifetime, req)); + }; } void init(Lifetime lifetime) const override @@ -95,27 +268,18 @@ class RdEndpoint : public virtual RdReactiveBase, public ISerializable throw std::invalid_argument("handler is empty for RdEndPoint"); } - using TaskResult = RdTaskResult; - - auto lifetime = *bind_lifetime; - auto send_result = [this, task_id](TaskResult const& task_result) - { - auto logger = spdlog::get("logSend"); - if (logger->should_log(spdlog::level::trace)) - logger->trace("endpoint {}::{} response = {}", to_string(location), to_string(rdid), to_string(task_result)); - get_wire()->send(task_id, [&](Buffer& inner_buffer) { task_result.write(get_serialization_context(), inner_buffer); }); - }; - + auto result_lifetime_def = LifetimeDefinition(*bind_lifetime); try { - auto task = local_handler(lifetime, wrapper::get(value)); - awaiting_tasks[task_id] = task; - lifetime->add_action([this, task_id] { awaiting_tasks.erase(task_id); }); - task.advise(lifetime, send_result); + auto task = local_handler(result_lifetime_def.lifetime, wrapper::get(value)); + if (!task.has_value()) + handle_result_async(std::move(result_lifetime_def), task_id, task); + else + handle_result(std::move(result_lifetime_def), task_id, task.value_or_throw()); } catch (std::exception const& e) { - send_result(typename TaskResult::Fault(e)); + send_result(task_id, typename TaskResult::Fault(e)); } } diff --git a/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h b/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h index 77821cb46..042f18477 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h @@ -119,14 +119,46 @@ class RdTaskResult final : public ISerializable v); } - T const& unwrap() const + /// \brief Attaches lifetime to result value. Lifetime terminates when result value destructed. + /// \param parent Parent lifetime. + /// \throw mpark::bad_variant_access if result isn't available or not Success + template , bool> = true> + Lifetime attach_nested_lifetime_to_value(const Lifetime& parent) { - return visit(util::make_visitor([](Success const& value) -> T const& { return wrapper::get(value.value); }, - [](Cancelled const&) -> T const& { throw std::invalid_argument("Task finished in Cancelled state"); }, - [](Fault const& value) -> T const& { throw std::runtime_error(to_string(value.reason_message)); }), + auto&& wrapper = get(v).value; + + struct Deleter + { + std::shared_ptr ptr; + LifetimeDefinition lifetime_definition; + + explicit Deleter(LifetimeDefinition&& lifetime_definition, std::shared_ptr ptr) : ptr(std::move(ptr)), lifetime_definition(std::move(lifetime_definition)) { } + + void operator()(T*) const + { + } + }; + + auto deleter = Deleter(LifetimeDefinition(parent), std::move(wrapper)); + auto ptr = deleter.ptr.get(); + auto lifetime = deleter.lifetime_definition.lifetime; + wrapper = std::shared_ptr(ptr, std::move(deleter)); + return lifetime; + } + + WT const& get_value() const + { + return visit(util::make_visitor([](Success const& value) -> WT const& { return value.value; }, + [](Cancelled const&) -> WT const& { throw std::invalid_argument("Task finished in Cancelled state"); }, + [](Fault const& value) -> WT const& { throw std::runtime_error(to_string(value.reason_message)); }), v); } + T const& unwrap() const + { + return wrapper::get(get_value()); + } + bool is_succeeded() const { return v.index() == 0; diff --git a/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h b/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h index 9bc9299d4..0fd12b313 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h @@ -3,6 +3,7 @@ #include "serialization/Polymorphic.h" #include "RdTaskResult.h" +#include "util/framework_traits.h" namespace rd { @@ -14,26 +15,48 @@ namespace detail template > class WiredRdTaskImpl : public RdReactiveBase { -private: + using Task = RdTask; + using TaskResult = typename Task::result_type; + Lifetime lifetime; RdReactiveBase const* cutpoint{}; IScheduler* scheduler{}; - Property>* result{}; + Property* result{}; LifetimeImpl::counter_t termination_lifetime_id{}; + template , bool> = true> + void bind_result(TaskResult& task_result) const + { + auto result_lifetime = task_result.attach_nested_lifetime_to_value(lifetime); + result_lifetime->add_action([task_id = get_id(), cutpoint = cutpoint] + { + cutpoint->get_wire()->send(task_id, [](auto&) + { + // write nothing, just signal server to release result lifetime + }); + }); + task_result.get_value()->bind(result_lifetime, cutpoint, "CallResult"); + } + + template , bool> = true> + void bind_result(TaskResult&) const + { + // do nothing for non-bindable value + } + public: template friend class ::rd::WiredRdTask; WiredRdTaskImpl( - Lifetime lifetime, RdReactiveBase const& cutpoint, RdId rdid, IScheduler* scheduler, Property>* result) + Lifetime lifetime, RdReactiveBase const& cutpoint, RdId rdid, IScheduler* scheduler, Property* result) : lifetime(lifetime), cutpoint(&cutpoint), scheduler(scheduler), result(result) { this->rdid = std::move(rdid); cutpoint.get_wire()->advise(lifetime, this); termination_lifetime_id = - lifetime->add_action([this]() { this->result->set_if_empty(typename RdTaskResult::Cancelled{}); }); + lifetime->add_action([this]() { this->result->set_if_empty(typename TaskResult::Cancelled{}); }); } virtual ~WiredRdTaskImpl() @@ -55,6 +78,7 @@ class WiredRdTaskImpl : public RdReactiveBase } else { + bind_result(result); this->result->set_if_empty(std::move(result)); } }); diff --git a/rd-cpp/src/rd_framework_cpp/src/main/util/framework_traits.h b/rd-cpp/src/rd_framework_cpp/src/main/util/framework_traits.h index 17d0ef9cd..1470fbca1 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/util/framework_traits.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/util/framework_traits.h @@ -14,6 +14,13 @@ template >>, " "); + +template +using is_bindable = std::is_base_of; + +template +constexpr bool is_bindable_v = is_bindable::value; + } // namespace util } // namespace rd diff --git a/rd-cpp/src/rd_framework_cpp/src/test/cases/RdTaskTest.cpp b/rd-cpp/src/rd_framework_cpp/src/test/cases/RdTaskTest.cpp index d3d9688c6..6b7132052 100644 --- a/rd-cpp/src/rd_framework_cpp/src/test/cases/RdTaskTest.cpp +++ b/rd-cpp/src/rd_framework_cpp/src/test/cases/RdTaskTest.cpp @@ -1,6 +1,8 @@ #include #include "RdFrameworkTestBase.h" +#include "DynamicExt/ConcreteEntity.Generated.h" +#include "DynamicExt/DynamicEntity.Generated.h" #include "task/RdCall.h" #include "task/RdEndpoint.h" #include "task/RdSymmetricCall.h" @@ -95,5 +97,112 @@ TEST_F(RdFrameworkTestBase, testSymmetricCall) EXPECT_EQ(+2, client_entity.sync(L"ab").value_or_throw().unwrap()); EXPECT_EQ(-2, server_entity.sync(L"xy").value_or_throw().unwrap()); + AfterTest(); +} + +TEST_F(RdFrameworkTestBase, testBindableCall) +{ + RdEndpoint server_entity; + RdCall client_entity; + + statics(server_entity, static_entity_id); + statics(client_entity, static_entity_id); + + Wrapper server_result; + bool server_result_lifetime_terminated = false; + + server_entity.set([&](const Lifetime& lifetime, std::wstring const& s) + { + server_result = wrapper::make_wrapper(); + server_result->get_foo().set(static_cast(s.length())); + lifetime->add_action([&] { server_result_lifetime_terminated = true; }); + return server_result; + }); + + bindStatic(serverProtocol.get(), server_entity, static_name); + bindStatic(clientProtocol.get(), client_entity, static_name); + + RdId property_id; + { + auto client_result = client_entity.start(L"xy").value_or_throw().get_value(); + property_id = dynamic_cast*>(&client_result->get_foo())->get_id(); + EXPECT_EQ(2, client_result->get_foo().get()) << "Expected client result to recieve value from server."; + + EXPECT_TRUE(serverWire->is_subscribed(property_id)) << "Expected to auto-bind server result"; + EXPECT_TRUE(clientWire->is_subscribed(property_id)) << "Expected to auto-bind client result"; + + server_result->get_foo().set(42); + EXPECT_EQ(42, client_result->get_foo().get()) << "Expected client result to be auto-binded to server result."; + // client_result leaves scopes here and should release all resources + } + + EXPECT_FALSE(serverWire->is_subscribed(property_id)) << "Expected to auto-unbind server result"; + EXPECT_FALSE(clientWire->is_subscribed(property_id)) << "Expected to auto-unbind client result"; + + EXPECT_TRUE(server_result_lifetime_terminated) << "Expected server lifetime for result to be terminated."; + EXPECT_TRUE(server_result.unique()) << "Expected server_result to be released. Test should hold only reference to server result."; + + AfterTest(); +} + +TEST_F(RdFrameworkTestBase, testAsyncBindableCall) +{ + RdEndpoint server_entity; + RdCall client_entity; + + statics(server_entity, static_entity_id); + statics(client_entity, static_entity_id); + + bindStatic(serverProtocol.get(), server_entity, static_name); + bindStatic(clientProtocol.get(), client_entity, static_name); + + Wrapper server_result; + bool server_result_lifetime_terminated = false; + RdId property_id; + + { + Wrapper client_result; + + { + RdTask server_result_task{}; + std::wstring req; + + server_entity.set( + [&](const Lifetime& lifetime, std::wstring const& s) + { + req = s; + lifetime->add_action([&] { server_result_lifetime_terminated = true; }); + return server_result_task; + }); + + auto client_result_task = client_entity.start(L"xy"); + EXPECT_THROW(client_result_task.value_or_throw(), std::exception); + EXPECT_EQ(req, L"xy"); + + server_result = wrapper::make_wrapper(); + server_result->get_foo().set(static_cast(req.length())); + server_result_task.set(server_result); + + client_result = client_result_task.value_or_throw().get_value(); + // release tasks, but preserve results + } + + property_id = dynamic_cast*>(&client_result->get_foo())->get_id(); + EXPECT_EQ(2, client_result->get_foo().get()) << "Expected client result to recieve value from server."; + + EXPECT_TRUE(serverWire->is_subscribed(property_id)) << "Expected to auto-bind server result"; + EXPECT_TRUE(clientWire->is_subscribed(property_id)) << "Expected to auto-bind client result"; + + server_result->get_foo().set(42); + EXPECT_EQ(42, client_result->get_foo().get()) << "Expected client result to be auto-binded to server result."; + // client_result leaves scopes here and should release all resources + } + + EXPECT_FALSE(serverWire->is_subscribed(property_id)) << "Expected to auto-unbind server result"; + EXPECT_FALSE(clientWire->is_subscribed(property_id)) << "Expected to auto-unbind client result"; + + EXPECT_TRUE(server_result_lifetime_terminated) << "Expected server lifetime for result to be terminated."; + EXPECT_TRUE(server_result.unique()) << "Expected server_result to be released. Test should hold only reference to server result."; + AfterTest(); } \ No newline at end of file diff --git a/rd-cpp/src/rd_framework_cpp/src/test/util/SimpleWire.h b/rd-cpp/src/rd_framework_cpp/src/test/util/SimpleWire.h index 3c3ea3054..8c3c27260 100644 --- a/rd-cpp/src/rd_framework_cpp/src/test/util/SimpleWire.h +++ b/rd-cpp/src/rd_framework_cpp/src/test/util/SimpleWire.h @@ -45,6 +45,8 @@ class SimpleWire : public WireBase void process_one_message() const; void set_auto_flush(bool value); + + bool is_subscribed(const RdId id) { return message_broker.is_subscribed(id); } }; } // namespace test } // namespace rd From 22eecd0448d422c20ffb7bbc820d0297a79573c2 Mon Sep 17 00:00:00 2001 From: Alexander Bondarev Date: Mon, 15 Jan 2024 14:20:34 +0200 Subject: [PATCH 2/3] [rd-cpp] Fixed signals to release resources from lambda closures on lifetime termination. Fixed SequentialLifetimes to not keep lifetime callback after SequentialLifetimes destroyed. --- .../src/main/lifetime/Lifetime.cpp | 13 +++ .../rd_core_cpp/src/main/lifetime/Lifetime.h | 3 + .../src/main/lifetime/LifetimeImpl.h | 2 +- .../src/main/lifetime/SequentialLifetimes.cpp | 22 ++--- .../src/main/lifetime/SequentialLifetimes.h | 8 +- .../src/main/reactive/base/SignalX.h | 84 ++++++++++++------- .../src/test/cases/RdSignalTest.cpp | 53 ++++++++++++ 7 files changed, 140 insertions(+), 45 deletions(-) diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.cpp b/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.cpp index d63a9bfbc..fa9b9f9fe 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.cpp +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.cpp @@ -37,6 +37,19 @@ Lifetime const& Lifetime::Eternal() return ETERNAL; } + +Lifetime const& Lifetime::Terminated() +{ + static Lifetime TERMINATED = [] + { + Lifetime lifetime; + lifetime->terminate(); + return lifetime; + }(); + + return TERMINATED; +} + bool operator==(Lifetime const& lw1, Lifetime const& lw2) { return lw1.ptr == lw2.ptr; diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.h b/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.h index 4144699eb..6885999cf 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.h +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.h @@ -35,7 +35,10 @@ class RD_CORE_API Lifetime final std::shared_ptr ptr; public: + typedef LifetimeImpl::counter_t counter_t; + static Lifetime const& Eternal(); + static Lifetime const& Terminated(); // region ctor/dtor diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h index dd08b6244..c05f46078 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h @@ -22,7 +22,7 @@ class RD_CORE_API LifetimeImpl final { public: friend class LifetimeDefinition; - + friend class SequentialLifetimes; friend class Lifetime; using counter_t = int32_t; diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.cpp b/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.cpp index f57c5c696..40fa00bba 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.cpp +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.cpp @@ -2,32 +2,32 @@ namespace rd { -SequentialLifetimes::SequentialLifetimes(Lifetime parent_lifetime) : parent_lifetime(std::move(parent_lifetime)) +SequentialLifetimes::SequentialLifetimes(const Lifetime& parent_lifetime) : parent_lifetime(parent_lifetime) { - this->parent_lifetime->add_action([this] { set_current_lifetime(LifetimeDefinition::get_shared_eternal()); }); } Lifetime SequentialLifetimes::next() { - std::shared_ptr new_def = std::make_shared(parent_lifetime); - set_current_lifetime(new_def); - return current_def->lifetime; + Lifetime new_lifetime = parent_lifetime.create_nested(); + set_current_lifetime(new_lifetime); + return new_lifetime; } void SequentialLifetimes::terminate_current() { - set_current_lifetime(LifetimeDefinition::get_shared_eternal()); + set_current_lifetime(Lifetime::Terminated()); } bool SequentialLifetimes::is_terminated() const { - return current_def->is_eternal() || current_def->is_terminated(); + return current_lifetime->is_terminated(); } -void SequentialLifetimes::set_current_lifetime(std::shared_ptr new_def) +void SequentialLifetimes::set_current_lifetime(const Lifetime& lifetime) { - std::shared_ptr prev = current_def; - current_def = new_def; - prev->terminate(); + const Lifetime prev = current_lifetime; + current_lifetime = lifetime; + if (!prev->is_terminated()) + prev->terminate(); } } // namespace rd diff --git a/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.h b/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.h index b909ed9cb..edaeaeb20 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.h +++ b/rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.h @@ -12,9 +12,9 @@ namespace rd { class RD_CORE_API SequentialLifetimes { -private: - std::shared_ptr current_def = LifetimeDefinition::get_shared_eternal(); Lifetime parent_lifetime; + Lifetime current_lifetime = Lifetime::Terminated(); + void set_current_lifetime(const Lifetime& lifetime); public: // region ctor/dtor @@ -28,7 +28,7 @@ class RD_CORE_API SequentialLifetimes SequentialLifetimes& operator=(SequentialLifetimes&&) = delete; - explicit SequentialLifetimes(Lifetime parent_lifetime); + explicit SequentialLifetimes(const Lifetime& parent_lifetime); // endregion Lifetime next(); @@ -36,8 +36,6 @@ class RD_CORE_API SequentialLifetimes void terminate_current(); bool is_terminated() const; - - void set_current_lifetime(std::shared_ptr new_def); }; } // namespace rd diff --git a/rd-cpp/src/rd_core_cpp/src/main/reactive/base/SignalX.h b/rd-cpp/src/rd_core_cpp/src/main/reactive/base/SignalX.h index 598cc6093..03cbb2d79 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/reactive/base/SignalX.h +++ b/rd-cpp/src/rd_core_cpp/src/main/reactive/base/SignalX.h @@ -3,6 +3,7 @@ #include "interfaces.h" #include "SignalCookie.h" +#include "lifetime/LifetimeDefinition.h" #include #include @@ -10,6 +11,7 @@ #include #include #include +#include namespace rd { @@ -22,57 +24,83 @@ class Signal final : public ISignal private: using WT = typename ISignal::WT; - class Event + struct Event { + using F = std::function; private: - std::function action; Lifetime lifetime; + F action; + std::atomic_int8_t state; + constexpr static int8_t ACTIVE = 0; + constexpr static int8_t FIRING = 1; + constexpr static int8_t TERMINATED = 2; public: // region ctor/dtor Event() = delete; - - template - Event(F&& action, Lifetime lifetime) : action(std::forward(action)), lifetime(lifetime) + explicit Event(const Lifetime& lifetime, F&& action) : lifetime(lifetime), action(std::forward(action)), state(ACTIVE) { } Event(Event&&) = default; - // endregion + Event& operator=(Event&& other) = default; - bool is_alive() const + bool operator()(T const& arg) { - return !lifetime->is_terminated(); - } + if (lifetime->is_terminated()) + return false; - void execute_if_alive(T const& value) const - { - if (is_alive()) + auto expected_state = ACTIVE; + // set firing flag to prevent action destruction during action firing + // skip action if it isn't active (lifetime was terminated) + if (!state.compare_exchange_strong(expected_state, FIRING)) + return false; + + expected_state = FIRING; + try { - action(value); + action(arg); + return state.compare_exchange_strong(expected_state, ACTIVE); } + catch (...) + { + if (!state.compare_exchange_strong(expected_state, ACTIVE)) + action = nullptr; + throw; + } + } + + void terminate() + { + const auto old_state = state.exchange(TERMINATED); + // release action immediatelly if it isn't firing right now + if (old_state == ACTIVE) + action = nullptr; + lifetime = Lifetime::Terminated(); } }; - using counter_t = int32_t; - using listeners_t = std::map; + using listeners_t = std::vector>; - mutable counter_t advise_id = 0; mutable listeners_t listeners, priority_listeners; - static void cleanup(listeners_t& queue) - { - util::erase_if(queue, [](Event const& e) -> bool { return !e.is_alive(); }); - } - void fire_impl(T const& value, listeners_t& queue) const { - for (auto const& p : queue) + auto it = queue.begin(); + auto end = queue.end(); + auto alive_it = it; + while (it != end) { - auto const& event = p.second; - event.execute_if_alive(value); + if (it->get()->operator()(value)) + { + if (alive_it != it) + *alive_it = std::move(*it); + ++alive_it; + } + ++it; } - cleanup(queue); + if (alive_it != end) + queue.erase(alive_it, end); } template @@ -80,9 +108,9 @@ class Signal final : public ISignal { if (lifetime->is_terminated()) return; - counter_t id = advise_id /*.load()*/; - queue.emplace(id, Event(std::forward(handler), lifetime)); - ++advise_id; + auto event_ptr = std::make_shared(lifetime, std::forward(handler)); + lifetime->add_action([event_ptr] { event_ptr->terminate(); }); + queue.push_back(std::move(event_ptr)); } public: diff --git a/rd-cpp/src/rd_framework_cpp/src/test/cases/RdSignalTest.cpp b/rd-cpp/src/rd_framework_cpp/src/test/cases/RdSignalTest.cpp index 5e627e569..386f9d725 100644 --- a/rd-cpp/src/rd_framework_cpp/src/test/cases/RdSignalTest.cpp +++ b/rd-cpp/src/rd_framework_cpp/src/test/cases/RdSignalTest.cpp @@ -249,5 +249,58 @@ TEST_F(RdFrameworkTestBase, signal_move) RdSignal signal1; RdSignal signal2(std::move(signal1)); + AfterTest(); +} + +TEST_F(RdFrameworkTestBase, signal_release_resources) +{ + RdSignal signal; + statics(signal, 1); + + bindStatic(serverProtocol.get(), signal, static_name); + + EXPECT_NO_THROW( + auto ptr = std::make_shared(0); + { + const LifetimeDefinition def; + signal.advise(def.lifetime, [ptr](auto const& value) { *ptr = value; }); + } + EXPECT_TRUE(ptr.unique()) << "Signal should release reference to ptr from lambda."; + signal.fire(42); + EXPECT_EQ(*ptr, 0) << "Signal shouldn't impact ptr value after lifetime termination."; + ); + + AfterTest(); +} + +TEST_F(RdFrameworkTestBase, signal_release_resources_from_handler) +{ + RdSignal signal; + statics(signal, 1); + bindStatic(serverProtocol.get(), signal, static_name); + + auto ptr = std::make_shared(0); + { + struct Payload + { + LifetimeDefinition def; + std::shared_ptr ptr; + }; + auto payload = std::make_shared(Payload{LifetimeDefinition(), ptr}); + signal.advise(payload->def.lifetime, [payload](auto const& value) + { + payload->def.terminate(); + *(payload->ptr) = value; + }); + // only lambda keeps payload now, it also keeps def reference preventing it from auto-terminating on out-of-scope. + // instead from callback we terminate payload which then should successfully complete callback and release all resources + // effectively destructing Payload and releasing ptr reference. + } + signal.fire(42); + EXPECT_EQ(*ptr, 42); + EXPECT_TRUE(ptr.unique()) << "Signal should release reference to ptr from lambda."; + signal.fire(24); + EXPECT_EQ(*ptr, 42) << "Signal shouldn't impact ptr value after lifetime termination."; + AfterTest(); } \ No newline at end of file From 3563beef72eb08743158a4003e93fbe08e580b91 Mon Sep 17 00:00:00 2001 From: Alexander Bondarev Date: Mon, 15 Jan 2024 18:25:48 +0200 Subject: [PATCH 3/3] [rd-cpp] Improve error handling for task results. --- .../src/rd_core_cpp/src/main/CMakeLists.txt | 1 + .../rd_core_cpp/src/main/util/lifetime_util.h | 37 +++++++++++++++++++ .../src/main/task/RdEndpoint.h | 23 ++++++------ .../src/main/task/RdTaskResult.h | 27 -------------- .../src/main/task/WiredRdTaskImpl.h | 19 +++++++--- 5 files changed, 62 insertions(+), 45 deletions(-) create mode 100644 rd-cpp/src/rd_core_cpp/src/main/util/lifetime_util.h diff --git a/rd-cpp/src/rd_core_cpp/src/main/CMakeLists.txt b/rd-cpp/src/rd_core_cpp/src/main/CMakeLists.txt index db04f7fc9..6c27f4ad8 100644 --- a/rd-cpp/src/rd_core_cpp/src/main/CMakeLists.txt +++ b/rd-cpp/src/rd_core_cpp/src/main/CMakeLists.txt @@ -37,6 +37,7 @@ set(RD_CORE_CPP_SOURCES #pch ${PCH_CPP_OPT} util/export_api_helper.h + util/lifetime_util.h ) if (RD_STATIC) diff --git a/rd-cpp/src/rd_core_cpp/src/main/util/lifetime_util.h b/rd-cpp/src/rd_core_cpp/src/main/util/lifetime_util.h new file mode 100644 index 000000000..5984b6639 --- /dev/null +++ b/rd-cpp/src/rd_core_cpp/src/main/util/lifetime_util.h @@ -0,0 +1,37 @@ +#ifndef LIFETIME_UTIL_H +#define LIFETIME_UTIL_H + +#include +#include "../lifetime/LifetimeDefinition.h" + +namespace rd +{ +namespace util +{ +/// \brief Attaches lifetime to shared_ptr. Lifetime terminates when shared_ptr destructed. +/// \param original original pointer which will be used to make a new pointer with lifetime. +/// \param lifetime_definition Lifetime definition associated with returned shared_ptr. +/// \return New shared_ptr which owns lifetime_definition and terminates that lifetime when destroyed. +template +static std::shared_ptr attach_lifetime(std::shared_ptr original, LifetimeDefinition lifetime_definition) +{ + struct Deleter + { + std::shared_ptr ptr; + LifetimeDefinition lifetime_definition; + + explicit Deleter(LifetimeDefinition&& lifetime_definition, std::shared_ptr ptr) : ptr(std::move(ptr)), lifetime_definition(std::move(lifetime_definition)) { } + + void operator()(T*) const + { + } + }; + + auto raw_ptr = original.get(); + auto deleter = Deleter(std::move(lifetime_definition), std::move(original)); + return std::shared_ptr(raw_ptr, std::move(deleter)); +} +} +} + +#endif //LIFETIME_UTIL_H diff --git a/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h b/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h index 6c6e322bb..3e74f9773 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/task/RdEndpoint.h @@ -119,9 +119,9 @@ class RdEndpoint : public virtual RdReactiveBase, public ISerializable template , bool> = true> void handle_result(LifetimeDefinition result_lifetime_def, const RdId task_id, const TaskResult& result) const { - try + if (result.is_succeeded()) { - if (result.is_succeeded()) + try { auto result_lifetime = result_lifetime_def.lifetime; auto wired_result = result_lifetime->make_attached>(std::move(result_lifetime_def), result.get_value()); @@ -129,31 +129,30 @@ class RdEndpoint : public virtual RdReactiveBase, public ISerializable wired_result->bind(result_lifetime, this, "EndpointWiredResult"); send_result(task_id, result); } - else + catch (const std::exception& ex) { - send_result(task_id, result); - result_lifetime_def.terminate(); + spdlog::get("logSend")->error(ex.what()); + send_result(task_id, typename TaskResult::Fault(ex)); } } - catch (std::exception ex) + else { - spdlog::get("logSend")->error(ex.what()); - result_lifetime_def.terminate(); + send_result(task_id, result); } } template , bool> = true> - void handle_result(LifetimeDefinition result_lifetime_def, const RdId task_id, TaskResult result) const + void handle_result(LifetimeDefinition /*should_be_destroyed_on_complete*/, const RdId task_id, TaskResult result) const { try { send_result(task_id, result); - result_lifetime_def.terminate(); } - catch (std::exception ex) + catch (const std::exception& ex) { spdlog::get("logSend")->error(ex.what()); - result_lifetime_def.terminate(); + if (result.is_succeeded()) + send_result(task_id, typename TaskResult::Fault(ex)); } } diff --git a/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h b/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h index 042f18477..50b431d5a 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/task/RdTaskResult.h @@ -119,33 +119,6 @@ class RdTaskResult final : public ISerializable v); } - /// \brief Attaches lifetime to result value. Lifetime terminates when result value destructed. - /// \param parent Parent lifetime. - /// \throw mpark::bad_variant_access if result isn't available or not Success - template , bool> = true> - Lifetime attach_nested_lifetime_to_value(const Lifetime& parent) - { - auto&& wrapper = get(v).value; - - struct Deleter - { - std::shared_ptr ptr; - LifetimeDefinition lifetime_definition; - - explicit Deleter(LifetimeDefinition&& lifetime_definition, std::shared_ptr ptr) : ptr(std::move(ptr)), lifetime_definition(std::move(lifetime_definition)) { } - - void operator()(T*) const - { - } - }; - - auto deleter = Deleter(LifetimeDefinition(parent), std::move(wrapper)); - auto ptr = deleter.ptr.get(); - auto lifetime = deleter.lifetime_definition.lifetime; - wrapper = std::shared_ptr(ptr, std::move(deleter)); - return lifetime; - } - WT const& get_value() const { return visit(util::make_visitor([](Success const& value) -> WT const& { return value.value; }, diff --git a/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h b/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h index 0fd12b313..259b25d58 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/task/WiredRdTaskImpl.h @@ -4,6 +4,7 @@ #include "serialization/Polymorphic.h" #include "RdTaskResult.h" #include "util/framework_traits.h" +#include "util/lifetime_util.h" namespace rd { @@ -26,9 +27,14 @@ class WiredRdTaskImpl : public RdReactiveBase LifetimeImpl::counter_t termination_lifetime_id{}; template , bool> = true> - void bind_result(TaskResult& task_result) const + TaskResult bind_result(TaskResult task_result) const { - auto result_lifetime = task_result.attach_nested_lifetime_to_value(lifetime); + if (!task_result.is_succeeded()) + return task_result; + + auto lifetime_defintion = LifetimeDefinition(lifetime); + auto result_lifetime = lifetime_defintion.lifetime; + auto value = util::attach_lifetime(task_result.get_value(), std::move(lifetime_defintion)); result_lifetime->add_action([task_id = get_id(), cutpoint = cutpoint] { cutpoint->get_wire()->send(task_id, [](auto&) @@ -36,13 +42,14 @@ class WiredRdTaskImpl : public RdReactiveBase // write nothing, just signal server to release result lifetime }); }); - task_result.get_value()->bind(result_lifetime, cutpoint, "CallResult"); + value->bind(result_lifetime, cutpoint, "CallResult"); + return typename TaskResult::Success(value); } template , bool> = true> - void bind_result(TaskResult&) const + TaskResult bind_result(TaskResult result) const { - // do nothing for non-bindable value + return result; } public: @@ -78,7 +85,7 @@ class WiredRdTaskImpl : public RdReactiveBase } else { - bind_result(result); + result = bind_result(std::move(result)); this->result->set_if_empty(std::move(result)); } });