diff --git a/.gitignore b/.gitignore index 95705174c6d..d5f4abf8ce7 100644 --- a/.gitignore +++ b/.gitignore @@ -63,3 +63,47 @@ compile_commands.json *.exe *.out *.app +*.debug + +# Generated proto files +*.pb.h +*.pb.cc +*.pb.go +*.pb.h_serialized.cpp +*_pb2.py +*_pb2_grpc.py +*_pb2.pyi + +# Ya make test files +test-results + +# Test binaries and generated dirs +*-ut +**/tests/tests + +# Autogenerated util, library and contrib files +/util/all_*.cpp +/util/charset/all_charset.cpp +/contrib/tools/ragel6/all_*.cpp +/contrib/go/_std_1.21/src/net/_cgo_export.h +/contrib/go/_std_1.21/src/runtime/cgo/_cgo_export.h +/contrib/libs/apache/arrow/cpp/src/generated +/contrib/python/protobuf/py3/google.protobuf.internal._api_implementation.reg3.cpp +/contrib/python/protobuf/py3/google.protobuf.pyext._message.reg3.cpp +/contrib/tools/python3/src/Modules/_sqlite/_sqlite3.reg3.cpp +/library/python/runtime_py3/__res.pyx.cpp +/library/python/runtime_py3/__res.reg3.cpp +/library/python/runtime_py3/sitecustomize.pyx.cpp +/library/python/runtime_py3/sitecustomize.reg3.cpp +/library/python/symbols/module/library.python.symbols.module.syms.reg3.cpp + +# IDE files +*.swp +.vscode/* +*.code-workspace +*.code-workspace.bak + +# act files +.input +.secrets +.vars diff --git a/cloud/blockstore/libs/endpoints/endpoint_events.cpp b/cloud/blockstore/libs/endpoints/endpoint_events.cpp index 6f80a7e4741..7808284d2c0 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_events.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_events.cpp @@ -2,6 +2,8 @@ namespace NCloud::NBlockStore::NServer { +using namespace NThreading; + namespace { //////////////////////////////////////////////////////////////////////////////// @@ -12,17 +14,23 @@ class TEndpointEventProxy: public IEndpointEventProxy IEndpointEventHandlerPtr Handler; public: - void OnVolumeConnectionEstablished(const TString& diskId) override; + TFuture SwitchEndpointIfNeeded( + const TString& diskId, + const TString& reason) override; void Register(IEndpointEventHandlerPtr listener) override; }; //////////////////////////////////////////////////////////////////////////////// -void TEndpointEventProxy::OnVolumeConnectionEstablished(const TString& diskId) +TFuture TEndpointEventProxy::SwitchEndpointIfNeeded( + const TString& diskId, + const TString& reason) { if (Handler) { - Handler->OnVolumeConnectionEstablished(diskId); + return Handler->SwitchEndpointIfNeeded(diskId, reason); } + + return MakeFuture(MakeError(S_OK)); } void TEndpointEventProxy::Register(IEndpointEventHandlerPtr listener) diff --git a/cloud/blockstore/libs/endpoints/endpoint_events.h b/cloud/blockstore/libs/endpoints/endpoint_events.h index 60d3d6b1f5b..dbba788a455 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_events.h +++ b/cloud/blockstore/libs/endpoints/endpoint_events.h @@ -3,6 +3,7 @@ #include "public.h" #include +#include namespace NCloud::NBlockStore::NServer { @@ -12,7 +13,9 @@ struct IEndpointEventHandler { virtual ~IEndpointEventHandler() = default; - virtual void OnVolumeConnectionEstablished(const TString& diskId) = 0; + virtual NThreading::TFuture SwitchEndpointIfNeeded( + const TString& diskId, + const TString& reason) = 0; }; struct IEndpointEventProxy: public IEndpointEventHandler diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index fc2ae2066a0..691dd16432c 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -147,6 +147,52 @@ bool CompareRequests( #undef BLOCKSTORE_DECLARE_METHOD +class TSwitchEndpointRequest +{ +private: + TString DiskId; + TString UnixSocketPath; + TString Reason; + +public: + const TString& GetDiskId() const + { + return DiskId; + } + + void SetDiskId(const TString& diskId) + { + DiskId = diskId; + } + + const TString& GetUnixSocketPath() const + { + return UnixSocketPath; + } + + void SetUnixSocketPath(const TString& socketPath) + { + UnixSocketPath = socketPath; + } + + const TString& GetReason() const + { + return Reason; + } + + void SetReason(const TString& reason) + { + Reason = reason; + } + +}; + +struct TSwitchEndpointMethod +{ + using TRequest = TSwitchEndpointRequest; + using TResponse = NProto::TError; +}; + //////////////////////////////////////////////////////////////////////////////// template @@ -174,7 +220,8 @@ class TEndpointManager final using TRequestStateVariant = std::variant< TRequestState, TRequestState, - TRequestState + TRequestState, + TRequestState >; THashMap ProcessingSockets; @@ -197,10 +244,10 @@ class TEndpointManager final Log = logging->CreateLog("BLOCKSTORE_SERVER"); } -#define ENDPOINT_IMPLEMENT_METHOD(name, ...) \ - TFuture name( \ +#define ENDPOINT_IMPLEMENT_METHOD(name, specifier, ...) \ + TFuture name( \ TCallContextPtr callContext, \ - std::shared_ptr request) override \ + std::shared_ptr request) specifier \ { \ return Executor->Execute([ \ ctx = std::move(callContext), \ @@ -211,20 +258,23 @@ class TEndpointManager final }); \ } \ \ - NProto::T##name##Response Do##name( \ + T##name##Method::TResponse Do##name( \ TCallContextPtr ctx, \ - std::shared_ptr req); \ + std::shared_ptr req); \ // ENDPOINT_IMPLEMENT_METHOD - ENDPOINT_IMPLEMENT_METHOD(StartEndpoint) - ENDPOINT_IMPLEMENT_METHOD(StopEndpoint) - ENDPOINT_IMPLEMENT_METHOD(ListEndpoints) - ENDPOINT_IMPLEMENT_METHOD(DescribeEndpoint) - ENDPOINT_IMPLEMENT_METHOD(RefreshEndpoint) + ENDPOINT_IMPLEMENT_METHOD(StartEndpoint, override) + ENDPOINT_IMPLEMENT_METHOD(StopEndpoint, override) + ENDPOINT_IMPLEMENT_METHOD(ListEndpoints, override) + ENDPOINT_IMPLEMENT_METHOD(DescribeEndpoint, override) + ENDPOINT_IMPLEMENT_METHOD(RefreshEndpoint, override) + ENDPOINT_IMPLEMENT_METHOD(SwitchEndpoint, ) #undef ENDPOINT_IMPLEMENT_METHOD - void OnVolumeConnectionEstablished(const TString& diskId) override; + TFuture SwitchEndpointIfNeeded( + const TString& diskId, + const TString& reason) override; private: NProto::TStartEndpointResponse StartEndpointImpl( @@ -239,6 +289,10 @@ class TEndpointManager final TCallContextPtr ctx, std::shared_ptr request); + NProto::TError SwitchEndpointImpl( + TCallContextPtr ctx, + std::shared_ptr request); + NProto::TStartEndpointResponse AlterEndpoint( TCallContextPtr ctx, const NProto::TStartEndpointRequest& newRequest, @@ -261,8 +315,6 @@ class TEndpointManager final std::shared_ptr CreateNbdStartEndpointRequest( const NProto::TStartEndpointRequest& request); - void TrySwitchEndpoint(const TString& diskId); - template TPromise AddProcessingSocket( const typename TMethod::TRequest& request) @@ -296,6 +348,9 @@ class TEndpointManager final [] (const TRequestState&) { return "refreshing"; }, + [] (const TRequestState&) { + return "switching"; + }, [](const auto&) { return "busy (undefined process)"; } @@ -717,42 +772,82 @@ TStartEndpointRequestPtr TEndpointManager::CreateNbdStartEndpointRequest( return nbdRequest; } -void TEndpointManager::TrySwitchEndpoint(const TString& diskId) +NProto::TError TEndpointManager::DoSwitchEndpoint( + TCallContextPtr ctx, + std::shared_ptr request) { - auto it = FindIf(Requests, [&] (auto& v) { + const auto& diskId = request->GetDiskId(); + + auto reqIt = FindIf(Requests, [&] (auto& v) { const auto& [_, req] = v; return req->GetDiskId() == diskId && req->GetIpcType() == NProto::IPC_VHOST; }); + if (reqIt == Requests.end()) { + return TErrorResponse(S_OK); + } + + const auto& socketPath = reqIt->first; + + auto sockIt = ProcessingSockets.find(socketPath); + if (sockIt != ProcessingSockets.end()) { + const auto& st = sockIt->second; + auto* state = std::get_if>(&st); + if (!state) { + return MakeBusySocketError(socketPath, st); + } + + return Executor->WaitFor(state->Result); + } + + request->SetUnixSocketPath(socketPath); + auto promise = AddProcessingSocket(*request); + + auto response = SwitchEndpointImpl(std::move(ctx), std::move(request)); + promise.SetValue(response); + + RemoveProcessingSocket(socketPath); + return response; +} + +NProto::TError TEndpointManager::SwitchEndpointImpl( + TCallContextPtr ctx, + std::shared_ptr request) +{ + const auto& socketPath = request->GetUnixSocketPath(); + + auto it = Requests.find(socketPath); if (it == Requests.end()) { - return; + return TErrorResponse(S_FALSE, TStringBuilder() + << "endpoint " << socketPath.Quote() << " not started"); } - const auto& req = it->second; - auto listenerIt = EndpointListeners.find(req->GetIpcType()); + auto& startRequest = it->second; + auto listenerIt = EndpointListeners.find(startRequest->GetIpcType()); STORAGE_VERIFY( listenerIt != EndpointListeners.end(), TWellKnownEntityTypes::ENDPOINT, - req->GetUnixSocketPath()); + socketPath); const auto& listener = listenerIt->second; - auto ctx = MakeIntrusive(); auto future = SessionManager->GetSession( - std::move(ctx), - req->GetUnixSocketPath(), - req->GetHeaders()); + ctx, + startRequest->GetUnixSocketPath(), + startRequest->GetHeaders()); auto [sessionInfo, error] = Executor->WaitFor(future); if (HasError(error)) { - return; + return error; } - STORAGE_INFO("Switching endpoint for volume " << sessionInfo.Volume.GetDiskId() + STORAGE_INFO("Switching endpoint" + << ", reason=" << request->GetReason() + << ", volume=" << sessionInfo.Volume.GetDiskId() << ", IsFastPathEnabled=" << sessionInfo.Volume.GetIsFastPathEnabled() << ", Migrations=" << sessionInfo.Volume.GetMigrations().size()); auto switchFuture = listener->SwitchEndpoint( - *it->second, + *startRequest, sessionInfo.Volume, sessionInfo.Session); error = Executor->WaitFor(switchFuture); @@ -762,16 +857,20 @@ void TEndpointManager::TrySwitchEndpoint(const TString& diskId) << sessionInfo.Volume.GetDiskId() << ", " << error.GetMessage()); } + + return TErrorResponse(error); } -void TEndpointManager::OnVolumeConnectionEstablished(const TString& diskId) +TFuture TEndpointManager::SwitchEndpointIfNeeded( + const TString& diskId, + const TString& reason) { - Y_UNUSED(diskId); + auto ctx = MakeIntrusive(); + auto request = std::make_shared(); + request->SetDiskId(diskId); + request->SetReason(reason); - // TODO: NBS-312 safely call TrySwitchEndpoint - // Executor->ExecuteSimple([this, diskId] () { - // return TrySwitchEndpoint(diskId); - // }); + return SwitchEndpoint(std::move(ctx), std::move(request)); } } // namespace diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 5d997f7e201..a6f7f6e82a3 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -138,6 +138,7 @@ class TTestEndpointListener final public: ui32 AlterEndpointCounter = 0; + ui32 SwitchEndpointCounter = 0; public: TTestEndpointListener( @@ -204,6 +205,8 @@ class TTestEndpointListener final Y_UNUSED(volume); Y_UNUSED(session); + ++SwitchEndpointCounter; + return MakeFuture(); } @@ -366,7 +369,8 @@ IEndpointManagerPtr CreateEndpointManager( TBootstrap& bootstrap, THashMap endpointListeners, IServerStatsPtr serverStats = CreateServerStatsStub(), - TString nbdSocketSuffix = "") + TString nbdSocketSuffix = "", + IEndpointEventProxyPtr endpointEventHandler = CreateEndpointEventProxy()) { TSessionManagerOptions sessionManagerOptions; sessionManagerOptions.DefaultClientConfig.SetRequestTimeout( @@ -394,7 +398,7 @@ IEndpointManagerPtr CreateEndpointManager( bootstrap.Logging, serverStats, bootstrap.Executor, - CreateEndpointEventProxy(), + std::move(endpointEventHandler), std::move(sessionManager), std::move(endpointListeners), std::move(nbdSocketSuffix)); @@ -1182,6 +1186,68 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) response.GetError()); } } + + Y_UNIT_TEST(ShouldSwitchEndpointWhenEndpointStarted) + { + TMap mountedVolumes; + TBootstrap bootstrap(CreateTestService(mountedVolumes)); + + auto endpointEventHandler = CreateEndpointEventProxy(); + auto listener = std::make_shared(); + auto manager = CreateEndpointManager( + bootstrap, + {{ NProto::IPC_VHOST, listener }}, + CreateServerStatsStub(), + "", + endpointEventHandler); + + bootstrap.Start(); + + auto socketPath = "testSocketPath"; + auto diskId = "testDiskId"; + + { + // without started endpoint SwitchEndpointIfNeeded is ignored + auto future = endpointEventHandler->SwitchEndpointIfNeeded( + diskId, "test"); + auto error = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + error.GetCode(), + error); + UNIT_ASSERT_VALUES_EQUAL(0, listener->SwitchEndpointCounter); + } + + NProto::TStartEndpointRequest startRequest; + SetDefaultHeaders(startRequest); + startRequest.SetUnixSocketPath(socketPath); + startRequest.SetDiskId(diskId); + startRequest.SetClientId(TestClientId); + startRequest.SetIpcType(NProto::IPC_VHOST); + + { + auto future = StartEndpoint(*manager, startRequest); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + response.GetError().GetCode(), + response.GetError()); + } + + { + // with started endpoint SwitchEndpointIfNeeded leads to + // SwitchEndpoint call + auto future = endpointEventHandler->SwitchEndpointIfNeeded( + diskId, + "test"); + auto error = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + error.GetCode(), + error); + UNIT_ASSERT_VALUES_EQUAL(1, listener->SwitchEndpointCounter); + } + } } } // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/endpoints/session_manager.cpp b/cloud/blockstore/libs/endpoints/session_manager.cpp index 5a4cbd50f9d..c17e4ff1eea 100644 --- a/cloud/blockstore/libs/endpoints/session_manager.cpp +++ b/cloud/blockstore/libs/endpoints/session_manager.cpp @@ -684,7 +684,12 @@ TResultOrError TSessionManager::CreateEndpoint( std::move(throttler)); } - if (Options.StrictContractValidation) { + if (Options.StrictContractValidation && + !volume.GetIsFastPathEnabled() // switching fast path to slow path + // during migration might lead to + // validation false positives + ) + { client = CreateValidationClient( Logging, Monitoring, diff --git a/cloud/blockstore/libs/storage/api/volume.h b/cloud/blockstore/libs/storage/api/volume.h index 53716a136a5..de4d96512a2 100644 --- a/cloud/blockstore/libs/storage/api/volume.h +++ b/cloud/blockstore/libs/storage/api/volume.h @@ -133,14 +133,20 @@ struct TEvVolume struct TDiskRegistryBasedPartitionCounters { TPartitionDiskCountersPtr DiskCounters; + TString DiskId; ui64 NetworkBytes = 0; TDuration CpuUsage; TDiskRegistryBasedPartitionCounters( - TPartitionDiskCountersPtr diskCounters) + TPartitionDiskCountersPtr diskCounters, + TString diskId, + ui64 networkBytes, + TDuration cpuUsage) : DiskCounters(std::move(diskCounters)) - { - } + , DiskId(std::move(diskId)) + , NetworkBytes(networkBytes) + , CpuUsage(cpuUsage) + {} }; // @@ -207,6 +213,25 @@ struct TEvVolume {} }; + // + // PreparePartitionMigrationRequest + // + struct TPreparePartitionMigrationRequest + { + }; + + // + // PreparePartitionMigrationResponse + // + struct TPreparePartitionMigrationResponse + { + bool IsMigrationAllowed; + + explicit TPreparePartitionMigrationResponse(bool isMigrationAllowed) + : IsMigrationAllowed(isMigrationAllowed) + {} + }; + // // Events declaration // @@ -295,6 +320,9 @@ struct TEvVolume EvChangeStorageConfigRequest = EvBegin + 54, EvChangeStorageConfigResponse = EvBegin + 55, + EvPreparePartitionMigrationRequest = EvBegin + 56, + EvPreparePartitionMigrationResponse = EvBegin + 57, + EvEnd }; @@ -357,6 +385,16 @@ struct TEvVolume TClearBaseDiskIdToTabletIdMapping, EvClearBaseDiskIdToTabletIdMapping >; + + using TEvPreparePartitionMigrationRequest = TRequestEvent< + TPreparePartitionMigrationRequest, + EvPreparePartitionMigrationRequest + >; + + using TEvPreparePartitionMigrationResponse = TRequestEvent< + TPreparePartitionMigrationResponse, + EvPreparePartitionMigrationResponse + >; }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/disk_counters.h b/cloud/blockstore/libs/storage/core/disk_counters.h index fe9d54dbd36..522e0394db2 100644 --- a/cloud/blockstore/libs/storage/core/disk_counters.h +++ b/cloud/blockstore/libs/storage/core/disk_counters.h @@ -122,6 +122,7 @@ enum class EPublishingPolicy xxx(HasStorageConfigPatch, Generic, Permanent, __VA_ARGS__)\ xxx(LongRunningReadBlob, Generic, Expiring, __VA_ARGS__)\ xxx(LongRunningWriteBlob, Generic, Expiring, __VA_ARGS__)\ + xxx(UseFastPath, Generic, Permanent, __VA_ARGS__)\ #define BLOCKSTORE_VOLUME_SELF_COMMON_CUMULATIVE_COUNTERS(xxx, ...) \ diff --git a/cloud/blockstore/libs/storage/init/server/actorsystem.cpp b/cloud/blockstore/libs/storage/init/server/actorsystem.cpp index 17c56b18ab1..1ea2e4c9366 100644 --- a/cloud/blockstore/libs/storage/init/server/actorsystem.cpp +++ b/cloud/blockstore/libs/storage/init/server/actorsystem.cpp @@ -343,10 +343,10 @@ class TCustomLocalServiceInitializer final const IProfileLogPtr ProfileLog; const IBlockDigestGeneratorPtr BlockDigestGenerator; const ITraceSerializerPtr TraceSerializer; - const NServer::IEndpointEventHandlerPtr EndpointEventHandler; const NLogbroker::IServicePtr LogbrokerService; const NNotify::IServicePtr NotifyService; const NRdma::IClientPtr RdmaClient; + const NServer::IEndpointEventHandlerPtr EndpointEventHandler; const bool IsDiskRegistrySpareNode; public: @@ -361,6 +361,7 @@ class TCustomLocalServiceInitializer final NLogbroker::IServicePtr logbrokerService, NNotify::IServicePtr notifyService, NRdma::IClientPtr rdmaClient, + NServer::IEndpointEventHandlerPtr endpointEventHandler, bool isDiskRegistrySpareNode) : AppConfig(appConfig) , Logging(std::move(logging)) @@ -372,6 +373,7 @@ class TCustomLocalServiceInitializer final , LogbrokerService(std::move(logbrokerService)) , NotifyService(std::move(notifyService)) , RdmaClient(std::move(rdmaClient)) + , EndpointEventHandler(std::move(endpointEventHandler)) , IsDiskRegistrySpareNode(isDiskRegistrySpareNode) {} @@ -387,6 +389,7 @@ class TCustomLocalServiceInitializer final auto logbrokerService = LogbrokerService; auto notifyService = NotifyService; auto rdmaClient = RdmaClient; + auto endpointEventHandler = EndpointEventHandler; auto logging = Logging; auto volumeFactory = [=] (const TActorId& owner, TTabletStorageInfo* storage) { @@ -400,7 +403,8 @@ class TCustomLocalServiceInitializer final profileLog, blockDigestGenerator, traceSerializer, - rdmaClient + rdmaClient, + endpointEventHandler ); return actor.release(); }; @@ -496,6 +500,7 @@ IActorSystemPtr CreateActorSystem(const TServerActorSystemArgs& sArgs) sArgs.LogbrokerService, sArgs.NotifyService, sArgs.RdmaClient, + sArgs.EndpointEventHandler, sArgs.IsDiskRegistrySpareNode)); }; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp index 87ffa28718e..1402e5a54ab 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp @@ -62,12 +62,14 @@ void TMirrorPartitionActor::SendStats(const TActorContext& ctx) } } - auto request = std::make_unique( - MakeIntrusive(), - std::move(stats)); + auto request = + std::make_unique( + MakeIntrusive(), + std::move(stats), + DiskId, + NetworkBytes, + CpuUsage); - request->NetworkBytes = NetworkBytes; - request->CpuUsage = CpuUsage; NetworkBytes = 0; CpuUsage = {}; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_actor_stats.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_actor_stats.cpp index 70dd3d2b029..d2163fbed6d 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_actor_stats.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_actor_stats.cpp @@ -39,12 +39,13 @@ void TMirrorPartitionResyncActor::SendStats(const TActorContext& ctx) stats->AggregateWith(*MirrorCounters); } - auto request = std::make_unique( - MakeIntrusive(), - std::move(stats)); - - request->NetworkBytes = NetworkBytes; - request->CpuUsage = CpuUsage; + auto request = + std::make_unique( + MakeIntrusive(), + std::move(stats), + PartConfig->GetName(), + NetworkBytes, + CpuUsage); NCloud::Send(ctx, StatActorId, std::move(request)); } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp index 7e86538fe5f..aeb78703318 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp @@ -157,8 +157,7 @@ bool TNonreplicatedPartitionActor::InitRequests( reply( ctx, requestInfo, - PartConfig->MakeError(E_ARGUMENT, TStringBuilder() - << "checkpoints not supported")); + PartConfig->MakeError(E_ARGUMENT, "checkpoints not supported")); return false; } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor_stats.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor_stats.cpp index 6853240daa7..06d9bc51e2e 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor_stats.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor_stats.cpp @@ -40,12 +40,14 @@ void TNonreplicatedPartitionActor::SendStats(const TActorContext& ctx) && IOErrorCooldownPassed(ctx.Now())); PartCounters->Simple.HasBrokenDeviceSilent.Set(HasBrokenDevice); - auto request = std::make_unique( - MakeIntrusive(), - std::move(PartCounters)); + auto request = + std::make_unique( + MakeIntrusive(), + std::move(PartCounters), + PartConfig->GetName(), + NetworkBytes, + CpuUsage); - request->NetworkBytes = NetworkBytes; - request->CpuUsage = CpuUsage; NetworkBytes = 0; CpuUsage = {}; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp index 653188a9060..7b71b9c9ebc 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp @@ -11,6 +11,10 @@ using namespace NActors; //////////////////////////////////////////////////////////////////////////////// +constexpr TDuration PrepareMigrationInterval = TDuration::Seconds(5); + +//////////////////////////////////////////////////////////////////////////////// + TNonreplicatedPartitionMigrationActor::TNonreplicatedPartitionMigrationActor( TStorageConfigPtr config, IProfileLogPtr profileLog, @@ -42,7 +46,9 @@ TNonreplicatedPartitionMigrationActor::TNonreplicatedPartitionMigrationActor( void TNonreplicatedPartitionMigrationActor::OnBootstrap( const NActors::TActorContext& ctx) { - StartWork(ctx, CreateSrcActor(ctx), CreateDstActor(ctx)); + InitWork(ctx, CreateSrcActor(ctx), CreateDstActor(ctx)); + + PrepareForMigration(ctx); } bool TNonreplicatedPartitionMigrationActor::OnMessage( @@ -52,6 +58,12 @@ bool TNonreplicatedPartitionMigrationActor::OnMessage( Y_UNUSED(ctx); switch (ev->GetTypeRewrite()) { + HFunc( + TEvVolume::TEvPreparePartitionMigrationRequest, + HandlePreparePartitionMigrationRequest); + HFunc( + TEvVolume::TEvPreparePartitionMigrationResponse, + HandlePreparePartitionMigrationResponse); HFunc(TEvVolume::TEvMigrationStateUpdated, HandleMigrationStateUpdated); HFunc( TEvDiskRegistry::TEvFinishMigrationResponse, @@ -243,4 +255,40 @@ void TNonreplicatedPartitionMigrationActor::HandleFinishMigrationResponse( } } +void TNonreplicatedPartitionMigrationActor::PrepareForMigration( + const NActors::TActorContext& ctx) +{ + auto request = + std::make_unique(); + + NCloud::Send( + ctx, + SrcConfig->GetParentActorId(), + std::move(request)); +} + +void TNonreplicatedPartitionMigrationActor::HandlePreparePartitionMigrationRequest( + const TEvVolume::TEvPreparePartitionMigrationRequest::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + PrepareForMigration(ctx); +} + +void TNonreplicatedPartitionMigrationActor::HandlePreparePartitionMigrationResponse( + const TEvVolume::TEvPreparePartitionMigrationResponse::TPtr& ev, + const TActorContext& ctx) +{ + bool isMigrationAllowed = ev->Get()->IsMigrationAllowed; + if (!isMigrationAllowed) { + ctx.Schedule( + PrepareMigrationInterval, + new TEvVolume::TEvPreparePartitionMigrationRequest()); + return; + } + + StartWork(ctx); +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.h index bca86876e77..3c3fe85c1b4 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.h @@ -47,6 +47,7 @@ class TNonreplicatedPartitionMigrationActor final void OnMigrationError(const NActors::TActorContext& ctx) override; private: + void PrepareForMigration(const NActors::TActorContext& ctx); void FinishMigration(const NActors::TActorContext& ctx, bool isRetry); NActors::TActorId CreateSrcActor(const NActors::TActorContext& ctx); NActors::TActorId CreateDstActor(const NActors::TActorContext& ctx); @@ -58,6 +59,14 @@ class TNonreplicatedPartitionMigrationActor final void HandleFinishMigrationResponse( const TEvDiskRegistry::TEvFinishMigrationResponse::TPtr& ev, const NActors::TActorContext& ctx); + + void HandlePreparePartitionMigrationRequest( + const TEvVolume::TEvPreparePartitionMigrationRequest::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandlePreparePartitionMigrationResponse( + const TEvVolume::TEvPreparePartitionMigrationResponse::TPtr& ev, + const NActors::TActorContext& ctx); }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h index dc7483014fc..1a1f851afe4 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h @@ -62,8 +62,8 @@ class IMigrationOwner //////////////////////////////////////////////////////////////////////////////// // To migrate data, it is necessary to inherit from this class. To get started, -// you need to call the StartWork() method and pass the source and destination -// actors to it. +// you need to call the InitWork() method and pass the source and destination +// actors to it. Then when you ready to run migration call StartWork() // About error handling. If migration errors occur, they cannot be fixed, on the // VolumeActor/PartitionActor side since DiskRegistry manages the allocation of @@ -94,6 +94,7 @@ class TNonreplicatedPartitionMigrationCommonActor TProcessingBlocks ProcessingBlocks; bool MigrationInProgress = false; + bool MigrationStarted = false; TRequestsInProgress WriteAndZeroRequestsInProgress{ EAllowedRequests::WriteOnly}; @@ -134,12 +135,15 @@ class TNonreplicatedPartitionMigrationCommonActor virtual void Bootstrap(const NActors::TActorContext& ctx); - // Called from the inheritor to get started. - void StartWork( + // Called from the inheritor to initialize migration. + void InitWork( const NActors::TActorContext& ctx, NActors::TActorId srcActorId, NActors::TActorId dstActorId); + // Called from the inheritor to start migration. + void StartWork(const NActors::TActorContext& ctx); + // Called from the inheritor to mark ranges that do not need to be // processed. void MarkMigratedBlocks(TBlockRange64 range); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp index 3a2a3cc2534..a66d02fabfe 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp @@ -19,7 +19,7 @@ LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); //////////////////////////////////////////////////////////////////////////////// -void TNonreplicatedPartitionMigrationCommonActor::StartWork( +void TNonreplicatedPartitionMigrationCommonActor::InitWork( const NActors::TActorContext& ctx, NActors::TActorId srcActorId, NActors::TActorId dstActorId) @@ -36,13 +36,21 @@ void TNonreplicatedPartitionMigrationCommonActor::StartWork( ProcessingBlocks.SkipProcessedRanges(); } +} + +void TNonreplicatedPartitionMigrationCommonActor::StartWork( + const NActors::TActorContext& ctx) +{ + MigrationStarted = true; ContinueMigrationIfNeeded(ctx); } void TNonreplicatedPartitionMigrationCommonActor::ContinueMigrationIfNeeded( const NActors::TActorContext& ctx) { - if (MigrationInProgress || !ProcessingBlocks.IsProcessingStarted()) { + if (!MigrationStarted || MigrationInProgress || + !ProcessingBlocks.IsProcessingStarted()) + { return; } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_stats.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_stats.cpp index 1e45bc9e18e..99ff0783af5 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_stats.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_stats.cpp @@ -57,12 +57,14 @@ void TNonreplicatedPartitionMigrationCommonActor::SendStats( DstCounters->Simple.IORequestsInFlight.Value); } - auto request = std::make_unique( - MakeIntrusive(), - std::move(stats)); + auto request = + std::make_unique( + MakeIntrusive(), + std::move(stats), + DiskId, + NetworkBytes, + CpuUsage); - request->NetworkBytes = NetworkBytes; - request->CpuUsage = CpuUsage; NetworkBytes = 0; CpuUsage = {}; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp index 623076c829c..88cd8b3b5e9 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp @@ -78,7 +78,8 @@ struct TTestEnv TDevices devices, TMigrations migrations, NProto::EVolumeIOMode ioMode, - bool useRdma) + bool useRdma, + TMigrationStatePtr migrationState) : Runtime(runtime) , ActorId(0, "YYY") , VolumeActorId(0, "VVV") @@ -164,7 +165,7 @@ struct TTestEnv TActorSetupCmd(part.release(), TMailboxType::Simple, 0) ); - auto dummy = std::make_unique(); + auto dummy = std::make_unique(std::move(migrationState)); Runtime.AddLocalService( VolumeActorId, @@ -231,7 +232,9 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionMigrationTest) TTestEnv::DefaultDevices(runtime.GetNodeId(0)), TTestEnv::DefaultMigrations(runtime.GetNodeId(0)), NProto::VOLUME_IO_OK, - false); + false, + nullptr // no migration state + ); TPartitionClient client(runtime, env.ActorId); @@ -263,7 +266,9 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionMigrationTest) TTestEnv::DefaultDevices(runtime.GetNodeId(0)), TTestEnv::DefaultMigrations(runtime.GetNodeId(0)), NProto::VOLUME_IO_OK, - true); + true, + nullptr // no migration state + ); env.Rdma().InitAllEndpoints(); @@ -280,7 +285,9 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionMigrationTest) TTestEnv::DefaultDevices(runtime.GetNodeId(0)), TTestEnv::DefaultMigrations(runtime.GetNodeId(0)), NProto::VOLUME_IO_ERROR_READ_ONLY, - false); + false, + nullptr // no migration state + ); // petya should be migrated => 3 ranges WaitForMigrations(runtime, 3); @@ -295,7 +302,9 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionMigrationTest) TTestEnv::DefaultDevices(runtime.GetNodeId(0)), TTestEnv::DefaultMigrations(runtime.GetNodeId(0)), NProto::VOLUME_IO_OK, - false); + false, + nullptr // no migration state + ); TPartitionClient client(runtime, env.ActorId); @@ -309,6 +318,27 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionMigrationTest) 5 * 1024 * DefaultBlockSize, counters.BytesCount.Value); } + + Y_UNIT_TEST(ShouldDelayMigration) + { + TTestBasicRuntime runtime; + + auto migrationState = std::make_shared(); + migrationState->IsMigrationAllowed = false; + + TTestEnv env( + runtime, + TTestEnv::DefaultDevices(runtime.GetNodeId(0)), + TTestEnv::DefaultMigrations(runtime.GetNodeId(0)), + NProto::VOLUME_IO_OK, + false, + migrationState); + + WaitForNoMigrations(runtime, TDuration::Seconds(5)); + + migrationState->IsMigrationAllowed = true; + WaitForMigrations(runtime, 3); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp index 489256afff5..7634bc8f2b9 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp @@ -89,12 +89,13 @@ void TNonreplicatedPartitionRdmaActor::ScheduleCountersUpdate( template bool TNonreplicatedPartitionRdmaActor::InitRequests( + const typename TMethod::TRequest& msg, const NActors::TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, TVector* deviceRequests) { - auto reply = [=] ( + auto reply = [] ( const TActorContext& ctx, TRequestInfo& requestInfo, NProto::TError error) @@ -121,6 +122,26 @@ bool TNonreplicatedPartitionRdmaActor::InitRequests( return false; } + if (!msg.Record.GetHeaders().GetIsBackgroundRequest() + && RequiresReadWriteAccess + && PartConfig->IsReadOnly()) + { + reply( + ctx, + requestInfo, + PartConfig->MakeIOError( + "disk in error state", + true // cooldown passed + )); + return false; + } else if (RequiresCheckpointSupport(msg.Record)) { + reply( + ctx, + requestInfo, + PartConfig->MakeError(E_ARGUMENT, "checkpoints not supported")); + return false; + } + *deviceRequests = PartConfig->ToDeviceRequests(blockRange); if (deviceRequests->empty()) { @@ -169,36 +190,42 @@ bool TNonreplicatedPartitionRdmaActor::InitRequests( } template bool TNonreplicatedPartitionRdmaActor::InitRequests( + const TEvService::TWriteBlocksMethod::TRequest& msg, const TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, TVector* deviceRequests); template bool TNonreplicatedPartitionRdmaActor::InitRequests( + const TEvService::TWriteBlocksLocalMethod::TRequest& msg, const TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, TVector* deviceRequests); template bool TNonreplicatedPartitionRdmaActor::InitRequests( + const TEvService::TZeroBlocksMethod::TRequest& msg, const TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, TVector* deviceRequests); template bool TNonreplicatedPartitionRdmaActor::InitRequests( + const TEvService::TReadBlocksMethod::TRequest& msg, const TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, TVector* deviceRequests); template bool TNonreplicatedPartitionRdmaActor::InitRequests( + const TEvService::TReadBlocksLocalMethod::TRequest& msg, const TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, TVector* deviceRequests); template bool TNonreplicatedPartitionRdmaActor::InitRequests( + const TEvNonreplPartitionPrivate::TChecksumBlocksMethod::TRequest& msg, const TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.h index d7867fbc8cc..f8939c694ac 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.h @@ -97,6 +97,7 @@ class TNonreplicatedPartitionRdmaActor final template bool InitRequests( + const typename TMethod::TRequest& msg, const NActors::TActorContext& ctx, TRequestInfo& requestInfo, const TBlockRange64& blockRange, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp index 2c74082e144..b3ea0e57f7b 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp @@ -189,6 +189,7 @@ void TNonreplicatedPartitionRdmaActor::HandleChecksumBlocks( TVector deviceRequests; bool ok = InitRequests( + *msg, ctx, *requestInfo, blockRange, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks.cpp index b88ab57cfa9..80828e6f638 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks.cpp @@ -195,6 +195,7 @@ void TNonreplicatedPartitionRdmaActor::HandleReadBlocks( TVector deviceRequests; bool ok = InitRequests( + *msg, ctx, *requestInfo, blockRange, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks_local.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks_local.cpp index 34368d7b3e8..13db9b4a0f1 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks_local.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_readblocks_local.cpp @@ -185,6 +185,7 @@ void TNonreplicatedPartitionRdmaActor::HandleReadBlocksLocal( TVector deviceRequests; bool ok = InitRequests( + *msg, ctx, *requestInfo, blockRange, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_stats.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_stats.cpp index 770ef746506..f06d72a9d72 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_stats.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_stats.cpp @@ -29,12 +29,14 @@ void TNonreplicatedPartitionRdmaActor::SendStats(const TActorContext& ctx) PartConfig->GetBlockCount() * PartConfig->GetBlockSize() ); - auto request = std::make_unique( - MakeIntrusive(), - std::move(PartCounters)); + auto request = + std::make_unique( + MakeIntrusive(), + std::move(PartCounters), + PartConfig->GetName(), + NetworkBytes, + CpuUsage); - request->NetworkBytes = NetworkBytes; - request->CpuUsage = CpuUsage; NetworkBytes = 0; CpuUsage = {}; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp index c1b6852391b..02db4097990 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp @@ -207,6 +207,7 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocks( TVector deviceRequests; bool ok = InitRequests( + *msg, ctx, *requestInfo, blockRange, @@ -350,6 +351,7 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocksLocal( TVector deviceRequests; bool ok = InitRequests( + *msg, ctx, *requestInfo, blockRange, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp index b29c0bceee4..e78adeb1bb5 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp @@ -156,6 +156,7 @@ void TNonreplicatedPartitionRdmaActor::HandleZeroBlocks( TVector deviceRequests; bool ok = InitRequests( + *msg, ctx, *requestInfo, blockRange, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp index cc26193ab9d..8fff5b97c17 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp @@ -758,6 +758,103 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionRdmaTest) UNIT_ASSERT_VALUES_EQUAL(30, zeroRequestId); } } + + Y_UNIT_TEST(ShouldSupportReadOnlyMode) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime, NProto::VOLUME_IO_ERROR_READ_ONLY); + + env.Rdma().InitAllEndpoints(); + + TPartitionClient client(runtime, env.ActorId); + + TString zeroedBlockData(DefaultBlockSize, 0); + + auto readBlocks = [&](const auto& expectedBlockData) + { + auto response = client.ReadBlocks( + TBlockRange64::WithLength(1024, 3072)); + const auto& blocks = response->Record.GetBlocks(); + + UNIT_ASSERT_VALUES_EQUAL(3072, blocks.BuffersSize()); + UNIT_ASSERT_VALUES_EQUAL(DefaultBlockSize, blocks.GetBuffers(0).size()); + UNIT_ASSERT_VALUES_EQUAL(expectedBlockData, blocks.GetBuffers(0)); + + UNIT_ASSERT_VALUES_EQUAL(DefaultBlockSize, blocks.GetBuffers(3071).size()); + UNIT_ASSERT_VALUES_EQUAL(expectedBlockData, blocks.GetBuffers(3071)); + }; + + readBlocks(zeroedBlockData); + + // write blocks requests are fobidden in read only mode + { + client.SendWriteBlocksRequest( + TBlockRange64::WithLength(1024, 3072), + 1); + auto response = client.RecvWriteBlocksResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_IO, response->GetStatus()); + } + + readBlocks(zeroedBlockData); + + // zero blocks requests are fobidden in read only mode + { + client.SendZeroBlocksRequest( + TBlockRange64::WithLength(1024, 3072)); + auto response = client.RecvZeroBlocksResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_IO, response->GetStatus()); + } + + readBlocks(zeroedBlockData); + + // background write requests are allowed + // background requests are requests that originate from + // blockstore-server itself e.g. NRD migration-related reads and writes. + + TString modifiedBlockData(DefaultBlockSize, 'A'); + + { + auto request = client.CreateWriteBlocksLocalRequest( + TBlockRange64::WithLength(1024, 3072), + modifiedBlockData); + request->Record.MutableHeaders()->SetIsBackgroundRequest(true); + client.SendRequest(client.GetActorId(), std::move(request)); + auto response = client.RecvWriteBlocksLocalResponse(); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + response->GetStatus(), + response->GetErrorReason()); + } + + readBlocks(modifiedBlockData); + } + + Y_UNIT_TEST(ShouldNotHandleRequestsWithRequiredCheckpointSupport) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + + env.Rdma().InitAllEndpoints(); + + TPartitionClient client(runtime, env.ActorId); + + { + auto request = client.CreateReadBlocksRequest( + TBlockRange64::WithLength(1024, 3072)); + request->Record.SetCheckpointId("abc"); + client.SendRequest(client.GetActorId(), std::move(request)); + + runtime.DispatchEvents(); + + auto response = client.RecvReadBlocksResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_ARGUMENT, response->GetStatus()); + UNIT_ASSERT(response->GetErrorReason().Contains( + "checkpoints not supported")); + } + } + } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h b/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h index 654217abf92..b13d11a01f3 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h @@ -85,12 +85,25 @@ class TStorageStatsServiceMock final //////////////////////////////////////////////////////////////////////////////// +struct TMigrationState +{ + bool IsMigrationAllowed = true; +}; + +using TMigrationStatePtr = std::shared_ptr; + +//////////////////////////////////////////////////////////////////////////////// + class TDummyActor final : public NActors::TActor { +private: + TMigrationStatePtr MigrationState; + public: - TDummyActor() + TDummyActor(TMigrationStatePtr migrationState = nullptr) : TActor(&TThis::StateWork) + , MigrationState(std::move(migrationState)) { } @@ -112,6 +125,8 @@ class TDummyActor final HFunc(TEvDiskRegistry::TEvFinishMigrationRequest, HandleFinishMigration); + HFunc(TEvVolume::TEvPreparePartitionMigrationRequest, HandlePreparePartitionMigration); + default: Y_ABORT("Unexpected event %x", ev->GetTypeRewrite()); } @@ -176,6 +191,17 @@ class TDummyActor final NCloud::Reply(ctx, *ev, std::make_unique()); } + + void HandlePreparePartitionMigration( + const TEvVolume::TEvPreparePartitionMigrationRequest::TPtr& ev, + const NActors::TActorContext& ctx) + { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + MigrationState ? MigrationState->IsMigrationAllowed : true)); + } }; //////////////////////////////////////////////////////////////////////////////// @@ -397,4 +423,32 @@ inline void WaitForMigrations( UNIT_ASSERT_VALUES_EQUAL(rangeCount, migratedRanges); } +inline void WaitForNoMigrations(NActors::TTestBasicRuntime& runtime, TDuration timeout) +{ + ui32 migratedRanges = 0; + runtime.SetObserverFunc([&] (auto& runtime, auto& event) { + switch (event->GetTypeRewrite()) { + case TEvNonreplPartitionPrivate::EvRangeMigrated: { + auto* msg = + event->template Get(); + if (!HasError(msg->Error)) { + ++migratedRanges; + } + break; + } + } + return NActors::TTestActorRuntime::DefaultObserverFunc(runtime, event); + }); + + NActors::TDispatchOptions options; + options.FinalEvents = { + NActors::TDispatchOptions::TFinalEventCondition( + TEvNonreplPartitionPrivate::EvRangeMigrated) + }; + + runtime.DispatchEvents(options, timeout); + + UNIT_ASSERT_VALUES_EQUAL(0, migratedRanges); +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/protos/disk.proto b/cloud/blockstore/libs/storage/protos/disk.proto index f61b6c95424..18e5ecd30f5 100644 --- a/cloud/blockstore/libs/storage/protos/disk.proto +++ b/cloud/blockstore/libs/storage/protos/disk.proto @@ -742,6 +742,7 @@ message TAcquireDiskResponse NCloud.NProto.TError Error = 1; // List of devices that make up the disk. + // Note. Contains only the devices located at available agents. repeated TDeviceConfig Devices = 2; // Migration configuration. diff --git a/cloud/blockstore/libs/storage/service/volume_client_actor.cpp b/cloud/blockstore/libs/storage/service/volume_client_actor.cpp index 11cee1d1ee3..c2aab270c5c 100644 --- a/cloud/blockstore/libs/storage/service/volume_client_actor.cpp +++ b/cloud/blockstore/libs/storage/service/volume_client_actor.cpp @@ -212,7 +212,7 @@ void TVolumeClientActor::HandleConnect( "Connection to tablet: " << msg->TabletId << " has been established"); - EndpointEventHandler->OnVolumeConnectionEstablished(DiskId); + EndpointEventHandler->SwitchEndpointIfNeeded(DiskId, "volume connected"); } void TVolumeClientActor::HandleDisconnect( diff --git a/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp b/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp index c63083d3ef3..469bfb140a4 100644 --- a/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp +++ b/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp @@ -453,6 +453,7 @@ void TStartVolumeActor::StartTablet(const TActorContext& ctx) blockDigestGenerator, traceSerializer, rdmaClient, + endpointEventHandler, EVolumeStartMode::MOUNTED); return actor.release(); }; diff --git a/cloud/blockstore/libs/storage/testlib/test_env.cpp b/cloud/blockstore/libs/storage/testlib/test_env.cpp index dd8f50ecd26..feac6feded6 100644 --- a/cloud/blockstore/libs/storage/testlib/test_env.cpp +++ b/cloud/blockstore/libs/storage/testlib/test_env.cpp @@ -242,7 +242,8 @@ ui32 TTestEnv::CreateBlockStoreNode( CreateProfileLogStub(), CreateBlockDigestGeneratorStub(), TraceSerializer, - nullptr // RdmaClient + nullptr, // RdmaClient + NServer::CreateEndpointEventProxy() ); return actor.release(); }; diff --git a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp index 7dc81b5a8b5..6ad9329745b 100644 --- a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp +++ b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp @@ -1,5 +1,6 @@ #include "test_env.h" +#include #include namespace NCloud::NBlockStore::NStorage { @@ -700,7 +701,8 @@ std::unique_ptr PrepareTestActorRuntime( CreateLoggingService("console"), "BLOCKSTORE_TRACE", NLwTraceMonPage::TraceManager(false)), - rdmaClient + rdmaClient, + NServer::CreateEndpointEventProxy() ); return tablet.release(); }; diff --git a/cloud/blockstore/libs/storage/volume/volume.cpp b/cloud/blockstore/libs/storage/volume/volume.cpp index 74855b06499..259e11f3da3 100644 --- a/cloud/blockstore/libs/storage/volume/volume.cpp +++ b/cloud/blockstore/libs/storage/volume/volume.cpp @@ -19,6 +19,7 @@ IActorPtr CreateVolumeTablet( IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, NRdma::IClientPtr rdmaClient, + NServer::IEndpointEventHandlerPtr endpointEventHandler, EVolumeStartMode startMode) { return std::make_unique( @@ -30,6 +31,7 @@ IActorPtr CreateVolumeTablet( std::move(blockDigestGenerator), std::move(traceSerializer), std::move(rdmaClient), + std::move(endpointEventHandler), startMode); } diff --git a/cloud/blockstore/libs/storage/volume/volume.h b/cloud/blockstore/libs/storage/volume/volume.h index 669b4ce5be1..bcffd8c0c18 100644 --- a/cloud/blockstore/libs/storage/volume/volume.h +++ b/cloud/blockstore/libs/storage/volume/volume.h @@ -3,6 +3,7 @@ #include "public.h" #include +#include #include #include #include @@ -30,6 +31,7 @@ NActors::IActorPtr CreateVolumeTablet( IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, NRdma::IClientPtr rdmaClient, + NServer::IEndpointEventHandlerPtr endpointEventHandler, EVolumeStartMode startMode = EVolumeStartMode::ONLINE); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.cpp b/cloud/blockstore/libs/storage/volume/volume_actor.cpp index b40aa97cc80..432734f6ed0 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor.cpp @@ -57,6 +57,7 @@ TVolumeActor::TVolumeActor( IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, NRdma::IClientPtr rdmaClient, + NServer::IEndpointEventHandlerPtr endpointEventHandler, EVolumeStartMode startMode) : TActor(&TThis::StateBoot) , TTabletBase(owner, std::move(storage)) @@ -67,6 +68,7 @@ TVolumeActor::TVolumeActor( , BlockDigestGenerator(std::move(blockDigestGenerator)) , TraceSerializer(std::move(traceSerializer)) , RdmaClient(std::move(rdmaClient)) + , EndpointEventHandler(std::move(endpointEventHandler)) , StartMode(startMode) , ThrottlerLogger( TabletID(), @@ -938,6 +940,8 @@ STFUNC(TVolumeActor::StateWork) HFunc(TEvLocal::TEvTabletMetrics, HandleTabletMetrics); + HFunc(TEvVolume::TEvPreparePartitionMigrationRequest, HandlePreparePartitionMigration); + HFunc(TEvVolume::TEvUpdateMigrationState, HandleUpdateMigrationState); HFunc(TEvVolume::TEvUpdateResyncState, HandleUpdateResyncState); HFunc(TEvVolume::TEvResyncFinished, HandleResyncFinished); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.h b/cloud/blockstore/libs/storage/volume/volume_actor.h index 7370f651c7e..905b0a94feb 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.h +++ b/cloud/blockstore/libs/storage/volume/volume_actor.h @@ -190,6 +190,7 @@ class TVolumeActor final const IBlockDigestGeneratorPtr BlockDigestGenerator; const ITraceSerializerPtr TraceSerializer; const NRdma::IClientPtr RdmaClient; + NServer::IEndpointEventHandlerPtr EndpointEventHandler; const EVolumeStartMode StartMode; TVolumeThrottlerLogger ThrottlerLogger; @@ -349,6 +350,7 @@ class TVolumeActor final IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, NRdma::IClientPtr rdmaClient, + NServer::IEndpointEventHandlerPtr endpointEventHandler, EVolumeStartMode startMode); ~TVolumeActor() override; @@ -752,6 +754,10 @@ class TVolumeActor final const TEvVolume::TEvUpdateMigrationState::TPtr& ev, const NActors::TActorContext& ctx); + void HandlePreparePartitionMigration( + const TEvVolume::TEvPreparePartitionMigrationRequest::TPtr& ev, + const NActors::TActorContext& ctx); + void HandleUpdateResyncState( const TEvVolume::TEvUpdateResyncState::TPtr& ev, const NActors::TActorContext& ctx); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp index ebe6f41bbd3..c66327d52bb 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp @@ -1428,7 +1428,8 @@ void TVolumeActor::CompleteUpdateShadowDiskState( ctx, *args.RequestInfo, std::make_unique( - newState)); + newState, + args.ProcessedBlockCount)); } void TVolumeActor::HandleUpdateShadowDiskState( @@ -1448,7 +1449,9 @@ void TVolumeActor::HandleUpdateShadowDiskState( auto reply = [&](EShadowDiskState newState) { auto response = std::make_unique< - TEvVolumePrivate::TEvUpdateShadowDiskStateResponse>(newState); + TEvVolumePrivate::TEvUpdateShadowDiskStateResponse>( + newState, + msg->ProcessedBlockCount); NCloud::Reply(ctx, *ev, std::move(response)); }; diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_migration.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_migration.cpp index dff019b9b43..85246235151 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_migration.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_migration.cpp @@ -1,5 +1,6 @@ #include "volume_actor.h" +#include #include namespace NCloud::NBlockStore::NStorage { @@ -89,4 +90,45 @@ void TVolumeActor::CompleteUpdateMigrationState( std::make_unique()); } +//////////////////////////////////////////////////////////////////////////////// + +void TVolumeActor::HandlePreparePartitionMigration( + const TEvVolume::TEvPreparePartitionMigrationRequest::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + auto requestInfo = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); + + if (!State->GetUseFastPath()) { + auto response = + std::make_unique( + true // migration allowed + ); + NCloud::Reply(ctx, *requestInfo, std::move(response)); + return; + } + + EndpointEventHandler + ->SwitchEndpointIfNeeded(State->GetDiskId(), "partition migration") + .Subscribe( + [actorSystem = ctx.ActorSystem(), + replyFrom = ctx.SelfID, + requestInfo = std::move(requestInfo)](const auto& future) + { + bool migrationAllowed = !HasError(future.GetValue()); + auto response = std::make_unique< + TEvVolume::TEvPreparePartitionMigrationResponse>( + migrationAllowed); + + actorSystem->Send(new IEventHandle( + requestInfo->Sender, + replyFrom, + response.release(), + 0, // flags + requestInfo->Cookie)); + }); +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp index c31497c95dd..477aba45593 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp @@ -1094,6 +1094,13 @@ void TVolumeActor::RenderConfig(IOutputStream& out) const } } + TABLER() { + TABLED() { out << "UseFastPath"; } + TABLED() { + out << State->GetUseFastPath(); + } + } + TABLER() { TABLED() { out << "Throttler"; } TABLED() { diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_stats.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_stats.cpp index 953511010dd..57ebe11dc7a 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_stats.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_stats.cpp @@ -127,10 +127,16 @@ void TVolumeActor::HandleDiskRegistryBasedPartCounters( msg->CallContext ); - if (State->GetDiskRegistryBasedPartitionActor() != ev->Sender) { - LOG_INFO(ctx, TBlockStoreComponents::VOLUME, - "Partition %s for disk %s counters not found", + const bool doesPartitionBelongToDisk = + State->GetDiskRegistryBasedPartitionActor() == ev->Sender || + State->GetDiskId() == msg->DiskId; + if (!doesPartitionBelongToDisk) { + LOG_INFO( + ctx, + TBlockStoreComponents::VOLUME, + "Counters from partition %s (%s) do not belong to disk %s", ToString(ev->Sender).c_str(), + msg->DiskId.Quote().c_str(), State->GetDiskId().Quote().c_str()); return; } @@ -371,6 +377,9 @@ void TVolumeActor::SendSelfStatsToService(const TActorContext& ctx) simple.LastVolumeLoadTime.Set(GetLoadTime().MicroSeconds()); simple.LastVolumeStartTime.Set(GetStartTime().MicroSeconds()); simple.HasStorageConfigPatch.Set(HasStorageConfigPatch); + simple.UseFastPath.Set( + State->GetUseFastPath() && + State->GetMeta().GetMigrations().size() == 0); SendVolumeSelfCounters(ctx); VolumeSelfCounters = CreateVolumeSelfCounters(CountersPolicy); diff --git a/cloud/blockstore/libs/storage/volume/volume_events_private.h b/cloud/blockstore/libs/storage/volume/volume_events_private.h index 9c30d574b5b..c4da52c1509 100644 --- a/cloud/blockstore/libs/storage/volume/volume_events_private.h +++ b/cloud/blockstore/libs/storage/volume/volume_events_private.h @@ -247,11 +247,15 @@ struct TEvVolumePrivate struct TUpdateShadowDiskStateResponse { EShadowDiskState NewState = EShadowDiskState::None; + ui64 ProcessedBlockCount = 0; - TUpdateShadowDiskStateResponse() {} + TUpdateShadowDiskStateResponse() = default; - TUpdateShadowDiskStateResponse(EShadowDiskState newState) + TUpdateShadowDiskStateResponse( + EShadowDiskState newState, + ui64 processedBlockCount) : NewState(newState) + , ProcessedBlockCount(processedBlockCount) {} }; diff --git a/cloud/blockstore/libs/storage/volume/volume_state.cpp b/cloud/blockstore/libs/storage/volume/volume_state.cpp index ffa7561f1ae..92d79fb680e 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_state.cpp @@ -185,6 +185,7 @@ void TVolumeState::Reset() Meta.GetConfig().GetCloudId(), Meta.GetConfig().GetFolderId(), Meta.GetConfig().GetDiskId()); + UseFastPath = false; UseRdmaForThisVolume = false; AcceptInvalidDiskAllocationResponse = false; @@ -237,6 +238,8 @@ void TVolumeState::Reset() UseRdmaForThisVolume = true; } else if (tag == "max-timed-out-device-state-duration") { TDuration::TryParse(value, MaxTimedOutDeviceStateDuration); + } else if (tag == "use-fastpath") { + UseFastPath = true; } } diff --git a/cloud/blockstore/libs/storage/volume/volume_state.h b/cloud/blockstore/libs/storage/volume/volume_state.h index cc399195ee8..3e3929db613 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.h +++ b/cloud/blockstore/libs/storage/volume/volume_state.h @@ -143,6 +143,7 @@ class TVolumeState bool TrackUsedBlocks = false; bool MaskUnusedBlocks = false; bool UseRdma = false; + bool UseFastPath = false; bool UseRdmaForThisVolume = false; bool RdmaUnavailable = false; TDuration MaxTimedOutDeviceStateDuration; @@ -568,6 +569,11 @@ class TVolumeState return UseRdmaForThisVolume; } + bool GetUseFastPath() const + { + return UseFastPath; + } + void SetRdmaUnavailable() { RdmaUnavailable = true; diff --git a/cloud/blockstore/libs/storage/volume/volume_ut.cpp b/cloud/blockstore/libs/storage/volume/volume_ut.cpp index 0b9ff392636..a2a14d047c9 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_ut.cpp @@ -7166,6 +7166,69 @@ Y_UNIT_TEST_SUITE(TVolumeTest) UNIT_ASSERT_VALUES_EQUAL(mapping["Unknown"], "Not found"); } + Y_UNIT_TEST(ShouldGetUseFastPathStats) + { + NProto::TStorageServiceConfig config; + auto runtime = PrepareTestActorRuntime(config); + + ui32 hasUseFastPathCounter = 0; + + runtime->SetObserverFunc( + [&](TTestActorRuntimeBase& runtime, TAutoPtr& event) + { + if (event->Recipient == MakeStorageStatsServiceId() && + event->GetTypeRewrite() == + TEvStatsService::EvVolumeSelfCounters) + { + auto* msg = + event->Get(); + + hasUseFastPathCounter = + msg->VolumeSelfCounters->Simple.UseFastPath.Value; + } + + return TTestActorRuntime::DefaultObserverFunc(runtime, event); + }); + + TVolumeClient volume(*runtime); + + int version = 0; + + auto updateConfig = [&](auto tags) + { + volume.UpdateVolumeConfig( + 0, // maxBandwidth + 0, // maxIops + 0, // burstPercentage + 0, // maxPostponedWeight + false, // throttlingEnabled + ++version, + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + 1024, // block count per partition + "vol0", // diskId + "cloud", // cloudId + "folder", // folderId + 1, // partition count + 0, // blocksPerStripe + tags); + volume.WaitReady(); + }; + + auto checkUseFastPath = [&](auto expectedVal) + { + volume.SendToPipe( + std::make_unique()); + runtime->DispatchEvents({}, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(expectedVal, hasUseFastPathCounter); + }; + + updateConfig(""); + checkUseFastPath(0); + + updateConfig("use-fastpath"); + checkUseFastPath(1); + } + Y_UNIT_TEST(ShouldDisableByFlag) { NProto::TStorageServiceConfig config; diff --git a/cloud/filestore/config/filesystem.proto b/cloud/filestore/config/filesystem.proto index 036bd2f5eab..23e684bf48e 100644 --- a/cloud/filestore/config/filesystem.proto +++ b/cloud/filestore/config/filesystem.proto @@ -28,4 +28,8 @@ message TFileSystemConfig // Filesystem max buffer size per request. optional uint32 MaxBufferSize = 8; + + // Inode entry timeout for negative responses (responses with errors). + // The most notable one is ENOENT for getattr. + optional uint32 NegativeEntryTimeout = 9; } diff --git a/cloud/filestore/config/storage.proto b/cloud/filestore/config/storage.proto index 5e75b2b8cd9..4866a29c5f7 100644 --- a/cloud/filestore/config/storage.proto +++ b/cloud/filestore/config/storage.proto @@ -210,4 +210,10 @@ message TStorageConfig // number of backpressure errors. Needed to automatically recover after // various races that may happen during index tablet startup due to bugs. optional uint32 MaxBackpressureErrorsBeforeSuicide = 334; + + // Params that are passed to filestore-vhost via TCreateSessionResponse via + // TFilestore::Features. They do not have any effect on the tablet itself. + optional uint32 EntryTimeout = 335; + optional uint32 NegativeEntryTimeout = 336; + optional uint32 AttrTimeout = 337; } diff --git a/cloud/filestore/libs/diagnostics/CMakeLists.darwin-x86_64.txt b/cloud/filestore/libs/diagnostics/CMakeLists.darwin-x86_64.txt index 3dca19fb125..965d23e7229 100644 --- a/cloud/filestore/libs/diagnostics/CMakeLists.darwin-x86_64.txt +++ b/cloud/filestore/libs/diagnostics/CMakeLists.darwin-x86_64.txt @@ -18,6 +18,8 @@ target_link_libraries(filestore-libs-diagnostics PUBLIC filestore-libs-diagnostics-events filestore-libs-service storage-tablet-protos + filestore-public-api-protos + filestore-private-api-protos core-libs-diagnostics core-libs-user_stats core-libs-version diff --git a/cloud/filestore/libs/diagnostics/CMakeLists.linux-aarch64.txt b/cloud/filestore/libs/diagnostics/CMakeLists.linux-aarch64.txt index cc6810175d2..991bed32411 100644 --- a/cloud/filestore/libs/diagnostics/CMakeLists.linux-aarch64.txt +++ b/cloud/filestore/libs/diagnostics/CMakeLists.linux-aarch64.txt @@ -19,6 +19,8 @@ target_link_libraries(filestore-libs-diagnostics PUBLIC filestore-libs-diagnostics-events filestore-libs-service storage-tablet-protos + filestore-public-api-protos + filestore-private-api-protos core-libs-diagnostics core-libs-user_stats core-libs-version diff --git a/cloud/filestore/libs/diagnostics/CMakeLists.linux-x86_64.txt b/cloud/filestore/libs/diagnostics/CMakeLists.linux-x86_64.txt index cc6810175d2..991bed32411 100644 --- a/cloud/filestore/libs/diagnostics/CMakeLists.linux-x86_64.txt +++ b/cloud/filestore/libs/diagnostics/CMakeLists.linux-x86_64.txt @@ -19,6 +19,8 @@ target_link_libraries(filestore-libs-diagnostics PUBLIC filestore-libs-diagnostics-events filestore-libs-service storage-tablet-protos + filestore-public-api-protos + filestore-private-api-protos core-libs-diagnostics core-libs-user_stats core-libs-version diff --git a/cloud/filestore/libs/diagnostics/CMakeLists.windows-x86_64.txt b/cloud/filestore/libs/diagnostics/CMakeLists.windows-x86_64.txt index 3dca19fb125..965d23e7229 100644 --- a/cloud/filestore/libs/diagnostics/CMakeLists.windows-x86_64.txt +++ b/cloud/filestore/libs/diagnostics/CMakeLists.windows-x86_64.txt @@ -18,6 +18,8 @@ target_link_libraries(filestore-libs-diagnostics PUBLIC filestore-libs-diagnostics-events filestore-libs-service storage-tablet-protos + filestore-public-api-protos + filestore-private-api-protos core-libs-diagnostics core-libs-user_stats core-libs-version diff --git a/cloud/filestore/libs/diagnostics/critical_events.h b/cloud/filestore/libs/diagnostics/critical_events.h index d33fc1c7521..381d79fe017 100644 --- a/cloud/filestore/libs/diagnostics/critical_events.h +++ b/cloud/filestore/libs/diagnostics/critical_events.h @@ -13,6 +13,9 @@ namespace NCloud::NFileStore{ xxx(TabletBSFailure) \ xxx(TabletCommitIdOverflow) \ xxx(VfsQueueRunningError) \ + xxx(MissingSessionId) \ + xxx(CreateSessionError) \ + xxx(DescribeFileStoreError) \ // FILESTORE_CRITICAL_EVENTS //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/filestore/libs/diagnostics/profile_log_events.cpp b/cloud/filestore/libs/diagnostics/profile_log_events.cpp index eff81242d61..22f290774be 100644 --- a/cloud/filestore/libs/diagnostics/profile_log_events.cpp +++ b/cloud/filestore/libs/diagnostics/profile_log_events.cpp @@ -3,6 +3,7 @@ #include "profile_log.h" #include +#include #include #include #include @@ -178,6 +179,18 @@ void InitProfileLogRequestInfo( rangeInfo->SetBytes(request.GetLength()); } +template <> +void InitProfileLogRequestInfo( + NProto::TProfileLogRequestInfo& profileLogRequest, + const NProtoPrivate::TDescribeDataRequest& request) +{ + auto* rangeInfo = profileLogRequest.AddRanges(); + rangeInfo->SetNodeId(request.GetNodeId()); + rangeInfo->SetHandle(request.GetHandle()); + rangeInfo->SetOffset(request.GetOffset()); + rangeInfo->SetBytes(request.GetLength()); +} + template <> void InitProfileLogRequestInfo( NProto::TProfileLogRequestInfo& profileLogRequest, @@ -383,57 +396,58 @@ void InitProfileLogRequestInfo( //////////////////////////////////////////////////////////////////////////////// -#define IMPLEMENT_DEFAULT_METHOD(name) \ +#define IMPLEMENT_DEFAULT_METHOD(name, ns) \ template <> \ void FinalizeProfileLogRequestInfo( \ NProto::TProfileLogRequestInfo& profileLogRequest, \ - const NProto::T##name##Response& response) \ + const ns::T##name##Response& response) \ { \ Y_UNUSED(profileLogRequest, response); \ } \ // IMPLEMENT_DEFAULT_METHOD - IMPLEMENT_DEFAULT_METHOD(Ping) - IMPLEMENT_DEFAULT_METHOD(CreateFileStore) - IMPLEMENT_DEFAULT_METHOD(DestroyFileStore) - IMPLEMENT_DEFAULT_METHOD(AlterFileStore) - IMPLEMENT_DEFAULT_METHOD(ResizeFileStore) - IMPLEMENT_DEFAULT_METHOD(DescribeFileStoreModel) - IMPLEMENT_DEFAULT_METHOD(GetFileStoreInfo) - IMPLEMENT_DEFAULT_METHOD(ListFileStores) - IMPLEMENT_DEFAULT_METHOD(CreateSession) - IMPLEMENT_DEFAULT_METHOD(DestroySession) - IMPLEMENT_DEFAULT_METHOD(PingSession) - IMPLEMENT_DEFAULT_METHOD(AddClusterNode) - IMPLEMENT_DEFAULT_METHOD(RemoveClusterNode) - IMPLEMENT_DEFAULT_METHOD(ListClusterNodes) - IMPLEMENT_DEFAULT_METHOD(AddClusterClients) - IMPLEMENT_DEFAULT_METHOD(RemoveClusterClients) - IMPLEMENT_DEFAULT_METHOD(ListClusterClients) - IMPLEMENT_DEFAULT_METHOD(UpdateCluster) - IMPLEMENT_DEFAULT_METHOD(StatFileStore) - IMPLEMENT_DEFAULT_METHOD(SubscribeSession) - IMPLEMENT_DEFAULT_METHOD(GetSessionEvents) - IMPLEMENT_DEFAULT_METHOD(ResetSession) - IMPLEMENT_DEFAULT_METHOD(CreateCheckpoint) - IMPLEMENT_DEFAULT_METHOD(DestroyCheckpoint) - IMPLEMENT_DEFAULT_METHOD(ResolvePath) - IMPLEMENT_DEFAULT_METHOD(UnlinkNode) - IMPLEMENT_DEFAULT_METHOD(RenameNode) - IMPLEMENT_DEFAULT_METHOD(AccessNode) - IMPLEMENT_DEFAULT_METHOD(ReadLink) - IMPLEMENT_DEFAULT_METHOD(RemoveNodeXAttr) - IMPLEMENT_DEFAULT_METHOD(DestroyHandle) - IMPLEMENT_DEFAULT_METHOD(AcquireLock) - IMPLEMENT_DEFAULT_METHOD(ReleaseLock) - IMPLEMENT_DEFAULT_METHOD(ReadData) - IMPLEMENT_DEFAULT_METHOD(WriteData) - IMPLEMENT_DEFAULT_METHOD(AllocateData) - IMPLEMENT_DEFAULT_METHOD(StartEndpoint) - IMPLEMENT_DEFAULT_METHOD(StopEndpoint) - IMPLEMENT_DEFAULT_METHOD(ListEndpoints) - IMPLEMENT_DEFAULT_METHOD(KickEndpoint) - IMPLEMENT_DEFAULT_METHOD(ExecuteAction) + IMPLEMENT_DEFAULT_METHOD(Ping, NProto) + IMPLEMENT_DEFAULT_METHOD(CreateFileStore, NProto) + IMPLEMENT_DEFAULT_METHOD(DestroyFileStore, NProto) + IMPLEMENT_DEFAULT_METHOD(AlterFileStore, NProto) + IMPLEMENT_DEFAULT_METHOD(ResizeFileStore, NProto) + IMPLEMENT_DEFAULT_METHOD(DescribeFileStoreModel, NProto) + IMPLEMENT_DEFAULT_METHOD(GetFileStoreInfo, NProto) + IMPLEMENT_DEFAULT_METHOD(ListFileStores, NProto) + IMPLEMENT_DEFAULT_METHOD(CreateSession, NProto) + IMPLEMENT_DEFAULT_METHOD(DestroySession, NProto) + IMPLEMENT_DEFAULT_METHOD(PingSession, NProto) + IMPLEMENT_DEFAULT_METHOD(AddClusterNode, NProto) + IMPLEMENT_DEFAULT_METHOD(RemoveClusterNode, NProto) + IMPLEMENT_DEFAULT_METHOD(ListClusterNodes, NProto) + IMPLEMENT_DEFAULT_METHOD(AddClusterClients, NProto) + IMPLEMENT_DEFAULT_METHOD(RemoveClusterClients, NProto) + IMPLEMENT_DEFAULT_METHOD(ListClusterClients, NProto) + IMPLEMENT_DEFAULT_METHOD(UpdateCluster, NProto) + IMPLEMENT_DEFAULT_METHOD(StatFileStore, NProto) + IMPLEMENT_DEFAULT_METHOD(SubscribeSession, NProto) + IMPLEMENT_DEFAULT_METHOD(GetSessionEvents, NProto) + IMPLEMENT_DEFAULT_METHOD(ResetSession, NProto) + IMPLEMENT_DEFAULT_METHOD(CreateCheckpoint, NProto) + IMPLEMENT_DEFAULT_METHOD(DestroyCheckpoint, NProto) + IMPLEMENT_DEFAULT_METHOD(ResolvePath, NProto) + IMPLEMENT_DEFAULT_METHOD(UnlinkNode, NProto) + IMPLEMENT_DEFAULT_METHOD(RenameNode, NProto) + IMPLEMENT_DEFAULT_METHOD(AccessNode, NProto) + IMPLEMENT_DEFAULT_METHOD(ReadLink, NProto) + IMPLEMENT_DEFAULT_METHOD(RemoveNodeXAttr, NProto) + IMPLEMENT_DEFAULT_METHOD(DestroyHandle, NProto) + IMPLEMENT_DEFAULT_METHOD(AcquireLock, NProto) + IMPLEMENT_DEFAULT_METHOD(ReleaseLock, NProto) + IMPLEMENT_DEFAULT_METHOD(ReadData, NProto) + IMPLEMENT_DEFAULT_METHOD(WriteData, NProto) + IMPLEMENT_DEFAULT_METHOD(AllocateData, NProto) + IMPLEMENT_DEFAULT_METHOD(StartEndpoint, NProto) + IMPLEMENT_DEFAULT_METHOD(StopEndpoint, NProto) + IMPLEMENT_DEFAULT_METHOD(ListEndpoints, NProto) + IMPLEMENT_DEFAULT_METHOD(KickEndpoint, NProto) + IMPLEMENT_DEFAULT_METHOD(ExecuteAction, NProto) + IMPLEMENT_DEFAULT_METHOD(DescribeData, NProtoPrivate) #undef IMPLEMENT_DEFAULT_METHOD diff --git a/cloud/filestore/libs/diagnostics/ya.make b/cloud/filestore/libs/diagnostics/ya.make index c41a9062a23..2062abe0d0b 100644 --- a/cloud/filestore/libs/diagnostics/ya.make +++ b/cloud/filestore/libs/diagnostics/ya.make @@ -18,6 +18,8 @@ PEERDIR( cloud/filestore/libs/service # FIXME use public api protos cloud/filestore/libs/storage/tablet/protos + cloud/filestore/private/api/protos + cloud/filestore/public/api/protos cloud/storage/core/libs/diagnostics cloud/storage/core/libs/user_stats diff --git a/cloud/filestore/libs/service/auth_scheme.cpp b/cloud/filestore/libs/service/auth_scheme.cpp index 77007e7855f..ecd500a7432 100644 --- a/cloud/filestore/libs/service/auth_scheme.cpp +++ b/cloud/filestore/libs/service/auth_scheme.cpp @@ -40,6 +40,7 @@ TPermissionList GetRequestPermissions(EFileStoreRequest requestType) case EFileStoreRequest::AcquireLock: case EFileStoreRequest::ReleaseLock: case EFileStoreRequest::TestLock: + case EFileStoreRequest::DescribeData: return CreatePermissionList({}); case EFileStoreRequest::AddClusterNode: diff --git a/cloud/filestore/libs/service/request.cpp b/cloud/filestore/libs/service/request.cpp index 3d5bfe50612..090d000636e 100644 --- a/cloud/filestore/libs/service/request.cpp +++ b/cloud/filestore/libs/service/request.cpp @@ -115,8 +115,13 @@ ui64 CreateRequestId() static const TString RequestNames[] = { FILESTORE_REQUESTS(FILESTORE_DECLARE_REQUEST) + "DescribeData", }; +static_assert( + sizeof(RequestNames) / sizeof(RequestNames[0]) == FileStoreRequestCount, + "RequestNames size mismatch"); + #undef FILESTORE_DECLARE_REQUEST const TString& GetFileStoreRequestName(EFileStoreRequest requestType) diff --git a/cloud/filestore/libs/service/request.h b/cloud/filestore/libs/service/request.h index 9ead2735cf2..4d5784e8763 100644 --- a/cloud/filestore/libs/service/request.h +++ b/cloud/filestore/libs/service/request.h @@ -127,6 +127,7 @@ namespace NCloud::NFileStore { enum class EFileStoreRequest { FILESTORE_REQUESTS(FILESTORE_DECLARE_REQUEST) + DescribeData, MAX }; diff --git a/cloud/filestore/libs/storage/core/config.cpp b/cloud/filestore/libs/storage/core/config.cpp index 76ae8f72f83..eaf9b22961c 100644 --- a/cloud/filestore/libs/storage/core/config.cpp +++ b/cloud/filestore/libs/storage/core/config.cpp @@ -20,7 +20,7 @@ namespace { xxx(PipeClientMaxRetryTime, TDuration, TDuration::Seconds(4) )\ \ xxx(EstablishSessionTimeout, TDuration, TDuration::Seconds(30) )\ - xxx(IdleSessionTimeout, TDuration, TDuration::Seconds(30) )\ + xxx(IdleSessionTimeout, TDuration, TDuration::Minutes(5) )\ \ xxx(WriteBatchEnabled, bool, false )\ xxx(WriteBatchTimeout, TDuration, TDuration::MilliSeconds(0) )\ @@ -33,7 +33,7 @@ namespace { xxx(CollectGarbageThreshold, ui32, 4_MB )\ xxx(FlushBytesThreshold, ui32, 4_MB )\ xxx(MaxDeleteGarbageBlobsPerTx, ui32, 16384 )\ - xxx(LoadedCompactionRangesPerTx, ui32, 1048576 )\ + xxx(LoadedCompactionRangesPerTx, ui32, 10 * 1024 * 1024 )\ xxx(MaxBlocksPerTruncateTx, ui32, 0 /*TODO: 8388608 32gb/4kb*/)\ xxx(MaxTruncateTxInflight, ui32, 10 )\ xxx(CompactionRetryTimeout, TDuration, TDuration::Seconds(1) )\ @@ -133,6 +133,9 @@ namespace { NCloud::NProto::AUTHORIZATION_IGNORE )\ \ xxx(TwoStageReadEnabled, bool, false )\ + xxx(EntryTimeout, TDuration, TDuration::Zero() )\ + xxx(NegativeEntryTimeout, TDuration, TDuration::Zero() )\ + xxx(AttrTimeout, TDuration, TDuration::Zero() )\ xxx(MaxOutOfOrderCompactionMapLoadRequestsInQueue, ui32, 5 )\ xxx(MaxBackpressureErrorsBeforeSuicide, ui32, 1000 )\ // FILESTORE_STORAGE_CONFIG diff --git a/cloud/filestore/libs/storage/core/config.h b/cloud/filestore/libs/storage/core/config.h index 8800033f787..4b262a05b7d 100644 --- a/cloud/filestore/libs/storage/core/config.h +++ b/cloud/filestore/libs/storage/core/config.h @@ -175,6 +175,10 @@ class TStorageConfig NCloud::NProto::EAuthorizationMode GetAuthorizationMode() const; bool GetTwoStageReadEnabled() const; + TDuration GetEntryTimeout() const; + TDuration GetNegativeEntryTimeout() const; + TDuration GetAttrTimeout() const; + ui32 GetMaxOutOfOrderCompactionMapLoadRequestsInQueue() const; bool GetConfigsDispatcherServiceEnabled() const; diff --git a/cloud/filestore/libs/storage/core/public.h b/cloud/filestore/libs/storage/core/public.h index eaa01b356d0..adbdbf63c80 100644 --- a/cloud/filestore/libs/storage/core/public.h +++ b/cloud/filestore/libs/storage/core/public.h @@ -16,6 +16,6 @@ class TStorageConfig; using TStorageConfigPtr = std::shared_ptr; struct TRequestInfo; -using TRequestInfoPtr = std::shared_ptr; +using TRequestInfoPtr = TIntrusivePtr; } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/core/request_info.h b/cloud/filestore/libs/storage/core/request_info.h index 848a4ee52e0..6df414a608c 100644 --- a/cloud/filestore/libs/storage/core/request_info.h +++ b/cloud/filestore/libs/storage/core/request_info.h @@ -18,6 +18,8 @@ namespace NCloud::NFileStore::NStorage { //////////////////////////////////////////////////////////////////////////////// struct TRequestInfo + : public TAtomicRefCount + , public TIntrusiveListItem { using TCancelRoutine = void( const NActors::TActorContext& ctx, @@ -46,14 +48,6 @@ struct TRequestInfo , CallContext(std::move(callContext)) {} - TRequestInfo(TRequestInfo&& other) = default; - - void CancelRequest(const NActors::TActorContext& ctx) - { - Y_ABORT_UNLESS(CancelRoutine); - CancelRoutine(ctx, *this); - } - void AddExecCycles(ui64 cycles) { AtomicAdd(ExecCycles, cycles); @@ -105,7 +99,7 @@ inline TRequestInfoPtr CreateRequestInfo( ui64 cookie, TCallContextPtr callContext) { - return std::make_shared( + return MakeIntrusive( sender, cookie, std::move(callContext)); @@ -117,7 +111,7 @@ TRequestInfoPtr CreateRequestInfo( ui64 cookie, TCallContextPtr callContext) { - auto requestInfo = std::make_shared( + auto requestInfo = MakeIntrusive( sender, cookie, std::move(callContext)); diff --git a/cloud/filestore/libs/storage/service/service_actor.cpp b/cloud/filestore/libs/storage/service/service_actor.cpp index 1aecb18dc16..b8a0bc4c2f2 100644 --- a/cloud/filestore/libs/storage/service/service_actor.cpp +++ b/cloud/filestore/libs/storage/service/service_actor.cpp @@ -75,7 +75,7 @@ void TStorageServiceActor::ScheduleUpdateStats(const NActors::TActorContext& ctx } std::pair TStorageServiceActor::CreateInFlightRequest( - TRequestInfo&& info, + const TRequestInfo& info, NProto::EStorageMediaKind media, IRequestStatsPtr requestStats, TInstant start) @@ -85,7 +85,7 @@ std::pair TStorageServiceActor::CreateInFlightRequest( std::piecewise_construct, std::forward_as_tuple(cookie), std::forward_as_tuple( - std::move(info), + info, ProfileLog, media, requestStats)); diff --git a/cloud/filestore/libs/storage/service/service_actor.h b/cloud/filestore/libs/storage/service/service_actor.h index 3bb9b450363..8e055d39df5 100644 --- a/cloud/filestore/libs/storage/service/service_actor.h +++ b/cloud/filestore/libs/storage/service/service_actor.h @@ -116,7 +116,7 @@ class TStorageServiceActor final const NActors::TActorContext& ctx); std::pair CreateInFlightRequest( - TRequestInfo&& info, + const TRequestInfo& info, NProto::EStorageMediaKind media, IRequestStatsPtr requestStats, TInstant currentTs); diff --git a/cloud/filestore/libs/storage/service/service_actor_createsession.cpp b/cloud/filestore/libs/storage/service/service_actor_createsession.cpp index 1812abf9856..eaccb115a2d 100644 --- a/cloud/filestore/libs/storage/service/service_actor_createsession.cpp +++ b/cloud/filestore/libs/storage/service/service_actor_createsession.cpp @@ -2,6 +2,7 @@ #include "helpers.h" +#include #include #include #include @@ -67,7 +68,6 @@ class TCreateSessionActor final google::protobuf::RepeatedPtrField StoredEvents; TActorId EventListener; - bool Shutdown = false; bool FirstWakeupScheduled = false; TActorId Owner; @@ -91,6 +91,7 @@ class TCreateSessionActor final private: STFUNC(StateResolve); STFUNC(StateWork); + STFUNC(StateShutdown); void DescribeFileStore(const TActorContext& ctx); void HandleDescribeFileStoreResponse( @@ -116,6 +117,10 @@ class TCreateSessionActor final const TEvServicePrivate::TEvCreateSession::TPtr& ev, const TActorContext& ctx); + void RejectCreateSession( + const TEvServicePrivate::TEvCreateSession::TPtr& ev, + const TActorContext& ctx); + void HandlePingSession( const TEvServicePrivate::TEvPingSession::TPtr& ev, const TActorContext& ctx); @@ -134,17 +139,16 @@ class TCreateSessionActor final const TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); + void OnDisconnect(const TActorContext& ctx); + void HandlePoisonPill( const TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); - void ReplyAndDie( - const TActorContext& ctx, - const NProto::TError& error); - void Notify( const TActorContext& ctx, - const NProto::TError& error); + const NProto::TError& error, + bool shutdown); void CancelPendingRequests( const TActorContext& ctx, @@ -213,7 +217,12 @@ void TCreateSessionActor::HandleDescribeFileStoreResponse( const auto* msg = ev->Get(); if (FAILED(msg->GetStatus())) { - return ReplyAndDie(ctx, msg->GetError()); + ReportDescribeFileStoreError(); + + Notify(ctx, msg->GetError(), false); + Die(ctx); + + return; } const auto& pathDescr = msg->PathDescription; @@ -256,14 +265,7 @@ void TCreateSessionActor::HandleConnect( TabletId, NKikimrProto::EReplyStatus_Name(msg->Status).data()); - NTabletPipe::CloseClient(ctx, PipeClient); - PipeClient = {}; - - if (!FirstWakeupScheduled) { - // Wakeup cycle is inactive => reconnect won't be initiated - // if we don't initiate it here - CreatePipe(ctx); - } + OnDisconnect(ctx); return; } @@ -277,10 +279,6 @@ void TCreateSessionActor::HandleConnect( LastPing = ctx.Now(); CreateSession(ctx); - if (!FirstWakeupScheduled) { - ScheduleWakeup(ctx); - FirstWakeupScheduled = true; - } } void TCreateSessionActor::HandleDisconnect( @@ -292,43 +290,35 @@ void TCreateSessionActor::HandleDisconnect( LogTag().c_str(), TabletId); + OnDisconnect(ctx); +} + +void TCreateSessionActor::OnDisconnect(const TActorContext& ctx) +{ NTabletPipe::CloseClient(ctx, PipeClient); PipeClient = {}; + + if (!FirstWakeupScheduled) { + // Wakeup cycle is inactive => reconnect won't be initiated + // if we don't initiate it here + CreatePipe(ctx); + } } void TCreateSessionActor::HandleCreateSession( const TEvServicePrivate::TEvCreateSession::TPtr& ev, const TActorContext& ctx) { + auto* msg = ev->Get(); + LOG_INFO(ctx, TFileStoreComponents::SERVICE_WORKER, "%s got create session: seqno %lu ro %u", LogTag().c_str(), - ev->Get()->SessionSeqNo, - ev->Get()->ReadOnly); + msg->SessionSeqNo, + msg->ReadOnly); - auto* msg = ev->Get(); LastPing = ctx.Now(); - if (Shutdown) { - auto error = MakeError(E_REJECTED, "TCreateSessionActor: shutting down"); - - LOG_INFO(ctx, TFileStoreComponents::SERVICE_WORKER, - "%s reject create session - TCreateSessionActor is shutting down %lu (%s)", - LogTag().c_str(), - msg->SessionSeqNo, - FormatError(error).c_str()); - - auto response = std::make_unique(error); - response->ClientId = msg->ClientId; - response->SessionId = msg->SessionId; - response->SessionSeqNo = msg->SessionSeqNo; - response->ReadOnly = msg->ReadOnly; - response->RequestInfo = std::move(msg->RequestInfo); - - NCloud::Send(ctx, MakeStorageServiceId(), std::move(response)); - return; - } - ClientId = msg->ClientId; FileSystemId = msg->FileSystemId; SessionId = msg->SessionId; @@ -341,6 +331,30 @@ void TCreateSessionActor::HandleCreateSession( CreateSession(ctx); } +void TCreateSessionActor::RejectCreateSession( + const TEvServicePrivate::TEvCreateSession::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + auto error = MakeError(E_REJECTED, "TCreateSessionActor: shutting down"); + + LOG_INFO(ctx, TFileStoreComponents::SERVICE_WORKER, + "%s reject create session: seqno %lu error %s", + LogTag().c_str(), + msg->SessionSeqNo, + FormatError(error).c_str()); + + auto response = std::make_unique(error); + response->ClientId = msg->ClientId; + response->SessionId = msg->SessionId; + response->SessionSeqNo = msg->SessionSeqNo; + response->ReadOnly = msg->ReadOnly; + response->RequestInfo = std::move(msg->RequestInfo); + + NCloud::Send(ctx, MakeStorageServiceId(), std::move(response)); +} + void TCreateSessionActor::CreateSession(const TActorContext& ctx) { auto request = std::make_unique(); @@ -372,11 +386,21 @@ void TCreateSessionActor::HandleCreateSessionResponse( const auto& sessionId = msg->Record.GetSessionId(); if (FAILED(msg->GetStatus())) { - return Notify(ctx, msg->GetError()); - } else if (!sessionId) { - auto error = MakeError(E_FAIL, "empty session id"); - return Notify(ctx, std::move(error)); - } else if (sessionId != SessionId) { + ReportCreateSessionError(); + + return Notify(ctx, msg->GetError(), false); + } + + if (!sessionId) { + ReportMissingSessionId(); + + return Notify( + ctx, + MakeError(E_FAIL, "empty session id"), + false); + } + + if (sessionId != SessionId) { LOG_INFO(ctx, TFileStoreComponents::SERVICE_WORKER, "%s restored session id: actual id %s, state(%lu)", LogTag().c_str(), @@ -395,7 +419,12 @@ void TCreateSessionActor::HandleCreateSessionResponse( SessionState = msg->Record.GetSessionState(); FileStore = msg->Record.GetFileStore(); - Notify(ctx, {}); + Notify(ctx, {}, false); + + if (!FirstWakeupScheduled) { + ScheduleWakeup(ctx); + FirstWakeupScheduled = true; + } } void TCreateSessionActor::HandlePingSession( @@ -483,8 +512,8 @@ void TCreateSessionActor::HandleWakeup( "%s closing idle session, last ping at %s", LogTag().c_str(), LastPing.ToStringUpToSeconds().c_str()); - Shutdown = true; - return Notify(ctx, MakeError(E_TIMEOUT, "closed idle session")); + Become(&TThis::StateShutdown); + return Notify(ctx, MakeError(E_TIMEOUT, "closed idle session"), true); } auto connectTimeout = Config->GetEstablishSessionTimeout(); @@ -493,8 +522,8 @@ void TCreateSessionActor::HandleWakeup( "%s create timeouted, couldn't connect from %s", LogTag().c_str(), LastPipeResetTime.ToStringUpToSeconds().c_str()); - Shutdown = true; - return Notify(ctx, MakeError(E_TIMEOUT, "failed to connect to fs")); + Become(&TThis::StateShutdown); + return Notify(ctx, MakeError(E_TIMEOUT, "failed to connect to fs"), true); } if (!PipeClient) { @@ -517,24 +546,17 @@ void TCreateSessionActor::HandlePoisonPill( ToString(SelfId()).c_str()); if (ev->Sender != Owner) { - Shutdown = true; - Notify(ctx, MakeError(E_REJECTED, "request cancelled")); + Become(&TThis::StateShutdown); + Notify(ctx, MakeError(E_REJECTED, "request cancelled"), true); } else { Die(ctx); } } -void TCreateSessionActor::ReplyAndDie( - const TActorContext& ctx, - const NProto::TError& error) -{ - Notify(ctx, error); - Die(ctx); -} - void TCreateSessionActor::Notify( const TActorContext& ctx, - const NProto::TError& error) + const NProto::TError& error, + bool shutdown) { Y_ABORT_UNLESS(SessionId); LOG_INFO(ctx, TFileStoreComponents::SERVICE_WORKER, @@ -552,9 +574,7 @@ void TCreateSessionActor::Notify( response->TabletId = TabletId; response->FileStore = FileStore; response->RequestInfo = std::move(RequestInfo); - if (Shutdown) { - response->Shutdown = true; - } + response->Shutdown = shutdown; NCloud::Send(ctx, MakeStorageServiceId(), std::move(response)); } @@ -603,6 +623,26 @@ STFUNC(TCreateSessionActor::StateWork) } } +STFUNC(TCreateSessionActor::StateShutdown) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvServicePrivate::TEvCreateSession, RejectCreateSession); + + IgnoreFunc(TEvents::TEvWakeup); + IgnoreFunc(TEvTabletPipe::TEvClientConnected); + IgnoreFunc(TEvTabletPipe::TEvClientDestroyed); + IgnoreFunc(TEvIndexTablet::TEvCreateSessionResponse); + IgnoreFunc(TEvServicePrivate::TEvPingSession); + IgnoreFunc(TEvService::TEvGetSessionEventsRequest); + IgnoreFunc(TEvService::TEvGetSessionEventsResponse); + + default: + HandleUnexpectedEvent(ev, TFileStoreComponents::SERVICE_WORKER); + break; + } +} + } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -634,7 +674,9 @@ void TStorageServiceActor::HandleCreateSession( auto reply = [&] (auto* session, auto error) { auto response = std::make_unique(error); - session->GetInfo(*response->Record.MutableSession(), GetSessionSeqNo(msg->Record)); + session->GetInfo( + *response->Record.MutableSession(), + GetSessionSeqNo(msg->Record)); inflight->Complete(ctx.Now(), std::move(error)); NCloud::Reply(ctx, *ev, std::move(response)); }; @@ -663,7 +705,10 @@ void TStorageServiceActor::HandleCreateSession( NCloud::Send(ctx, std::move(actorId), std::move(request)); }; - if (auto* session = State->FindSession(sessionId, GetSessionSeqNo(msg->Record))) { + auto* session = + State->FindSession(sessionId, GetSessionSeqNo(msg->Record)); + + if (session) { if (session->ClientId != clientId) { auto error = MakeError( E_FS_INVALID_SESSION, @@ -674,7 +719,9 @@ void TStorageServiceActor::HandleCreateSession( return; } - if (session->CreateDestroyState != ESessionCreateDestroyState::STATE_NONE) { + if (session->CreateDestroyState + != ESessionCreateDestroyState::STATE_NONE) + { auto error = MakeError( E_REJECTED, "Another create or destroy request is in progress"); @@ -682,7 +729,8 @@ void TStorageServiceActor::HandleCreateSession( return; } - session->CreateDestroyState = ESessionCreateDestroyState::STATE_CREATE_SESSION; + session->CreateDestroyState = + ESessionCreateDestroyState::STATE_CREATE_SESSION; if (session->SessionActor) { return proceed(session->SessionActor); @@ -847,6 +895,10 @@ void TStorageServiceActor::HandleSessionCreated( msg->SessionId, msg->SessionSeqNo).c_str(), FormatError(msg->GetError()).c_str()); + + if (msg->Shutdown) { + ctx.Send(ev->Sender, new TEvents::TEvPoisonPill()); + } } if (session) { @@ -867,9 +919,12 @@ void TStorageServiceActor::HandleSessionCreated( return; } - auto response = std::make_unique(msg->GetError()); + auto response = std::make_unique( + msg->GetError()); if (session) { - session->GetInfo(*response->Record.MutableSession(), msg->SessionSeqNo); + session->GetInfo( + *response->Record.MutableSession(), + msg->SessionSeqNo); response->Record.MutableFileStore()->CopyFrom(msg->FileStore); } diff --git a/cloud/filestore/libs/storage/service/service_actor_readdata.cpp b/cloud/filestore/libs/storage/service/service_actor_readdata.cpp index 24e944e184f..ac3026aada4 100644 --- a/cloud/filestore/libs/storage/service/service_actor_readdata.cpp +++ b/cloud/filestore/libs/storage/service/service_actor_readdata.cpp @@ -40,12 +40,21 @@ class TReadDataActor final: public TActorBootstrapped ui32 RemainingBlobsToRead = 0; bool ReadDataFallbackEnabled = false; + // Stats for reporting + IRequestStatsPtr RequestStats; + IProfileLogPtr ProfileLog; + TMaybe InFlightRequest; + const NCloud::NProto::EStorageMediaKind MediaKind; + public: TReadDataActor( TRequestInfoPtr requestInfo, NProto::TReadDataRequest readRequest, TString logTag, - ui32 blockSize); + ui32 blockSize, + IRequestStatsPtr requestStats, + IProfileLogPtr profileLog, + NCloud::NProto::EStorageMediaKind mediaKind); void Bootstrap(const TActorContext& ctx); @@ -84,7 +93,10 @@ TReadDataActor::TReadDataActor( TRequestInfoPtr requestInfo, NProto::TReadDataRequest readRequest, TString logTag, - ui32 blockSize) + ui32 blockSize, + IRequestStatsPtr requestStats, + IProfileLogPtr profileLog, + NCloud::NProto::EStorageMediaKind mediaKind) : RequestInfo(std::move(requestInfo)) , ReadRequest(std::move(readRequest)) , LogTag(std::move(logTag)) @@ -95,6 +107,9 @@ TReadDataActor::TReadDataActor( BlockSize) , AlignedByteRange(OriginByteRange.AlignedSuperRange()) , BlockBuffer(CreateBlockBuffer(AlignedByteRange)) + , RequestStats(std::move(requestStats)) + , ProfileLog(std::move(profileLog)) + , MediaKind(mediaKind) { } @@ -124,6 +139,22 @@ void TReadDataActor::DescribeData(const TActorContext& ctx) request->Record.SetOffset(ReadRequest.GetOffset()); request->Record.SetLength(ReadRequest.GetLength()); + // RequestType is set in order to properly record the request type + RequestInfo->CallContext->RequestType = EFileStoreRequest::DescribeData; + InFlightRequest.ConstructInPlace( + TRequestInfo( + RequestInfo->Sender, + RequestInfo->Cookie, + RequestInfo->CallContext), + ProfileLog, + MediaKind, + RequestStats); + + InFlightRequest->Start(ctx.Now()); + InitProfileLogRequestInfo( + InFlightRequest->ProfileLogRequest, + request->Record); + // forward request through tablet proxy ctx.Send(MakeIndexTabletProxyServiceId(), request.release()); } @@ -223,6 +254,16 @@ void TReadDataActor::HandleDescribeDataResponse( { const auto* msg = ev->Get(); + Y_ABORT_UNLESS(InFlightRequest); + + InFlightRequest->Complete(ctx.Now(), msg->GetError()); + FinalizeProfileLogRequestInfo( + InFlightRequest->ProfileLogRequest, + msg->Record); + // After the DescribeData response is received, we continue to consider the + // request as a ReadData request + RequestInfo->CallContext->RequestType = EFileStoreRequest::ReadData; + if (FAILED(msg->GetStatus())) { ReadData(ctx, FormatError(msg->GetError())); return; @@ -584,7 +625,10 @@ void TStorageServiceActor::HandleReadData( std::move(requestInfo), std::move(msg->Record), filestore.GetFileSystemId(), - filestore.GetBlockSize()); + filestore.GetBlockSize(), + session->RequestStats, + ProfileLog, + session->MediaKind); NCloud::Register(ctx, std::move(actor)); } diff --git a/cloud/filestore/libs/storage/service/service_state.h b/cloud/filestore/libs/storage/service/service_state.h index 1fe07531b34..f79978f70bd 100644 --- a/cloud/filestore/libs/storage/service/service_state.h +++ b/cloud/filestore/libs/storage/service/service_state.h @@ -41,11 +41,11 @@ struct TInFlightRequest public: TInFlightRequest( - TRequestInfo&& info, + const TRequestInfo& info, IProfileLogPtr profileLog, NCloud::NProto::EStorageMediaKind mediaKind, IRequestStatsPtr requestStats) - : TRequestInfo(std::move(info)) + : TRequestInfo(info.Sender, info.Cookie, info.CallContext) , MediaKind(mediaKind) , RequestStats(std::move(requestStats)) , ProfileLog(std::move(profileLog)) diff --git a/cloud/filestore/libs/storage/service/service_ut.cpp b/cloud/filestore/libs/storage/service/service_ut.cpp index 2516f8ef244..1ff093d2144 100644 --- a/cloud/filestore/libs/storage/service/service_ut.cpp +++ b/cloud/filestore/libs/storage/service/service_ut.cpp @@ -1528,6 +1528,176 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest) service.DestroySession(headers); } + Y_UNIT_TEST(ShouldProperlyProcessSlowSessionCreation) + { + NProto::TStorageConfig config; + config.SetIdleSessionTimeout(5'000); // 5s + TTestEnv env({}, config); + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + auto& runtime = env.GetRuntime(); + + // enabling scheduling for all actors + runtime.SetRegistrationObserverFunc( + [] (auto& runtime, const auto& parentId, const auto& actorId) { + Y_UNUSED(parentId); + runtime.EnableScheduleForActor(actorId); + }); + + TServiceClient service(env.GetRuntime(), nodeIdx); + service.CreateFileStore("test", 1000); + + THeaders headers = {"test", "client", "", 0}; + + // delaying session creation response + bool rescheduled = false; + ui32 createSessionResponses = 0; + TActorId createSessionActor; + + runtime.SetEventFilter( + [&] (TTestActorRuntimeBase&, TAutoPtr& event) { + switch (event->GetTypeRewrite()) { + case TEvSSProxy::EvDescribeFileStoreResponse: { + createSessionActor = event->Recipient; + + break; + } + case TEvIndexTablet::EvCreateSessionResponse: { + ++createSessionResponses; + + if (!rescheduled) { + runtime.Schedule(event, TDuration::Seconds(10), nodeIdx); + rescheduled = true; + return true; + } + + break; + } + } + + return false; + }); + + // creating session + service.SendCreateSessionRequest(headers); + auto response = service.RecvCreateSessionResponse(); + headers.SessionId = response->Record.GetSession().GetSessionId(); + // immediately pinging session to signal that it's not idle + service.PingSession(headers); + + // just checking that we observed the events that we are expecting + UNIT_ASSERT(rescheduled); + UNIT_ASSERT_VALUES_EQUAL(1, createSessionResponses); + + // can't call RebootTablet here because it resets our registration + // observer and thus disables wakeup event scheduling + auto msg = std::make_unique( + static_cast(0), + TActorId(), + TActorId()); + + runtime.Send( + new IEventHandle( + createSessionActor, + runtime.AllocateEdgeActor(nodeIdx), + msg.release(), + 0, // flags + 0), + nodeIdx); + + runtime.AdvanceCurrentTime(TDuration::Seconds(1)); + runtime.DispatchEvents({}, TDuration::MilliSeconds(100)); + + // checking that session was recreated + UNIT_ASSERT_VALUES_EQUAL(2, createSessionResponses); + + service.DestroySession(headers); + } + + Y_UNIT_TEST(UnsuccessfulSessionActorShouldStopWorking) + { + NProto::TStorageConfig config; + config.SetIdleSessionTimeout(5'000); // 5s + TTestEnv env({}, config); + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + auto& runtime = env.GetRuntime(); + + // enabling scheduling for all actors + runtime.SetRegistrationObserverFunc( + [] (auto& runtime, const auto& parentId, const auto& actorId) { + Y_UNUSED(parentId); + runtime.EnableScheduleForActor(actorId); + }); + + TServiceClient service(env.GetRuntime(), nodeIdx); + service.CreateFileStore("test", 1000); + + THeaders headers = {"test", "client", "", 0}; + + ui32 sessionCreated = 0; + bool rescheduled = false; + + runtime.SetEventFilter( + [&] (TTestActorRuntimeBase&, TAutoPtr& event) { + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvCreateSessionResponse: { + if (!rescheduled) { + auto* msg = event->Get(); + *msg->Record.MutableError() = MakeError(E_TIMEOUT, "timeout"); + + runtime.Schedule(event, TDuration::Seconds(10), nodeIdx); + rescheduled = true; + return true; + } + + break; + } + + case TEvServicePrivate::EvSessionCreated: { + ++sessionCreated; + + break; + } + } + + return false; + }); + + // creating session + service.SendCreateSessionRequest(headers); + runtime.DispatchEvents({}, TDuration::MilliSeconds(100)); + UNIT_ASSERT(rescheduled); + runtime.AdvanceCurrentTime(TDuration::Seconds(5)); + auto response = service.RecvCreateSessionResponse(); + UNIT_ASSERT_VALUES_EQUAL_C( + E_TIMEOUT, + response->GetStatus(), + response->GetErrorReason()); + + runtime.AdvanceCurrentTime(TDuration::Seconds(5)); + runtime.DispatchEvents({}, TDuration::MilliSeconds(100)); + + // we should have observed exactly 1 CreateSessionResponse + // if we observe more than 1 it means that our CreateSessionActor + // remained active after the first failure + UNIT_ASSERT_VALUES_EQUAL(1, sessionCreated); + + // this time session creation should be successful + service.SendCreateSessionRequest(headers); + response = service.RecvCreateSessionResponse(); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + response->GetStatus(), + response->GetErrorReason()); + + UNIT_ASSERT_VALUES_EQUAL(2, sessionCreated); + } + Y_UNIT_TEST(ShouldFillOriginFqdnWhenCreatingSession) { TTestEnv env; @@ -1628,11 +1798,21 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest) ->FindSubgroup("component", "service_fs") ->FindSubgroup("host", "cluster") ->FindSubgroup("filesystem", fs) - ->FindSubgroup("client", "client") - ->FindSubgroup("request", "ReadData"); - UNIT_ASSERT(counters); - - UNIT_ASSERT_EQUAL(4, counters->GetCounter("Count")->GetAtomic()); + ->FindSubgroup("client", "client"); + { + auto subgroup = counters->FindSubgroup("request", "DescribeData"); + UNIT_ASSERT(subgroup); + UNIT_ASSERT_VALUES_EQUAL( + 4, + subgroup->GetCounter("Count")->GetAtomic()); + } + { + auto subgroup = counters->FindSubgroup("request", "ReadData"); + UNIT_ASSERT(subgroup); + UNIT_ASSERT_VALUES_EQUAL( + 4, + subgroup->GetCounter("Count")->GetAtomic()); + } } Y_UNIT_TEST(ShouldFallbackToReadDataIfDescribeDataFails) diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp index b1502cc8d6c..ae021f68610 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp @@ -59,7 +59,9 @@ TIndexTabletActor::TIndexTabletActor( } TIndexTabletActor::~TIndexTabletActor() -{} +{ + ReleaseTransactions(); +} TString TIndexTabletActor::GetStateName(ui32 state) { @@ -223,10 +225,18 @@ void TIndexTabletActor::OnTabletDead( { Y_UNUSED(ev); + TerminateTransactions(ctx); + for (const auto& actor: WorkerActors) { ctx.Send(actor, new TEvents::TEvPoisonPill()); } + auto writeBatch = DequeueWriteBatch(); + for (const auto& request: writeBatch) { + TRequestInfo& requestInfo = *request.RequestInfo; + requestInfo.CancelRoutine(ctx, requestInfo); + } + WorkerActors.clear(); UnregisterFileStore(ctx); @@ -235,6 +245,49 @@ void TIndexTabletActor::OnTabletDead( //////////////////////////////////////////////////////////////////////////////// +void TIndexTabletActor::AddTransaction( + TRequestInfo& transaction, + TRequestInfo::TCancelRoutine cancelRoutine) +{ + transaction.CancelRoutine = cancelRoutine; + + transaction.Ref(); + + TABLET_VERIFY(transaction.Empty()); + ActiveTransactions.PushBack(&transaction); +} + +void TIndexTabletActor::RemoveTransaction(TRequestInfo& requestInfo) +{ + TABLET_VERIFY(!requestInfo.Empty()); + requestInfo.Unlink(); + + TABLET_VERIFY(requestInfo.RefCount() > 1); + requestInfo.UnRef(); +} + +void TIndexTabletActor::TerminateTransactions(const TActorContext& ctx) +{ + while (ActiveTransactions) { + TRequestInfo* requestInfo = ActiveTransactions.PopFront(); + TABLET_VERIFY(requestInfo->RefCount() >= 1); + + requestInfo->CancelRoutine(ctx, *requestInfo); + requestInfo->UnRef(); + } +} + +void TIndexTabletActor::ReleaseTransactions() +{ + while (ActiveTransactions) { + TRequestInfo* requestInfo = ActiveTransactions.PopFront(); + TABLET_VERIFY(requestInfo->RefCount() >= 1); + requestInfo->UnRef(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + using TThresholds = TIndexTabletState::TBackpressureThresholds; TThresholds TIndexTabletActor::BuildBackpressureThresholds() const { diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.h b/cloud/filestore/libs/storage/tablet/tablet_actor.h index 54009c38891..2a6b5409c1d 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.h +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.h @@ -167,6 +167,7 @@ class TIndexTabletActor final TDeque WaitReadyRequests; TSet WorkerActors; + TIntrusiveList ActiveTransactions; TInstant ReassignRequestSentTs; @@ -237,9 +238,29 @@ class TIndexTabletActor final void ScheduleCleanupSessions(const NActors::TActorContext& ctx); void RestartCheckpointDestruction(const NActors::TActorContext& ctx); + template void EnqueueWriteBatch( const NActors::TActorContext& ctx, - std::unique_ptr request); + std::unique_ptr request) + { + request->RequestInfo->CancelRoutine = [] ( + const NActors::TActorContext& ctx, + TRequestInfo& requestInfo) + { + auto response = std::make_unique( + MakeError(E_REJECTED, "tablet is dead")); + + NCloud::Reply(ctx, requestInfo, std::move(response)); + }; + + if (TIndexTabletState::EnqueueWriteBatch(std::move(request))) { + if (auto timeout = Config->GetWriteBatchTimeout()) { + ctx.Schedule(timeout, new TEvIndexTabletPrivate::TEvWriteBatchRequest()); + } else { + ctx.Send(SelfId(), new TEvIndexTabletPrivate::TEvWriteBatchRequest()); + } + } + } void EnqueueFlushIfNeeded(const NActors::TActorContext& ctx); void EnqueueBlobIndexOpIfNeeded(const NActors::TActorContext& ctx); @@ -248,6 +269,30 @@ class TIndexTabletActor final void EnqueueForcedRangeOperationIfNeeded(const NActors::TActorContext& ctx); void LoadNextCompactionMapChunkIfNeeded(const NActors::TActorContext& ctx); + void AddTransaction( + TRequestInfo& transaction, + TRequestInfo::TCancelRoutine cancelRoutine); + + template + void AddTransaction(TRequestInfo& transaction) + { + auto cancelRoutine = [] ( + const NActors::TActorContext& ctx, + TRequestInfo& requestInfo) + { + auto response = std::make_unique( + MakeError(E_REJECTED, "request cancelled")); + + NCloud::Reply(ctx, requestInfo, std::move(response)); + }; + + AddTransaction(transaction, cancelRoutine); + } + + void RemoveTransaction(TRequestInfo& transaction); + void TerminateTransactions(const NActors::TActorContext& ctx); + void ReleaseTransactions(); + void NotifySessionEvent( const NActors::TActorContext& ctx, const NProto::TSessionEvent& event); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp index 17b9cd71a67..69a779ca49a 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_accessnode.cpp @@ -38,6 +38,8 @@ void TIndexTabletActor::HandleAccessNode( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -104,6 +106,8 @@ void TIndexTabletActor::CompleteTx_AccessNode( const TActorContext& ctx, TTxIndexTablet::TAccessNode& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); CompleteResponse( response->Record, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp index 4669a0c15a7..29ea10d60d1 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_acquirelock.cpp @@ -51,6 +51,8 @@ void TIndexTabletActor::HandleAcquireLock( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -102,6 +104,8 @@ void TIndexTabletActor::CompleteTx_AcquireLock( const TActorContext& ctx, TTxIndexTablet::TAcquireLock& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); CompleteResponse( response->Record, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp index 9db99da5941..7c03737b749 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_allocatedata.cpp @@ -74,6 +74,8 @@ void TIndexTabletActor::HandleAllocateData( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -223,6 +225,8 @@ void TIndexTabletActor::CompleteTx_AllocateData( const TActorContext& ctx, TTxIndexTablet::TAllocateData& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); } diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp index b0e46eb0698..823c08ca381 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_change_storage_config.cpp @@ -43,6 +43,8 @@ void TIndexTabletActor::CompleteTx_ChangeStorageConfig( const TActorContext& ctx, TTxIndexTablet::TChangeStorageConfig& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(); *response->Record.MutableStorageConfig() = @@ -62,6 +64,8 @@ void TIndexTabletActor::HandleChangeStorageConfig( ev->Cookie, MakeIntrusive()); + AddTransaction(*requestInfo); + const auto* msg = ev->Get(); ExecuteTx(ctx, CreateTx( requestInfo, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_createcheckpoint.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_createcheckpoint.cpp index ce29b81ce41..e0a58137882 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_createcheckpoint.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_createcheckpoint.cpp @@ -29,6 +29,8 @@ void TIndexTabletActor::HandleCreateCheckpoint( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -76,6 +78,8 @@ void TIndexTabletActor::CompleteTx_CreateCheckpoint( const TActorContext& ctx, TTxIndexTablet::TCreateCheckpoint& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); CompleteResponse( response->Record, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_createhandle.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_createhandle.cpp index d2643f77fe9..38c2450f51e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_createhandle.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_createhandle.cpp @@ -60,6 +60,8 @@ void TIndexTabletActor::HandleCreateHandle( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -259,6 +261,8 @@ void TIndexTabletActor::CompleteTx_CreateHandle( const TActorContext& ctx, TTxIndexTablet::TCreateHandle& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { CommitDupCacheEntry(args.SessionId, args.RequestId); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_createnode.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_createnode.cpp index e7dd351cbc0..274c9ccf27e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_createnode.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_createnode.cpp @@ -87,6 +87,8 @@ void TIndexTabletActor::HandleCreateNode( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -252,6 +254,8 @@ void TIndexTabletActor::CompleteTx_CreateNode( const TActorContext& ctx, TTxIndexTablet::TCreateNode& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { CommitDupCacheEntry(args.SessionId, args.RequestId); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp index 777bf8b5e20..1196ce6a58f 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp @@ -15,6 +15,10 @@ void FillFeatures(const TStorageConfig& config, NProto::TFileStore& fileStore) { auto* features = fileStore.MutableFeatures(); features->SetTwoStageReadEnabled(config.GetTwoStageReadEnabled()); + features->SetEntryTimeout(config.GetEntryTimeout().MilliSeconds()); + features->SetNegativeEntryTimeout( + config.GetNegativeEntryTimeout().MilliSeconds()); + features->SetAttrTimeout(config.GetAttrTimeout().MilliSeconds()); } //////////////////////////////////////////////////////////////////////////////// @@ -101,6 +105,8 @@ void TIndexTabletActor::HandleCreateSession( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -237,6 +243,8 @@ void TIndexTabletActor::CompleteTx_CreateSession( const TActorContext& ctx, TTxIndexTablet::TCreateSession& args) { + RemoveTransaction(*args.RequestInfo); + LOG_INFO(ctx, TFileStoreComponents::TABLET, "%s CreateSession completed (%s)", LogTag.c_str(), diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp index a91ad152e90..bbaeebb34f1 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp @@ -31,6 +31,8 @@ void TIndexTabletActor::HandleDeleteCheckpoint( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -235,6 +237,8 @@ void TIndexTabletActor::CompleteTx_DeleteCheckpoint( const TActorContext& ctx, TTxIndexTablet::TDeleteCheckpoint& args) { + RemoveTransaction(*args.RequestInfo); + LOG_DEBUG(ctx, TFileStoreComponents::TABLET, "%s DeleteCheckpoint completed (%s)", LogTag.c_str(), diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_destroyhandle.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_destroyhandle.cpp index 4f758d58715..c8ad9ff7057 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_destroyhandle.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_destroyhandle.cpp @@ -33,6 +33,8 @@ void TIndexTabletActor::HandleDestroyHandle( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -106,6 +108,8 @@ void TIndexTabletActor::CompleteTx_DestroyHandle( const TActorContext& ctx, TTxIndexTablet::TDestroyHandle& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); CompleteResponse( response->Record, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_destroysession.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_destroysession.cpp index 1a1683898db..6303fe031b4 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_destroysession.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_destroysession.cpp @@ -47,6 +47,8 @@ void TIndexTabletActor::HandleDestroySession( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -151,6 +153,8 @@ void TIndexTabletActor::CompleteTx_DestroySession( const TActorContext& ctx, TTxIndexTablet::TDestroySession& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(); NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); } diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_dumprange.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_dumprange.cpp index f8b9cee80de..15561c16b8d 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_dumprange.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_dumprange.cpp @@ -25,10 +25,11 @@ void TIndexTabletActor::HandleDumpCompactionRange( msg->CallContext, "DumpCompactionRange"); - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); + auto requestInfo = + CreateRequestInfo( + ev->Sender, + ev->Cookie, + msg->CallContext); ExecuteTx( ctx, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_filteralivenodes.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_filteralivenodes.cpp index ca965d8efd2..ba9ba1b7c82 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_filteralivenodes.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_filteralivenodes.cpp @@ -37,10 +37,11 @@ void TIndexTabletActor::HandleFilterAliveNodes( LogTag.c_str(), DescribeNodes(msg->Nodes).c_str()); - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); + auto requestInfo = + CreateRequestInfo( + ev->Sender, + ev->Cookie, + msg->CallContext); FILESTORE_TRACK( BackgroundRequestReceived_Tablet, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_getnodeattr.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_getnodeattr.cpp index b6b421c982d..80920fd6b24 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_getnodeattr.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_getnodeattr.cpp @@ -46,6 +46,8 @@ void TIndexTabletActor::HandleGetNodeAttr( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -144,6 +146,8 @@ void TIndexTabletActor::CompleteTx_GetNodeAttr( const TActorContext& ctx, TTxIndexTablet::TGetNodeAttr& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { TABLET_VERIFY(args.TargetNode); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_getnodexattr.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_getnodexattr.cpp index 24bb5020b78..e6598c5d2f9 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_getnodexattr.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_getnodexattr.cpp @@ -42,6 +42,8 @@ void TIndexTabletActor::HandleGetNodeXAttr( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -116,6 +118,8 @@ void TIndexTabletActor::CompleteTx_GetNodeXAttr( const TActorContext& ctx, TTxIndexTablet::TGetNodeXAttr& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { TABLET_VERIFY(args.Attr); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_listnodes.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_listnodes.cpp index b5e777b1893..f535f14e3f2 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_listnodes.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_listnodes.cpp @@ -50,6 +50,8 @@ void TIndexTabletActor::HandleListNodes( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + auto maxBytes = Config->GetMaxResponseEntries() * MaxName; if (auto bytes = msg->Record.GetMaxBytes()) { maxBytes = Min(bytes, maxBytes); @@ -162,6 +164,8 @@ void TIndexTabletActor::CompleteTx_ListNodes( const TActorContext& ctx, TTxIndexTablet::TListNodes& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { auto& record = response->Record; diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_listnodexattr.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_listnodexattr.cpp index 2a7ae8f6bc5..9583afce628 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_listnodexattr.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_listnodexattr.cpp @@ -38,6 +38,8 @@ void TIndexTabletActor::HandleListNodeXAttr( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -106,6 +108,8 @@ void TIndexTabletActor::CompleteTx_ListNodeXAttr( const TActorContext& ctx, TTxIndexTablet::TListNodeXAttr& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { response->Record.MutableNames()->Reserve(args.Attrs.size()); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp index 15e962a729e..cc89945d30d 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp @@ -544,6 +544,8 @@ void TIndexTabletActor::HandleReadData( msg->CallContext); requestInfo->StartedTs = ctx.Now(); + AddTransaction(*requestInfo); + TByteRange alignedByteRange = byteRange.AlignedSuperRange(); auto blockBuffer = CreateBlockBuffer(alignedByteRange); @@ -601,6 +603,8 @@ void TIndexTabletActor::HandleDescribeData( msg->CallContext); requestInfo->StartedTs = ctx.Now(); + AddTransaction(*requestInfo); + TByteRange alignedByteRange = byteRange.AlignedSuperRange(); auto blockBuffer = CreateLazyBlockBuffer(alignedByteRange); @@ -741,6 +745,8 @@ void TIndexTabletActor::CompleteTx_ReadData( const TActorContext& ctx, TTxIndexTablet::TReadData& args) { + RemoveTransaction(*args.RequestInfo); + if (args.DescribeOnly && !HasError(args.Error)) { auto response = std::make_unique(); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_readlink.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_readlink.cpp index 75060a99771..ee18d2e2bbe 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_readlink.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_readlink.cpp @@ -38,6 +38,8 @@ void TIndexTabletActor::HandleReadLink( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -89,6 +91,8 @@ void TIndexTabletActor::CompleteTx_ReadLink( const TActorContext& ctx, TTxIndexTablet::TReadLink& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { response->Record.SetSymLink(args.Node->Attrs.GetSymLink()); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_releaselock.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_releaselock.cpp index 4ad97dfb77f..e816cedabb7 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_releaselock.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_releaselock.cpp @@ -23,6 +23,8 @@ void TIndexTabletActor::HandleReleaseLock( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -89,6 +91,8 @@ void TIndexTabletActor::CompleteTx_ReleaseLock( const TActorContext& ctx, TTxIndexTablet::TReleaseLock& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); CompleteResponse( response->Record, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_removenodexattr.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_removenodexattr.cpp index f8bbab092bc..4b61364cd1b 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_removenodexattr.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_removenodexattr.cpp @@ -42,6 +42,8 @@ void TIndexTabletActor::HandleRemoveNodeXAttr( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -113,6 +115,8 @@ void TIndexTabletActor::CompleteTx_RemoveNodeXAttr( const TActorContext& ctx, TTxIndexTablet::TRemoveNodeXAttr& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); CompleteResponse( response->Record, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_renamenode.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_renamenode.cpp index 5018271b89b..4bbc7a20e5e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_renamenode.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_renamenode.cpp @@ -60,6 +60,8 @@ void TIndexTabletActor::HandleRenameNode( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -284,6 +286,8 @@ void TIndexTabletActor::CompleteTx_RenameNode( const TActorContext& ctx, TTxIndexTablet::TRenameNode& args) { + RemoveTransaction(*args.RequestInfo); + if (SUCCEEDED(args.Error.GetCode())) { TABLET_VERIFY(args.ChildRef); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_resolvepath.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_resolvepath.cpp index be73a568c0c..e8f469810e1 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_resolvepath.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_resolvepath.cpp @@ -43,6 +43,8 @@ void TIndexTabletActor::HandleResolvePath( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -78,6 +80,8 @@ void TIndexTabletActor::CompleteTx_ResolvePath( const TActorContext& ctx, TTxIndexTablet::TResolvePath& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); CompleteResponse( response->Record, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_setnodeattr.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_setnodeattr.cpp index 11da4a19073..9fb5a938895 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_setnodeattr.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_setnodeattr.cpp @@ -52,6 +52,8 @@ void TIndexTabletActor::HandleSetNodeAttr( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -171,6 +173,8 @@ void TIndexTabletActor::CompleteTx_SetNodeAttr( const TActorContext& ctx, TTxIndexTablet::TSetNodeAttr& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (SUCCEEDED(args.Error.GetCode())) { TABLET_VERIFY(args.Node); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_setnodexattr.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_setnodexattr.cpp index ad0f4e7c1e0..8b5d6bcf072 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_setnodexattr.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_setnodexattr.cpp @@ -47,6 +47,8 @@ void TIndexTabletActor::HandleSetNodeXAttr( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -134,6 +136,8 @@ void TIndexTabletActor::CompleteTx_SetNodeXAttr( const TActorContext& ctx, TTxIndexTablet::TSetNodeXAttr& args) { + RemoveTransaction(*args.RequestInfo); + if (SUCCEEDED(args.Error.GetCode())) { NProto::TSessionEvent sessionEvent; { diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_testlock.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_testlock.cpp index ab6a12b252a..9ec33aa85c0 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_testlock.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_testlock.cpp @@ -55,6 +55,8 @@ void TIndexTabletActor::HandleTestLock( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -111,6 +113,8 @@ void TIndexTabletActor::CompleteTx_TestLock( const TActorContext& ctx, TTxIndexTablet::TTestLock& args) { + RemoveTransaction(*args.RequestInfo); + auto response = std::make_unique(args.Error); if (args.Incompatible.has_value()) { SetResponseDetails(response->Record, std::move(*args.Incompatible)); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_unlinknode.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_unlinknode.cpp index 7024a749fd6..08ace989d4e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_unlinknode.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_unlinknode.cpp @@ -49,6 +49,8 @@ void TIndexTabletActor::HandleUnlinkNode( ev->Cookie, msg->CallContext); + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -163,6 +165,8 @@ void TIndexTabletActor::CompleteTx_UnlinkNode( const TActorContext& ctx, TTxIndexTablet::TUnlinkNode& args) { + RemoveTransaction(*args.RequestInfo); + LOG_DEBUG(ctx, TFileStoreComponents::TABLET, "%s[%s] UnlinkNode completed (%s)", LogTag.c_str(), diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp index ec5b81e64a4..d81e726edd9 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp @@ -225,19 +225,6 @@ STFUNC(TWriteBatchActor::StateWork) //////////////////////////////////////////////////////////////////////////////// -void TIndexTabletActor::EnqueueWriteBatch( - const TActorContext& ctx, - std::unique_ptr request) -{ - if (TIndexTabletState::EnqueueWriteBatch(std::move(request))) { - if (auto timeout = Config->GetWriteBatchTimeout()) { - ctx.Schedule(timeout, new TEvIndexTabletPrivate::TEvWriteBatchRequest()); - } else { - ctx.Send(SelfId(), new TEvIndexTabletPrivate::TEvWriteBatchRequest()); - } - } -} - void TIndexTabletActor::HandleWriteBatch( const TEvIndexTabletPrivate::TEvWriteBatchRequest::TPtr& ev, const TActorContext& ctx) @@ -258,6 +245,10 @@ void TIndexTabletActor::HandleWriteBatch( return; } + for (const auto& request: writeBatch) { + AddTransaction(*request.RequestInfo, request.RequestInfo->CancelRoutine); + } + auto batchInfo = GetBatchInfo(writeBatch); LOG_DEBUG(ctx, TFileStoreComponents::TABLET, "%s WriteBatch started (%u bytes, %s)", @@ -499,6 +490,10 @@ void TIndexTabletActor::CompleteTx_WriteBatch( const TActorContext& ctx, TTxIndexTablet::TWriteBatch& args) { + for (const auto& request: args.WriteBatch) { + RemoveTransaction(*request.RequestInfo); + } + auto reply = [] ( const TActorContext& ctx, TTxIndexTablet::TWriteBatch& args) diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp index 60663219c97..dc2bd56dace 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_writedata.cpp @@ -361,10 +361,12 @@ void TIndexTabletActor::HandleWriteData( range, std::move(blockBuffer)); - EnqueueWriteBatch(ctx, std::move(request)); + EnqueueWriteBatch(ctx, std::move(request)); return; } + AddTransaction(*requestInfo); + ExecuteTx( ctx, std::move(requestInfo), @@ -548,6 +550,8 @@ void TIndexTabletActor::CompleteTx_WriteData( const TActorContext& ctx, TTxIndexTablet::TWriteData& args) { + RemoveTransaction(*args.RequestInfo); + auto reply = [&] ( const TActorContext& ctx, TTxIndexTablet::TWriteData& args) diff --git a/cloud/filestore/libs/storage/tablet/tablet_ut_counters.cpp b/cloud/filestore/libs/storage/tablet/tablet_ut_counters.cpp index 55583aeaab5..0667631f847 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_ut_counters.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_ut_counters.cpp @@ -411,7 +411,7 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Counters) }); Tablet->RebootTablet(); - Tablet->AdvanceTime(TDuration::Minutes(1)); + Tablet->AdvanceTime(TDuration::Minutes(10)); Tablet->CleanupSessions(); Tablet->AdvanceTime(TDuration::Seconds(15)); diff --git a/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp b/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp index 25404fe23df..bdcf6dc58bf 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp @@ -9,12 +9,16 @@ #include +#include #include +#include #include namespace NCloud::NFileStore::NStorage { +using namespace NKikimr; + //////////////////////////////////////////////////////////////////////////////// Y_UNIT_TEST_SUITE(TIndexTabletTest_Data) @@ -3719,6 +3723,123 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Data) response->GetErrorReason()); } + void DoTestWriteRequestCancellationOnTabletReboot( + bool writeBatchEnabled, + const TFileSystemConfig& tabletConfig) + { + NProto::TStorageConfig storageConfig; + storageConfig.SetWriteBatchEnabled(writeBatchEnabled); + + TTestEnv env({}, std::move(storageConfig)); + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto id = CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + ui64 handle = CreateHandle(tablet, id); + + tablet.SendRequest( + tablet.CreateWriteDataRequest(handle, 0, tabletConfig.BlockSize, 'a')); + + bool putSeen = false; + env.GetRuntime().SetEventFilter([&] (auto& runtime, auto& event) { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvPutResult: { + putSeen = true; + return true; + } + } + + return false; + }); + + env.GetRuntime().DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + + UNIT_ASSERT(putSeen); + + tablet.RebootTablet(); + + auto response = tablet.RecvWriteDataResponse(); + UNIT_ASSERT_VALUES_EQUAL_C( + E_REJECTED, + response->GetStatus(), + response->GetErrorReason()); + } + + TABLET_TEST(ShouldCancelWriteRequestsIfTabletIsRebooted) + { + DoTestWriteRequestCancellationOnTabletReboot(false, tabletConfig); + } + + TABLET_TEST(ShouldCancelBatchedRequestsIfTabletIsRebooted) + { + DoTestWriteRequestCancellationOnTabletReboot(true, tabletConfig); + } + + TABLET_TEST(ShouldCancelReadRequestsIfTabletIsRebooted) + { + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto id = CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + ui64 handle = CreateHandle(tablet, id); + tablet.WriteData(handle, 0, 1_MB, '0'); + tablet.Flush(); + + tablet.SendRequest( + tablet.CreateReadDataRequest(handle, 0, tabletConfig.BlockSize)); + + bool getSeen = false; + bool failGet = true; + env.GetRuntime().SetEventFilter([&] (auto& runtime, auto& event) { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvGetResult: { + if (failGet) { + getSeen = true; + return true; + } + } + } + + return false; + }); + + env.GetRuntime().DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + + UNIT_ASSERT(getSeen); + + failGet = false; + tablet.RebootTablet(); + + auto response = tablet.RecvReadDataResponse(); + UNIT_ASSERT_VALUES_EQUAL_C( + E_REJECTED, + response->GetStatus(), + response->GetErrorReason()); + } + #undef TABLET_TEST } diff --git a/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp b/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp index 65915fab3e1..8970bc13476 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp @@ -827,7 +827,13 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Sessions) DoTestShouldReturnFeaturesInCreateSessionResponse(config, features); config.SetTwoStageReadEnabled(true); + config.SetEntryTimeout(TDuration::Seconds(10).MilliSeconds()); + config.SetNegativeEntryTimeout(TDuration::Seconds(1).MilliSeconds()); + config.SetAttrTimeout(TDuration::Seconds(20).MilliSeconds()); features.SetTwoStageReadEnabled(true); + features.SetEntryTimeout(TDuration::Seconds(10).MilliSeconds()); + features.SetNegativeEntryTimeout(TDuration::Seconds(1).MilliSeconds()); + features.SetAttrTimeout(TDuration::Seconds(20).MilliSeconds()); DoTestShouldReturnFeaturesInCreateSessionResponse(config, features); } } diff --git a/cloud/filestore/libs/vfs_fuse/config.cpp b/cloud/filestore/libs/vfs_fuse/config.cpp index 4a4fc9b70c1..82269eb4edc 100644 --- a/cloud/filestore/libs/vfs_fuse/config.cpp +++ b/cloud/filestore/libs/vfs_fuse/config.cpp @@ -17,6 +17,7 @@ namespace { \ xxx(LockRetryTimeout, TDuration, TDuration::Seconds(1) )\ xxx(EntryTimeout, TDuration, TDuration::Seconds(15) )\ + xxx(NegativeEntryTimeout, TDuration, TDuration::Zero() )\ xxx(AttrTimeout, TDuration, TDuration::Seconds(15) )\ \ xxx(XAttrCacheLimit, ui32, 512 )\ diff --git a/cloud/filestore/libs/vfs_fuse/config.h b/cloud/filestore/libs/vfs_fuse/config.h index 74a3f6822f1..c1a97aa2072 100644 --- a/cloud/filestore/libs/vfs_fuse/config.h +++ b/cloud/filestore/libs/vfs_fuse/config.h @@ -27,6 +27,7 @@ struct TFileSystemConfig TDuration GetLockRetryTimeout() const; TDuration GetEntryTimeout() const; + TDuration GetNegativeEntryTimeout() const; TDuration GetAttrTimeout() const; ui32 GetXAttrCacheLimit() const; diff --git a/cloud/filestore/libs/vfs_fuse/fs_impl_node.cpp b/cloud/filestore/libs/vfs_fuse/fs_impl_node.cpp index 8b2c530a7fd..253575caaea 100644 --- a/cloud/filestore/libs/vfs_fuse/fs_impl_node.cpp +++ b/cloud/filestore/libs/vfs_fuse/fs_impl_node.cpp @@ -38,7 +38,8 @@ void TFileSystem::Lookup( response.GetNode()); } else { fuse_entry_param entry = {}; - entry.entry_timeout = Config->GetEntryTimeout().Seconds(); + entry.entry_timeout = + Config->GetNegativeEntryTimeout().Seconds(); self->ReplyEntry( *callContext, error, diff --git a/cloud/filestore/libs/vfs_fuse/loop.cpp b/cloud/filestore/libs/vfs_fuse/loop.cpp index 84e1974ba92..631912ad812 100644 --- a/cloud/filestore/libs/vfs_fuse/loop.cpp +++ b/cloud/filestore/libs/vfs_fuse/loop.cpp @@ -736,6 +736,17 @@ class TFileSystemLoop final config.SetMaxBufferSize(pages * NSystemInfo::GetPageSize()); } + const auto& features = filestore.GetFeatures(); + if (features.GetEntryTimeout()) { + config.SetEntryTimeout(features.GetEntryTimeout()); + } + if (features.GetNegativeEntryTimeout()) { + config.SetNegativeEntryTimeout(features.GetNegativeEntryTimeout()); + } + if (features.GetAttrTimeout()) { + config.SetAttrTimeout(features.GetAttrTimeout()); + } + return std::make_shared(config); } diff --git a/cloud/filestore/public/api/protos/fs.proto b/cloud/filestore/public/api/protos/fs.proto index 19ca63061ee..2ade37cbc11 100644 --- a/cloud/filestore/public/api/protos/fs.proto +++ b/cloud/filestore/public/api/protos/fs.proto @@ -14,6 +14,9 @@ option go_package = "a.yandex-team.ru/cloud/filestore/public/api/protos"; message TFileStoreFeatures { bool TwoStageReadEnabled = 1; + uint32 EntryTimeout = 2; + uint32 NegativeEntryTimeout = 3; + uint32 AttrTimeout = 4; } message TFileStore diff --git a/cloud/filestore/tools/analytics/libs/event-log/dump_ut.cpp b/cloud/filestore/tools/analytics/libs/event-log/dump_ut.cpp index 209648ded26..3d4dbc6098f 100644 --- a/cloud/filestore/tools/analytics/libs/event-log/dump_ut.cpp +++ b/cloud/filestore/tools/analytics/libs/event-log/dump_ut.cpp @@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(TDumpTest) { const auto requests = GetRequestTypes(); - UNIT_ASSERT_VALUES_EQUAL(65, requests.size()); + UNIT_ASSERT_VALUES_EQUAL(66, requests.size()); #define TEST_REQUEST_TYPE(index, id, name) \ UNIT_ASSERT_VALUES_EQUAL(id, requests[index].Id); \ @@ -153,24 +153,25 @@ Y_UNIT_TEST_SUITE(TDumpTest) TEST_REQUEST_TYPE(48, 48, StopEndpoint); TEST_REQUEST_TYPE(49, 49, ListEndpoints); TEST_REQUEST_TYPE(50, 50, KickEndpoint); + TEST_REQUEST_TYPE(51, 51, DescribeData); // Fuse - TEST_REQUEST_TYPE(51, 1001, Flush); - TEST_REQUEST_TYPE(52, 1002, Fsync); + TEST_REQUEST_TYPE(52, 1001, Flush); + TEST_REQUEST_TYPE(53, 1002, Fsync); // Tablet - TEST_REQUEST_TYPE(53, 10001, Flush); - TEST_REQUEST_TYPE(54, 10002, FlushBytes); - TEST_REQUEST_TYPE(55, 10003, Compaction); - TEST_REQUEST_TYPE(56, 10004, Cleanup); - TEST_REQUEST_TYPE(57, 10005, TrimBytes); - TEST_REQUEST_TYPE(58, 10006, CollectGarbage); - TEST_REQUEST_TYPE(59, 10007, DeleteGarbage); - TEST_REQUEST_TYPE(60, 10008, ReadBlob); - TEST_REQUEST_TYPE(61, 10009, WriteBlob); - TEST_REQUEST_TYPE(62, 10010, AddBlob); - TEST_REQUEST_TYPE(63, 10011, TruncateRange); - TEST_REQUEST_TYPE(64, 10012, ZeroRange); + TEST_REQUEST_TYPE(54, 10001, Flush); + TEST_REQUEST_TYPE(55, 10002, FlushBytes); + TEST_REQUEST_TYPE(56, 10003, Compaction); + TEST_REQUEST_TYPE(57, 10004, Cleanup); + TEST_REQUEST_TYPE(58, 10005, TrimBytes); + TEST_REQUEST_TYPE(59, 10006, CollectGarbage); + TEST_REQUEST_TYPE(60, 10007, DeleteGarbage); + TEST_REQUEST_TYPE(61, 10008, ReadBlob); + TEST_REQUEST_TYPE(62, 10009, WriteBlob); + TEST_REQUEST_TYPE(63, 10010, AddBlob); + TEST_REQUEST_TYPE(64, 10011, TruncateRange); + TEST_REQUEST_TYPE(65, 10012, ZeroRange); #undef TEST_REQUEST_TYPE } diff --git a/cloud/storage/core/libs/actors/poison_pill_helper.cpp b/cloud/storage/core/libs/actors/poison_pill_helper.cpp index d4b0a436f6c..8fb3f045fca 100644 --- a/cloud/storage/core/libs/actors/poison_pill_helper.cpp +++ b/cloud/storage/core/libs/actors/poison_pill_helper.cpp @@ -63,9 +63,7 @@ void TPoisonPillHelper::KillActors(const TActorContext& ctx) void TPoisonPillHelper::ReplyAndDie(const TActorContext& ctx) { - Y_DEBUG_ABORT_UNLESS(Poisoner); - - if (!OwnedActors.empty()) { + if (!Poisoner || !OwnedActors.empty()) { return; } diff --git a/cloud/storage/core/libs/actors/poison_pill_helper_ut.cpp b/cloud/storage/core/libs/actors/poison_pill_helper_ut.cpp index 2a93d41f2cf..7328fad16b8 100644 --- a/cloud/storage/core/libs/actors/poison_pill_helper_ut.cpp +++ b/cloud/storage/core/libs/actors/poison_pill_helper_ut.cpp @@ -125,11 +125,20 @@ class TParentActor { Y_UNUSED(ev); + { // We give ownership and take it away immediately. + auto childId = ctx.Register(new TChildActor()); + PoisonPillHelper.TakeOwnership(ctx, childId); + ctx.Send(childId, std::make_unique()); + PoisonPillHelper.ReleaseOwnership(ctx, childId); + } + + // Give ownership for long time. for (ui32 i = 0; i < ChildCount; ++i) { PoisonPillHelper.TakeOwnership( ctx, ctx.Register(new TChildActor())); } + } };