Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBS-3864: introduce fast data path external endpoint #18

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions cloud/blockstore/libs/common/device_path.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "device_path.h"

#include <library/cpp/uri/uri.h>

#include <util/string/builder.h>

namespace NCloud::NBlockStore {

NProto::TError DevicePath::Parse(const TString& devicePath)
{
// rdma://myt1-ct5-13.cloud.yandex.net:10020/62ccf40f2743308191205ad6391bfa06

NUri::TUri uri;
auto res = uri.Parse(
devicePath,
NUri::TFeature::FeaturesDefault |
NUri::TFeature::FeatureSchemeFlexible);
if (res != NUri::TState::ParsedOK) {
return MakeError(
E_FAIL,
TStringBuilder() << "invalid device path " << devicePath);
}

if (uri.GetField(NUri::TField::FieldScheme) != Protocol) {
return MakeError(
E_FAIL,
TStringBuilder()
<< "device path doesn't start with "
<< Protocol << "://, " << devicePath);
}

auto path = uri.GetField(NUri::TField::FieldPath);
if (path.size() < 2 || path[0] != '/') {
return MakeError(
E_FAIL,
TStringBuilder()
<< "invalid uuid inside device path " << devicePath);
}

Host = uri.GetHost();
Port = uri.GetPort();
Uuid = path.substr(1);

return {};
}

TString DevicePath::Serialize() const
{
return TStringBuilder()
<< Protocol << "://"
<< Host << ":" << Port
<< "/" << Uuid;
}

} // namespace NCloud::NBlockStore
32 changes: 32 additions & 0 deletions cloud/blockstore/libs/common/device_path.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <cloud/storage/core/libs/common/error.h>

#include <util/generic/string.h>

namespace NCloud::NBlockStore {

////////////////////////////////////////////////////////////////////////////////

struct DevicePath
{
TString Protocol;
TString Host;
ui16 Port;
TString Uuid;

DevicePath(
const TString protocol,
const TString& host = {},
ui16 port = 0,
const TString& uuid = {})
: Protocol(protocol)
, Host(host)
, Port(port)
, Uuid(uuid)
{}
NProto::TError Parse(const TString& devicePath);
TString Serialize() const;
};

} // namespace NCloud::NBlockStore
50 changes: 50 additions & 0 deletions cloud/blockstore/libs/common/device_path_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "device_path.h"

#include <library/cpp/testing/unittest/registar.h>

namespace NCloud::NBlockStore {

////////////////////////////////////////////////////////////////////////////////

Y_UNIT_TEST_SUITE(TDevicePathTest)
{
Y_UNIT_TEST(TestParseValidDevicePath)
{
DevicePath devPath("rdma");
auto error = devPath.Parse("rdma://a.b.c.d:10020/11223344");
UNIT_ASSERT_C(!HasError(error), error);

UNIT_ASSERT_EQUAL(devPath.Protocol, "rdma");
UNIT_ASSERT_EQUAL(devPath.Host, "a.b.c.d");
UNIT_ASSERT_EQUAL(devPath.Port, 10020);
UNIT_ASSERT_EQUAL(devPath.Uuid, "11223344");
}

Y_UNIT_TEST(TestParseInvalidProtocol)
{
DevicePath devPath("unknown");
auto error = devPath.Parse("rdma://a.b.c.d:10020/11223344");
UNIT_ASSERT(HasError(error));
}

Y_UNIT_TEST(TestParseNoUuid)
{
DevicePath devPath("rdma");
auto error = devPath.Parse("rdma://a.b.c.d:10020");
UNIT_ASSERT(HasError(error));
}

Y_UNIT_TEST(TestSerialize)
{
auto expectedPath = "rdma://a.b.c.d:10020/11223344";
DevicePath devPath("rdma", "a.b.c.d", 10020, "11223344");
UNIT_ASSERT_EQUAL(devPath.Serialize(), expectedPath);

DevicePath devPath2("rdma");
auto error = devPath2.Parse(expectedPath);
UNIT_ASSERT_C(!HasError(error), error);
UNIT_ASSERT_EQUAL(expectedPath, devPath2.Serialize());
}
}

} // namespace NCloud::NBlockStore
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/common/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ SRCS(
caching_allocator_ut.cpp
block_checksum_ut.cpp
block_range_ut.cpp
device_path_ut.cpp
iovector_ut.cpp
)

PEERDIR(
library/cpp/uri
)

END()
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ SRCS(
block_checksum.cpp
block_range.cpp
caching_allocator.cpp
device_path.cpp
iovector.cpp
typeinfo.cpp
)

