Skip to content

Commit

Permalink
Merge branch 'stable-23-3' into users/vlad-serikov/merge-to-stable-23-3
Browse files Browse the repository at this point in the history
  • Loading branch information
NewBediver authored Mar 4, 2024
2 parents e864d63 + 4ea24aa commit ab8e37b
Show file tree
Hide file tree
Showing 110 changed files with 1,681 additions and 283 deletions.
44 changes: 44 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,47 @@ compile_commands.json
*.exe
*.out
*.app
*.debug

# Generated proto files
*.pb.h
*.pb.cc
*.pb.go
*.pb.h_serialized.cpp
*_pb2.py
*_pb2_grpc.py
*_pb2.pyi

# Ya make test files
test-results

# Test binaries and generated dirs
*-ut
**/tests/tests

# Autogenerated util, library and contrib files
/util/all_*.cpp
/util/charset/all_charset.cpp
/contrib/tools/ragel6/all_*.cpp
/contrib/go/_std_1.21/src/net/_cgo_export.h
/contrib/go/_std_1.21/src/runtime/cgo/_cgo_export.h
/contrib/libs/apache/arrow/cpp/src/generated
/contrib/python/protobuf/py3/google.protobuf.internal._api_implementation.reg3.cpp
/contrib/python/protobuf/py3/google.protobuf.pyext._message.reg3.cpp
/contrib/tools/python3/src/Modules/_sqlite/_sqlite3.reg3.cpp
/library/python/runtime_py3/__res.pyx.cpp
/library/python/runtime_py3/__res.reg3.cpp
/library/python/runtime_py3/sitecustomize.pyx.cpp
/library/python/runtime_py3/sitecustomize.reg3.cpp
/library/python/symbols/module/library.python.symbols.module.syms.reg3.cpp

