Skip to content

Commit

Permalink
loop shutdown should not call CancelRequest for inflight requests bec…
Browse files Browse the repository at this point in the history
…ause it causes a race with the code that's executing those requests, we should simply wait until those requests complete by themselves (#1193)

* loop shutdown should not call CancelRequest for inflight requests because it causes a race with the code that's executing those requests, we should simply wait until those requests complete by themselves

* CompletionQueue shutdown should actually be async + fixed StopAsync bug in TBootstrap in fs_ut.cpp (Apply on a callback which returns TFuture<void> discards the future returned by the callback!)

* loop shutdown: SuspendAsync ut + some tmp dbg output

* loop shutdown: more tmp dbg output

* loop shutdown: more tmp dbg output

* loop shutdown: outputting dbg stuff under a lock, outputting dbg stuff only when ShouldStop flag is set

* loop shutdown: fixed CompletingCount vs Requests.empty() checks race

* loop shutdown: inflight count and completing count should be checked together atomically
  • Loading branch information
qkrorlqr committed May 23, 2024
1 parent 2cf1caa commit 0d914a4
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 61 deletions.
76 changes: 62 additions & 14 deletions cloud/filestore/libs/vfs_fuse/fs_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,22 @@ struct TBootstrap

TFuture<void> StopAsync()
{
TFuture<void> f = MakeFuture();
auto f = MakeFuture();
if (Loop) {
f = f.Apply([&] (auto) {
return Loop->StopAsync();
});
f = Loop->StopAsync();
}

if (Scheduler) {
f.Apply([&] (auto) {
Scheduler->Stop();
});
if (!Scheduler) {
return f;
}
return f;

auto p = NewPromise<void>();
f.Subscribe([=] (auto f) mutable {
f.GetValue();
Scheduler->Stop();
p.SetValue();
});
return p;
}

void InterruptNextRequest()
Expand Down Expand Up @@ -788,6 +791,44 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
UNIT_ASSERT_EXCEPTION(result.GetValue(), yexception);
}

Y_UNIT_TEST(ShouldNotFailOnSuspendWithRequestsInFlight)
{
NAtomic::TBool sessionDestroyed = false;

TBootstrap bootstrap;
bootstrap.Service->DestroySessionHandler = [&sessionDestroyed] (auto, auto) {
sessionDestroyed = true;
return MakeFuture(NProto::TDestroySessionResponse());
};

auto response = NewPromise<NProto::TListNodesResponse>();
bootstrap.Service->ListNodesHandler = [&] (auto callContext, auto request) {
Y_UNUSED(request);
UNIT_ASSERT_VALUES_EQUAL(FileSystemId, callContext->FileSystemId);

return response.GetFuture();
};

bootstrap.Start();

const ui64 nodeId = 123;

auto handle = bootstrap.Fuse->SendRequest<TOpenDirRequest>(nodeId);
UNIT_ASSERT(handle.Wait(WaitTimeout));
auto handleId = handle.GetValue();

auto read =
bootstrap.Fuse->SendRequest<TReadDirRequest>(nodeId, handleId);
UNIT_ASSERT(!read.Wait(TDuration::Seconds(1)));

auto suspend = bootstrap.Loop->SuspendAsync();
UNIT_ASSERT(!suspend.Wait(TDuration::Seconds(1)));

response.SetValue(NProto::TListNodesResponse{});
UNIT_ASSERT(suspend.Wait(WaitTimeout));
UNIT_ASSERT_NO_EXCEPTION(read.GetValueSync());
}

Y_UNIT_TEST(ShouldNotFailOnStopWithRequestsInFlight)
{
NAtomic::TBool sessionDestroyed = false;
Expand All @@ -798,7 +839,7 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
return MakeFuture(NProto::TDestroySessionResponse());
};

NThreading::TPromise<NProto::TListNodesResponse> response = NewPromise<NProto::TListNodesResponse>();
auto response = NewPromise<NProto::TListNodesResponse>();
bootstrap.Service->ListNodesHandler = [&] (auto callContext, auto request) {
Y_UNUSED(request);
UNIT_ASSERT_VALUES_EQUAL(FileSystemId, callContext->FileSystemId);
Expand All @@ -814,16 +855,23 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
UNIT_ASSERT(handle.Wait(WaitTimeout));
auto handleId = handle.GetValue();

auto read = bootstrap.Fuse->SendRequest<TReadDirRequest>(nodeId, handleId);
UNIT_ASSERT(!read.Wait(WaitTimeout));
auto read =
bootstrap.Fuse->SendRequest<TReadDirRequest>(nodeId, handleId);
UNIT_ASSERT(!read.Wait(TDuration::Seconds(1)));

auto stop = bootstrap.StopAsync();
UNIT_ASSERT(stop.Wait(WaitTimeout));
UNIT_ASSERT(!stop.Wait(TDuration::Seconds(1)));

auto read2 =
bootstrap.Fuse->SendRequest<TReadDirRequest>(nodeId, handleId);
UNIT_ASSERT_EXCEPTION_CONTAINS(
read.GetValueSync(),
read2.GetValueSync(),
yexception,
"Unknown error -4");

response.SetValue(NProto::TListNodesResponse{});
UNIT_ASSERT(stop.Wait(WaitTimeout));
UNIT_ASSERT_NO_EXCEPTION(read.GetValueSync());
}

