Skip to content

Commit

Permalink
[Filestore] avoid fsync on every write for local service (#1990) (#2123)
Browse files Browse the repository at this point in the history
* NBSNEBIUS-371: avoid fsync on every write for local service
  • Loading branch information
budevg authored Sep 25, 2024
1 parent f299828 commit 6939558
Show file tree
Hide file tree
Showing 25 changed files with 494 additions and 37 deletions.
1 change: 1 addition & 0 deletions cloud/filestore/libs/client/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class TSession
// FILESTORE_IMPLEMENT_METHOD

FILESTORE_DATA_METHODS(FILESTORE_IMPLEMENT_METHOD)
FILESTORE_LOCAL_DATA_METHODS(FILESTORE_IMPLEMENT_METHOD)

#undef FILESTORE_IMPLEMENT_METHOD

Expand Down
23 changes: 23 additions & 0 deletions cloud/filestore/libs/diagnostics/profile_log_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,27 @@ void InitProfileLogRequestInfo(
nodeInfo->SetNodeName(request.GetCheckpointId());
}

template <>
void InitProfileLogRequestInfo(
NProto::TProfileLogRequestInfo& profileLogRequest,
const NProto::TFsyncRequest& request)
{
auto* nodeInfo = profileLogRequest.MutableNodeInfo();
nodeInfo->SetNodeId(request.GetNodeId());
nodeInfo->SetHandle(request.GetHandle());
nodeInfo->SetFlags(request.GetDataSync());
}

template <>
void InitProfileLogRequestInfo(
NProto::TProfileLogRequestInfo& profileLogRequest,
const NProto::TFsyncDirRequest& request)
{
auto* nodeInfo = profileLogRequest.MutableNodeInfo();
nodeInfo->SetNodeId(request.GetNodeId());
nodeInfo->SetFlags(request.GetDataSync());
}

////////////////////////////////////////////////////////////////////////////////

#define IMPLEMENT_DEFAULT_METHOD(name, ns) \
Expand Down Expand Up @@ -474,6 +495,8 @@ void InitProfileLogRequestInfo(
IMPLEMENT_DEFAULT_METHOD(DescribeData, NProtoPrivate)
IMPLEMENT_DEFAULT_METHOD(GenerateBlobIds, NProtoPrivate)
IMPLEMENT_DEFAULT_METHOD(AddData, NProtoPrivate)
IMPLEMENT_DEFAULT_METHOD(Fsync, NProto)
IMPLEMENT_DEFAULT_METHOD(FsyncDir, NProto)

#undef IMPLEMENT_DEFAULT_METHOD

Expand Down
1 change: 1 addition & 0 deletions cloud/filestore/libs/diagnostics/profile_log_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace NFuse {
#define FILESTORE_FUSE_REQUESTS(xxx, ...) \
xxx(Flush, __VA_ARGS__) \
xxx(Fsync, __VA_ARGS__) \
xxx(FsyncDir, __VA_ARGS__) \
// FILESTORE_FUSE_REQUESTS

#define FILESTORE_MATERIALIZE_REQUEST(name, ...) name,
Expand Down
2 changes: 2 additions & 0 deletions cloud/filestore/libs/service/auth_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ TPermissionList GetRequestPermissions(EFileStoreRequest requestType)
case EFileStoreRequest::GenerateBlobIds:
case EFileStoreRequest::WriteBlob:
case EFileStoreRequest::AddData:
case EFileStoreRequest::Fsync:
case EFileStoreRequest::FsyncDir:
return CreatePermissionList({EPermission::Write});

case EFileStoreRequest::AddClusterNode:
Expand Down
16 changes: 16 additions & 0 deletions cloud/filestore/libs/service/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ namespace NCloud::NFileStore {
xxx(AllocateData, __VA_ARGS__) \
// FILESTORE_DATA_METHODS

#define FILESTORE_LOCAL_DATA_METHODS(xxx, ...) \
xxx(Fsync, __VA_ARGS__) \
xxx(FsyncDir, __VA_ARGS__) \
// FILESTORE_LOCAL_DATA_METHODS

#define FILESTORE_DATA_SERVICE(xxx, ...) \
FILESTORE_DATA_METHODS(xxx, __VA_ARGS__) \
FILESTORE_LOCAL_DATA_METHODS(xxx, __VA_ARGS__) \
// FILESTORE_DATA_SERVICE

#define FILESTORE_CONTROL_SERVICE(xxx, ...) \
Expand All @@ -87,6 +93,14 @@ namespace NCloud::NFileStore {
xxx(PingSession, __VA_ARGS__) \
FILESTORE_SERVICE_METHODS(xxx, __VA_ARGS__) \
FILESTORE_DATA_METHODS(xxx, __VA_ARGS__) \
FILESTORE_LOCAL_DATA_METHODS(xxx, __VA_ARGS__) \
// FILESTORE_SERVICE

#define FILESTORE_REMOTE_SERVICE(xxx, ...) \
xxx(Ping, __VA_ARGS__) \
xxx(PingSession, __VA_ARGS__) \
FILESTORE_SERVICE_METHODS(xxx, __VA_ARGS__) \
FILESTORE_DATA_METHODS(xxx, __VA_ARGS__) \
// FILESTORE_SERVICE

#define FILESTORE_ENDPOINT_METHODS(xxx, ...) \
Expand All @@ -108,6 +122,7 @@ namespace NCloud::NFileStore {
xxx(PingSession, __VA_ARGS__) \
FILESTORE_SERVICE_METHODS(xxx, __VA_ARGS__) \
FILESTORE_DATA_METHODS(xxx, __VA_ARGS__) \
FILESTORE_LOCAL_DATA_METHODS(xxx, __VA_ARGS__) \
xxx(GetSessionEventsStream, __VA_ARGS__) \
FILESTORE_ENDPOINT_METHODS(xxx, __VA_ARGS__) \
// FILESTORE_REQUESTS
Expand All @@ -119,6 +134,7 @@ namespace NCloud::NFileStore {
xxx(PingSession, __VA_ARGS__) \
FILESTORE_SERVICE_METHODS(xxx, __VA_ARGS__) \
FILESTORE_DATA_METHODS(xxx, __VA_ARGS__) \
FILESTORE_LOCAL_DATA_METHODS(xxx, __VA_ARGS__) \
FILESTORE_ENDPOINT_METHODS(xxx, __VA_ARGS__) \
// FILESTORE_PROTO_REQUESTS

Expand Down
42 changes: 41 additions & 1 deletion cloud/filestore/libs/service_kikimr/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,21 @@ namespace {
}; \
// FILESTORE_DECLARE_METHOD

FILESTORE_SERVICE(FILESTORE_DECLARE_METHOD)
FILESTORE_REMOTE_SERVICE(FILESTORE_DECLARE_METHOD)

#undef FILESTORE_DECLARE_METHOD

#define FILESTORE_DECLARE_METHOD(name, ...) \
struct T##name##Method \
{ \
static constexpr auto RequestName = TStringBuf(#name); \
\
using TRequest = NProto::T##name##Request; \
using TResponse = NProto::T##name##Response; \
}; \
// FILESTORE_DECLARE_METHOD

FILESTORE_LOCAL_DATA_METHODS(FILESTORE_DECLARE_METHOD)

#undef FILESTORE_DECLARE_METHOD

Expand Down Expand Up @@ -291,6 +305,32 @@ class TKikimrFileStore final
std::move(response)));
}

template<>
void ExecuteRequest<TFsyncMethod>(
TCallContextPtr callContext,
std::shared_ptr<TFsyncMethod::TRequest> request,
TPromise<TFsyncMethod::TResponse> response)
{
Y_UNUSED(callContext);
Y_UNUSED(request);
Y_UNUSED(TFsyncMethod::RequestName);

response.SetValue(TFsyncMethod::TResponse());
}

template<>
void ExecuteRequest<TFsyncDirMethod>(
TCallContextPtr callContext,
std::shared_ptr<TFsyncDirMethod::TRequest> request,
TPromise<TFsyncDirMethod::TResponse> response)
{
Y_UNUSED(callContext);
Y_UNUSED(request);
Y_UNUSED(TFsyncDirMethod::RequestName);

response.SetValue(TFsyncDirMethod::TResponse());
}

template <typename T>
void ExecuteStreamRequest(
TCallContextPtr callContext,
Expand Down
53 changes: 50 additions & 3 deletions cloud/filestore/libs/service_kikimr/service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ struct TTestServiceActor final
} \
// FILESTORE_IMPLEMENT_METHOD

FILESTORE_SERVICE(FILESTORE_IMPLEMENT_METHOD, TEvService)
FILESTORE_REMOTE_SERVICE(FILESTORE_IMPLEMENT_METHOD, TEvService)

#undef FILESTORE_IMPLEMENT_METHOD

STFUNC(StateWork)
{
switch (ev->GetTypeRewrite()) {
FILESTORE_SERVICE(FILESTORE_HANDLE_REQUEST, TEvService)
FILESTORE_REMOTE_SERVICE(FILESTORE_HANDLE_REQUEST, TEvService)
}
}
};
Expand All @@ -69,7 +69,7 @@ struct TTestServiceActor final

////////////////////////////////////////////////////////////////////////////////

Y_UNIT_TEST_SUITE(TIndexStoreTest)
Y_UNIT_TEST_SUITE(TKikimrFileStore)
{
Y_UNIT_TEST(ShouldHandleRequests)
{
Expand Down Expand Up @@ -101,6 +101,53 @@ Y_UNIT_TEST_SUITE(TIndexStoreTest)
service->Stop();
}

Y_UNIT_TEST(ShouldHandleFsyncRequestsOutsideActorSystem)
{
auto serviceActor = std::make_unique<TTestServiceActor>();

auto actorSystem = MakeIntrusive<TTestActorSystem>();
actorSystem->RegisterTestService(std::move(serviceActor));

auto service = CreateKikimrFileStore(actorSystem);
service->Start();

{
auto context = MakeIntrusive<TCallContext>();
auto request = std::make_shared<NProto::TFsyncRequest>();

auto future = service->Fsync(
std::move(context),
std::move(request));

actorSystem->DispatchEvents(WaitTimeout);

const auto& response = future.GetValue(WaitTimeout);
UNIT_ASSERT_VALUES_EQUAL_C(
S_OK,
response.GetError().GetCode(),
response.GetError().GetMessage());
}

{
auto context = MakeIntrusive<TCallContext>();
auto request = std::make_shared<NProto::TFsyncDirRequest>();

auto future = service->FsyncDir(
std::move(context),
std::move(request));

actorSystem->DispatchEvents(WaitTimeout);

const auto& response = future.GetValue(WaitTimeout);
UNIT_ASSERT_VALUES_EQUAL_C(
S_OK,
response.GetError().GetCode(),
response.GetError().GetMessage());
}

service->Stop();
}

Y_UNIT_TEST(ShouldGetSessionEventsStream)
{
TActorId eventHandler;
Expand Down
3 changes: 3 additions & 0 deletions cloud/filestore/libs/service_local/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ namespace NCloud::NFileStore {
xxx(TestLock, __VA_ARGS__) \
\
xxx(AllocateData, __VA_ARGS__) \
\
xxx(Fsync, __VA_ARGS__) \
xxx(FsyncDir, __VA_ARGS__) \
// FILESTORE_DATA_METHODS_LOCAL_SYNC

#define FILESTORE_DATA_METHODS_LOCAL_ASYNC(xxx, ...) \
Expand Down
50 changes: 42 additions & 8 deletions cloud/filestore/libs/service_local/fs_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,14 @@ TFuture<NProto::TWriteDataResponse> TLocalFileSystem::WriteDataAsync(
TErrorResponse(ErrorInvalidHandle(request.GetHandle())));
}

const FHANDLE fd = *handle;
auto b = std::move(*request.MutableBuffer());
TArrayRef<char> data(b.begin(), b.vend());
auto promise = NewPromise<NProto::TWriteDataResponse>();
FileIOService->AsyncWrite(*handle, request.GetOffset(), data).Subscribe(
[b = std::move(b), promise, fd] (const TFuture<ui32>& f) mutable {
[b = std::move(b), promise] (const TFuture<ui32>& f) mutable {
NProto::TWriteDataResponse response;
try {
f.GetValue();
TFileHandle h(fd);
const bool flushed = h.Flush();
h.Release();
if (!flushed) {
throw yexception() << "failed to flush " << fd;
}
} catch (...) {
*response.MutableError() =
MakeError(E_IO, CurrentExceptionMessage());
Expand Down Expand Up @@ -161,4 +154,45 @@ NProto::TAllocateDataResponse TLocalFileSystem::AllocateData(
return {};
}

NProto::TFsyncResponse TLocalFileSystem::Fsync(
const NProto::TFsyncRequest& request)
{
STORAGE_TRACE("Fsync " << DumpMessage(request));

auto session = GetSession(request);
auto* handle = session->LookupHandle(request.GetHandle());
if (!handle || !handle->IsOpen()) {
return TErrorResponse(ErrorInvalidHandle(request.GetHandle()));
}

const bool flushed =
request.GetDataSync() ? handle->FlushData() : handle->Flush();
if (!flushed) {
return TErrorResponse(E_IO, "flush failed");
}

return {};
}

NProto::TFsyncDirResponse TLocalFileSystem::FsyncDir(
const NProto::TFsyncDirRequest& request)
{
STORAGE_TRACE("FsyncDir " << DumpMessage(request));

auto session = GetSession(request);
auto node = session->LookupNode(request.GetNodeId());
if (!node) {
return TErrorResponse(ErrorInvalidTarget(request.GetNodeId()));
}

auto handle = node->OpenHandle(O_RDONLY|O_DIRECTORY);
const bool flushed =
request.GetDataSync() ? handle.FlushData() : handle.Flush();
if (!flushed) {
return TErrorResponse(E_IO, "flush failed");
}

return {};
}

} // namespace NCloud::NFileStore
34 changes: 34 additions & 0 deletions cloud/filestore/libs/service_local/service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,23 @@ struct TTestBootstrap
return request;
}

auto CreateFsyncRequest(ui64 node, ui64 handle, bool dataSync)
{
auto request = CreateRequest<NProto::TFsyncRequest>();
request->SetNodeId(node);
request->SetHandle(handle);
request->SetDataSync(dataSync);
return request;
}

auto CreateFsyncDirRequest(ui64 node, bool dataSync)
{
auto request = CreateRequest<NProto::TFsyncDirRequest>();
request->SetNodeId(node);
request->SetDataSync(dataSync);
return request;
}

#define FILESTORE_DECLARE_METHOD(name, ns) \
template <typename... Args> \
NProto::T##name##Response name(Args&&... args) \
Expand Down Expand Up @@ -1580,6 +1597,23 @@ Y_UNIT_TEST_SUITE(LocalFileStore)
UNIT_ASSERT_VALUES_EQUAL(names2[0], "d");
}
}

Y_UNIT_TEST(ShouldFsyncFileAndDir)
{
TTestBootstrap bootstrap("fs");

auto fileNodeId = CreateFile(bootstrap, RootNodeId, "file1");
auto dirNodeId = CreateDirectory(bootstrap, RootNodeId, "dir1");

auto fileHandle =
bootstrap.CreateHandle(fileNodeId, "", TCreateHandleArgs::RDWR)
.GetHandle();

for (auto dataSync: {true, false}) {
bootstrap.Fsync(fileNodeId, fileHandle, dataSync);
bootstrap.FsyncDir(dirNodeId, dataSync);
}
}
};

} // namespace NCloud::NFileStore
2 changes: 1 addition & 1 deletion cloud/filestore/libs/storage/api/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ struct TEvService
static_assert(EvEnd < (int)TFileStoreEvents::SERVICE_END,
"EvEnd expected to be < TFileStoreEvents::SERVICE_END");

FILESTORE_SERVICE(FILESTORE_DECLARE_PROTO_EVENTS, NProto)
FILESTORE_REMOTE_SERVICE(FILESTORE_DECLARE_PROTO_EVENTS, NProto)

using TEvRegisterLocalFileStoreRequest = TRequestEvent<
TRegisterLocalFileStore,
Expand Down
2 changes: 1 addition & 1 deletion cloud/filestore/libs/storage/service/service_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ bool TStorageServiceActor::HandleRequests(STFUNC_SIG)
FILESTORE_HANDLE_REQUEST(name, ns) \
FILESTORE_HANDLE_RESPONSE(name, ns) \

FILESTORE_SERVICE(FILESTORE_HANDLE_REQUEST_RESPONSE, TEvService)
FILESTORE_REMOTE_SERVICE(FILESTORE_HANDLE_REQUEST_RESPONSE, TEvService)
FILESTORE_SERVICE_REQUESTS_PRIVATE(FILESTORE_HANDLE_REQUEST_RESPONSE, TEvServicePrivate)
#undef FILESTORE_HANDLE_REQUEST_RESPONSE

Expand Down
2 changes: 1 addition & 1 deletion cloud/filestore/libs/storage/service/service_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TStorageServiceActor final
const ns::TEv##name##Response::TPtr& ev, \
const NActors::TActorContext& ctx); \

FILESTORE_SERVICE(FILESTORE_DECLARE_REQUEST_RESPONSE, TEvService)
FILESTORE_REMOTE_SERVICE(FILESTORE_DECLARE_REQUEST_RESPONSE, TEvService)
FILESTORE_SERVICE_REQUESTS_PRIVATE(FILESTORE_DECLARE_REQUEST_RESPONSE, TEvServicePrivate)
#undef FILESTORE_DECLARE_REQUEST_RESPONSE

Expand Down
Loading

0 comments on commit 6939558

Please sign in to comment.