Skip to content

Commit

Permalink
issue-2100: introduce cloud/storage/core/libs/ss_proxy lib - later it…
Browse files Browse the repository at this point in the history
… will be used to unify blockstore & filestore SS proxy common part
  • Loading branch information
Mikhail Montsev committed Sep 25, 2024
1 parent 079fdb6 commit 1490c5e
Show file tree
Hide file tree
Showing 13 changed files with 954 additions and 2 deletions.
1 change: 1 addition & 0 deletions cloud/storage/core/libs/api/ss_proxy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "ss_proxy.h"
134 changes: 134 additions & 0 deletions cloud/storage/core/libs/api/ss_proxy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#pragma once

#include "public.h"

#include <cloud/storage/core/libs/kikimr/components.h>
#include <cloud/storage/core/libs/kikimr/events.h>

#include <contrib/ydb/core/protos/flat_tx_scheme.pb.h>
#include <contrib/ydb/core/protos/flat_scheme_op.pb.h>

#include <contrib/ydb/library/actors/core/actorid.h>

#include <util/generic/string.h>

namespace NCloud::NStorage {

////////////////////////////////////////////////////////////////////////////////

#define STORAGE_SS_PROXY_REQUESTS(xxx, ...) \
xxx(DescribeScheme, __VA_ARGS__) \
xxx(ModifyScheme, __VA_ARGS__) \
xxx(WaitSchemeTx, __VA_ARGS__) \
// STORAGE_SS_PROXY_REQUESTS

////////////////////////////////////////////////////////////////////////////////

struct TEvSSProxy
{
//
// DescribeScheme
//

struct TDescribeSchemeRequest
{
const TString Path;

TDescribeSchemeRequest(TString path)
: Path(std::move(path))
{}
};

struct TDescribeSchemeResponse
{
const TString Path;
const NKikimrSchemeOp::TPathDescription PathDescription;

TDescribeSchemeResponse() = default;

TDescribeSchemeResponse(
TString path,
NKikimrSchemeOp::TPathDescription pathDescription)
: Path(std::move(path))
, PathDescription(std::move(pathDescription))
{}
};

//
// ModifyScheme
//

struct TModifySchemeRequest
{
const NKikimrSchemeOp::TModifyScheme ModifyScheme;

TModifySchemeRequest(
NKikimrSchemeOp::TModifyScheme modifyScheme)
: ModifyScheme(std::move(modifyScheme))
{}
};

struct TModifySchemeResponse
{
const ui64 SchemeShardTabletId;
const NKikimrScheme::EStatus Status;
const TString Reason;

TModifySchemeResponse(
ui64 schemeShardTabletId = 0,
NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess,
TString reason = TString())
: SchemeShardTabletId(schemeShardTabletId)
, Status(status)
, Reason(std::move(reason))
{}
};

//
// WaitSchemeTx
//

struct TWaitSchemeTxRequest
{
const ui64 SchemeShardTabletId;
const ui64 TxId;

TWaitSchemeTxRequest(
ui64 schemeShardTabletId,
ui64 txId)
: SchemeShardTabletId(schemeShardTabletId)
, TxId(txId)
{}
};

struct TWaitSchemeTxResponse
{
};

//
// Events declaration
//

enum EEvents
{
EvBegin = TStorageEvents::SS_PROXY_START,

EvDescribeSchemeRequest = EvBegin + 1,
EvDescribeSchemeResponse = EvBegin + 2,

EvModifySchemeRequest = EvBegin + 3,
EvModifySchemeResponse = EvBegin + 4,

EvWaitSchemeTxRequest = EvBegin + 5,
EvWaitSchemeTxResponse = EvBegin + 6,

EvEnd
};

static_assert(EvEnd < (int)TStorageEvents::SS_PROXY_END,
"EvEnd expected to be < TStorageEvents::SS_PROXY_END");

STORAGE_SS_PROXY_REQUESTS(STORAGE_DECLARE_EVENTS)
};

} // namespace NCloud::NStorage
5 changes: 4 additions & 1 deletion cloud/storage/core/libs/api/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ LIBRARY()
SRCS(
authorizer.cpp
hive_proxy.cpp
ss_proxy.cpp
user_stats.cpp
)

PEERDIR(
cloud/storage/core/libs/kikimr
contrib/ydb/library/actors/core

contrib/ydb/core/base

contrib/ydb/library/actors/core
)

END()
2 changes: 1 addition & 1 deletion cloud/storage/core/libs/hive_proxy/hive_proxy_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

#include "hive_proxy_events_private.h"

#include <cloud/storage/core/libs/kikimr/helpers.h>
#include <cloud/storage/core/libs/api/hive_proxy.h>
#include <cloud/storage/core/libs/kikimr/helpers.h>

