Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-2100: introduce cloud/storage/core/libs/ss_proxy lib - later it will be used to unify blockstore & filestore SS proxy common part #2134

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/storage/core/libs/api/ss_proxy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "ss_proxy.h"
134 changes: 134 additions & 0 deletions cloud/storage/core/libs/api/ss_proxy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#pragma once

#include "public.h"

#include <cloud/storage/core/libs/kikimr/components.h>
#include <cloud/storage/core/libs/kikimr/events.h>

#include <contrib/ydb/core/protos/flat_tx_scheme.pb.h>
#include <contrib/ydb/core/protos/flat_scheme_op.pb.h>

#include <contrib/ydb/library/actors/core/actorid.h>

#include <util/generic/string.h>

namespace NCloud::NStorage {

////////////////////////////////////////////////////////////////////////////////

#define STORAGE_SS_PROXY_REQUESTS(xxx, ...) \
xxx(DescribeScheme, __VA_ARGS__) \
xxx(ModifyScheme, __VA_ARGS__) \
xxx(WaitSchemeTx, __VA_ARGS__) \
// STORAGE_SS_PROXY_REQUESTS

////////////////////////////////////////////////////////////////////////////////

struct TEvSSProxy
{
//
// DescribeScheme
//

struct TDescribeSchemeRequest
{
const TString Path;

TDescribeSchemeRequest(TString path)
: Path(std::move(path))
{}
};

struct TDescribeSchemeResponse
{
const TString Path;
const NKikimrSchemeOp::TPathDescription PathDescription;

TDescribeSchemeResponse() = default;

TDescribeSchemeResponse(
TString path,
NKikimrSchemeOp::TPathDescription pathDescription)
: Path(std::move(path))
, PathDescription(std::move(pathDescription))
{}
};

//
// ModifyScheme
//

struct TModifySchemeRequest
{
const NKikimrSchemeOp::TModifyScheme ModifyScheme;

TModifySchemeRequest(
NKikimrSchemeOp::TModifyScheme modifyScheme)
: ModifyScheme(std::move(modifyScheme))
{}
};

struct TModifySchemeResponse
{
const ui64 SchemeShardTabletId;
const NKikimrScheme::EStatus Status;
const TString Reason;

TModifySchemeResponse(
ui64 schemeShardTabletId = 0,
NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess,
TString reason = TString())
: SchemeShardTabletId(schemeShardTabletId)
, Status(status)
, Reason(std::move(reason))
{}
};

//
// WaitSchemeTx
//

struct TWaitSchemeTxRequest
{
const ui64 SchemeShardTabletId;
const ui64 TxId;

TWaitSchemeTxRequest(
ui64 schemeShardTabletId,
ui64 txId)
: SchemeShardTabletId(schemeShardTabletId)
, TxId(txId)
{}
};

struct TWaitSchemeTxResponse
{
};

//
// Events declaration
//

enum EEvents
{
EvBegin = TStorageEvents::SS_PROXY_START,

EvDescribeSchemeRequest = EvBegin + 1,
EvDescribeSchemeResponse = EvBegin + 2,

EvModifySchemeRequest = EvBegin + 3,
EvModifySchemeResponse = EvBegin + 4,

EvWaitSchemeTxRequest = EvBegin + 5,
EvWaitSchemeTxResponse = EvBegin + 6,

EvEnd
};

static_assert(EvEnd < (int)TStorageEvents::SS_PROXY_END,
"EvEnd expected to be < TStorageEvents::SS_PROXY_END");

STORAGE_SS_PROXY_REQUESTS(STORAGE_DECLARE_EVENTS)
};

} // namespace NCloud::NStorage
5 changes: 4 additions & 1 deletion cloud/storage/core/libs/api/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ LIBRARY()
SRCS(
authorizer.cpp
hive_proxy.cpp
ss_proxy.cpp
user_stats.cpp
)

PEERDIR(
cloud/storage/core/libs/kikimr
contrib/ydb/library/actors/core

contrib/ydb/core/base

contrib/ydb/library/actors/core
)

END()
2 changes: 1 addition & 1 deletion cloud/storage/core/libs/hive_proxy/hive_proxy_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

#include "hive_proxy_events_private.h"

#include <cloud/storage/core/libs/kikimr/helpers.h>
#include <cloud/storage/core/libs/api/hive_proxy.h>
#include <cloud/storage/core/libs/kikimr/helpers.h>