Y_UNIT_TEST(ShouldNotAbortOnInvalidServerLookup)
Expand Down
141 changes: 94 additions & 47 deletions cloud/filestore/libs/vfs_fuse/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ class TCompletionQueue final
enum fuse_cancelation_code CancelCode;
NAtomic::TBool ShouldStop = false;
TAtomicCounter CompletingCount = {0};
TManualEvent Stopped;

TMutex RequestsLock;
THashMap<fuse_req_t, TCallContextPtr> Requests;

TPromise<void> StopPromise = NewPromise<void>();

public:
TCompletionQueue(
IRequestStatsPtr stats,
Expand Down Expand Up @@ -123,34 +124,45 @@ class TCompletionQueue final
if (!Requests.erase(req)) {
return 0;
}
CompletingCount.Add(1);
CompletingCount.Inc();
}

int ret = cb(req);
if (CompletingCount.Dec() == 0 && ShouldStop) {
Stopped.Signal();
bool noCompleting = CompletingCount.Dec() == 0;

if (ShouldStop && noCompleting) {
bool noInflight = false;
with_lock (RequestsLock) {
noInflight = Requests.empty();
// double-checking needed because inflight count and completing
// count should be checked together atomically
noCompleting = CompletingCount.Val() == 0;
}

if (noInflight && noCompleting) {
StopPromise.TrySetValue();
}
}

return ret;
}

void Stop(enum fuse_cancelation_code code) {
TFuture<void> StopAsync(enum fuse_cancelation_code code)
{
CancelCode = code;
ShouldStop = true;

TGuard g{RequestsLock};
for (auto&& [req, context] : Requests) {
CancelRequest(
Log,
*RequestStats,
*context,
req,
CancelCode);
bool canStop = false;

with_lock (RequestsLock) {
canStop = Requests.empty() && CompletingCount.Val() == 0;
}
Requests.clear();

while (CompletingCount.Val() != 0) {
Stopped.Wait();
if (canStop) {
StopPromise.TrySetValue();
}

return StopPromise;
}

void Accept(IIncompleteRequestCollector& collector) override
Expand Down Expand Up @@ -535,32 +547,53 @@ class TFileSystemLoop final
return MakeFuture();
}

CompletionQueue->Stop(FUSE_ERROR);
SessionThread->Unmount();
SessionThread = nullptr;
auto w = weak_from_this();
auto s = NewPromise<void>();
CompletionQueue->StopAsync(FUSE_ERROR).Subscribe(
[w = std::move(w), s] (const auto& f) mutable {
f.GetValue();

auto callContext = MakeIntrusive<TCallContext>(
Config->GetFileSystemId(),
CreateRequestId());
callContext->RequestType = EFileStoreRequest::DestroySession;
RequestStats->RequestStarted(Log, *callContext);
auto p = w.lock();
if (!p) {
return;
}

auto weakPtr = weak_from_this();
return Session->DestroySession()
.Apply([=] (const auto& future) {
if (auto p = weakPtr.lock()) {
const auto& response = future.GetValue();
p->RequestStats->RequestCompleted(
p->Log,
*callContext,
response.GetError());
p->SessionThread->Unmount();
p->SessionThread = nullptr;

auto callContext = MakeIntrusive<TCallContext>(
p->Config->GetFileSystemId(),
CreateRequestId());
callContext->RequestType = EFileStoreRequest::DestroySession;
p->RequestStats->RequestStarted(p->Log, *callContext);

p->Session->DestroySession()
.Subscribe([
w = std::move(w),
s = std::move(s),
callContext = std::move(callContext)
] (const auto& f) mutable {
auto p = w.lock();
if (!p) {
s.SetValue();
return;
}

p->StatsRegistry->Unregister(
p->Config->GetFileSystemId(),
p->Config->GetClientId());
}
return;
const auto& response = f.GetValue();
p->RequestStats->RequestCompleted(
p->Log,
*callContext,
response.GetError());

p->StatsRegistry->Unregister(
p->Config->GetFileSystemId(),
p->Config->GetClientId());

s.SetValue();
});
});

return s;
}

TFuture<void> SuspendAsync() override
Expand All @@ -569,17 +602,31 @@ class TFileSystemLoop final
return MakeFuture();
}

CompletionQueue->Stop(FUSE_SUSPEND);
// just stop loop, leave connection
SessionThread->StopThread();
SessionThread->Suspend();
SessionThread = nullptr;
auto w = weak_from_this();
auto s = NewPromise<void>();
CompletionQueue->StopAsync(FUSE_SUSPEND).Subscribe(
[w = std::move(w), s] (const auto& f) mutable {
f.GetValue();

StatsRegistry->Unregister(
Config->GetFileSystemId(),
Config->GetClientId());
auto p = w.lock();
if (!p) {
s.SetValue();
return;
}

// just stop loop, leave connection
p->SessionThread->StopThread();
p->SessionThread->Suspend();
p->SessionThread = nullptr;

p->StatsRegistry->Unregister(
p->Config->GetFileSystemId(),
p->Config->GetClientId());

s.SetValue();
});

return MakeFuture();
return s;
}

TFuture<NProto::TError> AlterAsync(
Expand Down

0 comments on commit 0d914a4

Please sign in to comment.