diff --git a/cloud/filestore/config/storage.proto b/cloud/filestore/config/storage.proto index d2cd98256c4..c478a00b18a 100644 --- a/cloud/filestore/config/storage.proto +++ b/cloud/filestore/config/storage.proto @@ -231,4 +231,16 @@ message TStorageConfig optional uint32 CleanupThresholdAverage = 342; // Enables the aforementioned threshold. optional bool NewCleanupEnabled = 343; + + // Enables GenerateBlobIds + WriteBlob + AddData instead of WriteBlob + // for writing. + optional bool ThreeStageWriteEnabled = 344; + + // When issuing blob ids, the tablet acquires a collect barrier. In order + // to release it in case of a client disconnect, this timeout is used. + optional uint32 GenerateBlobIdsReleaseCollectBarrierTimeout = 345; + + // If ThreeStageWriteEnabled is true, writes that exceed this threshold + // will use the three-stage write path. Similar to WriteBlobThreshold + optional uint32 ThreeStageWriteThreshold = 346; } diff --git a/cloud/filestore/libs/storage/api/service.h b/cloud/filestore/libs/storage/api/service.h index 9c23e787342..99c8ce28826 100644 --- a/cloud/filestore/libs/storage/api/service.h +++ b/cloud/filestore/libs/storage/api/service.h @@ -55,11 +55,11 @@ namespace NCloud::NFileStore::NStorage { xxx(ReleaseLock, __VA_ARGS__) \ xxx(TestLock, __VA_ARGS__) \ \ - xxx(WriteData, __VA_ARGS__) \ xxx(AllocateData, __VA_ARGS__) \ // FILESTORE_SERVICE_REQUESTS_HANDLE #define FILESTORE_SERVICE_REQUESTS_NO_HANDLE(xxx, ...) \ + xxx(WriteData, __VA_ARGS__) \ xxx(ReadData, __VA_ARGS__) \ // FILESTORE_SERVICE_REQUESTS_NO_HANDLE diff --git a/cloud/filestore/libs/storage/api/tablet.h b/cloud/filestore/libs/storage/api/tablet.h index 5c0d4c34698..86de2d64322 100644 --- a/cloud/filestore/libs/storage/api/tablet.h +++ b/cloud/filestore/libs/storage/api/tablet.h @@ -24,6 +24,8 @@ namespace NCloud::NFileStore::NStorage { xxx(ChangeStorageConfig, __VA_ARGS__) \ xxx(DescribeData, __VA_ARGS__) \ xxx(DescribeSessions, __VA_ARGS__) \ + xxx(GenerateBlobIds, __VA_ARGS__) \ + xxx(AddData, __VA_ARGS__) \ // FILESTORE_TABLET_REQUESTS //////////////////////////////////////////////////////////////////////////////// @@ -65,6 +67,12 @@ struct TEvIndexTablet EvDescribeSessionsRequest = EvBegin + 17, EvDescribeSessionsResponse, + EvGenerateBlobIdsRequest = EvBegin + 19, + EvGenerateBlobIdsResponse, + + EvAddDataRequest = EvBegin + 21, + EvAddDataResponse, + EvEnd }; diff --git a/cloud/filestore/libs/storage/core/config.cpp b/cloud/filestore/libs/storage/core/config.cpp index 93128dfdf14..42cb543e9a4 100644 --- a/cloud/filestore/libs/storage/core/config.cpp +++ b/cloud/filestore/libs/storage/core/config.cpp @@ -139,11 +139,17 @@ namespace { NCloud::NProto::AUTHORIZATION_IGNORE )\ \ xxx(TwoStageReadEnabled, bool, false )\ + xxx(ThreeStageWriteEnabled, bool, false )\ + xxx(ThreeStageWriteThreshold, ui32, 64_KB )\ xxx(EntryTimeout, TDuration, TDuration::Zero() )\ xxx(NegativeEntryTimeout, TDuration, TDuration::Zero() )\ xxx(AttrTimeout, TDuration, TDuration::Zero() )\ xxx(MaxOutOfOrderCompactionMapLoadRequestsInQueue, ui32, 5 )\ xxx(MaxBackpressureErrorsBeforeSuicide, ui32, 1000 )\ + \ + xxx(GenerateBlobIdsReleaseCollectBarrierTimeout, \ + TDuration, \ + TDuration::Seconds(10) )\ // FILESTORE_STORAGE_CONFIG #define FILESTORE_DECLARE_CONFIG(name, type, value) \ diff --git a/cloud/filestore/libs/storage/core/config.h b/cloud/filestore/libs/storage/core/config.h index d2dfae3902c..e6fb1312bf3 100644 --- a/cloud/filestore/libs/storage/core/config.h +++ b/cloud/filestore/libs/storage/core/config.h @@ -181,6 +181,8 @@ class TStorageConfig NCloud::NProto::EAuthorizationMode GetAuthorizationMode() const; bool GetTwoStageReadEnabled() const; + bool GetThreeStageWriteEnabled() const; + ui32 GetThreeStageWriteThreshold() const; TDuration GetEntryTimeout() const; TDuration GetNegativeEntryTimeout() const; TDuration GetAttrTimeout() const; @@ -191,6 +193,8 @@ class TStorageConfig ui32 GetMaxBackpressureErrorsBeforeSuicide() const; + TDuration GetGenerateBlobIdsReleaseCollectBarrierTimeout() const; + void Dump(IOutputStream& out) const; void DumpHtml(IOutputStream& out) const; void DumpOverridesHtml(IOutputStream& out) const; diff --git a/cloud/filestore/libs/storage/service/CMakeLists.darwin-x86_64.txt b/cloud/filestore/libs/storage/service/CMakeLists.darwin-x86_64.txt index f7c1f0ba1ac..a3c502a2032 100644 --- a/cloud/filestore/libs/storage/service/CMakeLists.darwin-x86_64.txt +++ b/cloud/filestore/libs/storage/service/CMakeLists.darwin-x86_64.txt @@ -54,5 +54,6 @@ target_sources(filestore-libs-storage-service PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_pingsession.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_statfs.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_update_stats.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_writedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_state.cpp ) diff --git a/cloud/filestore/libs/storage/service/CMakeLists.linux-aarch64.txt b/cloud/filestore/libs/storage/service/CMakeLists.linux-aarch64.txt index 0cbff68a133..6e2e15a7ddd 100644 --- a/cloud/filestore/libs/storage/service/CMakeLists.linux-aarch64.txt +++ b/cloud/filestore/libs/storage/service/CMakeLists.linux-aarch64.txt @@ -55,5 +55,6 @@ target_sources(filestore-libs-storage-service PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_pingsession.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_statfs.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_update_stats.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_writedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_state.cpp ) diff --git a/cloud/filestore/libs/storage/service/CMakeLists.linux-x86_64.txt b/cloud/filestore/libs/storage/service/CMakeLists.linux-x86_64.txt index 22090c40682..5eda5bbf8d4 100644 --- a/cloud/filestore/libs/storage/service/CMakeLists.linux-x86_64.txt +++ b/cloud/filestore/libs/storage/service/CMakeLists.linux-x86_64.txt @@ -56,5 +56,6 @@ target_sources(filestore-libs-storage-service PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_readdata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_statfs.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_update_stats.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_writedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_state.cpp ) diff --git a/cloud/filestore/libs/storage/service/CMakeLists.windows-x86_64.txt b/cloud/filestore/libs/storage/service/CMakeLists.windows-x86_64.txt index f7c1f0ba1ac..a3c502a2032 100644 --- a/cloud/filestore/libs/storage/service/CMakeLists.windows-x86_64.txt +++ b/cloud/filestore/libs/storage/service/CMakeLists.windows-x86_64.txt @@ -54,5 +54,6 @@ target_sources(filestore-libs-storage-service PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_pingsession.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_statfs.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_update_stats.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_actor_writedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/service/service_state.cpp ) diff --git a/cloud/filestore/libs/storage/service/service_actor_writedata.cpp b/cloud/filestore/libs/storage/service/service_actor_writedata.cpp new file mode 100644 index 00000000000..becc69f3d97 --- /dev/null +++ b/cloud/filestore/libs/storage/service/service_actor_writedata.cpp @@ -0,0 +1,358 @@ +#include "service_actor.h" + +#include +#include +#include +#include + +#include + +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TWriteDataActor final: public TActorBootstrapped +{ +private: + // Original request + NProto::TWriteDataRequest WriteRequest; + const TRequestInfoPtr RequestInfo; + + // generated blob id and associated data + NProtoPrivate::TGenerateBlobIdsResponse GenerateBlobIdsResponse; + + // WriteData state + ui32 RemainingBlobsToWrite = 0; + +public: + TWriteDataActor( + NProto::TWriteDataRequest request, + TRequestInfoPtr requestInfo) + : WriteRequest(std::move(request)) + , RequestInfo(std::move(requestInfo)) + {} + + void Bootstrap(const TActorContext& ctx) + { + auto request = + std::make_unique(); + + request->Record.MutableHeaders()->CopyFrom(WriteRequest.GetHeaders()); + request->Record.SetFileSystemId(WriteRequest.GetFileSystemId()); + request->Record.SetNodeId(WriteRequest.GetNodeId()); + request->Record.SetHandle(WriteRequest.GetHandle()); + request->Record.SetOffset(WriteRequest.GetOffset()); + request->Record.SetLength(WriteRequest.GetBuffer().size()); + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "WriteDataActor started, data size: %lu, offset: %lu", + WriteRequest.GetBuffer().size(), + WriteRequest.GetOffset()); + + ctx.Send(MakeIndexTabletProxyServiceId(), request.release()); + + Become(&TThis::StateWork); + } + +private: + STFUNC(StateWork) + { + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc( + TEvIndexTablet::TEvGenerateBlobIdsResponse, + HandleGenerateBlobIdsResponse); + + HFunc(TEvBlobStorage::TEvPutResult, HandleWriteBlobResponse); + + HFunc(TEvIndexTablet::TEvAddDataResponse, HandleAddDataResponse); + + HFunc(TEvService::TEvWriteDataResponse, HandleWriteDataResponse); + + default: + HandleUnexpectedEvent(ev, TFileStoreComponents::SERVICE_WORKER); + break; + } + } + + void HandleGenerateBlobIdsResponse( + const TEvIndexTablet::TEvGenerateBlobIdsResponse::TPtr& ev, + const TActorContext& ctx) + { + const auto* msg = ev->Get(); + + if (HasError(msg->GetError())) { + WriteData(ctx, msg->GetError()); + return; + } + + GenerateBlobIdsResponse.CopyFrom(msg->Record); + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "GenerateBlobIds response received: %s", + GenerateBlobIdsResponse.DebugString().Quote().c_str()); + + WriteBlobs(ctx); + } + + void WriteBlobs(const TActorContext& ctx) + { + RemainingBlobsToWrite = GenerateBlobIdsResponse.BlobsSize(); + ui64 offset = 0; + + for (const auto& blob: GenerateBlobIdsResponse.GetBlobs()) { + NKikimr::TLogoBlobID blobId = + LogoBlobIDFromLogoBlobID(blob.GetBlobId()); + std::unique_ptr request; + if (GenerateBlobIdsResponse.BlobsSize() == 1) { + // do not copy the buffer if there is only one blob + request = std::make_unique( + blobId, + WriteRequest.GetBuffer(), + TInstant::Max()); + } else { + request = std::make_unique( + blobId, + TString( + WriteRequest.GetBuffer().Data() + offset, + blobId.BlobSize()), + TInstant::Max()); + } + NKikimr::TActorId proxy = + MakeBlobStorageProxyID(blob.GetBSGroupId()); + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Sending TEvPut request to blob storage, blobId: %s, proxy: %s", + blobId.ToString().c_str(), + proxy.ToString().c_str()); + SendToBSProxy(ctx, proxy, request.release(), blobId.Cookie()); + offset += blobId.BlobSize(); + } + } + + void HandleWriteBlobResponse( + const TEvBlobStorage::TEvPutResult::TPtr& ev, + const TActorContext& ctx) + { + const auto* msg = ev->Get(); + + if (msg->Status != NKikimrProto::OK) { + const auto error = + MakeError(MAKE_KIKIMR_ERROR(msg->Status), msg->ErrorReason); + LOG_WARN( + ctx, + TFileStoreComponents::SERVICE, + "WriteData error: %s", + msg->ErrorReason.Quote().c_str()); + // We still may receive some responses, but we do not want to + // process them + RemainingBlobsToWrite = std::numeric_limits::max(); + return WriteData(ctx, error); + } + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "TEvPutResult response received: %s", + msg->ToString().c_str()); + + --RemainingBlobsToWrite; + if (RemainingBlobsToWrite == 0) { + AddData(ctx); + } + } + + void AddData(const TActorContext& ctx) + { + auto request = std::make_unique(); + + request->Record.MutableHeaders()->CopyFrom(WriteRequest.GetHeaders()); + request->Record.SetFileSystemId(WriteRequest.GetFileSystemId()); + request->Record.SetNodeId(WriteRequest.GetNodeId()); + request->Record.SetHandle(WriteRequest.GetHandle()); + request->Record.SetOffset(WriteRequest.GetOffset()); + request->Record.SetLength(WriteRequest.GetBuffer().size()); + for (auto& blob: *GenerateBlobIdsResponse.MutableBlobs()) { + request->Record.AddBlobIds()->Swap(blob.MutableBlobId()); + } + request->Record.SetCommitId(GenerateBlobIdsResponse.GetCommitId()); + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Sending AddData request to tablet: %s", + request->Record.DebugString().Quote().c_str()); + + ctx.Send(MakeIndexTabletProxyServiceId(), request.release()); + } + + void HandleAddDataResponse( + const TEvIndexTablet::TEvAddDataResponse::TPtr& ev, + const TActorContext& ctx) + { + auto* msg = ev->Get(); + + if (HasError(msg->GetError())) { + return WriteData(ctx, msg->GetError()); + } + + auto response = std::make_unique(); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + Die(ctx); + } + + /** + * @brief Fallback to regular write if two-stage write fails for any reason + */ + void WriteData(const TActorContext& ctx, const NProto::TError& error) + { + LOG_WARN( + ctx, + TFileStoreComponents::SERVICE, + "Falling back to WriteData for %lu, %lu, %lu (%lu bytes). Message: " + "%s", + WriteRequest.GetNodeId(), + WriteRequest.GetHandle(), + WriteRequest.GetOffset(), + WriteRequest.GetBuffer().size(), + FormatError(error).Quote().c_str()); + + auto request = std::make_unique(); + request->Record = std::move(WriteRequest); + + // forward request through tablet proxy + ctx.Send(MakeIndexTabletProxyServiceId(), request.release()); + } + + void HandleWriteDataResponse( + const TEvService::TEvWriteDataResponse::TPtr& ev, + const TActorContext& ctx) + { + auto* msg = ev->Get(); + + if (HasError(msg->GetError())) { + HandleError(ctx, msg->GetError()); + return; + } + + LOG_DEBUG(ctx, TFileStoreComponents::SERVICE, "WriteData succeeded"); + + auto response = std::make_unique(); + response->Record = std::move(msg->Record); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + Die(ctx); + } + + void ReplyAndDie(const TActorContext& ctx) + { + auto response = std::make_unique(); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + Die(ctx); + } + + void HandleError(const TActorContext& ctx, const NProto::TError& error) + { + auto response = + std::make_unique(error); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + Die(ctx); + } + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) + { + Y_UNUSED(ev); + HandleError(ctx, MakeError(E_REJECTED, "request cancelled")); + } +}; + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TStorageServiceActor::HandleWriteData( + const TEvService::TEvWriteDataRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + const auto& clientId = GetClientId(msg->Record); + const auto& sessionId = GetSessionId(msg->Record); + const ui64 seqNo = GetSessionSeqNo(msg->Record); + + auto* session = State->FindSession(sessionId, seqNo); + if (!session || session->ClientId != clientId || !session->SessionActor) { + auto response = std::make_unique( + ErrorInvalidSession(clientId, sessionId, seqNo)); + return NCloud::Reply(ctx, *ev, std::move(response)); + } + const NProto::TFileStore& filestore = session->FileStore; + + if (!filestore.GetFeatures().GetThreeStageWriteEnabled()) { + // If three-stage write is disabled, forward the request to the tablet + // in the same way as all other requests. + ForwardRequest(ctx, ev); + return; + } + + auto [cookie, inflight] = CreateInFlightRequest( + TRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), + session->MediaKind, + session->RequestStats, + ctx.Now()); + + InitProfileLogRequestInfo(inflight->ProfileLogRequest, msg->Record); + + ui32 blockSize = filestore.GetBlockSize(); + + // TODO(debnatkh): Consider supporting unaligned writes + if (filestore.GetFeatures().GetThreeStageWriteEnabled() && + msg->Record.GetOffset() % blockSize == 0 && + msg->Record.GetBuffer().Size() % blockSize == 0 && + msg->Record.GetBuffer().Size() >= + filestore.GetFeatures().GetThreeStageWriteThreshold()) + { + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Using three-stage write for request, size: %lu", + msg->Record.GetBuffer().Size()); + + auto requestInfo = + CreateRequestInfo(SelfId(), cookie, msg->CallContext); + + auto actor = std::make_unique( + std::move(msg->Record), + std::move(requestInfo)); + NCloud::Register(ctx, std::move(actor)); + } else { + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Forwarding WriteData request to tablet"); + return ForwardRequest(ctx, ev); + } +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/service/service_ut.cpp b/cloud/filestore/libs/storage/service/service_ut.cpp index 5fdbe969fe5..29bce5b55db 100644 --- a/cloud/filestore/libs/storage/service/service_ut.cpp +++ b/cloud/filestore/libs/storage/service/service_ut.cpp @@ -2038,6 +2038,318 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest) UNIT_ASSERT_VALUES_EQUAL(1, reassignedChannels[0]); UNIT_ASSERT_VALUES_EQUAL(4, reassignedChannels[1]); } + + TString GenerateValidateData(ui32 size) + { + TString data(size, 0); + for (ui32 i = 0; i < size; ++i) { + data[i] = 'A' + (i % ('Z' - 'A' + 1)); + } + return data; + } + + Y_UNIT_TEST(ShouldPerformThreeStageWrites) + { + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + TServiceClient service(env.GetRuntime(), nodeIdx); + const TString fs = "test"; + service.CreateFileStore(fs, 1000); + + { + NProto::TStorageConfig newConfig; + newConfig.SetThreeStageWriteEnabled(true); + newConfig.SetThreeStageWriteThreshold(1); + const auto response = + ExecuteChangeStorageConfig(std::move(newConfig), service); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteEnabled(), + true); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteThreshold(), + 1); + TDispatchOptions options; + env.GetRuntime().DispatchEvents(options, TDuration::Seconds(1)); + } + + auto headers = service.InitSession(fs, "client"); + ui64 nodeId = service + .CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file")) + ->Record.GetNode() + .GetId(); + ui64 handle = service + .CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR) + ->Record.GetHandle(); + + ui32 putRequestCount = 0; + TActorId worker; + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvGenerateBlobIdsRequest: { + if (!worker) { + worker = event->Sender; + } + break; + } + case TEvBlobStorage::EvPut: { + if (event->Sender == worker && + event->Recipient.IsService() && + event->Recipient.ServiceId().StartsWith("bsproxy")) + { + ++putRequestCount; + } + break; + } + } + return false; + }); + + auto& runtime = env.GetRuntime(); + + auto validateWriteData = + [&](ui64 offset, ui64 size, ui32 expectedPutCount) + { + auto data = GenerateValidateData(size); + + service.WriteData(headers, fs, nodeId, handle, offset, data); + auto readDataResult = + service + .ReadData(headers, fs, nodeId, handle, offset, data.Size()); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsRequest)); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvAddDataRequest)); + UNIT_ASSERT_VALUES_EQUAL(1, runtime.GetCounter(TEvIndexTabletPrivate::EvAddBlobRequest)); + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTabletPrivate::EvWriteBlobRequest)); + UNIT_ASSERT_VALUES_EQUAL(1, runtime.GetCounter(TEvService::EvWriteDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(expectedPutCount, putRequestCount); + // clang-format on + runtime.ClearCounters(); + putRequestCount = 0; + worker = TActorId(); + }; + + validateWriteData(0, DefaultBlockSize, 1); + validateWriteData(DefaultBlockSize, DefaultBlockSize, 1); + validateWriteData(0, DefaultBlockSize * BlockGroupSize, 1); + validateWriteData(0, DefaultBlockSize * BlockGroupSize * 2, 2); + validateWriteData( + DefaultBlockSize, + DefaultBlockSize * BlockGroupSize * 10, + 11); + validateWriteData(0, DefaultBlockSize * BlockGroupSize * 3, 3); + // Currently the data is written from 0th to (1 + BlockGroupSize * 10) = 641th block + // Therefore, the next write should fail + + auto data = + GenerateValidateData(DefaultBlockSize * 360); + + auto response = + service.AssertWriteDataFailed(headers, fs, nodeId, handle, DefaultBlockSize * 641, data); + auto error = STATUS_FROM_CODE(response->GetError().GetCode()); + UNIT_ASSERT_VALUES_EQUAL((ui32)NProto::E_FS_NOSPC, error); + } + + Y_UNIT_TEST(ShouldNotUseThreeStageWriteForSmallOrUnalignedRequests) + { + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + TServiceClient service(env.GetRuntime(), nodeIdx); + const TString fs = "test"; + service.CreateFileStore(fs, 1000); + + { + NProto::TStorageConfig newConfig; + newConfig.SetThreeStageWriteEnabled(true); + const auto response = + ExecuteChangeStorageConfig(std::move(newConfig), service); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteEnabled(), + true); + TDispatchOptions options; + env.GetRuntime().DispatchEvents(options, TDuration::Seconds(1)); + } + + auto headers = service.InitSession(fs, "client"); + ui64 nodeId = service + .CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file")) + ->Record.GetNode() + .GetId(); + ui64 handle = service + .CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR) + ->Record.GetHandle(); + + auto& runtime = env.GetRuntime(); + + auto validateWriteData = [&](ui64 offset, ui64 size) + { + auto data = GenerateValidateData(size); + + service.WriteData(headers, fs, nodeId, handle, offset, data); + auto readDataResult = + service + .ReadData(headers, fs, nodeId, handle, offset, data.Size()); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsRequest)); + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTablet::EvAddDataRequest)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataRequest)); + // clang-format on + runtime.ClearCounters(); + }; + + validateWriteData(0, 4_KB); + validateWriteData(4_KB, 4_KB); + validateWriteData(1, 128_KB); + } + + Y_UNIT_TEST(ShouldFallbackThreeStageWriteToSimpleWrite) + { + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + TServiceClient service(env.GetRuntime(), nodeIdx); + const TString fs = "test"; + service.CreateFileStore(fs, 1000); + + NProto::TError error; + error.SetCode(E_REJECTED); + + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvGenerateBlobIdsResponse: { + auto* msg = event->template Get< + TEvIndexTablet::TEvGenerateBlobIdsResponse>(); + msg->Record.MutableError()->CopyFrom(error); + break; + } + } + return false; + }); + + { + NProto::TStorageConfig newConfig; + newConfig.SetThreeStageWriteEnabled(true); + const auto response = + ExecuteChangeStorageConfig(std::move(newConfig), service); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteEnabled(), + true); + TDispatchOptions options; + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(1)); + } + + auto headers = service.InitSession(fs, "client"); + ui64 nodeId = service + .CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file")) + ->Record.GetNode() + .GetId(); + ui64 handle = service + .CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR) + ->Record.GetHandle(); + + // GenerateBlobIdsResponse fails + TString data = GenerateValidateData(256_KB); + service.WriteData(headers, fs, nodeId, handle, 0, data); + auto readDataResult = + service.ReadData(headers, fs, nodeId, handle, 0, data.Size()); + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + auto& runtime = env.GetRuntime(); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsResponse)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataResponse)); + // clang-format on + runtime.ClearCounters(); + + // AddDataResponse fails + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvAddDataResponse: { + auto* msg = event->template Get< + TEvIndexTablet::TEvAddDataResponse>(); + msg->Record.MutableError()->CopyFrom(error); + break; + } + } + return false; + }); + data = GenerateValidateData(256_KB); + service.WriteData(headers, fs, nodeId, handle, 0, data); + readDataResult = + service.ReadData(headers, fs, nodeId, handle, 0, data.Size()); + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvAddDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsResponse)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataResponse)); + // clang-format on + + // TEvGet fails + + runtime.ClearCounters(); + + TActorId worker; + ui32 evPuts = 0; + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvGenerateBlobIdsRequest: { + if (!worker) { + worker = event->Sender; + } + break; + } + case TEvBlobStorage::EvPutResult: { + auto* msg = + event->template Get(); + if (event->Recipient == worker) { + if (evPuts == 0) { + msg->Status = NKikimrProto::ERROR; + } + ++evPuts; + } + break; + } + } + + return false; + }); + + data = GenerateValidateData(256_KB); + service.WriteData(headers, fs, nodeId, handle, 0, data); + readDataResult = + service.ReadData(headers, fs, nodeId, handle, 0, data.Size()); + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTablet::EvAddDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsResponse)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(1, evPuts); + // clang-format on + } } } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/service/ya.make b/cloud/filestore/libs/storage/service/ya.make index eb27a0f3bf5..46eac233826 100644 --- a/cloud/filestore/libs/storage/service/ya.make +++ b/cloud/filestore/libs/storage/service/ya.make @@ -28,6 +28,7 @@ SRCS( service_actor_readdata.cpp service_actor_statfs.cpp service_actor_update_stats.cpp + service_actor_writedata.cpp service_state.cpp ) diff --git a/cloud/filestore/libs/storage/tablet/CMakeLists.darwin-x86_64.txt b/cloud/filestore/libs/storage/tablet/CMakeLists.darwin-x86_64.txt index 928c3c4088e..6ab82c87d55 100644 --- a/cloud/filestore/libs/storage/tablet/CMakeLists.darwin-x86_64.txt +++ b/cloud/filestore/libs/storage/tablet/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(model) add_subdirectory(protos) add_subdirectory(ut) @@ -35,6 +36,7 @@ target_link_libraries(libs-storage-tablet PUBLIC filestore-libs-storage-api filestore-libs-storage-core filestore-libs-storage-model + storage-tablet-actors storage-tablet-model storage-tablet-protos core-libs-api @@ -66,6 +68,7 @@ target_sources(libs-storage-tablet PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp diff --git a/cloud/filestore/libs/storage/tablet/CMakeLists.linux-aarch64.txt b/cloud/filestore/libs/storage/tablet/CMakeLists.linux-aarch64.txt index e083d23a375..18d36e0dcab 100644 --- a/cloud/filestore/libs/storage/tablet/CMakeLists.linux-aarch64.txt +++ b/cloud/filestore/libs/storage/tablet/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(model) add_subdirectory(protos) add_subdirectory(ut) @@ -36,6 +37,7 @@ target_link_libraries(libs-storage-tablet PUBLIC filestore-libs-storage-api filestore-libs-storage-core filestore-libs-storage-model + storage-tablet-actors storage-tablet-model storage-tablet-protos core-libs-api @@ -67,6 +69,7 @@ target_sources(libs-storage-tablet PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp diff --git a/cloud/filestore/libs/storage/tablet/CMakeLists.linux-x86_64.txt b/cloud/filestore/libs/storage/tablet/CMakeLists.linux-x86_64.txt index e083d23a375..18d36e0dcab 100644 --- a/cloud/filestore/libs/storage/tablet/CMakeLists.linux-x86_64.txt +++ b/cloud/filestore/libs/storage/tablet/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(model) add_subdirectory(protos) add_subdirectory(ut) @@ -36,6 +37,7 @@ target_link_libraries(libs-storage-tablet PUBLIC filestore-libs-storage-api filestore-libs-storage-core filestore-libs-storage-model + storage-tablet-actors storage-tablet-model storage-tablet-protos core-libs-api @@ -67,6 +69,7 @@ target_sources(libs-storage-tablet PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp diff --git a/cloud/filestore/libs/storage/tablet/CMakeLists.windows-x86_64.txt b/cloud/filestore/libs/storage/tablet/CMakeLists.windows-x86_64.txt index 928c3c4088e..6ab82c87d55 100644 --- a/cloud/filestore/libs/storage/tablet/CMakeLists.windows-x86_64.txt +++ b/cloud/filestore/libs/storage/tablet/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(model) add_subdirectory(protos) add_subdirectory(ut) @@ -35,6 +36,7 @@ target_link_libraries(libs-storage-tablet PUBLIC filestore-libs-storage-api filestore-libs-storage-core filestore-libs-storage-model + storage-tablet-actors storage-tablet-model storage-tablet-protos core-libs-api @@ -66,6 +68,7 @@ target_sources(libs-storage-tablet PRIVATE ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp diff --git a/cloud/filestore/libs/storage/tablet/actors/CMakeLists.darwin-x86_64.txt b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..79b8f54f4dc --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-tablet-actors) +target_link_libraries(storage-tablet-actors PUBLIC + contrib-libs-cxxsupp + yutil + storage-tablet-model + core-libs-common +) +target_sources(storage-tablet-actors PRIVATE + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.cpp +) diff --git a/cloud/filestore/libs/storage/tablet/actors/CMakeLists.linux-aarch64.txt b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..118d27c081f --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(storage-tablet-actors) +target_link_libraries(storage-tablet-actors PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + storage-tablet-model + core-libs-common +) +target_sources(storage-tablet-actors PRIVATE + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.cpp +) diff --git a/cloud/filestore/libs/storage/tablet/actors/CMakeLists.linux-x86_64.txt b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..bd69273789d --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + +add_library(storage-tablet-actors) +target_link_libraries(storage-tablet-actors PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + tools-enum_parser-enum_serialization_runtime + storage-tablet-model + core-libs-common + libs-tablet-model +) +target_sources(storage-tablet-actors PRIVATE + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.cpp +) diff --git a/cloud/filestore/libs/storage/tablet/actors/CMakeLists.txt b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/cloud/filestore/libs/storage/tablet/actors/CMakeLists.windows-x86_64.txt b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..e3a6ff07290 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/CMakeLists.windows-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_library(storage-tablet-actors) +target_link_libraries(storage-tablet-actors PUBLIC + contrib-libs-cxxsupp + yutil + storage-tablet-model + core-libs-common +) +target_sources(storage-tablet-actors PRIVATE + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp + ${CMAKE_SOURCE_DIR}/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.cpp +) diff --git a/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp new file mode 100644 index 00000000000..40a8454ed57 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp @@ -0,0 +1,135 @@ +#include "tablet_adddata.h" + +#include +#include +#include +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; + +//////////////////////////////////////////////////////////////////////////////// + +LWTRACE_USING(FILESTORE_STORAGE_PROVIDER); + +//////////////////////////////////////////////////////////////////////////////// + +TAddDataActor::TAddDataActor( + ITraceSerializerPtr traceSerializer, + TString logTag, + TActorId tablet, + TRequestInfoPtr requestInfo, + ui64 commitId, + TVector blobs, + TWriteRange writeRange) + : TraceSerializer(std::move(traceSerializer)) + , LogTag(std::move(logTag)) + , Tablet(tablet) + , RequestInfo(std::move(requestInfo)) + , CommitId(commitId) + , Blobs(std::move(blobs)) + , WriteRange(writeRange) +{} + +void TAddDataActor::Bootstrap(const TActorContext& ctx) +{ + FILESTORE_TRACK( + RequestReceived_TabletWorker, + RequestInfo->CallContext, + "AddData"); + + AddBlob(ctx); + Become(&TThis::StateWork); +} + +void TAddDataActor::AddBlob(const TActorContext& ctx) +{ + auto request = std::make_unique( + RequestInfo->CallContext); + request->Mode = EAddBlobMode::Write; + request->WriteRanges.push_back(WriteRange); + + for (const auto& blob: Blobs) { + request->MergedBlobs.emplace_back( + blob.BlobId, + blob.Block, + blob.BlocksCount); + } + + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TAddDataActor::HandleAddBlobResponse( + const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + ReplyAndDie(ctx, msg->GetError()); +} + +void TAddDataActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + ReplyAndDie(ctx, MakeError(E_REJECTED, "tablet is shutting down")); +} + +void TAddDataActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& error) +{ + { + // notify tablet + using TCompletion = TEvIndexTabletPrivate::TEvAddDataCompleted; + auto response = std::make_unique(error); + response->CommitId = CommitId; + NCloud::Send(ctx, Tablet, std::move(response)); + } + + FILESTORE_TRACK( + ResponseSent_TabletWorker, + RequestInfo->CallContext, + "AddData"); + + if (RequestInfo->Sender != Tablet) { + auto response = + std::make_unique(error); + LOG_DEBUG( + ctx, + TFileStoreComponents::TABLET_WORKER, + "%s AddData: #%lu completed (%s)", + LogTag.c_str(), + RequestInfo->CallContext->RequestId, + FormatError(response->Record.GetError()).c_str()); + + BuildTraceInfo( + TraceSerializer, + RequestInfo->CallContext, + response->Record); + BuildThrottlerInfo(*RequestInfo->CallContext, response->Record); + + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + } + + Die(ctx); +} + +STFUNC(TAddDataActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc(TEvIndexTabletPrivate::TEvAddBlobResponse, HandleAddBlobResponse); + + default: + HandleUnexpectedEvent(ev, TFileStoreComponents::TABLET_WORKER); + break; + } +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.h b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.h new file mode 100644 index 00000000000..98d46509e81 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.h @@ -0,0 +1,59 @@ +#include +#include +#include +#include + +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; +using namespace NKikimr; + +//////////////////////////////////////////////////////////////////////////////// + +class TAddDataActor final: public TActorBootstrapped +{ +private: + const ITraceSerializerPtr TraceSerializer; + + const TString LogTag; + const TActorId Tablet; + const TRequestInfoPtr RequestInfo; + + const ui64 CommitId; + /*const*/ TVector Blobs; + const TWriteRange WriteRange; + +public: + TAddDataActor( + ITraceSerializerPtr traceSerializer, + TString logTag, + TActorId tablet, + TRequestInfoPtr requestInfo, + ui64 commitId, + TVector blobs, + TWriteRange writeRange); + + void Bootstrap(const TActorContext& ctx); + +private: + STFUNC(StateWork); + + void AddBlob(const TActorContext& ctx); + void HandleAddBlobResponse( + const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void + ReplyAndDie(const TActorContext& ctx, const NProto::TError& error = {}); +}; + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.cpp b/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.cpp new file mode 100644 index 00000000000..d372a0ba986 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.cpp @@ -0,0 +1,167 @@ +#include "tablet_writedata.h" + +#include +#include +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; + +//////////////////////////////////////////////////////////////////////////////// + +LWTRACE_USING(FILESTORE_STORAGE_PROVIDER); + +//////////////////////////////////////////////////////////////////////////////// + +TWriteDataActor::TWriteDataActor( + ITraceSerializerPtr traceSerializer, + TString logTag, + TActorId tablet, + TRequestInfoPtr requestInfo, + ui64 commitId, + TVector blobs, + TWriteRange writeRange) + : TraceSerializer(std::move(traceSerializer)) + , LogTag(std::move(logTag)) + , Tablet(tablet) + , RequestInfo(std::move(requestInfo)) + , CommitId(commitId) + , Blobs(std::move(blobs)) + , WriteRange(writeRange) +{ + for (const auto& blob: Blobs) { + BlobsSize += blob.BlobContent.Size(); + } +} + +void TWriteDataActor::Bootstrap(const TActorContext& ctx) +{ + FILESTORE_TRACK( + RequestReceived_TabletWorker, + RequestInfo->CallContext, + "WriteData"); + + WriteBlob(ctx); + Become(&TThis::StateWork); +} + +void TWriteDataActor::WriteBlob(const TActorContext& ctx) +{ + auto request = std::make_unique( + RequestInfo->CallContext + ); + + for (auto& blob: Blobs) { + request->Blobs.emplace_back(blob.BlobId, std::move(blob.BlobContent)); + } + + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TWriteDataActor::HandleWriteBlobResponse( + const TEvIndexTabletPrivate::TEvWriteBlobResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + if (FAILED(msg->GetStatus())) { + ReplyAndDie(ctx, msg->GetError()); + return; + } + + AddBlob(ctx); +} + +void TWriteDataActor::AddBlob(const TActorContext& ctx) +{ + auto request = std::make_unique( + RequestInfo->CallContext + ); + request->Mode = EAddBlobMode::Write; + request->WriteRanges.push_back(WriteRange); + + for (const auto& blob: Blobs) { + request->MergedBlobs.emplace_back( + blob.BlobId, + blob.Block, + blob.BlocksCount); + } + + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TWriteDataActor::HandleAddBlobResponse( + const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + ReplyAndDie(ctx, msg->GetError()); +} + +void TWriteDataActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + ReplyAndDie(ctx, MakeError(E_REJECTED, "tablet is shutting down")); +} + +void TWriteDataActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& error) +{ + { + // notify tablet + using TCompletion = TEvIndexTabletPrivate::TEvWriteDataCompleted; + auto response = std::make_unique(error); + response->CommitId = CommitId; + response->Count = 1; + response->Size = BlobsSize; + response->Time = ctx.Now() - RequestInfo->StartedTs; + NCloud::Send(ctx, Tablet, std::move(response)); + } + + FILESTORE_TRACK( + ResponseSent_TabletWorker, + RequestInfo->CallContext, + "WriteData"); + + if (RequestInfo->Sender != Tablet) { + auto response = std::make_unique(error); + LOG_DEBUG(ctx, TFileStoreComponents::TABLET_WORKER, + "%s WriteData: #%lu completed (%s)", + LogTag.c_str(), + RequestInfo->CallContext->RequestId, + FormatError(response->Record.GetError()).c_str()); + + BuildTraceInfo( + TraceSerializer, + RequestInfo->CallContext, + response->Record); + BuildThrottlerInfo(*RequestInfo->CallContext, response->Record); + + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + } + + Die(ctx); +} + +STFUNC(TWriteDataActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc(TEvIndexTabletPrivate::TEvWriteBlobResponse, HandleWriteBlobResponse); + HFunc(TEvIndexTabletPrivate::TEvAddBlobResponse, HandleAddBlobResponse); + + default: + HandleUnexpectedEvent(ev, TFileStoreComponents::TABLET_WORKER); + break; + } +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.h b/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.h new file mode 100644 index 00000000000..a546390acdb --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/tablet_writedata.h @@ -0,0 +1,65 @@ +#include +#include +#include +#include + +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; +using namespace NKikimr; + +//////////////////////////////////////////////////////////////////////////////// + +class TWriteDataActor final: public TActorBootstrapped +{ +private: + const ITraceSerializerPtr TraceSerializer; + + const TString LogTag; + const TActorId Tablet; + const TRequestInfoPtr RequestInfo; + + const ui64 CommitId; + /*const*/ TVector Blobs; + const TWriteRange WriteRange; + ui32 BlobsSize = 0; + +public: + TWriteDataActor( + ITraceSerializerPtr traceSerializer, + TString logTag, + TActorId tablet, + TRequestInfoPtr requestInfo, + ui64 commitId, + TVector blobs, + TWriteRange writeRange); + + void Bootstrap(const TActorContext& ctx); + +private: + STFUNC(StateWork); + + void WriteBlob(const TActorContext& ctx); + void HandleWriteBlobResponse( + const TEvIndexTabletPrivate::TEvWriteBlobResponse::TPtr& ev, + const TActorContext& ctx); + + void AddBlob(const TActorContext& ctx); + void HandleAddBlobResponse( + const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void + ReplyAndDie(const TActorContext& ctx, const NProto::TError& error = {}); +}; + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/actors/ya.make b/cloud/filestore/libs/storage/tablet/actors/ya.make new file mode 100644 index 00000000000..389d926dc02 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + tablet_writedata.cpp + tablet_adddata.cpp +) + +PEERDIR( + cloud/filestore/libs/storage/tablet/model +) + +END() diff --git a/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp b/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp index b489439553a..1c47512b29d 100644 --- a/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp +++ b/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp @@ -182,15 +182,19 @@ void TGarbageQueue::AcquireCollectBarrier(ui64 commitId) } } -void TGarbageQueue::ReleaseCollectBarrier(ui64 commitId) +bool TGarbageQueue::TryReleaseCollectBarrier(ui64 commitId) { { auto it = Impl->Barriers.find(commitId); - Y_ABORT_UNLESS(it != Impl->Barriers.end()); + if (it == Impl->Barriers.end()) { + return false; + } auto& barrier = const_cast(*it); - Y_ABORT_UNLESS(barrier.RefCount > 0); + if (barrier.RefCount == 0) { + return false; + } --barrier.RefCount; } @@ -208,6 +212,13 @@ void TGarbageQueue::ReleaseCollectBarrier(ui64 commitId) it = Impl->Barriers.erase(it); } + return true; +} + +bool TGarbageQueue::IsCollectBarrierAcquired(ui64 commitId) const +{ + auto it = Impl->Barriers.find(commitId); + return it != Impl->Barriers.end() && it->RefCount > 0; } ui64 TGarbageQueue::GetCollectCommitId() const diff --git a/cloud/filestore/libs/storage/tablet/model/garbage_queue.h b/cloud/filestore/libs/storage/tablet/model/garbage_queue.h index 493a2edf59b..bbdb12da8ab 100644 --- a/cloud/filestore/libs/storage/tablet/model/garbage_queue.h +++ b/cloud/filestore/libs/storage/tablet/model/garbage_queue.h @@ -46,7 +46,8 @@ class TGarbageQueue // void AcquireCollectBarrier(ui64 commitId); - void ReleaseCollectBarrier(ui64 commitId); + [[ nodiscard ]] bool TryReleaseCollectBarrier(ui64 commitId); + bool IsCollectBarrierAcquired(ui64 commitId) const; ui64 GetCollectCommitId() const; }; diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp index adb045808fb..dc563b70ea5 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp @@ -319,6 +319,56 @@ void TIndexTabletActor::ResetThrottlingPolicy() //////////////////////////////////////////////////////////////////////////////// +template +NProto::TError TIndexTabletActor::ValidateWriteRequest( + const TActorContext& ctx, + const TRequest& request, + const TByteRange& range) +{ + if (auto error = ValidateRange(range); HasError(error)) { + return error; + } + + auto* handle = FindHandle(request.GetHandle()); + if (!handle || handle->GetSessionId() != GetSessionId(request)) { + return ErrorInvalidHandle(request.GetHandle()); + } + + if (!IsWriteAllowed(BuildBackpressureThresholds())) { + if (CompactionStateLoadStatus.Finished && + ++BackpressureErrorCount >= + Config->GetMaxBackpressureErrorsBeforeSuicide()) + { + LOG_WARN( + ctx, + TFileStoreComponents::TABLET_WORKER, + "%s Suiciding after %u backpressure errors", + LogTag.c_str(), + BackpressureErrorCount); + + Suicide(ctx); + } + + return MakeError(E_REJECTED, "rejected due to backpressure"); + } + + return NProto::TError{}; +} + +template NProto::TError +TIndexTabletActor::ValidateWriteRequest( + const TActorContext& ctx, + const NProto::TWriteDataRequest& request, + const TByteRange& range); + +template NProto::TError +TIndexTabletActor::ValidateWriteRequest( + const TActorContext& ctx, + const NProtoPrivate::TAddDataRequest& request, + const TByteRange& range); + +//////////////////////////////////////////////////////////////////////////////// + NProto::TError TIndexTabletActor::IsDataOperationAllowed() const { if (!CompactionStateLoadStatus.Finished) { @@ -507,6 +557,7 @@ STFUNC(TIndexTabletActor::StateBoot) IgnoreFunc(TEvLocal::TEvTabletMetrics); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateCounters); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters); + IgnoreFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier); IgnoreFunc(TEvHiveProxy::TEvReassignTabletResponse); @@ -530,6 +581,7 @@ STFUNC(TIndexTabletActor::StateInit) HFunc(TEvFileStore::TEvUpdateConfig, HandleUpdateConfig); HFunc(TEvIndexTabletPrivate::TEvUpdateCounters, HandleUpdateCounters); HFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters, HandleUpdateLeakyBucketCounters); + HFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier, HandleReleaseCollectBarrier); FILESTORE_HANDLE_REQUEST(WaitReady, TEvIndexTablet) @@ -553,10 +605,13 @@ STFUNC(TIndexTabletActor::StateWork) switch (ev->GetTypeRewrite()) { HFunc(TEvIndexTabletPrivate::TEvReadDataCompleted, HandleReadDataCompleted); HFunc(TEvIndexTabletPrivate::TEvWriteDataCompleted, HandleWriteDataCompleted); + HFunc(TEvIndexTabletPrivate::TEvAddDataCompleted, HandleAddDataCompleted); HFunc(TEvIndexTabletPrivate::TEvUpdateCounters, HandleUpdateCounters); HFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters, HandleUpdateLeakyBucketCounters); + HFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier, HandleReleaseCollectBarrier); + HFunc(TEvents::TEvWakeup, HandleWakeup); HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); @@ -594,8 +649,11 @@ STFUNC(TIndexTabletActor::StateZombie) IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateCounters); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters); + IgnoreFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier); + IgnoreFunc(TEvIndexTabletPrivate::TEvReadDataCompleted); IgnoreFunc(TEvIndexTabletPrivate::TEvWriteDataCompleted); + IgnoreFunc(TEvIndexTabletPrivate::TEvAddDataCompleted); // tablet related requests IgnoreFunc(TEvents::TEvPoisonPill); @@ -619,6 +677,7 @@ STFUNC(TIndexTabletActor::StateBroken) switch (ev->GetTypeRewrite()) { IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateCounters); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters); + IgnoreFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier); HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); HFunc(TEvTablet::TEvTabletDead, HandleTabletDead); @@ -631,6 +690,7 @@ STFUNC(TIndexTabletActor::StateBroken) IgnoreFunc(TEvIndexTabletPrivate::TEvReadDataCompleted); IgnoreFunc(TEvIndexTabletPrivate::TEvWriteDataCompleted); + IgnoreFunc(TEvIndexTabletPrivate::TEvAddDataCompleted); IgnoreFunc(TEvHiveProxy::TEvReassignTabletResponse); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.h b/cloud/filestore/libs/storage/tablet/tablet_actor.h index 7d2eb492865..588f94d4a7e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.h +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.h @@ -353,6 +353,12 @@ class TIndexTabletActor final const typename TMethod::TRequest::TPtr& ev, const NActors::TActorContext& ctx); + template + NProto::TError ValidateWriteRequest( + const NActors::TActorContext& ctx, + const TRequest& request, + const TByteRange& range); + NProto::TError IsDataOperationAllowed() const; void HandleWakeup( @@ -399,6 +405,10 @@ class TIndexTabletActor final const TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters::TPtr& ev, const NActors::TActorContext& ctx); + void HandleReleaseCollectBarrier( + const TEvIndexTabletPrivate::TEvReleaseCollectBarrier::TPtr& ev, + const NActors::TActorContext& ctx); + void HandleReadDataCompleted( const TEvIndexTabletPrivate::TEvReadDataCompleted::TPtr& ev, const NActors::TActorContext& ctx); @@ -407,6 +417,10 @@ class TIndexTabletActor final const TEvIndexTabletPrivate::TEvWriteDataCompleted::TPtr& ev, const NActors::TActorContext& ctx); + void HandleAddDataCompleted( + const TEvIndexTabletPrivate::TEvAddDataCompleted::TPtr& ev, + const NActors::TActorContext& ctx); + bool HandleRequests(STFUNC_SIG); bool RejectRequests(STFUNC_SIG); bool RejectRequestsByBrokenTablet(STFUNC_SIG); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp new file mode 100644 index 00000000000..f3ee083026c --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp @@ -0,0 +1,392 @@ +#include "tablet_actor.h" + +#include +#include + +#include + +#include + +#include +#include +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; +using namespace NKikimr::NTabletFlatExecutor; + + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +/** + * @param range Aligned byte range + * @returns The vector of sizes of the blobs that the data should be split into. + */ +TVector SplitData(ui32 blockSize, TByteRange range) +{ + TVector blobSizes; + blobSizes.reserve(range.AlignedBlockCount() / BlockGroupSize + 1); + + SplitRange( + range.FirstAlignedBlock(), + range.AlignedBlockCount(), + BlockGroupSize, + [&](ui32 /*blockOffset*/, ui64 blocksCount) + { blobSizes.push_back(blocksCount * blockSize); }); + + return blobSizes; +} + +} // namespace + +class TWriteDataActor; + +//////////////////////////////////////////////////////////////////////////////// + +bool TIndexTabletActor::PrepareTx_AddData( + const TActorContext& ctx, + TTransactionContext& tx, + TTxIndexTablet::TAddData& args) +{ + auto* session = + FindSession(args.ClientId, args.SessionId, args.SessionSeqNo); + if (!session) { + args.Error = ErrorInvalidSession( + args.ClientId, + args.SessionId, + args.SessionSeqNo); + return true; + } + + auto* handle = FindHandle(args.Handle); + if (!handle || handle->Session != session) { + args.Error = ErrorInvalidHandle(args.Handle); + return true; + } + + if (!HasFlag(handle->GetFlags(), NProto::TCreateHandleRequest::E_WRITE)) { + args.Error = ErrorInvalidHandle(args.Handle); + return true; + } + + args.NodeId = handle->GetNodeId(); + ui64 commitId = GetCurrentCommitId(); + + LOG_TRACE( + ctx, + TFileStoreComponents::TABLET, + "%s AddData tx %lu @%lu %s", + LogTag.c_str(), + args.CommitId, + args.NodeId, + args.ByteRange.Describe().c_str()); + + TIndexTabletDatabase db(tx.DB); + + if (!ReadNode(db, args.NodeId, commitId, args.Node)) { + return false; + } + + // TODO: access check + TABLET_VERIFY(args.Node); + if (!HasSpaceLeft(args.Node->Attrs, args.ByteRange.End())) { + args.Error = ErrorNoSpaceLeft(); + return true; + } + + return true; +} + +void TIndexTabletActor::ExecuteTx_AddData( + const TActorContext& ctx, + TTransactionContext& tx, + TTxIndexTablet::TAddData& args) +{ + Y_UNUSED(ctx, tx, args); +} + +void TIndexTabletActor::CompleteTx_AddData( + const TActorContext& ctx, + TTxIndexTablet::TAddData& args) +{ + RemoveTransaction(*args.RequestInfo); + + auto reply = [&](const TActorContext& ctx, TTxIndexTablet::TAddData& args) + { + TABLET_VERIFY(TryReleaseCollectBarrier(args.CommitId)); + TryReleaseCollectBarrier(args.CommitId); + + auto response = + std::make_unique(args.Error); + CompleteResponse( + response->Record, + args.RequestInfo->CallContext, + ctx); + + NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); + }; + + if (HasError(args.Error)) { + reply(ctx, args); + return; + } + + TVector blobs; + SplitRange( + args.ByteRange.FirstAlignedBlock(), + args.ByteRange.AlignedBlockCount(), + BlockGroupSize, + [&](ui32 blockOffset, ui32 blocksCount) + { + TBlock block{ + args.NodeId, + IntegerCast( + args.ByteRange.FirstAlignedBlock() + blockOffset), + // correct CommitId will be assigned later in AddBlobs + InvalidCommitId, + InvalidCommitId}; + blobs.emplace_back( + TPartialBlobId(), // need to generate BlobId later + block, + blocksCount, + /* data buffer */ ""); + }); + + if (blobs.empty() || blobs.size() != args.BlobIds.size()) { + args.Error = MakeError( + E_ARGUMENT, + TStringBuilder() << "blobs count mismatch: expected" << blobs.size() + << " got " << args.BlobIds.size()); + reply(ctx, args); + return; + } + + for (size_t i = 0; i < blobs.size(); ++i) { + auto& targetBlob = blobs[i]; + auto& srcBlob = args.BlobIds[i]; + targetBlob.BlobId = TPartialBlobId( + srcBlob.Generation(), + srcBlob.Step(), + srcBlob.Channel(), + srcBlob.BlobSize(), + srcBlob.Cookie(), + srcBlob.PartId()); + } + auto actor = std::make_unique( + TraceSerializer, + LogTag, + ctx.SelfID, + args.RequestInfo, + args.CommitId, + std::move(blobs), + TWriteRange{args.NodeId, args.ByteRange.End()}); + + auto actorId = NCloud::Register(ctx, std::move(actor)); + WorkerActors.insert(actorId); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleGenerateBlobIds( + const TEvIndexTablet::TEvGenerateBlobIdsRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + LOG_DEBUG( + ctx, + TFileStoreComponents::TABLET, + "%s %s: %s", + LogTag.c_str(), + "GenerateBlobIds", + DumpMessage(msg->Record).c_str()); + + // It is up to the client to provide an aligned range, but we still verify + // it and reject the request if it is not aligned. + const ui32 blockSize = GetBlockSize(); + if (msg->Record.GetLength() % blockSize != 0 || + msg->Record.GetOffset() % blockSize != 0) + { + auto response = + std::make_unique( + MakeError(E_ARGUMENT, "unaligned range")); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + ui64 commitId = GenerateCommitId(); + if (commitId == InvalidCommitId) { + return RebootTabletOnCommitOverflow(ctx, "GenerateBlobIds"); + } + + // We schedule this event for the case if the client does not call AddData. + // Thus we ensure that the collect barrier will be released eventually. + ctx.Schedule( + Config->GetGenerateBlobIdsReleaseCollectBarrierTimeout(), + new TEvIndexTabletPrivate::TEvReleaseCollectBarrier(commitId, 1)); + AcquireCollectBarrier(commitId); + + TByteRange range( + msg->Record.GetOffset(), + msg->Record.GetLength(), + blockSize); + + auto response = + std::make_unique(); + ui64 offset = range.Offset; + for (auto [blobIndex, length]: Enumerate(SplitData(blockSize, range))) { + TPartialBlobId partialBlobId; + // TODO(debnatkh): better selection of channel + + const auto ok = + GenerateBlobId(commitId, length, blobIndex, &partialBlobId); + if (!ok) { + ReassignDataChannelsIfNeeded(ctx); + + auto response = + std::make_unique( + MakeError(E_FS_OUT_OF_SPACE, "failed to generate blobId")); + + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + auto generatedBlob = response->Record.MutableBlobs()->Add(); + LogoBlobIDFromLogoBlobID( + MakeBlobId(TabletID(), partialBlobId), + generatedBlob->MutableBlobId()); + generatedBlob->SetOffset(offset); + generatedBlob->SetBSGroupId(Info()->GroupFor( + partialBlobId.Channel(), + partialBlobId.Generation())); + offset += length; + } + + // TODO(debnatkh): Throttling + + response->Record.SetCommitId(commitId); + + NCloud::Reply(ctx, *ev, std::move(response)); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleAddData( + const TEvIndexTablet::TEvAddDataRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + if (auto error = IsDataOperationAllowed(); HasError(error)) { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + std::move(error))); + return; + } + + auto commitId = msg->Record.GetCommitId(); + if (!IsCollectBarrierAcquired(commitId)) { + // The client has sent the AddData request too late, after + // the lease has expired. + auto response = std::make_unique( + MakeError(E_REJECTED, "collect barrier expired")); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + // We acquire the collect barrier for the second time in order to prolong + // the already acquired lease + AcquireCollectBarrier(commitId); + bool txStarted = false; + Y_DEFER + { + // Until the tx is started, it is this method's responsibility to + // release the collect barrier + if (!txStarted) { + TABLET_VERIFY(TryReleaseCollectBarrier(commitId)); + // The second one is used to release the barrier, acquired in + // GenerateBlobIds method. Though it will be eventually released + // upon lease expiration, it is better to release it as soon as + // possible. + TryReleaseCollectBarrier(commitId); + } + }; + + const TByteRange range( + msg->Record.GetOffset(), + msg->Record.GetLength(), + GetBlockSize()); + + auto validator = [&](const NProtoPrivate::TAddDataRequest& request) + { + return ValidateWriteRequest(ctx, request, range); + }; + + if (!AcceptRequest(ev, ctx, validator)) { + return; + } + + auto requestInfo = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); + requestInfo->StartedTs = ctx.Now(); + + TVector blobIds; + for (const auto& blobId: msg->Record.GetBlobIds()) { + blobIds.push_back(LogoBlobIDFromLogoBlobID(blobId)); + } + + if (blobIds.empty()) { + auto response = + std::make_unique(MakeError( + E_ARGUMENT, + "empty list of blobs given in AddData request")); + NCloud::Reply(ctx, *ev, std::move(response)); + } + + LOG_DEBUG( + ctx, + TFileStoreComponents::TABLET, + "%s %s: blobId: %s,... (total: %lu)", + LogTag.c_str(), + "AddData", + blobIds[0].ToString().c_str(), + blobIds.size()); + + AddTransaction(*requestInfo); + + ExecuteTx( + ctx, + std::move(requestInfo), + msg->Record, + range, + std::move(blobIds), + msg->Record.GetCommitId()); + txStarted = true; +} + +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleAddDataCompleted( + const TEvIndexTabletPrivate::TEvAddDataCompleted::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + // We try to release commit barrier twice: once for the lock + // acquired by the GenerateBlob request and once for the lock + // acquired by the AddData request. Though, the first lock is + // scheduled to be released, it is better to release it as early + // as possible. + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId)); + TryReleaseCollectBarrier(msg->CommitId); + + WorkerActors.erase(ev->Sender); + EnqueueBlobIndexOpIfNeeded(ctx); +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp index 30dad1a6304..da9d7983211 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_cleanup.cpp @@ -122,7 +122,7 @@ void TIndexTabletActor::CompleteTx_Cleanup( BlobIndexOpState.Complete(); ReleaseMixedBlocks(args.RangeId); - ReleaseCollectBarrier(args.CollectBarrier); + TABLET_VERIFY(TryReleaseCollectBarrier(args.CollectBarrier)); FILESTORE_TRACK( ResponseSent_Tablet, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp index 318123bb3eb..fa50f60118a 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp @@ -473,4 +473,19 @@ void TIndexTabletActor::HandleCollectGarbageCompleted( EnqueueCollectGarbageIfNeeded(ctx); } +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleReleaseCollectBarrier( + const TEvIndexTabletPrivate::TEvReleaseCollectBarrier::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ctx); + auto commitId = ev->Get()->CommitId; + for (ui32 i = 0; i < ev->Get()->Count; ++i) { + // We do not check if the barrier was acquired, because the barrier may + // have already been released by a completed three-stage write operation + TryReleaseCollectBarrier(commitId); + } +} + } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp index 8a78eb21bfa..8c0052e1822 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp @@ -664,7 +664,7 @@ void TIndexTabletActor::HandleCompactionCompleted( FormatError(msg->GetError()).c_str()); ReleaseMixedBlocks(msg->MixedBlocksRanges); - ReleaseCollectBarrier(msg->CommitId); + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId)); BlobIndexOpState.Complete(); EnqueueBlobIndexOpIfNeeded(ctx); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp index 1196ce6a58f..cc5cf820bad 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp @@ -15,10 +15,13 @@ void FillFeatures(const TStorageConfig& config, NProto::TFileStore& fileStore) { auto* features = fileStore.MutableFeatures(); features->SetTwoStageReadEnabled(config.GetTwoStageReadEnabled()); + features->SetThreeStageWriteEnabled(config.GetThreeStageWriteEnabled()); features->SetEntryTimeout(config.GetEntryTimeout().MilliSeconds()); features->SetNegativeEntryTimeout( config.GetNegativeEntryTimeout().MilliSeconds()); features->SetAttrTimeout(config.GetAttrTimeout().MilliSeconds()); + features->SetThreeStageWriteEnabled(config.GetThreeStageWriteEnabled()); + features->SetThreeStageWriteThreshold(config.GetThreeStageWriteThreshold()); } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp index a7f6fe76194..4cfff2cbc7e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp @@ -255,7 +255,7 @@ void TIndexTabletActor::CompleteTx_DeleteCheckpoint( FormatError(args.Error).c_str()); ReleaseMixedBlocks(args.MixedBlocksRanges); - ReleaseCollectBarrier(args.CollectBarrier); + TABLET_VERIFY(TryReleaseCollectBarrier(args.CollectBarrier)); auto response = std::make_unique(args.Error); NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp index 7013600aad0..961945d338a 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp @@ -377,7 +377,7 @@ void TIndexTabletActor::HandleFlushCompleted( LogTag.c_str(), FormatError(msg->GetError()).c_str()); - ReleaseCollectBarrier(msg->CommitId); + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId)); FlushState.Complete(); WorkerActors.erase(ev->Sender); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp index 7996fbe7281..3ff7470b93a 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp @@ -829,7 +829,7 @@ void TIndexTabletActor::HandleFlushBytesCompleted( FormatError(msg->GetError()).c_str()); ReleaseMixedBlocks(msg->MixedBlocksRanges); - ReleaseCollectBarrier(msg->CollectCommitId); + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CollectCommitId)); WorkerActors.erase(ev->Sender); auto requestInfo = CreateRequestInfo( diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp index 4ce4419bccc..6af7c77cb52 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp @@ -565,7 +565,7 @@ void TIndexTabletActor::HandleReadDataCompleted( const auto* msg = ev->Get(); ReleaseMixedBlocks(msg->MixedBlocksRanges); - ReleaseCollectBarrier(msg->CommitId); + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId)); WorkerActors.erase(ev->Sender); Metrics.ReadData.Count.fetch_add(msg->Count, std::memory_order_relaxed); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp index 5ffe365cc3b..f693317d350 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp @@ -120,6 +120,8 @@ template void TIndexTabletActor::CompleteResponse( FILESTORE_SERVICE(FILESTORE_GENERATE_IMPL, TEvService) FILESTORE_GENERATE_IMPL(DescribeData, TEvIndexTablet) FILESTORE_GENERATE_IMPL(DescribeSessions, TEvIndexTablet) +FILESTORE_GENERATE_IMPL(GenerateBlobIds, TEvIndexTablet) +FILESTORE_GENERATE_IMPL(AddData, TEvIndexTablet) #undef FILESTORE_GENERATE_IMPL diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp index 9898907a10f..e4107830e4e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp @@ -286,7 +286,7 @@ void TIndexTabletActor::HandleWriteBatchCompleted( FormatError(msg->GetError()).c_str()); } - ReleaseCollectBarrier(msg->CommitId); + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId)); WorkerActors.erase(ev->Sender); EnqueueBlobIndexOpIfNeeded(ctx); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp index 624afefbaa4..df669f0d8d5 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp @@ -1,14 +1,11 @@ #include "tablet_actor.h" -#include "helpers.h" - #include #include +#include #include #include -#include - #include #include #include @@ -20,210 +17,6 @@ using namespace NActors; using namespace NKikimr; using namespace NKikimr::NTabletFlatExecutor; -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -class TWriteDataActor final - : public TActorBootstrapped -{ -private: - const ITraceSerializerPtr TraceSerializer; - - const TString LogTag; - const TActorId Tablet; - const TRequestInfoPtr RequestInfo; - - const ui64 CommitId; - /*const*/ TVector Blobs; - const TWriteRange WriteRange; - ui32 BlobsSize = 0; - -public: - TWriteDataActor( - ITraceSerializerPtr traceSerializer, - TString logTag, - TActorId tablet, - TRequestInfoPtr requestInfo, - ui64 commitId, - TVector blobs, - TWriteRange writeRange); - - void Bootstrap(const TActorContext& ctx); - -private: - STFUNC(StateWork); - - void WriteBlob(const TActorContext& ctx); - void HandleWriteBlobResponse( - const TEvIndexTabletPrivate::TEvWriteBlobResponse::TPtr& ev, - const TActorContext& ctx); - - void AddBlob(const TActorContext& ctx); - void HandleAddBlobResponse( - const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, - const TActorContext& ctx); - - void HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx); - - void ReplyAndDie( - const TActorContext& ctx, - const NProto::TError& error = {}); -}; - -//////////////////////////////////////////////////////////////////////////////// - -TWriteDataActor::TWriteDataActor( - ITraceSerializerPtr traceSerializer, - TString logTag, - TActorId tablet, - TRequestInfoPtr requestInfo, - ui64 commitId, - TVector blobs, - TWriteRange writeRange) - : TraceSerializer(std::move(traceSerializer)) - , LogTag(std::move(logTag)) - , Tablet(tablet) - , RequestInfo(std::move(requestInfo)) - , CommitId(commitId) - , Blobs(std::move(blobs)) - , WriteRange(writeRange) -{ - for (const auto& blob: Blobs) { - BlobsSize += blob.BlobContent.Size(); - } -} - -void TWriteDataActor::Bootstrap(const TActorContext& ctx) -{ - FILESTORE_TRACK( - RequestReceived_TabletWorker, - RequestInfo->CallContext, - "WriteData"); - - WriteBlob(ctx); - Become(&TThis::StateWork); -} - -void TWriteDataActor::WriteBlob(const TActorContext& ctx) -{ - auto request = std::make_unique( - RequestInfo->CallContext - ); - - for (auto& blob: Blobs) { - request->Blobs.emplace_back(blob.BlobId, std::move(blob.BlobContent)); - } - - NCloud::Send(ctx, Tablet, std::move(request)); -} - -void TWriteDataActor::HandleWriteBlobResponse( - const TEvIndexTabletPrivate::TEvWriteBlobResponse::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - if (FAILED(msg->GetStatus())) { - ReplyAndDie(ctx, msg->GetError()); - return; - } - - AddBlob(ctx); -} - -void TWriteDataActor::AddBlob(const TActorContext& ctx) -{ - auto request = std::make_unique( - RequestInfo->CallContext - ); - request->Mode = EAddBlobMode::Write; - request->WriteRanges.push_back(WriteRange); - - for (const auto& blob: Blobs) { - request->MergedBlobs.emplace_back( - blob.BlobId, - blob.Block, - blob.BlocksCount); - } - - NCloud::Send(ctx, Tablet, std::move(request)); -} - -void TWriteDataActor::HandleAddBlobResponse( - const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - ReplyAndDie(ctx, msg->GetError()); -} - -void TWriteDataActor::HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - ReplyAndDie(ctx, MakeError(E_REJECTED, "request cancelled")); -} - -void TWriteDataActor::ReplyAndDie( - const TActorContext& ctx, - const NProto::TError& error) -{ - { - // notify tablet - using TCompletion = TEvIndexTabletPrivate::TEvWriteDataCompleted; - auto response = std::make_unique(error); - response->CommitId = CommitId; - response->Count = 1; - response->Size = BlobsSize; - response->Time = ctx.Now() - RequestInfo->StartedTs; - NCloud::Send(ctx, Tablet, std::move(response)); - } - - FILESTORE_TRACK( - ResponseSent_TabletWorker, - RequestInfo->CallContext, - "WriteData"); - - if (RequestInfo->Sender != Tablet) { - auto response = std::make_unique(error); - LOG_DEBUG(ctx, TFileStoreComponents::TABLET_WORKER, - "%s WriteData: #%lu completed (%s)", - LogTag.c_str(), - RequestInfo->CallContext->RequestId, - FormatError(response->Record.GetError()).c_str()); - - BuildTraceInfo( - TraceSerializer, - RequestInfo->CallContext, - response->Record); - BuildThrottlerInfo(*RequestInfo->CallContext, response->Record); - - NCloud::Reply(ctx, *RequestInfo, std::move(response)); - } - - Die(ctx); -} - -STFUNC(TWriteDataActor::StateWork) -{ - switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); - - HFunc(TEvIndexTabletPrivate::TEvWriteBlobResponse, HandleWriteBlobResponse); - HFunc(TEvIndexTabletPrivate::TEvAddBlobResponse, HandleAddBlobResponse); - - default: - HandleUnexpectedEvent(ev, TFileStoreComponents::TABLET_WORKER); - break; - } -} - -} // namespace - //////////////////////////////////////////////////////////////////////////////// void TIndexTabletActor::HandleWriteData( @@ -303,33 +96,9 @@ void TIndexTabletActor::HandleWriteData( } } - auto validator = [&] (const NProto::TWriteDataRequest& request) { - if (auto error = ValidateRange(range); HasError(error)) { - return error; - } - - auto* handle = FindHandle(request.GetHandle()); - if (!handle || handle->GetSessionId() != GetSessionId(request)) { - return ErrorInvalidHandle(request.GetHandle()); - } - - if (!IsWriteAllowed(BuildBackpressureThresholds())) { - if (CompactionStateLoadStatus.Finished - && ++BackpressureErrorCount >= - Config->GetMaxBackpressureErrorsBeforeSuicide()) - { - LOG_WARN(ctx, TFileStoreComponents::TABLET_WORKER, - "%s Suiciding after %u backpressure errors", - LogTag.c_str(), - BackpressureErrorCount); - - Suicide(ctx); - } - - return MakeError(E_REJECTED, "rejected due to backpressure"); - } - - return NProto::TError{}; + auto validator = [&](const NProto::TWriteDataRequest& request) + { + return ValidateWriteRequest(ctx, request, range); }; if (!AcceptRequest(ev, ctx, validator)) { @@ -380,7 +149,7 @@ void TIndexTabletActor::HandleWriteDataCompleted( { const auto* msg = ev->Get(); - ReleaseCollectBarrier(msg->CommitId); + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId)); WorkerActors.erase(ev->Sender); EnqueueBlobIndexOpIfNeeded(ctx); diff --git a/cloud/filestore/libs/storage/tablet/tablet_counters.h b/cloud/filestore/libs/storage/tablet/tablet_counters.h index 64105d4ffd4..134da5514d0 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_counters.h +++ b/cloud/filestore/libs/storage/tablet/tablet_counters.h @@ -31,6 +31,7 @@ namespace NCloud::NFileStore::NStorage { xxx(CheckpointBlocksCount, __VA_ARGS__) \ xxx(CheckpointBlobsCount, __VA_ARGS__) \ xxx(FreshBytesCount, __VA_ARGS__) \ + xxx(LastCollectCommitId, __VA_ARGS__) \ // FILESTORE_TABLET_STATS #define FILESTORE_TABLET_SIMPLE_COUNTERS(xxx) \ diff --git a/cloud/filestore/libs/storage/tablet/tablet_private.h b/cloud/filestore/libs/storage/tablet/tablet_private.h index 5f7fd530374..6118dfd6c87 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_private.h +++ b/cloud/filestore/libs/storage/tablet/tablet_private.h @@ -252,6 +252,15 @@ struct TEvIndexTabletPrivate { }; + // + // AddData completion + // + + struct TAddDataCompleted + { + ui64 CommitId = 0; + }; + // // AddBlob // @@ -580,6 +589,23 @@ struct TEvIndexTabletPrivate TSet Nodes; }; + // + // Release collect barrier + // + + struct TReleaseCollectBarrier + { + // Commit id to release + ui64 CommitId; + // Number of times to perform the release + ui32 Count; + + TReleaseCollectBarrier(ui64 commitId, ui32 count) + : CommitId(commitId) + , Count(count) + {} + }; + // // Events declaration // @@ -596,6 +622,9 @@ struct TEvIndexTabletPrivate EvReadDataCompleted, EvWriteDataCompleted, + EvAddDataCompleted, + + EvReleaseCollectBarrier, EvEnd }; @@ -609,8 +638,12 @@ struct TEvIndexTabletPrivate using TEvUpdateCounters = TRequestEvent; using TEvUpdateLeakyBucketCounters = TRequestEvent; + using TEvReleaseCollectBarrier = + TRequestEvent; + using TEvReadDataCompleted = TResponseEvent; using TEvWriteDataCompleted = TResponseEvent; + using TEvAddDataCompleted = TResponseEvent; }; } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/tablet_state.h b/cloud/filestore/libs/storage/tablet/tablet_state.h index 7fc6d16a603..ba92e3649ce 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_state.h +++ b/cloud/filestore/libs/storage/tablet/tablet_state.h @@ -765,7 +765,8 @@ FILESTORE_DUPCACHE_REQUESTS(FILESTORE_DECLARE_DUPCACHE) } void AcquireCollectBarrier(ui64 commitId); - void ReleaseCollectBarrier(ui64 commitId); + bool TryReleaseCollectBarrier(ui64 commitId); + bool IsCollectBarrierAcquired(ui64 commitId) const; ui64 GetCollectCommitId() const; diff --git a/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp b/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp index 04066188b2b..8525d48788f 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp @@ -921,9 +921,15 @@ void TIndexTabletState::AcquireCollectBarrier(ui64 commitId) Impl->GarbageQueue.AcquireCollectBarrier(commitId); } -void TIndexTabletState::ReleaseCollectBarrier(ui64 commitId) +// returns true if the barrier was present +bool TIndexTabletState::TryReleaseCollectBarrier(ui64 commitId) { - Impl->GarbageQueue.ReleaseCollectBarrier(commitId); + return Impl->GarbageQueue.TryReleaseCollectBarrier(commitId); +} + +bool TIndexTabletState::IsCollectBarrierAcquired(ui64 commitId) const +{ + return Impl->GarbageQueue.IsCollectBarrierAcquired(commitId); } ui64 TIndexTabletState::GetCollectCommitId() const diff --git a/cloud/filestore/libs/storage/tablet/tablet_tx.h b/cloud/filestore/libs/storage/tablet/tablet_tx.h index af5e3e47196..4d51f412f38 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_tx.h +++ b/cloud/filestore/libs/storage/tablet/tablet_tx.h @@ -97,6 +97,7 @@ namespace NCloud::NFileStore::NStorage { \ xxx(ReadData, __VA_ARGS__) \ xxx(WriteData, __VA_ARGS__) \ + xxx(AddData, __VA_ARGS__) \ xxx(WriteBatch, __VA_ARGS__) \ xxx(AllocateData, __VA_ARGS__) \ \ @@ -1212,6 +1213,42 @@ struct TTxIndexTablet } }; + // + // AddData + // + + struct TAddData : TSessionAware + { + const TRequestInfoPtr RequestInfo; + const ui64 Handle; + const TByteRange ByteRange; + TVector BlobIds; + ui64 CommitId; + + ui64 NodeId = InvalidNodeId; + TMaybe Node; + + TAddData( + TRequestInfoPtr requestInfo, + const NProtoPrivate::TAddDataRequest& request, + TByteRange byteRange, + TVector blobIds, + ui64 commitId) + : TSessionAware(request) + , RequestInfo(std::move(requestInfo)) + , Handle(request.GetHandle()) + , ByteRange(byteRange) + , BlobIds(std::move(blobIds)) + , CommitId(commitId) + {} + + void Clear() + { + NodeId = InvalidNodeId; + Node.Clear(); + } + }; + // // WriteBatch // diff --git a/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp b/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp index 24112f169e6..1a8aa5a4d4e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp @@ -7,6 +7,7 @@ #include +#include #include #include @@ -4309,6 +4310,426 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Data) response->GetErrorReason()); } +#define CHECK_GENERATED_BLOB(offset, length, expected) \ + { \ + ui64 currentOffset = offset; \ + TVector expectedSizes = expected; \ + auto blobs = tablet.GenerateBlobIds(node, handle, offset, length); \ + auto commitId = blobs->Record.GetCommitId(); \ + const auto [generation, step] = ParseCommitId(commitId); \ + UNIT_ASSERT_VALUES_EQUAL( \ + blobs->Record.BlobsSize(), \ + expectedSizes.size()); \ + for (size_t i = 0; i < expectedSizes.size(); ++i) { \ + auto generatedBlob = blobs->Record.GetBlobs(i); \ + auto blob = LogoBlobIDFromLogoBlobID(generatedBlob.GetBlobId()); \ + UNIT_ASSERT_VALUES_EQUAL(blob.BlobSize(), expectedSizes[i]); \ + UNIT_ASSERT_VALUES_EQUAL(blob.Generation(), generation); \ + UNIT_ASSERT_VALUES_EQUAL(blob.Step(), step); \ + UNIT_ASSERT_VALUES_EQUAL( \ + generatedBlob.GetOffset(), \ + currentOffset); \ + currentOffset += expectedSizes[i]; \ + } \ + } + + TABLET_TEST(ShouldGenerateBlobIds) + { + auto block = tabletConfig.BlockSize; + + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + ui64 handle = CreateHandle(tablet, node); + + CHECK_GENERATED_BLOB(0, block, TVector{block}); + CHECK_GENERATED_BLOB(block, block, TVector{block}); + CHECK_GENERATED_BLOB(3 * block, 2 * block, TVector{2 * block}); + CHECK_GENERATED_BLOB( + BlockGroupSize * block / 2, + block * BlockGroupSize, + (TVector{ + BlockGroupSize * block / 2, + BlockGroupSize * block / 2})); + CHECK_GENERATED_BLOB( + 3 * block * BlockGroupSize, + 3 * block * BlockGroupSize, + (TVector{ + block * BlockGroupSize, + block * BlockGroupSize, + block * BlockGroupSize})); + CHECK_GENERATED_BLOB( + block, + 2 * block * BlockGroupSize, + (TVector{ + block * (BlockGroupSize - 1), + block * BlockGroupSize, + block})); + } + +#undef CHECK_GENERATED_BLOB + + TABLET_TEST(ShouldAcquireLockForCollectGarbageOnGenerateBlobIds) + { + auto block = tabletConfig.BlockSize; + + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto moveBarrier = [&tablet, block] + { + auto node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + ui64 handle = CreateHandle(tablet, node); + tablet.WriteData(handle, 0, block, 'a'); + tablet.Flush(); + tablet.DestroyHandle(handle); + tablet.UnlinkNode(RootNodeId, "test", false); + tablet.CollectGarbage(); + }; + moveBarrier(); + + ui64 lastCollectGarbage = 0; + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + lastCollectGarbage = stats.GetLastCollectCommitId(); + } + UNIT_ASSERT_GT(lastCollectGarbage, 0); + + auto node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test2")); + ui64 handle = CreateHandle(tablet, node); + + auto blobs = tablet.GenerateBlobIds(node, handle, 0, block); + auto commitId = blobs->Record.GetCommitId(); + + moveBarrier(); + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_LE(stats.GetLastCollectCommitId(), commitId); + } + + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(15)); + + moveBarrier(); + // After the GenerateBlobIdsReleaseCollectBarrierTimeout has passed, we + // can observe that the last collect garbage has moved beyond the commit + // id of the generated blob. + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_GT(stats.GetLastCollectCommitId(), commitId); + } + + // Now we validate that the barrier is released even if the TX fails + TVector blobIds; + + NProto::TError error; + error.SetCode(E_REJECTED); + + auto filter = [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvPutResult: { + using TResponse = TEvBlobStorage::TEvPutResult; + auto* msg = event->template Get(); + if (msg->Id.Channel() >= TIndexTabletSchema::DataChannel) { + blobIds.push_back(msg->Id); + } + return false; + } + case TEvIndexTabletPrivate::EvWriteBlobResponse: { + using TResponse = + TEvIndexTabletPrivate::TEvWriteBlobResponse; + auto* msg = event->template Get(); + auto& e = const_cast(msg->Error); + e.SetCode(E_REJECTED); + return false; + } + } + + return false; + }; + + env.GetRuntime().SetEventFilter(filter); + + auto generateResult = + tablet.GenerateBlobIds(node, handle, 0, block * BlockGroupSize); + commitId = generateResult->Record.GetCommitId(); + + // intercepted blob was successfully written to BlobStorage, yet the + // following operation is expected to fail + tablet.AssertWriteDataFailed(handle, 0, block * BlockGroupSize, 'x'); + + env.GetRuntime().SetEventFilter( + TTestActorRuntimeBase::DefaultFilterFunc); + + // because we use handle + 1 instead of handle, it is expected that the + // handler will fail will fail + tablet.AssertAddDataFailed( + node, + handle + 1, + 0, + block * BlockGroupSize, + blobIds, + commitId); + + moveBarrier(); + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + // We expect that upon failre the barrier was released, thus moving + // the last collect garbage beyond the commit id of the issued blob + UNIT_ASSERT_GT(stats.GetLastCollectCommitId(), commitId); + } + + node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test3")); + handle = CreateHandle(tablet, node, "", TCreateHandleArgs::RDNLY); + + + // now we do the same thing, but expect HandleAddData to execute tx, yet + // the tx to fail + generateResult = + tablet.GenerateBlobIds(node, handle, 0, block * BlockGroupSize); + commitId = generateResult->Record.GetCommitId(); + + tablet.AssertAddDataFailed( + node, + handle, + 0, + block * BlockGroupSize, + blobIds, + commitId); + + moveBarrier(); + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + // We expect that upon failre the barrier was released, thus moving + // the last collect garbage beyond the commit id of the issued blob + UNIT_ASSERT_GT(stats.GetLastCollectCommitId(), commitId); + } + } + + TABLET_TEST(ShouldAddData) + { + const auto block = tabletConfig.BlockSize; + + NProto::TStorageConfig storageConfig; + storageConfig.SetCompactionThreshold(999'999); + storageConfig.SetCleanupThreshold(999'999); + storageConfig.SetWriteBlobThreshold(block); + + TTestEnv env({}, std::move(storageConfig)); + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto id = CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + auto handle = CreateHandle(tablet, id); + + TVector blobIds; + bool shouldDropPutResult = true; + + // We can't make direct writes to BlobStorage, so we store the blob ids + // from an ordinary write and then use them in AddData + env.GetRuntime().SetObserverFunc( + [&](TTestActorRuntimeBase& runtime, TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvPutResult: { + // We intercept all PutResult events in order for tablet + // to consider them as written. Nevertheless, these + // blobs are already written and we will use them in + // AddData + auto* msg = event->Get(); + if (msg->Id.Channel() >= + TIndexTabletSchema::DataChannel) { + blobIds.push_back(msg->Id); + } + if (shouldDropPutResult) { + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + + return TTestActorRuntime::DefaultObserverFunc(runtime, event); + }); + + TString data(block * BlockGroupSize * 2, '\0'); + for (size_t i = 0; i < data.size(); ++i) { + // 77 and 256 are coprimes + data[i] = static_cast(i % 77); + } + + tablet.SendWriteDataRequest(handle, 0, data.size(), data.data()); + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(blobIds.size(), 2); + shouldDropPutResult = false; + + // We acquire commitId just so there is something to release on + // completion + auto commitId = tablet.GenerateBlobIds(id, handle, 0, block) + ->Record.GetCommitId(); + + auto id2 = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test2")); + auto handle2 = CreateHandle(tablet, id2); + + Sort(blobIds.begin(), blobIds.end()); + + // Now we try to submit the same blobs for another node + auto request = tablet.CreateAddDataRequest( + id2, + handle2, + 0, + data.size(), + blobIds, + commitId); + + tablet.SendRequest(std::move(request)); + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(2)); + + auto readData = tablet.ReadData(handle2, 0, data.size()); + + // After AddData, we should receive AddDataResponse + auto response = tablet.RecvAddDataResponse(); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + + // Use DescribeData to check that proper blobs were added + auto describe = tablet.DescribeData(handle2, 0, data.size()); + UNIT_ASSERT_VALUES_EQUAL(describe->Record.BlobPiecesSize(), 2); + for (auto [i, blobPiece]: Enumerate(describe->Record.GetBlobPieces())) { + UNIT_ASSERT_VALUES_EQUAL(1, blobPiece.RangesSize()); + UNIT_ASSERT_VALUES_EQUAL( + i * (block * BlockGroupSize), + blobPiece.GetRanges(0).GetOffset()); + UNIT_ASSERT_VALUES_EQUAL(0, blobPiece.GetRanges(0).GetBlobOffset()); + UNIT_ASSERT_VALUES_EQUAL( + block * BlockGroupSize, + blobPiece.GetRanges(0).GetLength()); + + auto blobId = LogoBlobIDFromLogoBlobID(blobPiece.GetBlobId()); + UNIT_ASSERT_VALUES_EQUAL(blobId, blobIds[i]); + } + + // validate, that no more BlobStorage requests were made + UNIT_ASSERT_VALUES_EQUAL(blobIds.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(data, readData->Record.GetBuffer()); + } + + TABLET_TEST(ShouldRejectAddDataIfCollectBarrierIsAlreadyReleased) + { + const auto block = tabletConfig.BlockSize; + + NProto::TStorageConfig storageConfig; + storageConfig.SetCompactionThreshold(999'999); + storageConfig.SetCleanupThreshold(999'999); + storageConfig.SetWriteBlobThreshold(block); + + TTestEnv env({}, std::move(storageConfig)); + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto id = CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + auto handle = CreateHandle(tablet, id); + + TVector blobIds; + + env.GetRuntime().SetObserverFunc( + [&](TTestActorRuntimeBase& runtime, TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvPutResult: { + auto* msg = event->Get(); + if (msg->Id.Channel() >= + TIndexTabletSchema::DataChannel) { + blobIds.push_back(msg->Id); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + + return TTestActorRuntime::DefaultObserverFunc(runtime, event); + }); + + TString data(block * BlockGroupSize * 2, 'x'); + + tablet.SendWriteDataRequest(handle, 0, data.size(), data.data()); + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(blobIds.size(), 2); + + auto commitId = tablet.GenerateBlobIds(id, handle, 0, block) + ->Record.GetCommitId(); + + // We wait for the collect barrier lease to expire. We expect that the + // following AddData request will be rejected + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(15)); + + auto id2 = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test2")); + auto handle2 = CreateHandle(tablet, id2); + + Sort(blobIds.begin(), blobIds.end()); + + tablet.SendAddDataRequest( + id2, + handle2, + 0, + data.size(), + blobIds, + commitId); + + auto response = tablet.RecvAddDataResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetStatus()); + } + #undef TABLET_TEST } diff --git a/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp b/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp index 8970bc13476..888f8e17879 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp @@ -824,16 +824,24 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Sessions) { NProto::TStorageConfig config; NProto::TFileStoreFeatures features; + features.SetThreeStageWriteThreshold(64_KB); + DoTestShouldReturnFeaturesInCreateSessionResponse(config, features); config.SetTwoStageReadEnabled(true); + config.SetThreeStageWriteEnabled(true); + config.SetThreeStageWriteThreshold(10_MB); config.SetEntryTimeout(TDuration::Seconds(10).MilliSeconds()); config.SetNegativeEntryTimeout(TDuration::Seconds(1).MilliSeconds()); config.SetAttrTimeout(TDuration::Seconds(20).MilliSeconds()); + features.SetTwoStageReadEnabled(true); features.SetEntryTimeout(TDuration::Seconds(10).MilliSeconds()); features.SetNegativeEntryTimeout(TDuration::Seconds(1).MilliSeconds()); features.SetAttrTimeout(TDuration::Seconds(20).MilliSeconds()); + features.SetThreeStageWriteEnabled(true); + features.SetThreeStageWriteThreshold(10_MB); + DoTestShouldReturnFeaturesInCreateSessionResponse(config, features); } } diff --git a/cloud/filestore/libs/storage/tablet/ya.make b/cloud/filestore/libs/storage/tablet/ya.make index a9bdf2ae413..7f2ba9f3cc7 100644 --- a/cloud/filestore/libs/storage/tablet/ya.make +++ b/cloud/filestore/libs/storage/tablet/ya.make @@ -15,6 +15,7 @@ SRCS( tablet_actor_accessnode.cpp tablet_actor_acquirelock.cpp tablet_actor_addblob.cpp + tablet_actor_adddata.cpp tablet_actor_allocatedata.cpp tablet_actor_change_storage_config.cpp tablet_actor_cleanup.cpp @@ -87,6 +88,7 @@ PEERDIR( cloud/filestore/libs/storage/api cloud/filestore/libs/storage/core cloud/filestore/libs/storage/model + cloud/filestore/libs/storage/tablet/actors cloud/filestore/libs/storage/tablet/model cloud/filestore/libs/storage/tablet/protos diff --git a/cloud/filestore/libs/storage/testlib/tablet_client.h b/cloud/filestore/libs/storage/testlib/tablet_client.h index 1e114d3e0ed..cd9b9ba6652 100644 --- a/cloud/filestore/libs/storage/testlib/tablet_client.h +++ b/cloud/filestore/libs/storage/testlib/tablet_client.h @@ -557,6 +557,44 @@ class TIndexTabletClient return request; } + auto CreateGenerateBlobIdsRequest( + ui64 nodeId, + ui64 handle, + ui64 offset, + ui64 length) + { + auto request = + CreateSessionRequest(); + request->Record.SetNodeId(nodeId); + request->Record.SetHandle(handle); + request->Record.SetOffset(offset); + request->Record.SetLength(length); + return request; + } + + auto CreateAddDataRequest( + ui64 nodeId, + ui64 handle, + ui64 offset, + ui32 length, + const TVector& blobIds, + ui64 commitId) + { + auto request = CreateSessionRequest< + TEvIndexTablet::TEvAddDataRequest>(); + request->Record.SetNodeId(nodeId); + request->Record.SetHandle(handle); + request->Record.SetOffset(offset); + request->Record.SetLength(length); + for (const auto& blobId: blobIds) { + NKikimr::LogoBlobIDFromLogoBlobID( + blobId, + request->Record.MutableBlobIds()->Add()); + } + request->Record.SetCommitId(commitId); + return request; + } + auto CreateAcquireLockRequest( ui64 handle, ui64 owner, diff --git a/cloud/filestore/private/api/protos/tablet.proto b/cloud/filestore/private/api/protos/tablet.proto index 7c5d8a6ee7d..a31a9add8c4 100644 --- a/cloud/filestore/private/api/protos/tablet.proto +++ b/cloud/filestore/private/api/protos/tablet.proto @@ -117,6 +117,7 @@ message TStorageStats uint64 FreshBytesCount = 109; uint64 AllocatedCompactionRanges = 110; uint64 UsedCompactionRanges = 111; + uint64 LastCollectCommitId = 112; // channel stats uint64 TabletChannelCount = 1000; @@ -368,3 +369,88 @@ message TDescribeSessionsResponse // All tablet sessions. repeated TTabletSessionInfo Sessions = 2; } + +//////////////////////////////////////////////////////////////////////////////// +// GenerateBlobIds request/response. + +message TGenerateBlobIdsRequest +{ + // Optional request headers. + NProto.THeaders Headers = 1; + + // FileSystem identifier. + string FileSystemId = 2; + + // Node. + uint64 NodeId = 3; + + // IO handle. + uint64 Handle = 4; + + // Starting offset for write. Expected to be aligned to the block size. + uint64 Offset = 5; + + // Length of data to write. Expected to be aligned to the block size. + uint64 Length = 6; +} + +message TGeneratedBlob +{ + // Blob id. + NKikimrProto.TLogoBlobID BlobId = 1; + + // Offset + uint64 Offset = 2; + + // Group id. + uint32 BSGroupId = 3; +} + +message TGenerateBlobIdsResponse +{ + // Optional error, set only if error happened. + NCloud.NProto.TError Error = 1; + + // Blob ids, in the same order as in the request. + repeated TGeneratedBlob Blobs = 2; + + // AcquireCollectBarrier has been executed for this commit id. + uint64 CommitId = 4; +} + +//////////////////////////////////////////////////////////////////////////////// + +// ThreeStageWrite request/response. + +message TAddDataRequest +{ + // Optional request headers. + NProto.THeaders Headers = 1; + + // FileSystem identifier. + string FileSystemId = 2; + + // Node. + uint64 NodeId = 3; + + // IO handle. + uint64 Handle = 4; + + // Starting offset for write. + uint64 Offset = 5; + + // Data size. + uint64 Length = 6; + + // Blob ids to be added. Ordered by the offset in the original data. + repeated NKikimrProto.TLogoBlobID BlobIds = 7; + + // Commit id. + uint64 CommitId = 8; +} + +message TAddDataResponse +{ + // Optional error, set only if error happened. + NCloud.NProto.TError Error = 1; +} diff --git a/cloud/filestore/public/api/protos/fs.proto b/cloud/filestore/public/api/protos/fs.proto index 2ade37cbc11..0fee40de775 100644 --- a/cloud/filestore/public/api/protos/fs.proto +++ b/cloud/filestore/public/api/protos/fs.proto @@ -17,6 +17,8 @@ message TFileStoreFeatures uint32 EntryTimeout = 2; uint32 NegativeEntryTimeout = 3; uint32 AttrTimeout = 4; + bool ThreeStageWriteEnabled = 5; + uint32 ThreeStageWriteThreshold = 6; } message TFileStore diff --git a/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt b/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt index 14fae8613da..f51f3cd1130 100644 --- a/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt +++ b/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt @@ -1,3 +1,4 @@ TwoStageReadEnabled: true NewCompactionEnabled: true NewCleanupEnabled: true +ThreeStageWriteEnabled: true