Skip to content

Commit

Permalink
NBS-3864: introduce fast data path external endpoint
Browse files Browse the repository at this point in the history
volumes with "use-fastpath" tag can connect directly to disk-agent without
passing the actor system. When migration needed the endpoint will switch back to
actor system based data path
  • Loading branch information
budevg committed Jan 2, 2024
1 parent f562f8a commit e00e9c5
Show file tree
Hide file tree
Showing 42 changed files with 655 additions and 63 deletions.
55 changes: 55 additions & 0 deletions cloud/blockstore/libs/common/device_path.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "device_path.h"

#include <library/cpp/uri/uri.h>

#include <util/string/builder.h>

namespace NCloud::NBlockStore {

NProto::TError DevicePath::Parse(const TString& devicePath)
{
// rdma://myt1-ct5-13.cloud.yandex.net:10020/62ccf40f2743308191205ad6391bfa06

NUri::TUri uri;
auto res = uri.Parse(
devicePath,
NUri::TFeature::FeaturesDefault |
NUri::TFeature::FeatureSchemeFlexible);
if (res != NUri::TState::ParsedOK) {
return MakeError(
E_FAIL,
TStringBuilder() << "invalid device path " << devicePath);
}

if (uri.GetField(NUri::TField::FieldScheme) != Protocol) {
return MakeError(
E_FAIL,
TStringBuilder()
<< "device path doesn't start with "
<< Protocol << "://, " << devicePath);
}

auto path = uri.GetField(NUri::TField::FieldPath);
if (path.size() < 2 || path[0] != '/') {
return MakeError(
E_FAIL,
TStringBuilder()
<< "invalid uuid inside device path " << devicePath);
}

Host = uri.GetHost();
Port = uri.GetPort();
Uuid = path.substr(1);

return {};
}

TString DevicePath::Serialize() const
{
return TStringBuilder()
<< Protocol << "://"
<< Host << ":" << Port
<< "/" << Uuid;
}

} // namespace NCloud::NBlockStore
32 changes: 32 additions & 0 deletions cloud/blockstore/libs/common/device_path.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <cloud/storage/core/libs/common/error.h>

#include <util/generic/string.h>

namespace NCloud::NBlockStore {

////////////////////////////////////////////////////////////////////////////////

struct DevicePath
{
TString Protocol;
TString Host;
ui16 Port;
TString Uuid;

DevicePath(
const TString protocol,
const TString& host = {},
ui16 port = 0,
const TString& uuid = {})
: Protocol(protocol)
, Host(host)
, Port(port)
, Uuid(uuid)
{}
NProto::TError Parse(const TString& devicePath);
TString Serialize() const;
};

} // namespace NCloud::NBlockStore
50 changes: 50 additions & 0 deletions cloud/blockstore/libs/common/device_path_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "device_path.h"

#include <library/cpp/testing/unittest/registar.h>

namespace NCloud::NBlockStore {

////////////////////////////////////////////////////////////////////////////////

Y_UNIT_TEST_SUITE(TDevicePathTest)
{
Y_UNIT_TEST(TestParseValidDevicePath)
{
DevicePath devPath("rdma");
auto error = devPath.Parse("rdma://a.b.c.d:10020/11223344");
UNIT_ASSERT_C(!HasError(error), error);

UNIT_ASSERT_EQUAL(devPath.Protocol, "rdma");
UNIT_ASSERT_EQUAL(devPath.Host, "a.b.c.d");
UNIT_ASSERT_EQUAL(devPath.Port, 10020);
UNIT_ASSERT_EQUAL(devPath.Uuid, "11223344");
}

Y_UNIT_TEST(TestParseInvalidProtocol)
{
DevicePath devPath("unknown");
auto error = devPath.Parse("rdma://a.b.c.d:10020/11223344");
UNIT_ASSERT(HasError(error));
}

Y_UNIT_TEST(TestParseNoUuid)
{
DevicePath devPath("rdma");
auto error = devPath.Parse("rdma://a.b.c.d:10020");
UNIT_ASSERT(HasError(error));
}

Y_UNIT_TEST(TestSerialize)
{
auto expectedPath = "rdma://a.b.c.d:10020/11223344";
DevicePath devPath("rdma", "a.b.c.d", 10020, "11223344");
UNIT_ASSERT_EQUAL(devPath.Serialize(), expectedPath);

DevicePath devPath2("rdma");
auto error = devPath2.Parse(expectedPath);
UNIT_ASSERT_C(!HasError(error), error);
UNIT_ASSERT_EQUAL(expectedPath, devPath2.Serialize());
}
}

} // namespace NCloud::NBlockStore
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/common/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ SRCS(
caching_allocator_ut.cpp
block_checksum_ut.cpp
block_range_ut.cpp
device_path_ut.cpp
iovector_ut.cpp
)

PEERDIR(
library/cpp/uri
)

END()
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ SRCS(
block_checksum.cpp
block_range.cpp
caching_allocator.cpp
device_path.cpp
iovector.cpp
typeinfo.cpp
)