#include <contrib/ydb/core/base/hive.h>
#include <contrib/ydb/core/mind/local.h>
Expand Down
1 change: 1 addition & 0 deletions cloud/storage/core/libs/kikimr/components.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace NCloud {

#define STORAGE_ACTORS(xxx) \
xxx(HIVE_PROXY) \
xxx(SS_PROXY) \
// STORAGE_ACTORS

////////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions cloud/storage/core/libs/ss_proxy/public.h
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#pragma once
100 changes: 100 additions & 0 deletions cloud/storage/core/libs/ss_proxy/ss_proxy_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include "ss_proxy_actor.h"

namespace NCloud::NStorage {

using namespace NActors;

using namespace NKikimr;
using namespace NKikimr::NSchemeShard;

////////////////////////////////////////////////////////////////////////////////

TSSProxyActor::TSSProxyActor(
int logComponent,
TString schemeShardDir,
NKikimr::NTabletPipe::TClientConfig pipeClientConfig)
: TActor(&TThis::StateWork)
, LogComponent(logComponent)
, SchemeShardDir(schemeShardDir)
, ClientCache(NTabletPipe::CreateUnboundedClientCache(pipeClientConfig))
{}

////////////////////////////////////////////////////////////////////////////////

void TSSProxyActor::HandleConnect(
TEvTabletPipe::TEvClientConnected::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();

if (!ClientCache->OnConnect(ev)) {
auto error = MakeKikimrError(msg->Status, TStringBuilder()
<< "Connect to schemeshard " << msg->TabletId << " failed");

OnConnectionError(ctx, error, msg->TabletId);
}
}

void TSSProxyActor::HandleDisconnect(
TEvTabletPipe::TEvClientDestroyed::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();

ClientCache->OnDisconnect(ev);

auto error = MakeError(E_REJECTED, TStringBuilder()
<< "Disconnected from schemeshard " << msg->TabletId);

OnConnectionError(ctx, error, msg->TabletId);
}

void TSSProxyActor::OnConnectionError(
const TActorContext& ctx,
const NProto::TError& error,
ui64 schemeShard)
{
Y_UNUSED(error);

// SchemeShard is a tablet, so it should eventually get up
// Re-send all outstanding requests
if (auto* state = SchemeShardStates.FindPtr(schemeShard)) {
for (const auto& kv : state->TxToRequests) {
ui64 txId = kv.first;
SendWaitTxRequest(ctx, schemeShard, txId);
}
}
}

////////////////////////////////////////////////////////////////////////////////

bool TSSProxyActor::HandleRequests(STFUNC_SIG)
{
switch (ev->GetTypeRewrite()) {
STORAGE_SS_PROXY_REQUESTS(STORAGE_HANDLE_REQUEST, TEvSSProxy)

default:
return false;
}

return true;
}

STFUNC(TSSProxyActor::StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(TEvTabletPipe::TEvClientConnected, HandleConnect);
HFunc(TEvTabletPipe::TEvClientDestroyed, HandleDisconnect);

HFunc(TEvSchemeShard::TEvNotifyTxCompletionRegistered, HandleTxRegistered);
HFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, HandleTxResult);

default:
if (!HandleRequests(ev)) {
HandleUnexpectedEvent(ev, LogComponent);
}
break;
}
}

} // namespace NCloud::NStorage
97 changes: 97 additions & 0 deletions cloud/storage/core/libs/ss_proxy/ss_proxy_actor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include "public.h"

#include <cloud/storage/core/libs/api/ss_proxy.h>
#include <cloud/storage/core/libs/kikimr/helpers.h>

#include <contrib/ydb/core/tablet/tablet_pipe_client_cache.h>
#include <contrib/ydb/core/tx/schemeshard/schemeshard.h>

#include <contrib/ydb/library/actors/core/actor.h>
#include <contrib/ydb/library/actors/core/events.h>
#include <contrib/ydb/library/actors/core/hfunc.h>
#include <contrib/ydb/library/actors/core/log.h>

#include <util/generic/hash.h>
#include <util/generic/deque.h>

namespace NCloud::NStorage {

////////////////////////////////////////////////////////////////////////////////

class TSSProxyActor final
: public NActors::TActor<TSSProxyActor>
{
public:
struct TRequestInfo
{
NActors::TActorId Sender;
ui64 Cookie = 0;

TRequestInfo() = default;
TRequestInfo(const TRequestInfo&) = default;
TRequestInfo& operator=(const TRequestInfo&) = default;

TRequestInfo(NActors::TActorId sender, ui64 cookie)
: Sender(sender)
, Cookie(cookie)
{}
};

private:
struct TSchemeShardState
{
NActors::TActorId ReplyProxy;
THashMap<ui64, TDeque<TRequestInfo>> TxToRequests;
};

private:
const int LogComponent;
const TString SchemeShardDir;

std::unique_ptr<NKikimr::NTabletPipe::IClientCache> ClientCache;
THashMap<ui64, TSchemeShardState> SchemeShardStates;

public:
TSSProxyActor(
int logComponent,
TString schemeShardDir,
NKikimr::NTabletPipe::TClientConfig pipeClientConfig);

private:
void SendWaitTxRequest(
const NActors::TActorContext& ctx,
ui64 schemeShard,
ui64 txId);

void OnConnectionError(
const NActors::TActorContext& ctx,
const NProto::TError& error,
ui64 schemeShard);

private:
void HandleConnect(
NKikimr::TEvTabletPipe::TEvClientConnected::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleDisconnect(
NKikimr::TEvTabletPipe::TEvClientDestroyed::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleTxRegistered(
const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleTxResult(
const NKikimr::NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev,
const NActors::TActorContext& ctx);

bool HandleRequests(STFUNC_SIG);

STORAGE_SS_PROXY_REQUESTS(STORAGE_IMPLEMENT_REQUEST, TEvSSProxy)

STFUNC(StateWork);
};

} // namespace NCloud::NStorage
Loading
Loading