diff --git a/cloud/filestore/libs/storage/api/ss_proxy.h b/cloud/filestore/libs/storage/api/ss_proxy.h index 4c32f8a3a20..5904f162632 100644 --- a/cloud/filestore/libs/storage/api/ss_proxy.h +++ b/cloud/filestore/libs/storage/api/ss_proxy.h @@ -5,6 +5,8 @@ #include "components.h" #include "events.h" +#include + #include #include #include @@ -21,6 +23,7 @@ namespace NCloud::NFileStore::NStorage { xxx(DescribeScheme, __VA_ARGS__) \ xxx(ModifyScheme, __VA_ARGS__) \ xxx(WaitSchemeTx, __VA_ARGS__) \ + \ xxx(DescribeFileStore, __VA_ARGS__) \ xxx(CreateFileStore, __VA_ARGS__) \ xxx(AlterFileStore, __VA_ARGS__) \ @@ -31,84 +34,20 @@ namespace NCloud::NFileStore::NStorage { struct TEvSSProxy { - // - // DescribeScheme - // - - struct TDescribeSchemeRequest - { - const TString Path; - - TDescribeSchemeRequest(TString path) - : Path(std::move(path)) - {} - }; - - struct TDescribeSchemeResponse - { - const TString Path; - const NKikimrSchemeOp::TPathDescription PathDescription; - - TDescribeSchemeResponse() = default; - - TDescribeSchemeResponse( - TString path, - NKikimrSchemeOp::TPathDescription pathDescription) - : Path(std::move(path)) - , PathDescription(std::move(pathDescription)) - {} - }; - - // - // ModifyScheme - // + using TDescribeSchemeRequest = + ::NCloud::NStorage::TEvSSProxy::TDescribeSchemeRequest; + using TDescribeSchemeResponse = + ::NCloud::NStorage::TEvSSProxy::TDescribeSchemeResponse; - struct TModifySchemeRequest - { - const NKikimrSchemeOp::TModifyScheme ModifyScheme; - - TModifySchemeRequest( - NKikimrSchemeOp::TModifyScheme modifyScheme) - : ModifyScheme(std::move(modifyScheme)) - {} - }; + using TModifySchemeRequest = + ::NCloud::NStorage::TEvSSProxy::TModifySchemeRequest; + using TModifySchemeResponse = + ::NCloud::NStorage::TEvSSProxy::TModifySchemeResponse; - struct TModifySchemeResponse - { - const ui64 SchemeShardTabletId; - const NKikimrScheme::EStatus Status; - const TString Reason; - - TModifySchemeResponse( - ui64 schemeShardTabletId = 0, - NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess, - TString reason = TString()) - : SchemeShardTabletId(schemeShardTabletId) - , Status(status) - , Reason(std::move(reason)) - {} - }; - - // - // WaitSchemeTx - // - - struct TWaitSchemeTxRequest - { - const ui64 SchemeShardTabletId; - const ui64 TxId; - - TWaitSchemeTxRequest( - ui64 schemeShardTabletId, - ui64 txId) - : SchemeShardTabletId(schemeShardTabletId) - , TxId(txId) - {} - }; - - struct TWaitSchemeTxResponse - { - }; + using TWaitSchemeTxRequest = + ::NCloud::NStorage::TEvSSProxy::TWaitSchemeTxRequest; + using TWaitSchemeTxResponse = + ::NCloud::NStorage::TEvSSProxy::TWaitSchemeTxResponse; // // DescribeFileStore @@ -223,28 +162,34 @@ struct TEvSSProxy enum EEvents { - EvBegin = TFileStoreEvents::SS_PROXY_START, + EvDescribeSchemeRequest = + ::NCloud::NStorage::TEvSSProxy::EEvents::EvDescribeSchemeRequest, + EvDescribeSchemeResponse = + ::NCloud::NStorage::TEvSSProxy::EEvents::EvDescribeSchemeResponse, - EvDescribeSchemeRequest = EvBegin + 1, - EvDescribeSchemeResponse = EvBegin + 2, + EvModifySchemeRequest = + ::NCloud::NStorage::TEvSSProxy::EEvents::EvModifySchemeRequest, + EvModifySchemeResponse = + ::NCloud::NStorage::TEvSSProxy::EEvents::EvModifySchemeResponse, - EvModifySchemeRequest = EvBegin + 3, - EvModifySchemeResponse = EvBegin + 4, + EvWaitSchemeTxRequest = + ::NCloud::NStorage::TEvSSProxy::EEvents::EvWaitSchemeTxRequest, + EvWaitSchemeTxResponse = + ::NCloud::NStorage::TEvSSProxy::EEvents::EvWaitSchemeTxResponse, - EvWaitSchemeTxRequest = EvBegin + 5, - EvWaitSchemeTxResponse = EvBegin + 6, + EvBegin = TFileStoreEvents::SS_PROXY_START, - EvDescribeFileStoreRequest = EvBegin + 7, - EvDescribeFileStoreResponse = EvBegin + 8, + EvDescribeFileStoreRequest = EvBegin + 1, + EvDescribeFileStoreResponse = EvBegin + 2, - EvCreateFileStoreRequest = EvBegin + 9, - EvCreateFileStoreResponse = EvBegin + 10, + EvCreateFileStoreRequest = EvBegin + 3, + EvCreateFileStoreResponse = EvBegin + 4, - EvAlterFileStoreRequest = EvBegin + 11, - EvAlterFileStoreResponse = EvBegin + 12, + EvAlterFileStoreRequest = EvBegin + 5, + EvAlterFileStoreResponse = EvBegin + 6, - EvDestroyFileStoreRequest = EvBegin + 13, - EvDestroyFileStoreResponse = EvBegin + 14, + EvDestroyFileStoreRequest = EvBegin + 7, + EvDestroyFileStoreResponse = EvBegin + 8, EvEnd }; diff --git a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.cpp b/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.cpp index 5cf902d7383..66c7e6d3994 100644 --- a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.cpp +++ b/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.cpp @@ -2,6 +2,8 @@ #include +#include + namespace NCloud::NFileStore::NStorage { using namespace NActors; @@ -13,7 +15,7 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -std::unique_ptr CreateTabletPipeClientCache( +NTabletPipe::TClientConfig CreateTabletPipeClientConfig( const TStorageConfig& config) { NTabletPipe::TClientConfig clientConfig; @@ -22,9 +24,7 @@ std::unique_ptr CreateTabletPipeClientCache( .MinRetryTime = config.GetPipeClientMinRetryTime(), .MaxRetryTime = config.GetPipeClientMaxRetryTime() }; - - return std::unique_ptr( - NTabletPipe::CreateUnboundedClientCache(clientConfig)); + return clientConfig; } } // namespace @@ -32,56 +32,19 @@ std::unique_ptr CreateTabletPipeClientCache( //////////////////////////////////////////////////////////////////////////////// TSSProxyActor::TSSProxyActor(TStorageConfigPtr config) - : TActor(&TThis::StateWork) - , Config(std::move(config)) - , ClientCache(CreateTabletPipeClientCache(*Config)) + : Config(std::move(config)) {} -//////////////////////////////////////////////////////////////////////////////// - -void TSSProxyActor::HandleConnect( - TEvTabletPipe::TEvClientConnected::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - if (!ClientCache->OnConnect(ev)) { - auto error = MakeKikimrError(msg->Status, TStringBuilder() - << "Connect to schemeshard " << msg->TabletId << " failed"); - - OnConnectionError(ctx, error, msg->TabletId); - } -} - -void TSSProxyActor::HandleDisconnect( - TEvTabletPipe::TEvClientDestroyed::TPtr& ev, - const TActorContext& ctx) +void TSSProxyActor::Bootstrap(const TActorContext& ctx) { - const auto* msg = ev->Get(); - - ClientCache->OnDisconnect(ev); - - auto error = MakeError(E_REJECTED, TStringBuilder() - << "Disconnected from schemeshard " << msg->TabletId); - - OnConnectionError(ctx, error, msg->TabletId); -} - -void TSSProxyActor::OnConnectionError( - const TActorContext& ctx, - const NProto::TError& error, - ui64 schemeShard) -{ - Y_UNUSED(error); - - // SchemeShard is a tablet, so it should eventually get up - // Re-send all outstanding requests - if (auto* state = SchemeShardStates.FindPtr(schemeShard)) { - for (const auto& kv : state->TxToRequests) { - ui64 txId = kv.first; - SendWaitTxRequest(ctx, schemeShard, txId); - } - } + TThis::Become(&TThis::StateWork); + + auto actor = std::make_unique<::NCloud::NStorage::TSSProxyActor>( + TFileStoreComponents::SS_PROXY, + Config->GetSchemeShardDir(), + CreateTabletPipeClientConfig(*Config) + ); + StorageSSProxyActor = ctx.Register(actor.release()); } //////////////////////////////////////////////////////////////////////////////// @@ -100,19 +63,32 @@ bool TSSProxyActor::HandleRequests(STFUNC_SIG) STFUNC(TSSProxyActor::StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvTabletPipe::TEvClientConnected, HandleConnect); - HFunc(TEvTabletPipe::TEvClientDestroyed, HandleDisconnect); + if (!HandleRequests(ev)) { + HandleUnexpectedEvent(ev, TFileStoreComponents::SS_PROXY); + } +} - HFunc(TEvSchemeShard::TEvNotifyTxCompletionRegistered, HandleTxRegistered); - HFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, HandleTxResult); +//////////////////////////////////////////////////////////////////////////////// - default: - if (!HandleRequests(ev)) { - HandleUnexpectedEvent(ev, TFileStoreComponents::SS_PROXY); - } - break; - } +void TSSProxyActor::HandleDescribeScheme( + const TEvSSProxy::TEvDescribeSchemeRequest::TPtr& ev, + const TActorContext& ctx) +{ + ctx.Send(ev->Forward(StorageSSProxyActor)); +} + +void TSSProxyActor::HandleModifyScheme( + const TEvSSProxy::TEvModifySchemeRequest::TPtr& ev, + const TActorContext& ctx) +{ + ctx.Send(ev->Forward(StorageSSProxyActor)); +} + +void TSSProxyActor::HandleWaitSchemeTx( + const TEvSSProxy::TEvWaitSchemeTxRequest::TPtr& ev, + const TActorContext& ctx) +{ + ctx.Send(ev->Forward(StorageSSProxyActor)); } } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.h b/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.h index 774ce63a172..9ee19be9e36 100644 --- a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.h +++ b/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.h @@ -8,10 +8,7 @@ #include -#include -#include - -#include +#include #include #include #include @@ -24,50 +21,19 @@ namespace NCloud::NFileStore::NStorage { //////////////////////////////////////////////////////////////////////////////// class TSSProxyActor final - : public NActors::TActor + : public NActors::TActorBootstrapped { - struct TSchemeShardState - { - NActors::TActorId ReplyProxy; - THashMap> TxToRequests; - }; - private: const TStorageConfigPtr Config; - std::unique_ptr ClientCache; - THashMap SchemeShardStates; -public: - TSSProxyActor(TStorageConfigPtr config); + NActors::TActorId StorageSSProxyActor; -private: - void SendWaitTxRequest( - const NActors::TActorContext& ctx, - ui64 schemeShard, - ui64 txId); +public: + explicit TSSProxyActor(TStorageConfigPtr config); - void OnConnectionError( - const NActors::TActorContext& ctx, - const NProto::TError& error, - ui64 schemeShard); + void Bootstrap(const NActors::TActorContext& ctx); private: - void HandleConnect( - NKikimr::TEvTabletPipe::TEvClientConnected::TPtr& ev, - const NActors::TActorContext& ctx); - - void HandleDisconnect( - NKikimr::TEvTabletPipe::TEvClientDestroyed::TPtr& ev, - const NActors::TActorContext& ctx); - - void HandleTxRegistered( - const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr& ev, - const NActors::TActorContext& ctx); - - void HandleTxResult( - const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, - const NActors::TActorContext& ctx); - bool HandleRequests(STFUNC_SIG); FILESTORE_SS_PROXY_REQUESTS(FILESTORE_IMPLEMENT_REQUEST, TEvSSProxy) diff --git a/cloud/filestore/libs/storage/ss_proxy/ya.make b/cloud/filestore/libs/storage/ss_proxy/ya.make index 34caa3bc273..eecbb34b96d 100644 --- a/cloud/filestore/libs/storage/ss_proxy/ya.make +++ b/cloud/filestore/libs/storage/ss_proxy/ya.make @@ -7,22 +7,24 @@ SRCS( ss_proxy_actor_alterfs.cpp ss_proxy_actor_createfs.cpp ss_proxy_actor_describefs.cpp - ss_proxy_actor_describescheme.cpp ss_proxy_actor_destroyfs.cpp - ss_proxy_actor_modifyscheme.cpp - ss_proxy_actor_waitschemetx.cpp ) PEERDIR( cloud/filestore/libs/storage/api cloud/filestore/libs/storage/core + cloud/storage/core/libs/kikimr - contrib/ydb/library/actors/core - library/cpp/string_utils/quote + cloud/storage/core/libs/ss_proxy + contrib/ydb/core/base contrib/ydb/core/tablet contrib/ydb/core/tx/schemeshard contrib/ydb/core/tx/tx_proxy + + contrib/ydb/library/actors/core + + library/cpp/string_utils/quote ) END() diff --git a/cloud/storage/core/libs/api/ss_proxy.cpp b/cloud/storage/core/libs/api/ss_proxy.cpp new file mode 100644 index 00000000000..191b2a5d398 --- /dev/null +++ b/cloud/storage/core/libs/api/ss_proxy.cpp @@ -0,0 +1 @@ +#include "ss_proxy.h" diff --git a/cloud/storage/core/libs/api/ss_proxy.h b/cloud/storage/core/libs/api/ss_proxy.h new file mode 100644 index 00000000000..06deb9c4335 --- /dev/null +++ b/cloud/storage/core/libs/api/ss_proxy.h @@ -0,0 +1,134 @@ +#pragma once + +#include "public.h" + +#include +#include + +#include +#include + +#include + +#include + +namespace NCloud::NStorage { + +//////////////////////////////////////////////////////////////////////////////// + +#define STORAGE_SS_PROXY_REQUESTS(xxx, ...) \ + xxx(DescribeScheme, __VA_ARGS__) \ + xxx(ModifyScheme, __VA_ARGS__) \ + xxx(WaitSchemeTx, __VA_ARGS__) \ +// STORAGE_SS_PROXY_REQUESTS + +//////////////////////////////////////////////////////////////////////////////// + +struct TEvSSProxy +{ + // + // DescribeScheme + // + + struct TDescribeSchemeRequest + { + const TString Path; + + TDescribeSchemeRequest(TString path) + : Path(std::move(path)) + {} + }; + + struct TDescribeSchemeResponse + { + const TString Path; + const NKikimrSchemeOp::TPathDescription PathDescription; + + TDescribeSchemeResponse() = default; + + TDescribeSchemeResponse( + TString path, + NKikimrSchemeOp::TPathDescription pathDescription) + : Path(std::move(path)) + , PathDescription(std::move(pathDescription)) + {} + }; + + // + // ModifyScheme + // + + struct TModifySchemeRequest + { + const NKikimrSchemeOp::TModifyScheme ModifyScheme; + + TModifySchemeRequest( + NKikimrSchemeOp::TModifyScheme modifyScheme) + : ModifyScheme(std::move(modifyScheme)) + {} + }; + + struct TModifySchemeResponse + { + const ui64 SchemeShardTabletId; + const NKikimrScheme::EStatus Status; + const TString Reason; + + TModifySchemeResponse( + ui64 schemeShardTabletId = 0, + NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess, + TString reason = TString()) + : SchemeShardTabletId(schemeShardTabletId) + , Status(status) + , Reason(std::move(reason)) + {} + }; + + // + // WaitSchemeTx + // + + struct TWaitSchemeTxRequest + { + const ui64 SchemeShardTabletId; + const ui64 TxId; + + TWaitSchemeTxRequest( + ui64 schemeShardTabletId, + ui64 txId) + : SchemeShardTabletId(schemeShardTabletId) + , TxId(txId) + {} + }; + + struct TWaitSchemeTxResponse + { + }; + + // + // Events declaration + // + + enum EEvents + { + EvBegin = TStorageEvents::SS_PROXY_START, + + EvDescribeSchemeRequest = EvBegin + 1, + EvDescribeSchemeResponse = EvBegin + 2, + + EvModifySchemeRequest = EvBegin + 3, + EvModifySchemeResponse = EvBegin + 4, + + EvWaitSchemeTxRequest = EvBegin + 5, + EvWaitSchemeTxResponse = EvBegin + 6, + + EvEnd + }; + + static_assert(EvEnd < (int)TStorageEvents::SS_PROXY_END, + "EvEnd expected to be < TStorageEvents::SS_PROXY_END"); + + STORAGE_SS_PROXY_REQUESTS(STORAGE_DECLARE_EVENTS) +}; + +} // namespace NCloud::NStorage diff --git a/cloud/storage/core/libs/api/ya.make b/cloud/storage/core/libs/api/ya.make index b1096bfca87..8c10e0cdbe9 100644 --- a/cloud/storage/core/libs/api/ya.make +++ b/cloud/storage/core/libs/api/ya.make @@ -3,13 +3,16 @@ LIBRARY() SRCS( authorizer.cpp hive_proxy.cpp + ss_proxy.cpp user_stats.cpp ) PEERDIR( cloud/storage/core/libs/kikimr - contrib/ydb/library/actors/core + contrib/ydb/core/base + + contrib/ydb/library/actors/core ) END() diff --git a/cloud/storage/core/libs/hive_proxy/hive_proxy_actor.h b/cloud/storage/core/libs/hive_proxy/hive_proxy_actor.h index 1efb1b0984c..75920d897e1 100644 --- a/cloud/storage/core/libs/hive_proxy/hive_proxy_actor.h +++ b/cloud/storage/core/libs/hive_proxy/hive_proxy_actor.h @@ -4,8 +4,8 @@ #include "hive_proxy_events_private.h" -#include #include +#include #include #include diff --git a/cloud/storage/core/libs/kikimr/components.h b/cloud/storage/core/libs/kikimr/components.h index bfa7a8cabe0..3d430713c8c 100644 --- a/cloud/storage/core/libs/kikimr/components.h +++ b/cloud/storage/core/libs/kikimr/components.h @@ -15,6 +15,7 @@ namespace NCloud { #define STORAGE_ACTORS(xxx) \ xxx(HIVE_PROXY) \ + xxx(SS_PROXY) \ // STORAGE_ACTORS //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/storage/core/libs/ss_proxy/public.h b/cloud/storage/core/libs/ss_proxy/public.h new file mode 100644 index 00000000000..6f70f09beec --- /dev/null +++ b/cloud/storage/core/libs/ss_proxy/public.h @@ -0,0 +1 @@ +#pragma once diff --git a/cloud/storage/core/libs/ss_proxy/ss_proxy_actor.cpp b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor.cpp new file mode 100644 index 00000000000..cc1557a7a65 --- /dev/null +++ b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor.cpp @@ -0,0 +1,100 @@ +#include "ss_proxy_actor.h" + +namespace NCloud::NStorage { + +using namespace NActors; + +using namespace NKikimr; +using namespace NKikimr::NSchemeShard; + +//////////////////////////////////////////////////////////////////////////////// + +TSSProxyActor::TSSProxyActor( + int logComponent, + TString schemeShardDir, + NKikimr::NTabletPipe::TClientConfig pipeClientConfig) + : TActor(&TThis::StateWork) + , LogComponent(logComponent) + , SchemeShardDir(schemeShardDir) + , ClientCache(NTabletPipe::CreateUnboundedClientCache(pipeClientConfig)) +{} + +//////////////////////////////////////////////////////////////////////////////// + +void TSSProxyActor::HandleConnect( + TEvTabletPipe::TEvClientConnected::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + if (!ClientCache->OnConnect(ev)) { + auto error = MakeKikimrError(msg->Status, TStringBuilder() + << "Connect to schemeshard " << msg->TabletId << " failed"); + + OnConnectionError(ctx, error, msg->TabletId); + } +} + +void TSSProxyActor::HandleDisconnect( + TEvTabletPipe::TEvClientDestroyed::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + ClientCache->OnDisconnect(ev); + + auto error = MakeError(E_REJECTED, TStringBuilder() + << "Disconnected from schemeshard " << msg->TabletId); + + OnConnectionError(ctx, error, msg->TabletId); +} + +void TSSProxyActor::OnConnectionError( + const TActorContext& ctx, + const NProto::TError& error, + ui64 schemeShard) +{ + Y_UNUSED(error); + + // SchemeShard is a tablet, so it should eventually get up + // Re-send all outstanding requests + if (auto* state = SchemeShardStates.FindPtr(schemeShard)) { + for (const auto& kv : state->TxToRequests) { + ui64 txId = kv.first; + SendWaitTxRequest(ctx, schemeShard, txId); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +bool TSSProxyActor::HandleRequests(STFUNC_SIG) +{ + switch (ev->GetTypeRewrite()) { + STORAGE_SS_PROXY_REQUESTS(STORAGE_HANDLE_REQUEST, TEvSSProxy) + + default: + return false; + } + + return true; +} + +STFUNC(TSSProxyActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvTabletPipe::TEvClientConnected, HandleConnect); + HFunc(TEvTabletPipe::TEvClientDestroyed, HandleDisconnect); + + HFunc(TEvSchemeShard::TEvNotifyTxCompletionRegistered, HandleTxRegistered); + HFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, HandleTxResult); + + default: + if (!HandleRequests(ev)) { + HandleUnexpectedEvent(ev, LogComponent); + } + break; + } +} + +} // namespace NCloud::NStorage diff --git a/cloud/storage/core/libs/ss_proxy/ss_proxy_actor.h b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor.h new file mode 100644 index 00000000000..a449a51f599 --- /dev/null +++ b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor.h @@ -0,0 +1,97 @@ +#pragma once + +#include "public.h" + +#include +#include + +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace NCloud::NStorage { + +//////////////////////////////////////////////////////////////////////////////// + +class TSSProxyActor final + : public NActors::TActor +{ +public: + struct TRequestInfo + { + NActors::TActorId Sender; + ui64 Cookie = 0; + + TRequestInfo() = default; + TRequestInfo(const TRequestInfo&) = default; + TRequestInfo& operator=(const TRequestInfo&) = default; + + TRequestInfo(NActors::TActorId sender, ui64 cookie) + : Sender(sender) + , Cookie(cookie) + {} + }; + +private: + struct TSchemeShardState + { + NActors::TActorId ReplyProxy; + THashMap> TxToRequests; + }; + +private: + const int LogComponent; + const TString SchemeShardDir; + + std::unique_ptr ClientCache; + THashMap SchemeShardStates; + +public: + TSSProxyActor( + int logComponent, + TString schemeShardDir, + NKikimr::NTabletPipe::TClientConfig pipeClientConfig); + +private: + void SendWaitTxRequest( + const NActors::TActorContext& ctx, + ui64 schemeShard, + ui64 txId); + + void OnConnectionError( + const NActors::TActorContext& ctx, + const NProto::TError& error, + ui64 schemeShard); + +private: + void HandleConnect( + NKikimr::TEvTabletPipe::TEvClientConnected::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleDisconnect( + NKikimr::TEvTabletPipe::TEvClientDestroyed::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleTxRegistered( + const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleTxResult( + const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, + const NActors::TActorContext& ctx); + + bool HandleRequests(STFUNC_SIG); + + STORAGE_SS_PROXY_REQUESTS(STORAGE_IMPLEMENT_REQUEST, TEvSSProxy) + + STFUNC(StateWork); +}; + +} // namespace NCloud::NStorage diff --git a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_describescheme.cpp b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor_describescheme.cpp similarity index 80% rename from cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_describescheme.cpp rename to cloud/storage/core/libs/ss_proxy/ss_proxy_actor_describescheme.cpp index f8c491f701e..026d185a91c 100644 --- a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_describescheme.cpp +++ b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor_describescheme.cpp @@ -1,12 +1,10 @@ #include "ss_proxy_actor.h" -#include - #include #include -namespace NCloud::NFileStore::NStorage { +namespace NCloud::NStorage { using namespace NActors; @@ -21,15 +19,17 @@ class TDescribeSchemeActor final : public TActorBootstrapped { private: - const TRequestInfoPtr RequestInfo; + const int LogComponent; + const TSSProxyActor::TRequestInfo RequestInfo; - const TStorageConfigPtr Config; + const TString SchemeShardDir; const TString Path; public: TDescribeSchemeActor( - TRequestInfoPtr requestInfo, - TStorageConfigPtr config, + int logComponent, + TSSProxyActor::TRequestInfo requestInfo, + TString schemeShardDir, TString path); void Bootstrap(const TActorContext& ctx); @@ -54,11 +54,13 @@ class TDescribeSchemeActor final //////////////////////////////////////////////////////////////////////////////// TDescribeSchemeActor::TDescribeSchemeActor( - TRequestInfoPtr requestInfo, - TStorageConfigPtr config, + int logComponent, + TSSProxyActor::TRequestInfo requestInfo, + TString schemeShardDir, TString path) - : RequestInfo(std::move(requestInfo)) - , Config(std::move(config)) + : LogComponent(logComponent) + , RequestInfo(std::move(requestInfo)) + , SchemeShardDir(std::move(schemeShardDir)) , Path(std::move(path)) {} @@ -72,7 +74,7 @@ void TDescribeSchemeActor::DescribeScheme(const TActorContext& ctx) { auto request = std::make_unique(); request->Record.MutableDescribePath()->SetPath(Path); - request->Record.SetDatabaseName(Config->GetSchemeShardDir()); + request->Record.SetDatabaseName(SchemeShardDir); NCloud::Send(ctx, MakeTxProxyID(), std::move(request)); } @@ -94,7 +96,7 @@ void TDescribeSchemeActor::ReplyAndDie( const TActorContext& ctx, std::unique_ptr response) { - NCloud::Reply(ctx, *RequestInfo, std::move(response)); + NCloud::Reply(ctx, RequestInfo, std::move(response)); Die(ctx); } @@ -124,7 +126,7 @@ STFUNC(TDescribeSchemeActor::StateWork) HFunc(TEvSchemeShard::TEvDescribeSchemeResult, HandleDescribeSchemeResult); default: - HandleUnexpectedEvent(ev, TFileStoreComponents::SS_PROXY); + HandleUnexpectedEvent(ev, LogComponent); break; } } @@ -139,17 +141,13 @@ void TSSProxyActor::HandleDescribeScheme( { const auto* msg = ev->Get(); - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - NCloud::Register( ctx, std::make_unique( - std::move(requestInfo), - Config, + LogComponent, + TRequestInfo(ev->Sender, ev->Cookie), + SchemeShardDir, msg->Path)); } -} // namespace NCloud::NFileStore::NStorage +} // namespace NCloud::NStorage diff --git a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_modifyscheme.cpp b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor_modifyscheme.cpp similarity index 89% rename from cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_modifyscheme.cpp rename to cloud/storage/core/libs/ss_proxy/ss_proxy_actor_modifyscheme.cpp index cf56922d236..5ee99ce206b 100644 --- a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_modifyscheme.cpp +++ b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor_modifyscheme.cpp @@ -4,7 +4,7 @@ #include -namespace NCloud::NFileStore::NStorage { +namespace NCloud::NStorage { using namespace NActors; @@ -41,7 +41,8 @@ class TModifySchemeActor final : public TActorBootstrapped { private: - const TRequestInfoPtr RequestInfo; + const int LogComponent; + const TSSProxyActor::TRequestInfo RequestInfo; const TActorId Owner; const NKikimrSchemeOp::TModifyScheme ModifyScheme; @@ -52,7 +53,8 @@ class TModifySchemeActor final public: TModifySchemeActor( - TRequestInfoPtr requestInfo, + int logComponent, + TSSProxyActor::TRequestInfo requestInfo, const TActorId& owner, NKikimrSchemeOp::TModifyScheme modifyScheme); @@ -77,10 +79,12 @@ class TModifySchemeActor final //////////////////////////////////////////////////////////////////////////////// TModifySchemeActor::TModifySchemeActor( - TRequestInfoPtr requestInfo, + int logComponent, + TSSProxyActor::TRequestInfo requestInfo, const TActorId& owner, NKikimrSchemeOp::TModifyScheme modifyScheme) - : RequestInfo(std::move(requestInfo)) + : LogComponent(logComponent) + , RequestInfo(std::move(requestInfo)) , Owner(owner) , ModifyScheme(std::move(modifyScheme)) {} @@ -111,7 +115,7 @@ void TModifySchemeActor::HandleStatus( auto status = (TEvTxUserProxy::TEvProposeTransactionStatus::EStatus) record.GetStatus(); switch (status) { case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete: - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Request %s with TxId# %lu completed immediately", NKikimrSchemeOp::EOperationType_Name(ModifyScheme.GetOperationType()).c_str(), TxId); @@ -120,7 +124,7 @@ void TModifySchemeActor::HandleStatus( break; case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress: { - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Request %s with TxId# %lu in progress, waiting for completion", NKikimrSchemeOp::EOperationType_Name(ModifyScheme.GetOperationType()).c_str(), TxId); @@ -134,7 +138,7 @@ void TModifySchemeActor::HandleStatus( } case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecError: - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Request %s with TxId# %lu failed with status %s", NKikimrSchemeOp::EOperationType_Name(ModifyScheme.GetOperationType()).c_str(), TxId, @@ -144,7 +148,7 @@ void TModifySchemeActor::HandleStatus( (record.GetPathCreateTxId() != 0 || record.GetPathDropTxId() != 0)) { ui64 txId = record.GetPathCreateTxId() != 0 ? record.GetPathCreateTxId() : record.GetPathDropTxId(); - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Waiting for a different TxId# %lu", txId); auto request = std::make_unique( @@ -169,7 +173,7 @@ void TModifySchemeActor::HandleStatus( case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ResolveError: if (SchemeShardStatus == NKikimrScheme::StatusPathDoesNotExist) { - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Request %s failed to resolve parent path", NKikimrSchemeOp::EOperationType_Name(ModifyScheme.GetOperationType()).c_str()); @@ -189,7 +193,7 @@ void TModifySchemeActor::HandleStatus( /* fall through */ default: - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Request %s to tx_proxy failed with code %u", NKikimrSchemeOp::EOperationType_Name(ModifyScheme.GetOperationType()).c_str(), status); @@ -208,7 +212,7 @@ void TModifySchemeActor::HandleTxDone( { const auto* msg = ev->Get(); - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "TModifySchemeActor received TEvWaitSchemeTxResponse"); ReplyAndDie(ctx, msg->GetError()); @@ -228,7 +232,7 @@ void TModifySchemeActor::ReplyAndDie( SchemeShardStatus, SchemeShardReason); - NCloud::Reply(ctx, *RequestInfo, std::move(response)); + NCloud::Reply(ctx, RequestInfo, std::move(response)); Die(ctx); } @@ -239,7 +243,7 @@ STFUNC(TModifySchemeActor::StateWork) HFunc(TEvSSProxy::TEvWaitSchemeTxResponse, HandleTxDone); default: - HandleUnexpectedEvent(ev, TFileStoreComponents::SS_PROXY); + HandleUnexpectedEvent(ev, LogComponent); break; } } @@ -254,17 +258,13 @@ void TSSProxyActor::HandleModifyScheme( { const auto* msg = ev->Get(); - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - NCloud::Register( ctx, std::make_unique( - std::move(requestInfo), + LogComponent, + TRequestInfo(ev->Sender, ev->Cookie), ctx.SelfID, msg->ModifyScheme)); } -} // namespace NCloud::NFileStore::NStorage +} // namespace NCloud::NStorage diff --git a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_waitschemetx.cpp b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor_waitschemetx.cpp similarity index 87% rename from cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_waitschemetx.cpp rename to cloud/storage/core/libs/ss_proxy/ss_proxy_actor_waitschemetx.cpp index 8128342c8d3..1060b630dff 100644 --- a/cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor_waitschemetx.cpp +++ b/cloud/storage/core/libs/ss_proxy/ss_proxy_actor_waitschemetx.cpp @@ -1,6 +1,6 @@ #include "ss_proxy_actor.h" -namespace NCloud::NFileStore::NStorage { +namespace NCloud::NStorage { using namespace NActors; @@ -15,14 +15,17 @@ class TReplyProxyActor final : public TActor { private: + const int LogComponent; const TActorId Owner; const ui64 TabletId; public: TReplyProxyActor( + int logComponent, const TActorId& owner, const ui64 tabletId) : TActor(&TThis::StateWork) + , LogComponent(logComponent) , Owner(owner) , TabletId(tabletId) {} @@ -48,7 +51,7 @@ STFUNC(TReplyProxyActor::StateWork) HFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, Handle); default: - HandleUnexpectedEvent(ev, TFileStoreComponents::SS_PROXY); + HandleUnexpectedEvent(ev, LogComponent); break; } } @@ -82,10 +85,7 @@ void TSSProxyActor::HandleWaitSchemeTx( auto& state = SchemeShardStates[msg->SchemeShardTabletId]; auto& requests = state.TxToRequests[msg->TxId]; - requests.emplace_back(CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext)); + requests.emplace_back(TRequestInfo(ev->Sender, ev->Cookie)); if (requests.size() == 1) { // This is the first request for this tabletId/txId @@ -100,16 +100,19 @@ void TSSProxyActor::SendWaitTxRequest( { auto& state = SchemeShardStates[schemeShard]; if (!state.ReplyProxy) { - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Creating reply proxy actor for schemeshard %lu", schemeShard); state.ReplyProxy = NCloud::Register( ctx, - std::make_unique(ctx.SelfID, schemeShard)); + std::make_unique( + LogComponent, + ctx.SelfID, + schemeShard)); } - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Sending NotifyTxCompletion to %lu for txId# %lu", schemeShard, txId); @@ -130,7 +133,7 @@ void TSSProxyActor::HandleTxRegistered( const auto* msg = ev->Get(); ui64 txId = msg->Record.GetTxId(); - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Received NotifyTxCompletionRegistered from %lu for txId# %lu", schemeShard, txId); @@ -146,7 +149,7 @@ void TSSProxyActor::HandleTxResult( const auto* msg = ev->Get(); ui64 txId = msg->Record.GetTxId(); - LOG_DEBUG(ctx, TFileStoreComponents::SS_PROXY, + LOG_DEBUG(ctx, LogComponent, "Received NotifyTxCompletionResult from %lu for txId# %lu", schemeShard, txId); @@ -156,11 +159,11 @@ void TSSProxyActor::HandleTxResult( for (const auto& request : it->second) { NCloud::Reply( ctx, - *request, + request, std::make_unique()); } state.TxToRequests.erase(it); } } -} // namespace NCloud::NFileStore::NStorage +} // namespace NCloud::NStorage diff --git a/cloud/storage/core/libs/ss_proxy/ya.make b/cloud/storage/core/libs/ss_proxy/ya.make new file mode 100644 index 00000000000..91d604c21e7 --- /dev/null +++ b/cloud/storage/core/libs/ss_proxy/ya.make @@ -0,0 +1,22 @@ +LIBRARY() + +SRCS( + ss_proxy_actor.cpp + ss_proxy_actor_describescheme.cpp + ss_proxy_actor_modifyscheme.cpp + ss_proxy_actor_waitschemetx.cpp +) + +PEERDIR( + cloud/storage/core/libs/api + cloud/storage/core/libs/kikimr + + contrib/ydb/core/base + contrib/ydb/core/tablet + contrib/ydb/core/tx/schemeshard + contrib/ydb/core/tx/tx_proxy + + contrib/ydb/library/actors/core +) + +END() diff --git a/cloud/storage/core/libs/ya.make b/cloud/storage/core/libs/ya.make index b9dce86c622..2056c321172 100644 --- a/cloud/storage/core/libs/ya.make +++ b/cloud/storage/core/libs/ya.make @@ -13,6 +13,7 @@ RECURSE( hive_proxy http kikimr + ss_proxy tablet throttling uds