PEERDIR(
cloud/blockstore/public/api/protos

cloud/storage/core/libs/common

library/cpp/digest/crc32c
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <cloud/blockstore/libs/encryption/encryption_client.h>
#include <cloud/blockstore/libs/encryption/encryption_key.h>
#include <cloud/blockstore/libs/encryption/encryption_service.h>
#include <cloud/blockstore/libs/endpoints/endpoint_events.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/endpoints/endpoint_manager.h>
#include <cloud/blockstore/libs/endpoints/service_endpoint.h>
Expand Down Expand Up @@ -210,6 +211,7 @@ void TBootstrapBase::Init()
Executor = TExecutor::Create("SVC");

VolumeBalancerSwitch = CreateVolumeBalancerSwitch();
EndpointEventHandler = CreateEndpointEventProxy();

switch (Configs->Options->ServiceKind) {
case TOptionsCommon::EServiceKind::Ydb:
Expand Down Expand Up @@ -479,6 +481,7 @@ void TBootstrapBase::Init()
Logging,
ServerStats,
Executor,
EndpointEventHandler,
std::move(sessionManager),
std::move(endpointListeners),
Configs->ServerConfig->GetNbdSocketSuffix());
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/daemon/common/bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TBootstrapBase
ICachingAllocatorPtr Allocator;
IStorageProviderPtr AioStorageProvider;
IEndpointServicePtr EndpointService;
IEndpointEventProxyPtr EndpointEventHandler;
NRdma::IServerPtr RdmaServer;
NRdma::IClientPtr RdmaClient;
ITaskQueuePtr RdmaThreadPool;
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/daemon/ydb/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <cloud/blockstore/libs/discovery/fetch.h>
#include <cloud/blockstore/libs/discovery/healthcheck.h>
#include <cloud/blockstore/libs/discovery/ping.h>
#include <cloud/blockstore/libs/endpoints/endpoint_events.h>
#include <cloud/blockstore/libs/kms/iface/compute_client.h>
#include <cloud/blockstore/libs/kms/iface/key_provider.h>
#include <cloud/blockstore/libs/kms/iface/kms_client.h>
Expand Down Expand Up @@ -528,6 +529,7 @@ void TBootstrapYdb::InitKikimrService()
return FindPtr(nodes, fqdn) || CityHash64(fqdn) % 100 < p;
}();
args.VolumeBalancerSwitch = VolumeBalancerSwitch;
args.EndpointEventHandler = EndpointEventHandler;

ActorSystem = NStorage::CreateActorSystem(args);

Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/diagnostics/critical_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace NCloud::NBlockStore {
xxx(DiskAgentConfigMismatch) \
xxx(DiskRegistryDeviceNotFoundSoft) \
xxx(DiskRegistrySourceDiskNotFound) \
xxx(EndpointSwitchFailure) \
// BLOCKSTORE_CRITICAL_EVENTS

#define BLOCKSTORE_IMPOSSIBLE_EVENTS(xxx) \
Expand Down
42 changes: 42 additions & 0 deletions cloud/blockstore/libs/endpoints/endpoint_events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "endpoint_events.h"

namespace NCloud::NBlockStore::NServer {

namespace {

////////////////////////////////////////////////////////////////////////////////

class TEndpointEventProxy: public IEndpointEventProxy
{
private:
IEndpointEventHandlerPtr Handler;

public:
void OnVolumeConnectionEstablished(const TString& diskId) override;
void Register(IEndpointEventHandlerPtr listener) override;
};

////////////////////////////////////////////////////////////////////////////////

void TEndpointEventProxy::OnVolumeConnectionEstablished(const TString& diskId)
{
if (Handler) {
Handler->OnVolumeConnectionEstablished(diskId);
}
}

void TEndpointEventProxy::Register(IEndpointEventHandlerPtr listener)
{
Handler = std::move(listener);
}

} // namespace

////////////////////////////////////////////////////////////////////////////////

IEndpointEventProxyPtr CreateEndpointEventProxy()
{
return std::make_shared<TEndpointEventProxy>();
}

} // namespace NCloud::NBlockStore::NServer
27 changes: 27 additions & 0 deletions cloud/blockstore/libs/endpoints/endpoint_events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "public.h"

#include <cloud/blockstore/public/api/protos/volume.pb.h>

namespace NCloud::NBlockStore::NServer {

////////////////////////////////////////////////////////////////////////////////

struct IEndpointEventHandler
{
virtual ~IEndpointEventHandler() = default;

virtual void OnVolumeConnectionEstablished(const TString& diskId) = 0;
};

struct IEndpointEventProxy: public IEndpointEventHandler
{
virtual ~IEndpointEventProxy() = default;

virtual void Register(IEndpointEventHandlerPtr eventHandler) = 0;
};

IEndpointEventProxyPtr CreateEndpointEventProxy();

} // namespace NCloud::NBlockStore::NServer
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/endpoints/endpoint_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ struct IEndpointListener
virtual NProto::TError RefreshEndpoint(
const TString& socketPath,
const NProto::TVolume& volume) = 0;

virtual NThreading::TFuture<NProto::TError> SwitchEndpoint(
const NProto::TStartEndpointRequest& request,
const NProto::TVolume& volume,
NClient::ISessionPtr session) = 0;
};

} // namespace NCloud::NBlockStore::NServer
Loading
Loading