Skip to content

Commit

Permalink
issue-2100: extract TSSProxyActor from cloud/filestore - this actor i…
Browse files Browse the repository at this point in the history
…mplements DescribeScheme, ModifyScheme and WaitSchemeTx requests
  • Loading branch information
Mikhail Montsev committed Sep 25, 2024
1 parent 079fdb6 commit fcfdfb6
Show file tree
Hide file tree
Showing 17 changed files with 506 additions and 256 deletions.
127 changes: 36 additions & 91 deletions cloud/filestore/libs/storage/api/ss_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "components.h"
#include "events.h"

#include <cloud/storage/core/libs/api/ss_proxy.h>

#include <contrib/ydb/core/protos/flat_tx_scheme.pb.h>
#include <contrib/ydb/core/protos/flat_scheme_op.pb.h>
#include <contrib/ydb/core/protos/filestore_config.pb.h>
Expand All @@ -21,6 +23,7 @@ namespace NCloud::NFileStore::NStorage {
xxx(DescribeScheme, __VA_ARGS__) \
xxx(ModifyScheme, __VA_ARGS__) \
xxx(WaitSchemeTx, __VA_ARGS__) \
\
xxx(DescribeFileStore, __VA_ARGS__) \
xxx(CreateFileStore, __VA_ARGS__) \
xxx(AlterFileStore, __VA_ARGS__) \
Expand All @@ -31,84 +34,20 @@ namespace NCloud::NFileStore::NStorage {

struct TEvSSProxy
{
//
// DescribeScheme
//

struct TDescribeSchemeRequest
{
const TString Path;

TDescribeSchemeRequest(TString path)
: Path(std::move(path))
{}
};

struct TDescribeSchemeResponse
{
const TString Path;
const NKikimrSchemeOp::TPathDescription PathDescription;

TDescribeSchemeResponse() = default;

TDescribeSchemeResponse(
TString path,
NKikimrSchemeOp::TPathDescription pathDescription)
: Path(std::move(path))
, PathDescription(std::move(pathDescription))
{}
};

//
// ModifyScheme
//
using TDescribeSchemeRequest =
::NCloud::NStorage::TEvSSProxy::TDescribeSchemeRequest;
using TDescribeSchemeResponse =
::NCloud::NStorage::TEvSSProxy::TDescribeSchemeResponse;

struct TModifySchemeRequest
{
const NKikimrSchemeOp::TModifyScheme ModifyScheme;

TModifySchemeRequest(
NKikimrSchemeOp::TModifyScheme modifyScheme)
: ModifyScheme(std::move(modifyScheme))
{}
};
using TModifySchemeRequest =
::NCloud::NStorage::TEvSSProxy::TModifySchemeRequest;
using TModifySchemeResponse =
::NCloud::NStorage::TEvSSProxy::TModifySchemeResponse;

struct TModifySchemeResponse
{
const ui64 SchemeShardTabletId;
const NKikimrScheme::EStatus Status;
const TString Reason;

TModifySchemeResponse(
ui64 schemeShardTabletId = 0,
NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess,
TString reason = TString())
: SchemeShardTabletId(schemeShardTabletId)
, Status(status)
, Reason(std::move(reason))
{}
};

//
// WaitSchemeTx
//

struct TWaitSchemeTxRequest
{
const ui64 SchemeShardTabletId;
const ui64 TxId;

TWaitSchemeTxRequest(
ui64 schemeShardTabletId,
ui64 txId)
: SchemeShardTabletId(schemeShardTabletId)
, TxId(txId)
{}
};

struct TWaitSchemeTxResponse
{
};
using TWaitSchemeTxRequest =
::NCloud::NStorage::TEvSSProxy::TWaitSchemeTxRequest;
using TWaitSchemeTxResponse =
::NCloud::NStorage::TEvSSProxy::TWaitSchemeTxResponse;

//
// DescribeFileStore
Expand Down Expand Up @@ -223,28 +162,34 @@ struct TEvSSProxy

enum EEvents
{
EvBegin = TFileStoreEvents::SS_PROXY_START,
EvDescribeSchemeRequest =
::NCloud::NStorage::TEvSSProxy::EEvents::EvDescribeSchemeRequest,
EvDescribeSchemeResponse =
::NCloud::NStorage::TEvSSProxy::EEvents::EvDescribeSchemeResponse,

EvDescribeSchemeRequest = EvBegin + 1,
EvDescribeSchemeResponse = EvBegin + 2,
EvModifySchemeRequest =
::NCloud::NStorage::TEvSSProxy::EEvents::EvModifySchemeRequest,
EvModifySchemeResponse =
::NCloud::NStorage::TEvSSProxy::EEvents::EvModifySchemeResponse,

EvModifySchemeRequest = EvBegin + 3,
EvModifySchemeResponse = EvBegin + 4,
EvWaitSchemeTxRequest =
::NCloud::NStorage::TEvSSProxy::EEvents::EvWaitSchemeTxRequest,
EvWaitSchemeTxResponse =
::NCloud::NStorage::TEvSSProxy::EEvents::EvWaitSchemeTxResponse,

EvWaitSchemeTxRequest = EvBegin + 5,
EvWaitSchemeTxResponse = EvBegin + 6,
EvBegin = TFileStoreEvents::SS_PROXY_START,

EvDescribeFileStoreRequest = EvBegin + 7,
EvDescribeFileStoreResponse = EvBegin + 8,
EvDescribeFileStoreRequest = EvBegin + 1,
EvDescribeFileStoreResponse = EvBegin + 2,

EvCreateFileStoreRequest = EvBegin + 9,
EvCreateFileStoreResponse = EvBegin + 10,
EvCreateFileStoreRequest = EvBegin + 3,
EvCreateFileStoreResponse = EvBegin + 4,

EvAlterFileStoreRequest = EvBegin + 11,
EvAlterFileStoreResponse = EvBegin + 12,
EvAlterFileStoreRequest = EvBegin + 5,
EvAlterFileStoreResponse = EvBegin + 6,

EvDestroyFileStoreRequest = EvBegin + 13,
EvDestroyFileStoreResponse = EvBegin + 14,
EvDestroyFileStoreRequest = EvBegin + 7,
EvDestroyFileStoreResponse = EvBegin + 8,

EvEnd
};
Expand Down
100 changes: 38 additions & 62 deletions cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <cloud/filestore/libs/storage/core/config.h>

#include <cloud/storage/core/libs/ss_proxy/ss_proxy_actor.h>

namespace NCloud::NFileStore::NStorage {

using namespace NActors;
Expand All @@ -13,7 +15,7 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

std::unique_ptr<NTabletPipe::IClientCache> CreateTabletPipeClientCache(
NTabletPipe::TClientConfig CreateTabletPipeClientConfig(
const TStorageConfig& config)
{
NTabletPipe::TClientConfig clientConfig;
Expand All @@ -22,66 +24,27 @@ std::unique_ptr<NTabletPipe::IClientCache> CreateTabletPipeClientCache(
.MinRetryTime = config.GetPipeClientMinRetryTime(),
.MaxRetryTime = config.GetPipeClientMaxRetryTime()
};

return std::unique_ptr<NTabletPipe::IClientCache>(
NTabletPipe::CreateUnboundedClientCache(clientConfig));
return clientConfig;
}

} // namespace

////////////////////////////////////////////////////////////////////////////////

TSSProxyActor::TSSProxyActor(TStorageConfigPtr config)
: TActor(&TThis::StateWork)
, Config(std::move(config))
, ClientCache(CreateTabletPipeClientCache(*Config))
: Config(std::move(config))
{}

////////////////////////////////////////////////////////////////////////////////

void TSSProxyActor::HandleConnect(
TEvTabletPipe::TEvClientConnected::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();

if (!ClientCache->OnConnect(ev)) {
auto error = MakeKikimrError(msg->Status, TStringBuilder()
<< "Connect to schemeshard " << msg->TabletId << " failed");

OnConnectionError(ctx, error, msg->TabletId);
}
}

void TSSProxyActor::HandleDisconnect(
TEvTabletPipe::TEvClientDestroyed::TPtr& ev,
const TActorContext& ctx)
void TSSProxyActor::Bootstrap(const TActorContext& ctx)
{
const auto* msg = ev->Get();

ClientCache->OnDisconnect(ev);

auto error = MakeError(E_REJECTED, TStringBuilder()
<< "Disconnected from schemeshard " << msg->TabletId);

OnConnectionError(ctx, error, msg->TabletId);
}

void TSSProxyActor::OnConnectionError(
const TActorContext& ctx,
const NProto::TError& error,
ui64 schemeShard)
{
Y_UNUSED(error);

// SchemeShard is a tablet, so it should eventually get up
// Re-send all outstanding requests
if (auto* state = SchemeShardStates.FindPtr(schemeShard)) {
for (const auto& kv : state->TxToRequests) {
ui64 txId = kv.first;
SendWaitTxRequest(ctx, schemeShard, txId);
}
}
TThis::Become(&TThis::StateWork);

auto actor = std::make_unique<::NCloud::NStorage::TSSProxyActor>(
TFileStoreComponents::SS_PROXY,
Config->GetSchemeShardDir(),
CreateTabletPipeClientConfig(*Config)
);
StorageSSProxyActor = ctx.Register(actor.release());
}

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -100,19 +63,32 @@ bool TSSProxyActor::HandleRequests(STFUNC_SIG)

STFUNC(TSSProxyActor::StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(TEvTabletPipe::TEvClientConnected, HandleConnect);
HFunc(TEvTabletPipe::TEvClientDestroyed, HandleDisconnect);
if (!HandleRequests(ev)) {
HandleUnexpectedEvent(ev, TFileStoreComponents::SS_PROXY);
}
}

HFunc(TEvSchemeShard::TEvNotifyTxCompletionRegistered, HandleTxRegistered);
HFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, HandleTxResult);
////////////////////////////////////////////////////////////////////////////////

default:
if (!HandleRequests(ev)) {
HandleUnexpectedEvent(ev, TFileStoreComponents::SS_PROXY);
}
break;
}
void TSSProxyActor::HandleDescribeScheme(
const TEvSSProxy::TEvDescribeSchemeRequest::TPtr& ev,
const TActorContext& ctx)
{
ctx.Send(ev->Forward(StorageSSProxyActor));
}

void TSSProxyActor::HandleModifyScheme(
const TEvSSProxy::TEvModifySchemeRequest::TPtr& ev,
const TActorContext& ctx)
{
ctx.Send(ev->Forward(StorageSSProxyActor));
}

void TSSProxyActor::HandleWaitSchemeTx(
const TEvSSProxy::TEvWaitSchemeTxRequest::TPtr& ev,
const TActorContext& ctx)
{
ctx.Send(ev->Forward(StorageSSProxyActor));
}

} // namespace NCloud::NFileStore::NStorage
46 changes: 6 additions & 40 deletions cloud/filestore/libs/storage/ss_proxy/ss_proxy_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@

#include <cloud/storage/core/libs/kikimr/helpers.h>

#include <contrib/ydb/core/tablet/tablet_pipe_client_cache.h>
#include <contrib/ydb/core/tx/schemeshard/schemeshard.h>

#include <contrib/ydb/library/actors/core/actor.h>
#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>
#include <contrib/ydb/library/actors/core/events.h>
#include <contrib/ydb/library/actors/core/hfunc.h>
#include <contrib/ydb/library/actors/core/log.h>
Expand All @@ -24,50 +21,19 @@ namespace NCloud::NFileStore::NStorage {
////////////////////////////////////////////////////////////////////////////////

class TSSProxyActor final
: public NActors::TActor<TSSProxyActor>
: public NActors::TActorBootstrapped<TSSProxyActor>
{
struct TSchemeShardState
{
NActors::TActorId ReplyProxy;
THashMap<ui64, TDeque<TRequestInfoPtr>> TxToRequests;
};

private:
const TStorageConfigPtr Config;
std::unique_ptr<NKikimr::NTabletPipe::IClientCache> ClientCache;
THashMap<ui64, TSchemeShardState> SchemeShardStates;

public:
TSSProxyActor(TStorageConfigPtr config);
NActors::TActorId StorageSSProxyActor;

private:
void SendWaitTxRequest(
const NActors::TActorContext& ctx,
ui64 schemeShard,
ui64 txId);
public:
explicit TSSProxyActor(TStorageConfigPtr config);

void OnConnectionError(
const NActors::TActorContext& ctx,
const NProto::TError& error,
ui64 schemeShard);
void Bootstrap(const NActors::TActorContext& ctx);

private:
void HandleConnect(
NKikimr::TEvTabletPipe::TEvClientConnected::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleDisconnect(
NKikimr::TEvTabletPipe::TEvClientDestroyed::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleTxRegistered(
const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleTxResult(
const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev,
const NActors::TActorContext& ctx);

bool HandleRequests(STFUNC_SIG);

FILESTORE_SS_PROXY_REQUESTS(FILESTORE_IMPLEMENT_REQUEST, TEvSSProxy)
Expand Down
Loading

0 comments on commit fcfdfb6

Please sign in to comment.