From 8bd8aaebe42e4969e283d729e74cebbb2e67678f Mon Sep 17 00:00:00 2001 From: Mikhail Montsev Date: Tue, 20 Aug 2024 12:50:13 +0100 Subject: [PATCH] Merge to stable-23-3 (#1822) * issue-1788: fix heap-use-after-free in TServiceEndpointTest.ShouldTimeoutFrozenRequest (#1789) * fix TSocketEndpointListenerTest.ShouldHandleStartStopEndpoint test (#1800) * [Blockstore] make 'ReadBlocks transaction was interrupted' errors silent (#1808) --- .../libs/endpoints/endpoint_manager.cpp | 44 ++++++++++++++----- .../libs/endpoints/service_endpoint_ut.cpp | 2 +- .../socket_endpoint_listener_ut.cpp | 35 +++++---------- .../partition/part_actor_readblocks.cpp | 9 +++- 4 files changed, 53 insertions(+), 37 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 8dbdb1143cc..08d61812c0e 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -516,9 +516,15 @@ class TEndpointManager final return Executor->Execute([ \ ctx = std::move(callContext), \ req = std::move(request), \ - this] () mutable \ + weakSelf = weak_from_this()] () mutable \ + -> T##name##Method::TResponse \ { \ - return Do##name(std::move(ctx), std::move(req)); \ + auto self = weakSelf.lock(); \ + if (!self) { \ + return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); \ + } \ + \ + return self->Do##name(std::move(ctx), std::move(req)); \ }); \ } \ \ @@ -539,11 +545,13 @@ class TEndpointManager final TFuture RestoreEndpoints() override { AtomicSet(RestoringStage, ReadingStorage); - return Executor->Execute([this] () mutable { - auto future = DoRestoreEndpoints(); - AtomicSet(RestoringStage, StartingEndpoints); - Executor->WaitFor(future); - AtomicSet(RestoringStage, Completed); + return Executor->Execute([weakSelf = weak_from_this()] () mutable { + if (auto self = weakSelf.lock()) { + auto future = self->DoRestoreEndpoints(); + AtomicSet(self->RestoringStage, self->StartingEndpoints); + self->Executor->WaitFor(future); + AtomicSet(self->RestoringStage, self->Completed); + } }); } @@ -721,8 +729,20 @@ TFuture TEndpointManager::RestoreSingleEndpoint( TCallContextPtr ctx, std::shared_ptr request) { - return Executor->Execute([this, ctx, request] () mutable { - return StartEndpointImpl(std::move(ctx), std::move(request), true); + return Executor->Execute([ + weakSelf = weak_from_this(), + ctx, + request] () mutable -> NProto::TStartEndpointResponse + { + auto self = weakSelf.lock(); + if (!self) { + return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); + } + + return self->StartEndpointImpl( + std::move(ctx), + std::move(request), + true); }); } @@ -1543,8 +1563,10 @@ void TEndpointManager::HandleRestoredEndpoint( << ", error:" << FormatError(error)); } - Executor->Execute([socketPath, this] () mutable { - RestoringEndpoints.erase(socketPath); + Executor->Execute([socketPath, weakSelf = weak_from_this()] () mutable { + if (auto self = weakSelf.lock()) { + self->RestoringEndpoints.erase(socketPath); + } }); } diff --git a/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp b/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp index ba41c589ba4..bb92132cc2d 100644 --- a/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp +++ b/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp @@ -338,7 +338,7 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) auto endpointManager = CreateEndpointManager( CreateWallClockTimer(), - CreateSchedulerStub(), + scheduler, CreateLoggingService("console"), CreateRequestStatsStub(), CreateVolumeStatsStub(), diff --git a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp index aeadb5a0dea..7ea8ad65f3d 100644 --- a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp +++ b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp @@ -82,10 +82,14 @@ class TTestClientStorage } } - size_t GetSessionCount() - { - with_lock (Lock) { - return Sessions.size(); + void WaitForSessionCount(ui32 sessionCount) { + while (true) { + with_lock (Lock) { + if (Sessions.size() == sessionCount) { + break; + } + } + Sleep(TDuration::MilliSeconds(100)); } } }; @@ -372,8 +376,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) client->Stop(); }; - Sleep(TDuration::MilliSeconds(100)); - UNIT_ASSERT(clientStorage->GetSessionCount() == 1); + clientStorage->WaitForSessionCount(1); { auto future = listener->StopEndpoint(unixSocket.GetPath()); @@ -381,8 +384,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) UNIT_ASSERT_C(!HasError(error), error); } - Sleep(TDuration::MilliSeconds(100)); - UNIT_ASSERT(clientStorage->GetSessionCount() == 0); + clientStorage->WaitForSessionCount(0); } Y_UNIT_TEST(ShouldHandleClientDisconnection) @@ -420,14 +422,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) } }; - const auto waitIters = 10; - for (ui32 i = 0; i < waitIters; ++i) { - if (clientStorage->GetSessionCount()) { - break; - } - Sleep(TDuration::MilliSeconds(100)); - } - UNIT_ASSERT_VALUES_EQUAL(1, clientStorage->GetSessionCount()); + clientStorage->WaitForSessionCount(1); clientEndpoint->Stop(); clientEndpoint.reset(); @@ -435,13 +430,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) client->Stop(); client.reset(); - for (ui32 i = 0; i < waitIters; ++i) { - if (!clientStorage->GetSessionCount()) { - break; - } - Sleep(TDuration::MilliSeconds(100)); - } - UNIT_ASSERT_VALUES_EQUAL(0, clientStorage->GetSessionCount()); + clientStorage->WaitForSessionCount(0); } Y_UNIT_TEST(ShouldNotAcceptClientAfterServerStopped) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_readblocks.cpp b/cloud/blockstore/libs/storage/partition/part_actor_readblocks.cpp index 2d981bcc0b2..c41b637483f 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_readblocks.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_readblocks.cpp @@ -1059,10 +1059,15 @@ void TPartitionActor::CompleteReadBlocks( RemoveTransaction(*args.RequestInfo); if (args.Interrupted) { + ui32 flags = 0; + SetProtoFlag(flags, NProto::EF_SILENT); + auto error = MakeError( + E_REJECTED, + "ReadBlocks transaction was interrupted", + flags); auto response = CreateReadBlocksResponse( args.ReplyLocal, - MakeError(E_REJECTED, "ReadBlocks transaction was interrupted") - ); + std::move(error)); LWTRACK( ResponseSent_Partition,