#include <contrib/ydb/core/base/hive.h>
#include <contrib/ydb/core/mind/local.h>
Expand Down
1 change: 1 addition & 0 deletions cloud/storage/core/libs/kikimr/components.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace NCloud {

#define STORAGE_ACTORS(xxx) \
xxx(HIVE_PROXY) \
xxx(SS_PROXY) \
// STORAGE_ACTORS

////////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions cloud/storage/core/libs/ss_proxy/public.h
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#pragma once
100 changes: 100 additions & 0 deletions cloud/storage/core/libs/ss_proxy/ss_proxy_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include "ss_proxy_actor.h"

namespace NCloud::NStorage {

using namespace NActors;

using namespace NKikimr;
using namespace NKikimr::NSchemeShard;

////////////////////////////////////////////////////////////////////////////////

TSSProxyActor::TSSProxyActor(
int logComponent,
TString schemeShardDir,
NKikimr::NTabletPipe::TClientConfig pipeClientConfig)
: TActor(&TThis::StateWork)
, LogComponent(logComponent)
, SchemeShardDir(schemeShardDir)
, ClientCache(NTabletPipe::CreateUnboundedClientCache(pipeClientConfig))
{}

////////////////////////////////////////////////////////////////////////////////

void TSSProxyActor::HandleConnect(
TEvTabletPipe::TEvClientConnected::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();

if (!ClientCache->OnConnect(ev)) {
auto error = MakeKikimrError(msg->Status, TStringBuilder()
<< "Connect to schemeshard " << msg->TabletId << " failed");

OnConnectionError(ctx, error, msg->TabletId);
}
}

void TSSProxyActor::HandleDisconnect(
TEvTabletPipe::TEvClientDestroyed::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();

ClientCache->OnDisconnect(ev);

auto error = MakeError(E_REJECTED, TStringBuilder()
<< "Disconnected from schemeshard " << msg->TabletId);

OnConnectionError(ctx, error, msg->TabletId);
}

void TSSProxyActor::OnConnectionError(
const TActorContext& ctx,
const NProto::TError& error,
ui64 schemeShard)
{
Y_UNUSED(error);

// SchemeShard is a tablet, so it should eventually get up
// Re-send all outstanding requests
if (auto* state = SchemeShardStates.FindPtr(schemeShard)) {
for (const auto& kv : state->TxToRequests) {
ui64 txId = kv.first;
SendWaitTxRequest(ctx, schemeShard, txId);
}
}
}

////////////////////////////////////////////////////////////////////////////////

bool TSSProxyActor::HandleRequests(STFUNC_SIG)
{
switch (ev->GetTypeRewrite()) {
STORAGE_SS_PROXY_REQUESTS(STORAGE_HANDLE_REQUEST, TEvSSProxy)

default:
return false;
}

return true;
}

STFUNC(TSSProxyActor::StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(TEvTabletPipe::TEvClientConnected, HandleConnect);
HFunc(TEvTabletPipe::TEvClientDestroyed, HandleDisconnect);

HFunc(TEvSchemeShard::TEvNotifyTxCompletionRegistered, HandleTxRegistered);
HFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, HandleTxResult);

default:
if (!HandleRequests(ev)) {
HandleUnexpectedEvent(ev, LogComponent);
}
break;
}
}

} // namespace NCloud::NStorage
97 changes: 97 additions & 0 deletions cloud/storage/core/libs/ss_proxy/ss_proxy_actor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include "public.h"

#include <cloud/storage/core/libs/api/ss_proxy.h>
#include <cloud/storage/core/libs/kikimr/helpers.h>

#include <contrib/ydb/core/tablet/tablet_pipe_client_cache.h>
#include <contrib/ydb/core/tx/schemeshard/schemeshard.h>

#include <contrib/ydb/library/actors/core/actor.h>
#include <contrib/ydb/library/actors/core/events.h>
#include <contrib/ydb/library/actors/core/hfunc.h>
#include <contrib/ydb/library/actors/core/log.h>

#include <util/generic/hash.h>
#include <util/generic/deque.h>

namespace NCloud::NStorage {

////////////////////////////////////////////////////////////////////////////////

class TSSProxyActor final
: public NActors::TActor<TSSProxyActor>
{
public:
struct TRequestInfo
{
NActors::TActorId Sender;
ui64 Cookie = 0;

TRequestInfo() = default;
TRequestInfo(const TRequestInfo&) = default;
TRequestInfo& operator=(const TRequestInfo&) = default;

TRequestInfo(NActors::TActorId sender, ui64 cookie)
: Sender(sender)
, Cookie(cookie)
{}
};

private:
struct TSchemeShardState
{
NActors::TActorId ReplyProxy;
THashMap<ui64, TDeque<TRequestInfo>> TxToRequests;
};

private:
const int LogComponent;
const TString SchemeShardDir;

std::unique_ptr<NKikimr::NTabletPipe::IClientCache> ClientCache;
THashMap<ui64, TSchemeShardState> SchemeShardStates;

public:
TSSProxyActor(
int logComponent,
TString schemeShardDir,
NKikimr::NTabletPipe::TClientConfig pipeClientConfig);

private:
void SendWaitTxRequest(
const NActors::TActorContext& ctx,
ui64 schemeShard,
ui64 txId);

void OnConnectionError(
const NActors::TActorContext& ctx,
const NProto::TError& error,
ui64 schemeShard);

private:
void HandleConnect(
NKikimr::TEvTabletPipe::TEvClientConnected::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleDisconnect(
NKikimr::TEvTabletPipe::TEvClientDestroyed::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleTxRegistered(
const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleTxResult(
const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev,
const NActors::TActorContext& ctx);

bool HandleRequests(STFUNC_SIG);

STORAGE_SS_PROXY_REQUESTS(STORAGE_IMPLEMENT_REQUEST, TEvSSProxy)

STFUNC(StateWork);
};

} // namespace NCloud::NStorage
Loading

0 comments on commit 1490c5e

Please sign in to comment.