diff --git a/cloud/blockstore/libs/common/device_path.cpp b/cloud/blockstore/libs/common/device_path.cpp new file mode 100644 index 00000000000..660540b8d2e --- /dev/null +++ b/cloud/blockstore/libs/common/device_path.cpp @@ -0,0 +1,55 @@ +#include "device_path.h" + +#include + +#include + +namespace NCloud::NBlockStore { + +NProto::TError DevicePath::Parse(const TString& devicePath) +{ + // rdma://myt1-ct5-13.cloud.yandex.net:10020/62ccf40f2743308191205ad6391bfa06 + + NUri::TUri uri; + auto res = uri.Parse( + devicePath, + NUri::TFeature::FeaturesDefault | + NUri::TFeature::FeatureSchemeFlexible); + if (res != NUri::TState::ParsedOK) { + return MakeError( + E_FAIL, + TStringBuilder() << "invalid device path " << devicePath); + } + + if (uri.GetField(NUri::TField::FieldScheme) != Protocol) { + return MakeError( + E_FAIL, + TStringBuilder() + << "device path doesn't start with " + << Protocol << "://, " << devicePath); + } + + auto path = uri.GetField(NUri::TField::FieldPath); + if (path.size() < 2 || path[0] != '/') { + return MakeError( + E_FAIL, + TStringBuilder() + << "invalid uuid inside device path " << devicePath); + } + + Host = uri.GetHost(); + Port = uri.GetPort(); + Uuid = path.substr(1); + + return {}; +} + +TString DevicePath::Serialize() const +{ + return TStringBuilder() + << Protocol << "://" + << Host << ":" << Port + << "/" << Uuid; +} + +} // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/common/device_path.h b/cloud/blockstore/libs/common/device_path.h new file mode 100644 index 00000000000..8ec0137e4db --- /dev/null +++ b/cloud/blockstore/libs/common/device_path.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include + +namespace NCloud::NBlockStore { + +//////////////////////////////////////////////////////////////////////////////// + +struct DevicePath +{ + TString Protocol; + TString Host; + ui16 Port; + TString Uuid; + + DevicePath( + const TString protocol, + const TString& host = {}, + ui16 port = 0, + const TString& uuid = {}) + : Protocol(protocol) + , Host(host) + , Port(port) + , Uuid(uuid) + {} + NProto::TError Parse(const TString& devicePath); + TString Serialize() const; +}; + +} // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/common/device_path_ut.cpp b/cloud/blockstore/libs/common/device_path_ut.cpp new file mode 100644 index 00000000000..e73596a6b7a --- /dev/null +++ b/cloud/blockstore/libs/common/device_path_ut.cpp @@ -0,0 +1,50 @@ +#include "device_path.h" + +#include + +namespace NCloud::NBlockStore { + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TDevicePathTest) +{ + Y_UNIT_TEST(TestParseValidDevicePath) + { + DevicePath devPath("rdma"); + auto error = devPath.Parse("rdma://a.b.c.d:10020/11223344"); + UNIT_ASSERT_C(!HasError(error), error); + + UNIT_ASSERT_EQUAL(devPath.Protocol, "rdma"); + UNIT_ASSERT_EQUAL(devPath.Host, "a.b.c.d"); + UNIT_ASSERT_EQUAL(devPath.Port, 10020); + UNIT_ASSERT_EQUAL(devPath.Uuid, "11223344"); + } + + Y_UNIT_TEST(TestParseInvalidProtocol) + { + DevicePath devPath("unknown"); + auto error = devPath.Parse("rdma://a.b.c.d:10020/11223344"); + UNIT_ASSERT(HasError(error)); + } + + Y_UNIT_TEST(TestParseNoUuid) + { + DevicePath devPath("rdma"); + auto error = devPath.Parse("rdma://a.b.c.d:10020"); + UNIT_ASSERT(HasError(error)); + } + + Y_UNIT_TEST(TestSerialize) + { + auto expectedPath = "rdma://a.b.c.d:10020/11223344"; + DevicePath devPath("rdma", "a.b.c.d", 10020, "11223344"); + UNIT_ASSERT_EQUAL(devPath.Serialize(), expectedPath); + + DevicePath devPath2("rdma"); + auto error = devPath2.Parse(expectedPath); + UNIT_ASSERT_C(!HasError(error), error); + UNIT_ASSERT_EQUAL(expectedPath, devPath2.Serialize()); + } +} + +} // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/common/ut/ya.make b/cloud/blockstore/libs/common/ut/ya.make index 030c6f032bf..0c079cac593 100644 --- a/cloud/blockstore/libs/common/ut/ya.make +++ b/cloud/blockstore/libs/common/ut/ya.make @@ -10,7 +10,12 @@ SRCS( caching_allocator_ut.cpp block_checksum_ut.cpp block_range_ut.cpp + device_path_ut.cpp iovector_ut.cpp ) +PEERDIR( + library/cpp/uri +) + END() diff --git a/cloud/blockstore/libs/common/ya.make b/cloud/blockstore/libs/common/ya.make index 28af8dadc90..4121bcfe5fe 100644 --- a/cloud/blockstore/libs/common/ya.make +++ b/cloud/blockstore/libs/common/ya.make @@ -4,13 +4,14 @@ SRCS( block_checksum.cpp block_range.cpp caching_allocator.cpp + device_path.cpp iovector.cpp typeinfo.cpp ) PEERDIR( cloud/blockstore/public/api/protos - + cloud/storage/core/libs/common library/cpp/digest/crc32c diff --git a/cloud/blockstore/libs/daemon/common/bootstrap.cpp b/cloud/blockstore/libs/daemon/common/bootstrap.cpp index 0e71ed0dc79..8aa63c30d24 100644 --- a/cloud/blockstore/libs/daemon/common/bootstrap.cpp +++ b/cloud/blockstore/libs/daemon/common/bootstrap.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -210,6 +211,7 @@ void TBootstrapBase::Init() Executor = TExecutor::Create("SVC"); VolumeBalancerSwitch = CreateVolumeBalancerSwitch(); + EndpointEventHandler = CreateEndpointEventProxy(); switch (Configs->Options->ServiceKind) { case TOptionsCommon::EServiceKind::Ydb: @@ -479,6 +481,7 @@ void TBootstrapBase::Init() Logging, ServerStats, Executor, + EndpointEventHandler, std::move(sessionManager), std::move(endpointListeners), Configs->ServerConfig->GetNbdSocketSuffix()); diff --git a/cloud/blockstore/libs/daemon/common/bootstrap.h b/cloud/blockstore/libs/daemon/common/bootstrap.h index fd93ed2b32e..71c4cabada1 100644 --- a/cloud/blockstore/libs/daemon/common/bootstrap.h +++ b/cloud/blockstore/libs/daemon/common/bootstrap.h @@ -69,6 +69,7 @@ class TBootstrapBase ICachingAllocatorPtr Allocator; IStorageProviderPtr AioStorageProvider; IEndpointServicePtr EndpointService; + IEndpointEventProxyPtr EndpointEventHandler; NRdma::IServerPtr RdmaServer; NRdma::IClientPtr RdmaClient; ITaskQueuePtr RdmaThreadPool; diff --git a/cloud/blockstore/libs/daemon/ydb/bootstrap.cpp b/cloud/blockstore/libs/daemon/ydb/bootstrap.cpp index 96fe70f740f..d303a66606e 100644 --- a/cloud/blockstore/libs/daemon/ydb/bootstrap.cpp +++ b/cloud/blockstore/libs/daemon/ydb/bootstrap.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -528,6 +529,7 @@ void TBootstrapYdb::InitKikimrService() return FindPtr(nodes, fqdn) || CityHash64(fqdn) % 100 < p; }(); args.VolumeBalancerSwitch = VolumeBalancerSwitch; + args.EndpointEventHandler = EndpointEventHandler; ActorSystem = NStorage::CreateActorSystem(args); diff --git a/cloud/blockstore/libs/diagnostics/critical_events.h b/cloud/blockstore/libs/diagnostics/critical_events.h index 4646973187f..96c0b65b592 100644 --- a/cloud/blockstore/libs/diagnostics/critical_events.h +++ b/cloud/blockstore/libs/diagnostics/critical_events.h @@ -50,6 +50,7 @@ namespace NCloud::NBlockStore { xxx(DiskAgentConfigMismatch) \ xxx(DiskRegistryDeviceNotFoundSoft) \ xxx(DiskRegistrySourceDiskNotFound) \ + xxx(EndpointSwitchFailure) \ // BLOCKSTORE_CRITICAL_EVENTS #define BLOCKSTORE_IMPOSSIBLE_EVENTS(xxx) \ diff --git a/cloud/blockstore/libs/endpoints/endpoint_events.cpp b/cloud/blockstore/libs/endpoints/endpoint_events.cpp new file mode 100644 index 00000000000..6f80a7e4741 --- /dev/null +++ b/cloud/blockstore/libs/endpoints/endpoint_events.cpp @@ -0,0 +1,42 @@ +#include "endpoint_events.h" + +namespace NCloud::NBlockStore::NServer { + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TEndpointEventProxy: public IEndpointEventProxy +{ +private: + IEndpointEventHandlerPtr Handler; + +public: + void OnVolumeConnectionEstablished(const TString& diskId) override; + void Register(IEndpointEventHandlerPtr listener) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +void TEndpointEventProxy::OnVolumeConnectionEstablished(const TString& diskId) +{ + if (Handler) { + Handler->OnVolumeConnectionEstablished(diskId); + } +} + +void TEndpointEventProxy::Register(IEndpointEventHandlerPtr listener) +{ + Handler = std::move(listener); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +IEndpointEventProxyPtr CreateEndpointEventProxy() +{ + return std::make_shared(); +} + +} // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/endpoints/endpoint_events.h b/cloud/blockstore/libs/endpoints/endpoint_events.h new file mode 100644 index 00000000000..60d3d6b1f5b --- /dev/null +++ b/cloud/blockstore/libs/endpoints/endpoint_events.h @@ -0,0 +1,27 @@ +#pragma once + +#include "public.h" + +#include + +namespace NCloud::NBlockStore::NServer { + +//////////////////////////////////////////////////////////////////////////////// + +struct IEndpointEventHandler +{ + virtual ~IEndpointEventHandler() = default; + + virtual void OnVolumeConnectionEstablished(const TString& diskId) = 0; +}; + +struct IEndpointEventProxy: public IEndpointEventHandler +{ + virtual ~IEndpointEventProxy() = default; + + virtual void Register(IEndpointEventHandlerPtr eventHandler) = 0; +}; + +IEndpointEventProxyPtr CreateEndpointEventProxy(); + +} // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/endpoints/endpoint_listener.h b/cloud/blockstore/libs/endpoints/endpoint_listener.h index e66232bb1b5..d46ba449152 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_listener.h +++ b/cloud/blockstore/libs/endpoints/endpoint_listener.h @@ -36,6 +36,11 @@ struct IEndpointListener virtual NProto::TError RefreshEndpoint( const TString& socketPath, const NProto::TVolume& volume) = 0; + + virtual NThreading::TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) = 0; }; } // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 0274406cf39..8592e05730f 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -1,10 +1,12 @@ #include "endpoint_manager.h" +#include "endpoint_events.h" #include "endpoint_listener.h" #include "session_manager.h" #include #include +#include #include #include #include @@ -133,7 +135,9 @@ bool CompareRequests( //////////////////////////////////////////////////////////////////////////////// class TEndpointManager final - : public IEndpointManager + : public std::enable_shared_from_this + , public IEndpointManager + , public IEndpointEventHandler { private: const IServerStatsPtr ServerStats; @@ -183,6 +187,8 @@ class TEndpointManager final TCallContextPtr ctx, std::shared_ptr request) override; + void OnVolumeConnectionEstablished(const TString& diskId) override; + private: NProto::TStartEndpointResponse StartEndpointImpl( TCallContextPtr ctx, @@ -216,6 +222,8 @@ class TEndpointManager final std::shared_ptr CreateNbdStartEndpointRequest( const NProto::TStartEndpointRequest& request); + + void TrySwitchEndpoint(const TString& diskId); }; //////////////////////////////////////////////////////////////////////////////// @@ -613,6 +621,62 @@ TStartEndpointRequestPtr TEndpointManager::CreateNbdStartEndpointRequest( return nbdRequest; } +void TEndpointManager::TrySwitchEndpoint(const TString& diskId) +{ + auto it = FindIf(Requests, [&] (auto& v) { + const auto& [_, req] = v; + return req->GetDiskId() == diskId + && req->GetIpcType() == NProto::IPC_VHOST; + }); + + if (it == Requests.end()) { + return; + } + + const auto& req = it->second; + auto listenerIt = EndpointListeners.find(req->GetIpcType()); + STORAGE_VERIFY( + listenerIt != EndpointListeners.end(), + TWellKnownEntityTypes::ENDPOINT, + req->GetUnixSocketPath()); + const auto& listener = listenerIt->second; + + auto ctx = MakeIntrusive(); + auto future = SessionManager->GetSession( + std::move(ctx), + req->GetUnixSocketPath(), + req->GetHeaders()); + auto result = Executor->WaitFor(future); + if (HasError(result)) { + return; + } + + const auto& sessionInfo = result.GetResult(); + + STORAGE_INFO("Switching endpoint for volume " << sessionInfo.Volume.GetDiskId() + << ", IsFastPathEnabled=" << sessionInfo.Volume.GetIsFastPathEnabled() + << ", Migrations=" << sessionInfo.Volume.GetMigrations().size()); + + auto switchFuture = listener->SwitchEndpoint( + *it->second, + sessionInfo.Volume, + sessionInfo.Session); + auto error = Executor->WaitFor(switchFuture); + if (HasError(error)) { + ReportEndpointSwitchFailure(TStringBuilder() + << "Failed to switch endpoint for volume " + << sessionInfo.Volume.GetDiskId() + << ", " << error.GetMessage()); + } +} + +void TEndpointManager::OnVolumeConnectionEstablished(const TString& diskId) +{ + Executor->ExecuteSimple([this, diskId] () { + return TrySwitchEndpoint(diskId); + }); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -621,17 +685,20 @@ IEndpointManagerPtr CreateEndpointManager( ILoggingServicePtr logging, IServerStatsPtr serverStats, TExecutorPtr executor, + IEndpointEventProxyPtr eventProxy, ISessionManagerPtr sessionManager, THashMap listeners, TString nbdSocketSuffix) { - return std::make_shared( + auto manager = std::make_shared( std::move(logging), std::move(serverStats), std::move(executor), std::move(sessionManager), std::move(listeners), std::move(nbdSocketSuffix)); + eventProxy->Register(manager); + return manager; } bool IsSameStartEndpointRequests( diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.h b/cloud/blockstore/libs/endpoints/endpoint_manager.h index 140399dfa4f..0491298b615 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.h +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.h @@ -46,6 +46,7 @@ IEndpointManagerPtr CreateEndpointManager( ILoggingServicePtr logging, IServerStatsPtr serverStats, TExecutorPtr executor, + IEndpointEventProxyPtr eventProxy, ISessionManagerPtr sessionManager, THashMap listeners, TString nbdSocketSuffix); diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 1422c239644..ea9ec185aef 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -1,5 +1,6 @@ #include "endpoint_manager.h" +#include "endpoint_events.h" #include "endpoint_listener.h" #include "session_manager.h" @@ -194,6 +195,18 @@ class TTestEndpointListener final return {}; } + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + + return MakeFuture(); + } + TMap GetEndpoints() const { return Endpoints; @@ -377,10 +390,13 @@ IEndpointManagerPtr CreateEndpointManager( bootstrap.Executor, sessionManagerOptions); + auto eventProxy = CreateEndpointEventProxy(); + return NServer::CreateEndpointManager( bootstrap.Logging, serverStats, bootstrap.Executor, + CreateEndpointEventProxy(), std::move(sessionManager), std::move(endpointListeners), std::move(nbdSocketSuffix)); @@ -1012,6 +1028,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) bootstrap.Logging, CreateServerStatsStub(), bootstrap.Executor, + CreateEndpointEventProxy(), sessionManager, {{ NProto::IPC_GRPC, std::make_shared() }}, "" // NbdSocketSuffix diff --git a/cloud/blockstore/libs/endpoints/public.h b/cloud/blockstore/libs/endpoints/public.h index 51bc3bb74de..c98d99208ee 100644 --- a/cloud/blockstore/libs/endpoints/public.h +++ b/cloud/blockstore/libs/endpoints/public.h @@ -25,4 +25,10 @@ using IEndpointListenerPtr = std::shared_ptr; struct IEndpointService; using IEndpointServicePtr = std::shared_ptr; +struct IEndpointEventHandler; +using IEndpointEventHandlerPtr = std::shared_ptr; + +struct IEndpointEventProxy; +using IEndpointEventProxyPtr = std::shared_ptr; + } // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/endpoints/ya.make b/cloud/blockstore/libs/endpoints/ya.make index 2b1da1f30c4..ecd11f7c208 100644 --- a/cloud/blockstore/libs/endpoints/ya.make +++ b/cloud/blockstore/libs/endpoints/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + endpoint_events.cpp endpoint_listener.cpp endpoint_manager.cpp service_endpoint.cpp diff --git a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp index dbc07de0141..3e9be9f0767 100644 --- a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp +++ b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp @@ -293,6 +293,18 @@ class TSocketEndpointListener final Y_UNUSED(volume); return {}; } + + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + return MakeFuture(); + } + }; } // namespace diff --git a/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp b/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp index 23b09834d59..49c6c736096 100644 --- a/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp +++ b/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp @@ -86,6 +86,18 @@ class TNbdEndpointListener final Y_UNUSED(volume); return {}; } + + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + return MakeFuture(MakeError(E_NOT_IMPLEMENTED)); + } + }; } // namespace diff --git a/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp b/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp index 35397f0eac5..aeec35eaed9 100644 --- a/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp +++ b/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp @@ -356,6 +356,18 @@ class TRdmaEndpointListener final return {}; } + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + return MakeFuture(MakeError(E_NOT_IMPLEMENTED)); + } + + private: NProto::TError DoStartEndpoint( const NProto::TStartEndpointRequest& request, diff --git a/cloud/blockstore/libs/endpoints_spdk/spdk_server.cpp b/cloud/blockstore/libs/endpoints_spdk/spdk_server.cpp index 72309c44a9d..4b324c08995 100644 --- a/cloud/blockstore/libs/endpoints_spdk/spdk_server.cpp +++ b/cloud/blockstore/libs/endpoints_spdk/spdk_server.cpp @@ -315,6 +315,17 @@ class TNVMeEndpointListener final return {}; } + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + return MakeFuture(MakeError(E_NOT_IMPLEMENTED)); + } + private: NProto::TError DoStartEndpoint( const NProto::TStartEndpointRequest& request, @@ -481,6 +492,17 @@ class TSCSIEndpointListener final return {}; } + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + return MakeFuture(MakeError(E_NOT_IMPLEMENTED)); + } + private: NProto::TError DoStartEndpoint( const NProto::TStartEndpointRequest& request, diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp index 6a2366bd5b2..2de95508be3 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp @@ -1,6 +1,7 @@ #include "external_vhost_server.h" #include "external_endpoint_stats.h" +#include #include #include @@ -42,6 +43,29 @@ namespace { //////////////////////////////////////////////////////////////////////////////// +enum class EEndpointType +{ + Local, + Rdma, + Fallback +}; + +const char* GetEndpointTypeName(EEndpointType state) +{ + static const char* names[] = { + "LOCAL", + "RDMA", + "FALLBACK", + }; + + if ((size_t)state < Y_ARRAY_SIZE(names)) { + return names[(size_t)state]; + } + return "UNDEFINED"; +} + +//////////////////////////////////////////////////////////////////////////////// + struct TChild { pid_t Pid = 0; @@ -572,6 +596,9 @@ class TExternalVhostEndpointListener final if (!TryStartExternalEndpoint(request, volume)) { Endpoints.emplace(socketPath, nullptr); + STORAGE_INFO("starting endpoint " + << request.GetUnixSocketPath() << ", epType=FALLBACK"); + return FallbackListener->StartEndpoint( request, volume, @@ -598,20 +625,7 @@ class TExternalVhostEndpointListener final return FallbackListener->AlterEndpoint(request, volume, std::move(session)); } - auto self = weak_from_this(); - - return StopEndpoint(socketPath) - .Apply([=] (const auto& future) { - if (future.HasException() || HasError(future.GetValue())) { - return future; - } - - if (auto p = self.lock()) { - return p->StartEndpoint(request, volume, session); - } - - return MakeFuture(MakeError(E_REJECTED, "Cancelled")); - }); + return TrySwitchEndpoint(request, volume, session); } TFuture StopEndpoint(const TString& socketPath) override @@ -652,7 +666,55 @@ class TExternalVhostEndpointListener final return {}; } + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + const auto& socketPath = request.GetUnixSocketPath(); + auto it = Endpoints.find(socketPath); + if (it == Endpoints.end()) { + return MakeFuture(MakeError(E_NOT_FOUND, TStringBuilder() + << "endpoint " << socketPath.Quote() << " not found")); + } + + bool isExternalEndpoint = it->second != nullptr; + if (isExternalEndpoint != CanStartExternalEndpoint(request, volume)) { + return TrySwitchEndpoint(request, volume, session); + } + + return MakeFuture(); + } + private: + NThreading::TFuture TrySwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) + { + auto epType = GetEndpointType(request, volume); + + STORAGE_INFO(TStringBuilder() + << "switching endpoint " << request.GetUnixSocketPath() + << ", epType=" << GetEndpointTypeName(epType) + << ", external=" << CanStartExternalEndpoint(request, volume)); + + auto self = weak_from_this(); + + return StopEndpoint(request.GetUnixSocketPath()) + .Apply([=] (const auto& future) { + if (future.HasException() || HasError(future.GetValue())) { + return future; + } + + if (auto p = self.lock()) { + return p->StartEndpoint(request, volume, session); + } + + return MakeFuture(MakeError(E_REJECTED, "Cancelled")); + }); + } + bool IsLocalMode( const NProto::TStartEndpointRequest& request, const NProto::TVolume& volume) const @@ -662,23 +724,60 @@ class TExternalVhostEndpointListener final && request.GetIpcType() == NProto::IPC_VHOST; } + bool IsFastPathMode( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume) const + { + return volume.GetIsFastPathEnabled() + && request.GetVolumeMountMode() == NProto::VOLUME_MOUNT_LOCAL + && request.GetIpcType() == NProto::IPC_VHOST; + } + bool CanStartExternalEndpoint( const NProto::TStartEndpointRequest& request, const NProto::TVolume& volume) const { - return IsLocalMode(request, volume) && IsOnLocalAgent(volume); + auto epType = GetEndpointType(request, volume); + switch (epType) { + case EEndpointType::Local: + return true; + case EEndpointType::Rdma: + return volume.GetMigrations().empty(); + case EEndpointType::Fallback: + return false; + } + } + + EEndpointType GetEndpointType( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume) const + { + if (IsLocalMode(request, volume) && IsOnLocalAgent(volume)) { + return EEndpointType::Local; + } + + if (IsFastPathMode(request, volume) && IsOnRdmaAgent(volume)) { + return EEndpointType::Rdma; + } + + return EEndpointType::Fallback; } bool TryStartExternalEndpoint( const NProto::TStartEndpointRequest& request, const NProto::TVolume& volume) { - if (!IsLocalMode(request, volume) || !ValidateLocation(volume)) { + auto epType = GetEndpointType(request, volume); + if (epType == EEndpointType::Local && !ValidateLocalLocation(volume)) { + return false; + } + + if (!CanStartExternalEndpoint(request, volume)) { return false; } try { - StartExternalEndpoint(request, volume); + StartExternalEndpoint(epType, request, volume); return true; @@ -690,27 +789,70 @@ class TExternalVhostEndpointListener final return false; } + TString GetDeviceBackend(EEndpointType epType) + { + switch (epType) { + case EEndpointType::Local: + return "aio"; + case EEndpointType::Rdma: + return "rdma"; + case EEndpointType::Fallback: + return "fallback"; + } + } + + TString GetDevicePath(EEndpointType epType, const NProto::TDevice& device) + { + switch (epType) { + case EEndpointType::Local: + return device.GetDeviceName(); + case EEndpointType::Rdma: { + DevicePath path( + "rdma", + device.GetRdmaEndpoint().GetHost(), + device.GetRdmaEndpoint().GetPort(), + device.GetDeviceUUID()); + return path.Serialize(); + } + case EEndpointType::Fallback: + return ""; + } + } + void StartExternalEndpoint( + EEndpointType epType, const NProto::TStartEndpointRequest& request, const NProto::TVolume& volume) { + STORAGE_INFO("starting endpoint " + << request.GetUnixSocketPath() << ", epType=" + << GetEndpointTypeName(epType)); + const auto& socketPath = request.GetUnixSocketPath(); const auto& deviceName = request.GetDeviceName() ? request.GetDeviceName() : volume.GetDiskId(); + const auto& clientId = request.GetClientId() + ? request.GetClientId() + : request.GetInstanceId(); TVector args { + "--client-id", clientId, + "--disk-id", request.GetDiskId(), "--serial", deviceName, "--socket-path", socketPath, "-q", ToString(request.GetVhostQueuesCount()) }; + args.emplace_back("--device-backend"); + args.emplace_back(GetDeviceBackend(epType)); + for (const auto& device: volume.GetDevices()) { const ui64 size = device.GetBlockCount() * volume.GetBlockSize(); args.insert(args.end(), { "--device", TStringBuilder() - << device.GetDeviceName() << ":" + << GetDevicePath(epType, device) << ":" << size << ":" << device.GetPhysicalOffset() }); @@ -725,9 +867,6 @@ class TExternalVhostEndpointListener final request.GetClientCGroups().end() ); - const auto& clientId = request.GetClientId() - ? request.GetClientId() - : request.GetInstanceId(); auto ep = EndpointFactory( clientId, @@ -750,7 +889,17 @@ class TExternalVhostEndpointListener final }); } - bool ValidateLocation(const NProto::TVolume& volume) const + bool IsOnRdmaAgent(const NProto::TVolume& volume) const + { + return AllOf( + volume.GetDevices(), + [&] (const auto& d) { + return d.GetRdmaEndpoint().GetHost() == d.GetAgentId() + && d.GetAgentId() != ""; + }); + } + + bool ValidateLocalLocation(const NProto::TVolume& volume) const { if (LocalAgentId.empty()) { return true; diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp index ac551f7ede0..2a3b0fa8a23 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp @@ -118,6 +118,17 @@ struct TTestEndpointListener return {}; } + + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + return MakeFuture(); + } }; //////////////////////////////////////////////////////////////////////////////// @@ -345,15 +356,21 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest) UNIT_ASSERT_VALUES_EQUAL(Volume.GetDiskId(), create->DiskId); /* - --device ... 2 - --device ... 2 + --client-id ... 2 + --disk-id ... 2 --serial local0 2 --socket-path /tmp/socket.vhost 2 -q 2 2 + --device-backend ... 2 + --device ... 2 + --device ... 2 --read-only 1 - 11 + 17 */ - UNIT_ASSERT_VALUES_EQUAL(11, create->CmdArgs.size()); + + UNIT_ASSERT_VALUES_EQUAL(17, create->CmdArgs.size()); + UNIT_ASSERT_VALUES_EQUAL("client", GetArg(create->CmdArgs, "--client-id")); + UNIT_ASSERT_VALUES_EQUAL("vol0", GetArg(create->CmdArgs, "--disk-id")); UNIT_ASSERT_VALUES_EQUAL("local0", GetArg(create->CmdArgs, "--serial")); UNIT_ASSERT_VALUES_EQUAL( @@ -361,6 +378,7 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest) GetArg(create->CmdArgs, "--socket-path")); UNIT_ASSERT_VALUES_EQUAL("2", GetArg(create->CmdArgs, "-q")); + UNIT_ASSERT_VALUES_EQUAL("aio", GetArg(create->CmdArgs, "--device-backend")); UNIT_ASSERT(FindPtr(create->CmdArgs, "--read-only")); auto devices = GetArgN(create->CmdArgs, "--device"); diff --git a/cloud/blockstore/libs/endpoints_vhost/vhost_server.cpp b/cloud/blockstore/libs/endpoints_vhost/vhost_server.cpp index 017beffdf3d..53d35903924 100644 --- a/cloud/blockstore/libs/endpoints_vhost/vhost_server.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/vhost_server.cpp @@ -65,6 +65,17 @@ class TVhostEndpointListener final { return Server->UpdateEndpoint(socketPath, volume.GetBlocksCount()); } + + TFuture SwitchEndpoint( + const NProto::TStartEndpointRequest& request, + const NProto::TVolume& volume, + NClient::ISessionPtr session) override + { + Y_UNUSED(request); + Y_UNUSED(volume); + Y_UNUSED(session); + return MakeFuture(); + } }; } // namespace diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.cpp b/cloud/blockstore/libs/storage/core/proto_helpers.cpp index 075bb5007db..348d6c35f7f 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.cpp +++ b/cloud/blockstore/libs/storage/core/proto_helpers.cpp @@ -184,6 +184,15 @@ void VolumeConfigToVolume( volume.SetBaseDiskCheckpointId(volumeConfig.GetBaseDiskCheckpointId()); volume.SetIsSystem(volumeConfig.GetIsSystem()); volume.SetIsFillFinished(volumeConfig.GetIsFillFinished()); + + TStringBuf sit(volumeConfig.GetTagsStr()); + TStringBuf tag; + while (sit.NextTok(',', tag)) { + if (tag == "use-fastpath") { + volume.SetIsFastPathEnabled(true); + } + } + } void VolumeConfigToVolumeModel( diff --git a/cloud/blockstore/libs/storage/init/server/actorsystem.cpp b/cloud/blockstore/libs/storage/init/server/actorsystem.cpp index 311ba4bf0ce..8a47eed4631 100644 --- a/cloud/blockstore/libs/storage/init/server/actorsystem.cpp +++ b/cloud/blockstore/libs/storage/init/server/actorsystem.cpp @@ -210,6 +210,7 @@ class TStorageServicesInitializer final Args.BlockDigestGenerator, Args.DiscoveryService, Args.TraceSerializer, + Args.EndpointEventHandler, Args.RdmaClient, Args.VolumeStats, Args.PreemptedVolumes); @@ -342,6 +343,7 @@ class TCustomLocalServiceInitializer final const IProfileLogPtr ProfileLog; const IBlockDigestGeneratorPtr BlockDigestGenerator; const ITraceSerializerPtr TraceSerializer; + const NServer::IEndpointEventHandlerPtr EndpointEventHandler; const NLogbroker::IServicePtr LogbrokerService; const NNotify::IServicePtr NotifyService; const NRdma::IClientPtr RdmaClient; diff --git a/cloud/blockstore/libs/storage/init/server/actorsystem.h b/cloud/blockstore/libs/storage/init/server/actorsystem.h index 722aa9dc461..8d4ed0b387a 100644 --- a/cloud/blockstore/libs/storage/init/server/actorsystem.h +++ b/cloud/blockstore/libs/storage/init/server/actorsystem.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -63,6 +64,7 @@ struct TServerActorSystemArgs TManuallyPreemptedVolumesPtr PreemptedVolumes; NNvme::INvmeManagerPtr NvmeManager; IVolumeBalancerSwitchPtr VolumeBalancerSwitch; + NServer::IEndpointEventHandlerPtr EndpointEventHandler; TVector UserCounterProviders; diff --git a/cloud/blockstore/libs/storage/service/service.cpp b/cloud/blockstore/libs/storage/service/service.cpp index 4de5ffa26d2..ec5a091a0d8 100644 --- a/cloud/blockstore/libs/storage/service/service.cpp +++ b/cloud/blockstore/libs/storage/service/service.cpp @@ -15,6 +15,7 @@ IActorPtr CreateStorageService( IBlockDigestGeneratorPtr blockDigestGenerator, NDiscovery::IDiscoveryServicePtr discoveryService, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, IVolumeStatsPtr volumeStats, TManuallyPreemptedVolumesPtr preemptedVolumes) @@ -26,6 +27,7 @@ IActorPtr CreateStorageService( std::move(blockDigestGenerator), std::move(discoveryService), std::move(traceSerializer), + std::move(endpointEventHandler), std::move(rdmaClient), std::move(volumeStats), std::move(preemptedVolumes)); diff --git a/cloud/blockstore/libs/storage/service/service.h b/cloud/blockstore/libs/storage/service/service.h index 473aae2aa65..41fec562de9 100644 --- a/cloud/blockstore/libs/storage/service/service.h +++ b/cloud/blockstore/libs/storage/service/service.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -19,6 +20,7 @@ NActors::IActorPtr CreateStorageService( IBlockDigestGeneratorPtr blockDigestGenerator, NDiscovery::IDiscoveryServicePtr discoveryService, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, IVolumeStatsPtr volumeStats, TManuallyPreemptedVolumesPtr preemptedVolumes); diff --git a/cloud/blockstore/libs/storage/service/service_actor.cpp b/cloud/blockstore/libs/storage/service/service_actor.cpp index b496c4a76da..2d9d42c1f14 100644 --- a/cloud/blockstore/libs/storage/service/service_actor.cpp +++ b/cloud/blockstore/libs/storage/service/service_actor.cpp @@ -24,6 +24,7 @@ TServiceActor::TServiceActor( IBlockDigestGeneratorPtr blockDigestGenerator, NDiscovery::IDiscoveryServicePtr discoveryService, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, IVolumeStatsPtr volumeStats, TManuallyPreemptedVolumesPtr preemptedVolumes) @@ -33,6 +34,7 @@ TServiceActor::TServiceActor( , BlockDigestGenerator(std::move(blockDigestGenerator)) , DiscoveryService(std::move(discoveryService)) , TraceSerializer(std::move(traceSerializer)) + , EndpointEventHandler(std::move(endpointEventHandler)) , RdmaClient(std::move(rdmaClient)) , VolumeStats(std::move(volumeStats)) , SharedCounters(MakeIntrusive(Config)) diff --git a/cloud/blockstore/libs/storage/service/service_actor.h b/cloud/blockstore/libs/storage/service/service_actor.h index 8468dc2ced0..7303699511f 100644 --- a/cloud/blockstore/libs/storage/service/service_actor.h +++ b/cloud/blockstore/libs/storage/service/service_actor.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,7 @@ class TServiceActor final const IBlockDigestGeneratorPtr BlockDigestGenerator; const NDiscovery::IDiscoveryServicePtr DiscoveryService; const ITraceSerializerPtr TraceSerializer; + const NServer::IEndpointEventHandlerPtr EndpointEventHandler; const NRdma::IClientPtr RdmaClient; const IVolumeStatsPtr VolumeStats; @@ -65,6 +67,7 @@ class TServiceActor final IBlockDigestGeneratorPtr blockDigestGenerator, NDiscovery::IDiscoveryServicePtr discoveryService, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, IVolumeStatsPtr volumeStats, TManuallyPreemptedVolumesPtr preemptedVolumes); @@ -381,6 +384,7 @@ NActors::IActorPtr CreateVolumeSessionActor( IProfileLogPtr profileLog, IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, std::shared_ptr counters, TSharedServiceCountersPtr sharedCounters); diff --git a/cloud/blockstore/libs/storage/service/service_actor_mount.cpp b/cloud/blockstore/libs/storage/service/service_actor_mount.cpp index 56648fa83ad..44dc6e0cf72 100644 --- a/cloud/blockstore/libs/storage/service/service_actor_mount.cpp +++ b/cloud/blockstore/libs/storage/service/service_actor_mount.cpp @@ -136,6 +136,7 @@ void TServiceActor::HandleMountVolume( ProfileLog, BlockDigestGenerator, TraceSerializer, + EndpointEventHandler, RdmaClient, Counters, SharedCounters diff --git a/cloud/blockstore/libs/storage/service/volume_client_actor.cpp b/cloud/blockstore/libs/storage/service/volume_client_actor.cpp index adb84466712..2a5e7f2e97c 100644 --- a/cloud/blockstore/libs/storage/service/volume_client_actor.cpp +++ b/cloud/blockstore/libs/storage/service/volume_client_actor.cpp @@ -2,6 +2,7 @@ #include "service_events_private.h" +#include #include #include #include @@ -71,6 +72,7 @@ class TVolumeClientActor final private: ITraceSerializerPtr TraceSerializer; + NServer::IEndpointEventHandlerPtr EndpointEventHandler; const TActorId SessionActorId; const TString DiskId; const ui64 TabletId; @@ -87,6 +89,7 @@ class TVolumeClientActor final TVolumeClientActor( TStorageConfigPtr config, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, const TActorId& sessionActorId, TString diskId, ui64 tabletId); @@ -136,11 +139,13 @@ class TVolumeClientActor final TVolumeClientActor::TVolumeClientActor( TStorageConfigPtr config, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, const TActorId& sessionActorId, TString diskId, ui64 tabletId) : TActor(&TThis::StateWork) , TraceSerializer(std::move(traceSerializer)) + , EndpointEventHandler(endpointEventHandler) , SessionActorId(sessionActorId) , DiskId(std::move(diskId)) , TabletId(tabletId) @@ -207,6 +212,7 @@ void TVolumeClientActor::HandleConnect( "Connection to tablet: " << msg->TabletId << " has been established"); + EndpointEventHandler->OnVolumeConnectionEstablished(DiskId); } void TVolumeClientActor::HandleDisconnect( @@ -466,6 +472,7 @@ STFUNC(TVolumeClientActor::StateWork) IActorPtr CreateVolumeClient( TStorageConfigPtr config, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, const TActorId& sessionActorId, TString diskId, ui64 tabletId) @@ -473,10 +480,10 @@ IActorPtr CreateVolumeClient( return std::make_unique( std::move(config), std::move(traceSerializer), + std::move(endpointEventHandler), sessionActorId, std::move(diskId), tabletId); } } // namespace NCloud::NBlockStore::NStorage - diff --git a/cloud/blockstore/libs/storage/service/volume_client_actor.h b/cloud/blockstore/libs/storage/service/volume_client_actor.h index 89839531b4f..a14d55b96f9 100644 --- a/cloud/blockstore/libs/storage/service/volume_client_actor.h +++ b/cloud/blockstore/libs/storage/service/volume_client_actor.h @@ -2,6 +2,7 @@ #include "public.h" +#include #include #include @@ -14,6 +15,7 @@ namespace NCloud::NBlockStore::NStorage { NActors::IActorPtr CreateVolumeClient( TStorageConfigPtr config, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, const NActors::TActorId& sessionActorId, TString diskId, ui64 tabletId); diff --git a/cloud/blockstore/libs/storage/service/volume_session_actor.cpp b/cloud/blockstore/libs/storage/service/volume_session_actor.cpp index 1c30dda9688..74881cfeb09 100644 --- a/cloud/blockstore/libs/storage/service/volume_session_actor.cpp +++ b/cloud/blockstore/libs/storage/service/volume_session_actor.cpp @@ -66,6 +66,7 @@ void TVolumeSessionActor::HandleDescribeVolumeResponse( CreateVolumeClient( Config, TraceSerializer, + EndpointEventHandler, SelfId(), VolumeInfo->DiskId, TabletId @@ -337,6 +338,7 @@ IActorPtr CreateVolumeSessionActor( IProfileLogPtr profileLog, IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, std::shared_ptr counters, TSharedServiceCountersPtr sharedCounters) @@ -348,6 +350,7 @@ IActorPtr CreateVolumeSessionActor( std::move(profileLog), std::move(blockDigestGenerator), std::move(traceSerializer), + std::move(endpointEventHandler), std::move(rdmaClient), std::move(counters), std::move(sharedCounters)); diff --git a/cloud/blockstore/libs/storage/service/volume_session_actor.h b/cloud/blockstore/libs/storage/service/volume_session_actor.h index 33c6e469715..7753000e045 100644 --- a/cloud/blockstore/libs/storage/service/volume_session_actor.h +++ b/cloud/blockstore/libs/storage/service/volume_session_actor.h @@ -7,6 +7,7 @@ #include "service_state.h" #include +#include #include #include #include @@ -47,6 +48,7 @@ class TVolumeSessionActor final const IProfileLogPtr ProfileLog; const IBlockDigestGeneratorPtr BlockDigestGenerator; const ITraceSerializerPtr TraceSerializer; + const NServer::IEndpointEventHandlerPtr EndpointEventHandler; const NRdma::IClientPtr RdmaClient; const std::shared_ptr Counters; const TSharedServiceCountersPtr SharedCounters; @@ -77,6 +79,7 @@ class TVolumeSessionActor final IProfileLogPtr profileLog, IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, std::shared_ptr counters, TSharedServiceCountersPtr sharedCounters) @@ -86,6 +89,7 @@ class TVolumeSessionActor final , ProfileLog(std::move(profileLog)) , BlockDigestGenerator(std::move(blockDigestGenerator)) , TraceSerializer(std::move(traceSerializer)) + , EndpointEventHandler(std::move(endpointEventHandler)) , RdmaClient(std::move(rdmaClient)) , Counters(std::move(counters)) , SharedCounters(std::move(sharedCounters)) diff --git a/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp b/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp index a0b33a12674..a3a0031e172 100644 --- a/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp +++ b/cloud/blockstore/libs/storage/service/volume_session_actor_start.cpp @@ -51,6 +51,7 @@ class TStartVolumeActor final const IProfileLogPtr ProfileLog; const IBlockDigestGeneratorPtr BlockDigestGenerator; const ITraceSerializerPtr TraceSerializer; + const NServer::IEndpointEventHandlerPtr EndpointEventHandler; const NRdma::IClientPtr RdmaClient; const TString DiskId; const ui64 VolumeTabletId; @@ -83,6 +84,7 @@ class TStartVolumeActor final IProfileLogPtr profileLog, IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, TString diskId, ui64 volumeTabletId = 0); @@ -159,6 +161,7 @@ TStartVolumeActor::TStartVolumeActor( IProfileLogPtr profileLog, IBlockDigestGeneratorPtr blockDigestGenerator, ITraceSerializerPtr traceSerializer, + NServer::IEndpointEventHandlerPtr endpointEventHandler, NRdma::IClientPtr rdmaClient, TString diskId, ui64 volumeTabletId) @@ -168,6 +171,7 @@ TStartVolumeActor::TStartVolumeActor( , ProfileLog(std::move(profileLog)) , BlockDigestGenerator(std::move(blockDigestGenerator)) , TraceSerializer(std::move(traceSerializer)) + , EndpointEventHandler(std::move(endpointEventHandler)) , RdmaClient(std::move(rdmaClient)) , DiskId(std::move(diskId)) , VolumeTabletId(volumeTabletId) @@ -435,6 +439,7 @@ void TStartVolumeActor::StartTablet(const TActorContext& ctx) auto profileLog = ProfileLog; auto blockDigestGenerator = BlockDigestGenerator; auto traceSerializer = TraceSerializer; + auto endpointEventHandler = EndpointEventHandler; auto rdmaClient = RdmaClient; auto factory = [=] (const TActorId& owner, TTabletStorageInfo* storage) { @@ -858,6 +863,7 @@ void TVolumeSessionActor::HandleStartVolumeRequest( ProfileLog, BlockDigestGenerator, TraceSerializer, + EndpointEventHandler, RdmaClient, diskId, TabletId); diff --git a/cloud/blockstore/libs/storage/service/volume_session_actor_unmount.cpp b/cloud/blockstore/libs/storage/service/volume_session_actor_unmount.cpp index a55a68c2f65..727a7a365f8 100644 --- a/cloud/blockstore/libs/storage/service/volume_session_actor_unmount.cpp +++ b/cloud/blockstore/libs/storage/service/volume_session_actor_unmount.cpp @@ -401,6 +401,7 @@ void TVolumeSessionActor::HandleUnmountRequestProcessed( CreateVolumeClient( Config, TraceSerializer, + EndpointEventHandler, SelfId(), VolumeInfo->DiskId, TabletId diff --git a/cloud/blockstore/libs/storage/testlib/test_env.cpp b/cloud/blockstore/libs/storage/testlib/test_env.cpp index 91d2012495d..0adcb56fd99 100644 --- a/cloud/blockstore/libs/storage/testlib/test_env.cpp +++ b/cloud/blockstore/libs/storage/testlib/test_env.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -371,6 +372,7 @@ ui32 TTestEnv::CreateBlockStoreNode( CreateBlockDigestGeneratorStub(), NDiscovery::CreateDiscoveryServiceStub(), TraceSerializer, + NServer::CreateEndpointEventProxy(), nullptr, // rdmaClient CreateVolumeStatsStub(), std::move(manuallyPreemptedVolumes)); diff --git a/cloud/blockstore/libs/storage/testlib/ya.make b/cloud/blockstore/libs/storage/testlib/ya.make index cb4e241669a..cb6b2a6bc2c 100644 --- a/cloud/blockstore/libs/storage/testlib/ya.make +++ b/cloud/blockstore/libs/storage/testlib/ya.make @@ -17,6 +17,7 @@ SRCS( PEERDIR( cloud/blockstore/libs/diagnostics cloud/blockstore/libs/discovery + cloud/blockstore/libs/endpoints cloud/blockstore/libs/kikimr cloud/blockstore/libs/service cloud/blockstore/libs/storage/api diff --git a/cloud/blockstore/public/api/protos/volume.proto b/cloud/blockstore/public/api/protos/volume.proto index 1b2bef81078..c0f799b6a89 100644 --- a/cloud/blockstore/public/api/protos/volume.proto +++ b/cloud/blockstore/public/api/protos/volume.proto @@ -276,6 +276,10 @@ message TVolume // Represents whether disk filling finished or not. bool IsFillFinished = 35; + + // Whether volume can use fast data path (external endpoint with direct + // rdma connection to engine) + bool IsFastPathEnabled = 36; } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/vhost-server/backend_rdma.cpp b/cloud/blockstore/vhost-server/backend_rdma.cpp index 29eccefc49f..7b053aff024 100644 --- a/cloud/blockstore/vhost-server/backend_rdma.cpp +++ b/cloud/blockstore/vhost-server/backend_rdma.cpp @@ -2,6 +2,7 @@ #include "backend.h" +#include #include #include #include @@ -12,11 +13,11 @@ #include #include #include +#include #include #include #include -#include namespace NCloud::NBlockStore::NVHostServer { @@ -68,32 +69,6 @@ TRdmaBackend::TRdmaBackend(ILoggingServicePtr logging) Log = Logging->CreateLog("RDMA"); } -std::tuple ParseRdmaDevicePath( - const TString& devicePath) -{ - // rdma://myt1-ct5-13.cloud.yandex.net:10020/62ccf40f2743308191205ad6391bfa06 - - NUri::TUri uri; - auto res = uri.Parse( - devicePath, - NUri::TFeature::FeaturesDefault | NUri::TFeature::FeatureSchemeFlexible); - Y_ENSURE( - res == NUri::TState::ParsedOK, - "invalid device path " << devicePath); - - Y_ENSURE(uri.GetField(NUri::TField::FieldScheme) == "rdma", - "device path doesn't start with rdma://, " << devicePath); - - auto path = uri.GetField(NUri::TField::FieldPath); - Y_ENSURE( - path.size() > 1 && path[0] == '/', - "invalid uuid inside device path " << devicePath); - - auto uuid = path.substr(1); - - return {TString{uri.GetHost()}, uri.GetPort(), TString{uuid}}; -} - vhd_bdev_info TRdmaBackend::Init(const TOptions& options) { STORAGE_INFO("Initializing RDMA backend"); @@ -123,20 +98,31 @@ vhd_bdev_info TRdmaBackend::Init(const TOptions& options) ui64 totalBytes = 0; for (auto& chunk: options.Layout) { - auto [host, port, uuid] = ParseRdmaDevicePath(chunk.DevicePath); + DevicePath devicePath("rdma"); + auto error = devicePath.Parse(chunk.DevicePath); + STORAGE_VERIFY_C( + !HasError(error), + TWellKnownEntityTypes::ENDPOINT, + ClientId, + "device parse error: " << error.GetMessage()); + auto* device = Volume.MutableDevices()->Add(); - device->SetDeviceUUID(uuid); - device->MutableRdmaEndpoint()->SetHost(host); - device->MutableRdmaEndpoint()->SetPort(port); + device->SetDeviceUUID(devicePath.Uuid); + device->MutableRdmaEndpoint()->SetHost(devicePath.Host); + device->MutableRdmaEndpoint()->SetPort(devicePath.Port); - Y_ENSURE( + STORAGE_VERIFY_C( chunk.Offset == 0, + TWellKnownEntityTypes::ENDPOINT, + ClientId, "device chunk offset is not 0" << ", device=" << chunk.DevicePath << ", offset=" << chunk.Offset); - Y_ENSURE( + STORAGE_VERIFY_C( chunk.ByteCount % Volume.GetBlockSize() == 0, + TWellKnownEntityTypes::ENDPOINT, + ClientId, "device chunk size is not aligned to " << Volume.GetBlockSize() << ", device=" << chunk.DevicePath