Skip to content

Commit

Permalink
Merge to stable-23-3 (#1822)
Browse files Browse the repository at this point in the history
* issue-1788: fix heap-use-after-free in TServiceEndpointTest.ShouldTimeoutFrozenRequest (#1789)

* fix TSocketEndpointListenerTest.ShouldHandleStartStopEndpoint test (#1800)

* [Blockstore] make 'ReadBlocks transaction was interrupted' errors silent (#1808)
  • Loading branch information
SvartMetal authored Aug 20, 2024
1 parent 08839d9 commit 8bd8aae
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 37 deletions.
44 changes: 33 additions & 11 deletions cloud/blockstore/libs/endpoints/endpoint_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,15 @@ class TEndpointManager final
return Executor->Execute([ \
ctx = std::move(callContext), \
req = std::move(request), \
this] () mutable \
weakSelf = weak_from_this()] () mutable \
-> T##name##Method::TResponse \
{ \
return Do##name(std::move(ctx), std::move(req)); \
auto self = weakSelf.lock(); \
if (!self) { \
return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); \
} \
\
return self->Do##name(std::move(ctx), std::move(req)); \
}); \
} \
\
Expand All @@ -539,11 +545,13 @@ class TEndpointManager final
TFuture<void> RestoreEndpoints() override
{
AtomicSet(RestoringStage, ReadingStorage);
return Executor->Execute([this] () mutable {
auto future = DoRestoreEndpoints();
AtomicSet(RestoringStage, StartingEndpoints);
Executor->WaitFor(future);
AtomicSet(RestoringStage, Completed);
return Executor->Execute([weakSelf = weak_from_this()] () mutable {
if (auto self = weakSelf.lock()) {
auto future = self->DoRestoreEndpoints();
AtomicSet(self->RestoringStage, self->StartingEndpoints);
self->Executor->WaitFor(future);
AtomicSet(self->RestoringStage, self->Completed);
}
});
}

Expand Down Expand Up @@ -721,8 +729,20 @@ TFuture<NProto::TStartEndpointResponse> TEndpointManager::RestoreSingleEndpoint(
TCallContextPtr ctx,
std::shared_ptr<NProto::TStartEndpointRequest> request)
{
return Executor->Execute([this, ctx, request] () mutable {
return StartEndpointImpl(std::move(ctx), std::move(request), true);
return Executor->Execute([
weakSelf = weak_from_this(),
ctx,
request] () mutable -> NProto::TStartEndpointResponse
{
auto self = weakSelf.lock();
if (!self) {
return TErrorResponse(E_FAIL, "EndpointManager is destroyed");
}

return self->StartEndpointImpl(
std::move(ctx),
std::move(request),
true);
});
}

Expand Down Expand Up @@ -1543,8 +1563,10 @@ void TEndpointManager::HandleRestoredEndpoint(
<< ", error:" << FormatError(error));
}

Executor->Execute([socketPath, this] () mutable {
RestoringEndpoints.erase(socketPath);
Executor->Execute([socketPath, weakSelf = weak_from_this()] () mutable {
if (auto self = weakSelf.lock()) {
self->RestoringEndpoints.erase(socketPath);
}
});
}

Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest)

auto endpointManager = CreateEndpointManager(
CreateWallClockTimer(),
CreateSchedulerStub(),
scheduler,
CreateLoggingService("console"),
CreateRequestStatsStub(),
CreateVolumeStatsStub(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ class TTestClientStorage
}
}

size_t GetSessionCount()
{
with_lock (Lock) {
return Sessions.size();
void WaitForSessionCount(ui32 sessionCount) {
while (true) {
with_lock (Lock) {
if (Sessions.size() == sessionCount) {
break;
}
}
Sleep(TDuration::MilliSeconds(100));
}
}
};
Expand Down Expand Up @@ -372,17 +376,15 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
client->Stop();
};

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);
clientStorage->WaitForSessionCount(1);

{
auto future = listener->StopEndpoint(unixSocket.GetPath());
const auto& error = future.GetValue(TDuration::Seconds(5));
UNIT_ASSERT_C(!HasError(error), error);
}

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
clientStorage->WaitForSessionCount(0);
}

Y_UNIT_TEST(ShouldHandleClientDisconnection)
Expand Down Expand Up @@ -420,28 +422,15 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
}
};

const auto waitIters = 10;
for (ui32 i = 0; i < waitIters; ++i) {
if (clientStorage->GetSessionCount()) {
break;
}
Sleep(TDuration::MilliSeconds(100));
}
UNIT_ASSERT_VALUES_EQUAL(1, clientStorage->GetSessionCount());
clientStorage->WaitForSessionCount(1);

clientEndpoint->Stop();
clientEndpoint.reset();

client->Stop();
client.reset();

for (ui32 i = 0; i < waitIters; ++i) {
if (!clientStorage->GetSessionCount()) {
break;
}
Sleep(TDuration::MilliSeconds(100));
}
UNIT_ASSERT_VALUES_EQUAL(0, clientStorage->GetSessionCount());
clientStorage->WaitForSessionCount(0);
}

Y_UNIT_TEST(ShouldNotAcceptClientAfterServerStopped)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,10 +1059,15 @@ void TPartitionActor::CompleteReadBlocks(
RemoveTransaction(*args.RequestInfo);

if (args.Interrupted) {
ui32 flags = 0;
SetProtoFlag(flags, NProto::EF_SILENT);
auto error = MakeError(
E_REJECTED,
"ReadBlocks transaction was interrupted",
flags);
auto response = CreateReadBlocksResponse(
args.ReplyLocal,
MakeError(E_REJECTED, "ReadBlocks transaction was interrupted")
);
std::move(error));

LWTRACK(
ResponseSent_Partition,
Expand Down

0 comments on commit 8bd8aae

Please sign in to comment.