# IDE files
*.swp
.vscode/*
*.code-workspace
*.code-workspace.bak

# act files
.input
.secrets
.vars
14 changes: 11 additions & 3 deletions cloud/blockstore/libs/endpoints/endpoint_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace NCloud::NBlockStore::NServer {

using namespace NThreading;

namespace {

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -12,17 +14,23 @@ class TEndpointEventProxy: public IEndpointEventProxy
IEndpointEventHandlerPtr Handler;

public:
void OnVolumeConnectionEstablished(const TString& diskId) override;
TFuture<NProto::TError> SwitchEndpointIfNeeded(
const TString& diskId,
const TString& reason) override;
void Register(IEndpointEventHandlerPtr listener) override;
};

////////////////////////////////////////////////////////////////////////////////

void TEndpointEventProxy::OnVolumeConnectionEstablished(const TString& diskId)
TFuture<NProto::TError> TEndpointEventProxy::SwitchEndpointIfNeeded(
const TString& diskId,
const TString& reason)
{
if (Handler) {
Handler->OnVolumeConnectionEstablished(diskId);
return Handler->SwitchEndpointIfNeeded(diskId, reason);
}

return MakeFuture(MakeError(S_OK));
}

void TEndpointEventProxy::Register(IEndpointEventHandlerPtr listener)
Expand Down
5 changes: 4 additions & 1 deletion cloud/blockstore/libs/endpoints/endpoint_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "public.h"

#include <cloud/blockstore/public/api/protos/volume.pb.h>
#include <cloud/storage/core/libs/common/error.h>

namespace NCloud::NBlockStore::NServer {

Expand All @@ -12,7 +13,9 @@ struct IEndpointEventHandler
{
virtual ~IEndpointEventHandler() = default;

virtual void OnVolumeConnectionEstablished(const TString& diskId) = 0;
virtual NThreading::TFuture<NProto::TError> SwitchEndpointIfNeeded(
const TString& diskId,
const TString& reason) = 0;
};

struct IEndpointEventProxy: public IEndpointEventHandler
Expand Down
165 changes: 132 additions & 33 deletions cloud/blockstore/libs/endpoints/endpoint_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,52 @@ bool CompareRequests(

#undef BLOCKSTORE_DECLARE_METHOD

class TSwitchEndpointRequest
{
private:
TString DiskId;
TString UnixSocketPath;
TString Reason;

public:
const TString& GetDiskId() const
{
return DiskId;
}

void SetDiskId(const TString& diskId)
{
DiskId = diskId;
}

const TString& GetUnixSocketPath() const
{
return UnixSocketPath;
}

void SetUnixSocketPath(const TString& socketPath)
{
UnixSocketPath = socketPath;
}

const TString& GetReason() const
{
return Reason;
}

void SetReason(const TString& reason)
{
Reason = reason;
}

};

struct TSwitchEndpointMethod
{
using TRequest = TSwitchEndpointRequest;
using TResponse = NProto::TError;
};

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod>
Expand Down Expand Up @@ -174,7 +220,8 @@ class TEndpointManager final
using TRequestStateVariant = std::variant<
TRequestState<TStartEndpointMethod>,
TRequestState<TStopEndpointMethod>,
TRequestState<TRefreshEndpointMethod>
TRequestState<TRefreshEndpointMethod>,
TRequestState<TSwitchEndpointMethod>
>;
THashMap<TString, TRequestStateVariant> ProcessingSockets;

Expand All @@ -197,10 +244,10 @@ class TEndpointManager final
Log = logging->CreateLog("BLOCKSTORE_SERVER");
}

#define ENDPOINT_IMPLEMENT_METHOD(name, ...) \
TFuture<NProto::T##name##Response> name( \
#define ENDPOINT_IMPLEMENT_METHOD(name, specifier, ...) \
TFuture<T##name##Method::TResponse> name( \
TCallContextPtr callContext, \
std::shared_ptr<NProto::T##name##Request> request) override \
std::shared_ptr<T##name##Method::TRequest> request) specifier \
{ \
return Executor->Execute([ \
ctx = std::move(callContext), \
Expand All @@ -211,20 +258,23 @@ class TEndpointManager final
}); \
} \
\
NProto::T##name##Response Do##name( \
T##name##Method::TResponse Do##name( \
TCallContextPtr ctx, \
std::shared_ptr<NProto::T##name##Request> req); \
std::shared_ptr<T##name##Method::TRequest> req); \
// ENDPOINT_IMPLEMENT_METHOD

ENDPOINT_IMPLEMENT_METHOD(StartEndpoint)
ENDPOINT_IMPLEMENT_METHOD(StopEndpoint)
ENDPOINT_IMPLEMENT_METHOD(ListEndpoints)
ENDPOINT_IMPLEMENT_METHOD(DescribeEndpoint)
ENDPOINT_IMPLEMENT_METHOD(RefreshEndpoint)
ENDPOINT_IMPLEMENT_METHOD(StartEndpoint, override)
ENDPOINT_IMPLEMENT_METHOD(StopEndpoint, override)
ENDPOINT_IMPLEMENT_METHOD(ListEndpoints, override)
ENDPOINT_IMPLEMENT_METHOD(DescribeEndpoint, override)
ENDPOINT_IMPLEMENT_METHOD(RefreshEndpoint, override)
ENDPOINT_IMPLEMENT_METHOD(SwitchEndpoint, )

#undef ENDPOINT_IMPLEMENT_METHOD

void OnVolumeConnectionEstablished(const TString& diskId) override;
TFuture<NProto::TError> SwitchEndpointIfNeeded(
const TString& diskId,
const TString& reason) override;

private:
NProto::TStartEndpointResponse StartEndpointImpl(
Expand All @@ -239,6 +289,10 @@ class TEndpointManager final
TCallContextPtr ctx,
std::shared_ptr<NProto::TRefreshEndpointRequest> request);

NProto::TError SwitchEndpointImpl(
TCallContextPtr ctx,
std::shared_ptr<TSwitchEndpointRequest> request);

NProto::TStartEndpointResponse AlterEndpoint(
TCallContextPtr ctx,
const NProto::TStartEndpointRequest& newRequest,
Expand All @@ -261,8 +315,6 @@ class TEndpointManager final
std::shared_ptr<NProto::TStartEndpointRequest> CreateNbdStartEndpointRequest(
const NProto::TStartEndpointRequest& request);

void TrySwitchEndpoint(const TString& diskId);

template <typename TMethod>
TPromise<typename TMethod::TResponse> AddProcessingSocket(
const typename TMethod::TRequest& request)
Expand Down Expand Up @@ -296,6 +348,9 @@ class TEndpointManager final
[] (const TRequestState<TRefreshEndpointMethod>&) {
return "refreshing";
},
[] (const TRequestState<TSwitchEndpointMethod>&) {
return "switching";
},
[](const auto&) {
return "busy (undefined process)";
}
Expand Down Expand Up @@ -717,42 +772,82 @@ TStartEndpointRequestPtr TEndpointManager::CreateNbdStartEndpointRequest(
return nbdRequest;
}

void TEndpointManager::TrySwitchEndpoint(const TString& diskId)
NProto::TError TEndpointManager::DoSwitchEndpoint(
TCallContextPtr ctx,
std::shared_ptr<TSwitchEndpointRequest> request)
{
auto it = FindIf(Requests, [&] (auto& v) {
const auto& diskId = request->GetDiskId();

auto reqIt = FindIf(Requests, [&] (auto& v) {
const auto& [_, req] = v;
return req->GetDiskId() == diskId
&& req->GetIpcType() == NProto::IPC_VHOST;
});

if (reqIt == Requests.end()) {
return TErrorResponse(S_OK);
}

const auto& socketPath = reqIt->first;

auto sockIt = ProcessingSockets.find(socketPath);
if (sockIt != ProcessingSockets.end()) {
const auto& st = sockIt->second;
auto* state = std::get_if<TRequestState<TSwitchEndpointMethod>>(&st);
if (!state) {
return MakeBusySocketError(socketPath, st);
}

return Executor->WaitFor(state->Result);
}

request->SetUnixSocketPath(socketPath);
auto promise = AddProcessingSocket<TSwitchEndpointMethod>(*request);

auto response = SwitchEndpointImpl(std::move(ctx), std::move(request));
promise.SetValue(response);

RemoveProcessingSocket(socketPath);
return response;
}

NProto::TError TEndpointManager::SwitchEndpointImpl(
TCallContextPtr ctx,
std::shared_ptr<TSwitchEndpointRequest> request)
{
const auto& socketPath = request->GetUnixSocketPath();

auto it = Requests.find(socketPath);
if (it == Requests.end()) {
return;
return TErrorResponse(S_FALSE, TStringBuilder()
<< "endpoint " << socketPath.Quote() << " not started");
}

const auto& req = it->second;
auto listenerIt = EndpointListeners.find(req->GetIpcType());
auto& startRequest = it->second;
auto listenerIt = EndpointListeners.find(startRequest->GetIpcType());
STORAGE_VERIFY(
listenerIt != EndpointListeners.end(),
TWellKnownEntityTypes::ENDPOINT,
req->GetUnixSocketPath());
socketPath);
const auto& listener = listenerIt->second;

auto ctx = MakeIntrusive<TCallContext>();
auto future = SessionManager->GetSession(
std::move(ctx),
req->GetUnixSocketPath(),
req->GetHeaders());
ctx,
startRequest->GetUnixSocketPath(),
startRequest->GetHeaders());
auto [sessionInfo, error] = Executor->WaitFor(future);
if (HasError(error)) {
return;
return error;
}

STORAGE_INFO("Switching endpoint for volume " << sessionInfo.Volume.GetDiskId()
STORAGE_INFO("Switching endpoint"
<< ", reason=" << request->GetReason()
<< ", volume=" << sessionInfo.Volume.GetDiskId()
<< ", IsFastPathEnabled=" << sessionInfo.Volume.GetIsFastPathEnabled()
<< ", Migrations=" << sessionInfo.Volume.GetMigrations().size());

auto switchFuture = listener->SwitchEndpoint(
*it->second,
*startRequest,
sessionInfo.Volume,
sessionInfo.Session);
error = Executor->WaitFor(switchFuture);
Expand All @@ -762,16 +857,20 @@ void TEndpointManager::TrySwitchEndpoint(const TString& diskId)
<< sessionInfo.Volume.GetDiskId()
<< ", " << error.GetMessage());
}

return TErrorResponse(error);
}

void TEndpointManager::OnVolumeConnectionEstablished(const TString& diskId)
TFuture<NProto::TError> TEndpointManager::SwitchEndpointIfNeeded(
const TString& diskId,
const TString& reason)
{
Y_UNUSED(diskId);
auto ctx = MakeIntrusive<TCallContext>();
auto request = std::make_shared<TSwitchEndpointRequest>();
request->SetDiskId(diskId);
request->SetReason(reason);

// TODO: NBS-312 safely call TrySwitchEndpoint
// Executor->ExecuteSimple([this, diskId] () {
// return TrySwitchEndpoint(diskId);
// });
return SwitchEndpoint(std::move(ctx), std::move(request));
}

} // namespace
Expand Down
Loading

0 comments on commit ab8e37b

Please sign in to comment.