PEERDIR(
cloud/blockstore/public/api/protos

cloud/storage/core/libs/common

library/cpp/digest/crc32c
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <cloud/blockstore/libs/encryption/encryption_client.h>
#include <cloud/blockstore/libs/encryption/encryption_key.h>
#include <cloud/blockstore/libs/encryption/encryption_service.h>
#include <cloud/blockstore/libs/endpoints/endpoint_events.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/endpoints/endpoint_manager.h>
#include <cloud/blockstore/libs/endpoints/service_endpoint.h>
Expand Down Expand Up @@ -210,6 +211,7 @@ void TBootstrapBase::Init()
Executor = TExecutor::Create("SVC");

VolumeBalancerSwitch = CreateVolumeBalancerSwitch();
EndpointEventHandler = CreateEndpointEventProxy();

switch (Configs->Options->ServiceKind) {
case TOptionsCommon::EServiceKind::Ydb:
Expand Down Expand Up @@ -479,6 +481,7 @@ void TBootstrapBase::Init()
Logging,
ServerStats,
Executor,
EndpointEventHandler,
std::move(sessionManager),
std::move(endpointListeners),
Configs->ServerConfig->GetNbdSocketSuffix());
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/daemon/common/bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TBootstrapBase
ICachingAllocatorPtr Allocator;
IStorageProviderPtr AioStorageProvider;
IEndpointServicePtr EndpointService;
IEndpointEventProxyPtr EndpointEventHandler;
NRdma::IServerPtr RdmaServer;
NRdma::IClientPtr RdmaClient;
ITaskQueuePtr RdmaThreadPool;
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/daemon/ydb/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <cloud/blockstore/libs/discovery/fetch.h>
#include <cloud/blockstore/libs/discovery/healthcheck.h>
#include <cloud/blockstore/libs/discovery/ping.h>
#include <cloud/blockstore/libs/endpoints/endpoint_events.h>
#include <cloud/blockstore/libs/kms/iface/compute_client.h>
#include <cloud/blockstore/libs/kms/iface/key_provider.h>
#include <cloud/blockstore/libs/kms/iface/kms_client.h>
Expand Down Expand Up @@ -528,6 +529,7 @@ void TBootstrapYdb::InitKikimrService()
return FindPtr(nodes, fqdn) || CityHash64(fqdn) % 100 < p;
}();
args.VolumeBalancerSwitch = VolumeBalancerSwitch;
args.EndpointEventHandler = EndpointEventHandler;

ActorSystem = NStorage::CreateActorSystem(args);

Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/diagnostics/critical_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace NCloud::NBlockStore {
xxx(DiskAgentConfigMismatch) \
xxx(DiskRegistryDeviceNotFoundSoft) \
xxx(DiskRegistrySourceDiskNotFound) \
xxx(EndpointSwitchFailure) \
// BLOCKSTORE_CRITICAL_EVENTS

#define BLOCKSTORE_IMPOSSIBLE_EVENTS(xxx) \
Expand Down
42 changes: 42 additions & 0 deletions cloud/blockstore/libs/endpoints/endpoint_events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "endpoint_events.h"

namespace NCloud::NBlockStore::NServer {

namespace {

////////////////////////////////////////////////////////////////////////////////

class TEndpointEventProxy: public IEndpointEventProxy
{
private:
IEndpointEventHandlerPtr Handler;

public:
void OnVolumeConnectionEstablished(const TString& diskId) override;
void Register(IEndpointEventHandlerPtr listener) override;
};

////////////////////////////////////////////////////////////////////////////////

void TEndpointEventProxy::OnVolumeConnectionEstablished(const TString& diskId)
{
if (Handler) {
Handler->OnVolumeConnectionEstablished(diskId);
}
}

void TEndpointEventProxy::Register(IEndpointEventHandlerPtr listener)
{
Handler = std::move(listener);
}

} // namespace

////////////////////////////////////////////////////////////////////////////////

IEndpointEventProxyPtr CreateEndpointEventProxy()
{
return std::make_shared<TEndpointEventProxy>();
}

} // namespace NCloud::NBlockStore::NServer
27 changes: 27 additions & 0 deletions cloud/blockstore/libs/endpoints/endpoint_events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "public.h"

#include <cloud/blockstore/public/api/protos/volume.pb.h>

namespace NCloud::NBlockStore::NServer {

////////////////////////////////////////////////////////////////////////////////

struct IEndpointEventHandler
{
virtual ~IEndpointEventHandler() = default;

virtual void OnVolumeConnectionEstablished(const TString& diskId) = 0;
};

struct IEndpointEventProxy: public IEndpointEventHandler
{
virtual ~IEndpointEventProxy() = default;

virtual void Register(IEndpointEventHandlerPtr eventHandler) = 0;
};

IEndpointEventProxyPtr CreateEndpointEventProxy();

} // namespace NCloud::NBlockStore::NServer
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/endpoints/endpoint_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ struct IEndpointListener
virtual NProto::TError RefreshEndpoint(
const TString& socketPath,
const NProto::TVolume& volume) = 0;

virtual NThreading::TFuture<NProto::TError> SwitchEndpoint(
const NProto::TStartEndpointRequest& request,
const NProto::TVolume& volume,
NClient::ISessionPtr session) = 0;
};

} // namespace NCloud::NBlockStore::NServer
Loading

0 comments on commit e00e9c5

Please sign in to comment.