diff --git a/cloud/blockstore/libs/daemon/common/bootstrap.cpp b/cloud/blockstore/libs/daemon/common/bootstrap.cpp index 8aa63c30d24..bdb40720d30 100644 --- a/cloud/blockstore/libs/daemon/common/bootstrap.cpp +++ b/cloud/blockstore/libs/daemon/common/bootstrap.cpp @@ -579,7 +579,8 @@ void TBootstrapBase::Init() STORAGE_INFO("Server initialized"); - GrpcEndpointListener->SetClientAcceptor(Server->GetClientAcceptor()); + GrpcEndpointListener->SetClientStorageFactory( + Server->GetClientStorageFactory()); TVector requestProviders = { Server, diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index ea9ec185aef..4f56633fbfd 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include @@ -913,7 +913,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) TBootstrap bootstrap(CreateTestService(mountedVolumes)); auto grpcListener = CreateSocketEndpointListener(bootstrap.Logging, 16); - grpcListener->SetClientAcceptor(CreateClientAcceptorStub()); + grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub()); auto manager = CreateEndpointManager( bootstrap, diff --git a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp index 3e9be9f0767..78af40fe826 100644 --- a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp +++ b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp @@ -3,13 +3,14 @@ #include #include #include -#include #include #include #include #include +#include #include #include +#include namespace NCloud::NBlockStore::NServer { @@ -210,7 +211,8 @@ class TSocketEndpointListener final private: const ui32 UnixSocketBacklog; - std::unique_ptr EndpointPoller; + std::unique_ptr EndpointPoller; + IClientStorageFactoryPtr ClientStorageFactory; TLog Log; @@ -228,11 +230,11 @@ class TSocketEndpointListener final Stop(); } - void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) override + void SetClientStorageFactory(IClientStorageFactoryPtr factory) override { Y_ABORT_UNLESS(!EndpointPoller); - EndpointPoller = std::make_unique( - std::move(clientAcceptor)); + ClientStorageFactory = std::move(factory); + EndpointPoller = std::make_unique(); } void Start() override @@ -263,7 +265,8 @@ class TSocketEndpointListener final UnixSocketBacklog, false, // multiClient NProto::SOURCE_FD_DATA_CHANNEL, - std::move(sessionService)); + ClientStorageFactory->CreateClientStorage( + std::move(sessionService))); return MakeFuture(std::move(error)); } diff --git a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h index 11e61c7e7ca..8d9b2e14f53 100644 --- a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h +++ b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h @@ -15,7 +15,7 @@ struct ISocketEndpointListener : public IEndpointListener , public IStartable { - virtual void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) = 0; + virtual void SetClientStorageFactory(IClientStorageFactoryPtr factory) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp index 5fe709abf66..804d00dd48b 100644 --- a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp +++ b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener_ut.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -38,30 +38,38 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -class TTestClientAcceptor final - : public IClientAcceptor +using NStorage::NServer::IClientStorage; +using NStorage::NServer::IClientStoragePtr; + +class TTestClientStorage + : public IClientStorageFactory + , public IClientStorage + , public std::enable_shared_from_this { private: TMutex Lock; - TMap Sessions; + TSet Sessions; public: - void Accept( + IClientStoragePtr CreateClientStorage(IBlockStorePtr service) override + { + Y_UNUSED(service); + return shared_from_this(); + } + + void AddClient( const TSocketHolder& socket, - IBlockStorePtr sessionService, NProto::ERequestSource source) override { Y_UNUSED(source); with_lock (Lock) { - auto [it, inserted] = Sessions.emplace( - socket, - std::move(sessionService)); + auto [it, inserted] = Sessions.emplace(socket); UNIT_ASSERT(inserted); } } - void Remove(const TSocketHolder& socket) override + void RemoveClient(const TSocketHolder& socket) override { with_lock (Lock) { auto it = Sessions.find(socket); @@ -267,7 +275,8 @@ TBootstrap CreateBootstrap(TOptions options) auto endpointListener = CreateSocketEndpointListener( testFactory.Logging, options.UnixSocketBacklog); - endpointListener->SetClientAcceptor(server->GetClientAcceptor()); + endpointListener->SetClientStorageFactory( + server->GetClientStorageFactory()); NProto::TStartEndpointRequest request; request.SetUnixSocketPath(options.UnixSocketPath); @@ -331,9 +340,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) auto logging = CreateLoggingService("console"); TFsPath unixSocket(CreateGuidAsString() + ".sock"); - auto clientAcceptor = std::make_shared(); + auto clientStorage = std::make_shared(); auto listener = CreateSocketEndpointListener(logging, 16); - listener->SetClientAcceptor(clientAcceptor); + listener->SetClientStorageFactory(clientStorage); listener->Start(); Y_DEFER { listener->Stop(); @@ -359,7 +368,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) }; Sleep(TDuration::MilliSeconds(100)); - UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1); + UNIT_ASSERT(clientStorage->GetSessionCount() == 1); { auto future = listener->StopEndpoint(unixSocket.GetPath()); @@ -368,7 +377,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) } Sleep(TDuration::MilliSeconds(100)); - UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0); + UNIT_ASSERT(clientStorage->GetSessionCount() == 0); } Y_UNIT_TEST(ShouldHandleClientDisconnection) @@ -376,9 +385,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) auto logging = CreateLoggingService("console"); TFsPath unixSocket(CreateGuidAsString() + ".sock"); - auto clientAcceptor = std::make_shared(); + auto clientStorage = std::make_shared(); auto listener = CreateSocketEndpointListener(logging, 16); - listener->SetClientAcceptor(clientAcceptor); + listener->SetClientStorageFactory(clientStorage); listener->Start(); Y_DEFER { listener->Stop(); @@ -397,7 +406,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) clientEndpoint->Start(); Sleep(TDuration::MilliSeconds(100)); - UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1); + UNIT_ASSERT(clientStorage->GetSessionCount() == 1); clientEndpoint->Stop(); clientEndpoint.reset(); @@ -406,7 +415,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) client.reset(); Sleep(TDuration::MilliSeconds(100)); - UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0); + UNIT_ASSERT(clientStorage->GetSessionCount() == 0); } Y_UNIT_TEST(ShouldNotAcceptClientAfterServerStopped) @@ -429,7 +438,8 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) auto endpointListener = CreateSocketEndpointListener( testFactory.Logging, unixSocketBacklog); - endpointListener->SetClientAcceptor(server->GetClientAcceptor()); + endpointListener->SetClientStorageFactory( + server->GetClientStorageFactory()); endpointListener->Start(); server->Start(); @@ -1101,9 +1111,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest) auto logging = CreateLoggingService("console"); TString unixSocket("./invalid/path/to/socket"); - auto clientAcceptor = std::make_shared(); + auto clientStorage = std::make_shared(); auto listener = CreateSocketEndpointListener(logging, 16); - listener->SetClientAcceptor(clientAcceptor); + listener->SetClientStorageFactory(clientStorage); listener->Start(); Y_DEFER { listener->Stop(); diff --git a/cloud/blockstore/libs/endpoints_grpc/ya.make b/cloud/blockstore/libs/endpoints_grpc/ya.make index f906e1efea1..52185a9408e 100644 --- a/cloud/blockstore/libs/endpoints_grpc/ya.make +++ b/cloud/blockstore/libs/endpoints_grpc/ya.make @@ -13,6 +13,7 @@ PEERDIR( cloud/blockstore/libs/endpoints cloud/blockstore/libs/server cloud/blockstore/libs/service + cloud/storage/core/libs/uds ) END() diff --git a/cloud/blockstore/libs/server/client_acceptor.cpp b/cloud/blockstore/libs/server/client_acceptor.cpp deleted file mode 100644 index 7bee955ff79..00000000000 --- a/cloud/blockstore/libs/server/client_acceptor.cpp +++ /dev/null @@ -1,37 +0,0 @@ -#include "client_acceptor.h" - -namespace NCloud::NBlockStore::NServer { - -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -struct TClientAcceptorStub final - : public IClientAcceptor -{ - void Accept( - const TSocketHolder& socket, - IBlockStorePtr sessionService, - NCloud::NProto::ERequestSource source) override - { - Y_UNUSED(socket); - Y_UNUSED(sessionService); - Y_UNUSED(source); - } - - void Remove(const TSocketHolder& socket) override - { - Y_UNUSED(socket); - } -}; - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - -IClientAcceptorPtr CreateClientAcceptorStub() -{ - return std::make_shared(); -} - -} // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/server/client_acceptor.h b/cloud/blockstore/libs/server/client_acceptor.h deleted file mode 100644 index 6b8215fc967..00000000000 --- a/cloud/blockstore/libs/server/client_acceptor.h +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include "public.h" - -#include - -#include - -#include - -namespace NCloud::NBlockStore::NServer { - -//////////////////////////////////////////////////////////////////////////////// - -struct IClientAcceptor -{ - virtual ~IClientAcceptor() = default; - - virtual void Accept( - const TSocketHolder& clientSocket, - IBlockStorePtr service, - NCloud::NProto::ERequestSource source) = 0; - - virtual void Remove(const TSocketHolder& clientSocket) = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -IClientAcceptorPtr CreateClientAcceptorStub(); - -} // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/server/client_storage_factory.cpp b/cloud/blockstore/libs/server/client_storage_factory.cpp new file mode 100644 index 00000000000..58562cfdc2c --- /dev/null +++ b/cloud/blockstore/libs/server/client_storage_factory.cpp @@ -0,0 +1,29 @@ +#include "client_storage_factory.h" + +namespace NCloud::NBlockStore::NServer { + +using namespace NCloud::NStorage::NServer; + +//////////////////////////////////////////////////////////////////////////////// + +struct TClientStorageFactoryStub + : public IClientStorageFactory +{ + NStorage::NServer::IClientStoragePtr CreateClientStorage( + IBlockStorePtr service) override + { + Y_UNUSED(service); + return CreateClientStorageStub(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IClientStorageFactoryPtr CreateClientStorageFactoryStub() +{ + return std::make_shared(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCloud::NBlockStore::NServer \ No newline at end of file diff --git a/cloud/blockstore/libs/server/client_storage_factory.h b/cloud/blockstore/libs/server/client_storage_factory.h new file mode 100644 index 00000000000..4f0d2cd448f --- /dev/null +++ b/cloud/blockstore/libs/server/client_storage_factory.h @@ -0,0 +1,27 @@ +#pragma once + +#include "public.h" + +#include + +#include + +namespace NCloud::NBlockStore::NServer { + +//////////////////////////////////////////////////////////////////////////////// + +struct IClientStorageFactory +{ + virtual ~IClientStorageFactory() = default; + + virtual NStorage::NServer::IClientStoragePtr CreateClientStorage( + IBlockStorePtr service) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IClientStorageFactoryPtr CreateClientStorageFactoryStub(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCloud::NBlockStore::NServer \ No newline at end of file diff --git a/cloud/blockstore/libs/server/public.h b/cloud/blockstore/libs/server/public.h index 548dcc26452..5b4b0d0ed1a 100644 --- a/cloud/blockstore/libs/server/public.h +++ b/cloud/blockstore/libs/server/public.h @@ -18,8 +18,8 @@ using TServerAppConfigPtr = std::shared_ptr; struct IServer; using IServerPtr = std::shared_ptr; -struct IClientAcceptor; -using IClientAcceptorPtr = std::shared_ptr; +struct IClientStorageFactory; +using IClientStorageFactoryPtr = std::shared_ptr; } // namespace NServer } // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/server/server.cpp b/cloud/blockstore/libs/server/server.cpp index 942ba774b7f..8405d81a561 100644 --- a/cloud/blockstore/libs/server/server.cpp +++ b/cloud/blockstore/libs/server/server.cpp @@ -1,8 +1,7 @@ #include "server.h" -#include "client_acceptor.h" #include "config.h" -#include "endpoint_poller.h" +#include "client_storage_factory.h" #include @@ -25,6 +24,8 @@ #include #include #include +#include +#include #include @@ -193,8 +194,22 @@ BLOCKSTORE_GRPC_SERVICE(BLOCKSTORE_DECLARE_METHOD) //////////////////////////////////////////////////////////////////////////////// +using NCloud::NStorage::NServer::IClientStorage; +using NCloud::NStorage::NServer::IClientStoragePtr; + +//////////////////////////////////////////////////////////////////////////////// + +class TSessionStorage; + +IClientStoragePtr CreateEndpointClientStorage( + const std::shared_ptr& sessionStorage, + IBlockStorePtr service); + +//////////////////////////////////////////////////////////////////////////////// + class TSessionStorage final - : public IClientAcceptor + : public IClientStorageFactory + , public std::enable_shared_from_this { struct TClientInfo { @@ -214,10 +229,10 @@ class TSessionStorage final : AppCtx(appCtx) {} - void Accept( + void AddClient( const TSocketHolder& socket, IBlockStorePtr sessionService, - NProto::ERequestSource source) override + NProto::ERequestSource source) { if (AtomicGet(AppCtx.ShouldStop)) { return; @@ -245,7 +260,7 @@ class TSessionStorage final grpc::AddInsecureChannelFromFd(AppCtx.Server.get(), dupSocket.Release()); } - void Remove(const TSocketHolder& socket) override + void RemoveClient(const TSocketHolder& socket) { if (AtomicGet(AppCtx.ShouldStop)) { return; @@ -271,6 +286,13 @@ class TSessionStorage final return nullptr; } + IClientStoragePtr CreateClientStorage(IBlockStorePtr service) + { + return CreateEndpointClientStorage( + shared_from_this(), + std::move(service)); + } + private: static TSocketHolder CreateDuplicate(const TSocketHolder& socket) { @@ -310,6 +332,44 @@ class TSessionStorage final //////////////////////////////////////////////////////////////////////////////// +class TClientStorage final + : public IClientStorage +{ + std::shared_ptr Storage; + IBlockStorePtr Service; + +public: + TClientStorage( + std::shared_ptr storage, + IBlockStorePtr service) + : Storage(std::move(storage)) + , Service(std::move(service)) + {} + + void AddClient( + const TSocketHolder& clientSocket, + NCloud::NProto::ERequestSource source) override + { + Storage->AddClient(clientSocket, Service, source); + } + + void RemoveClient(const TSocketHolder& clientSocket) override + { + Storage->RemoveClient(clientSocket); + } +}; + +IClientStoragePtr CreateEndpointClientStorage( + const std::shared_ptr& sessionStorage, + IBlockStorePtr service) +{ + return std::make_shared( + sessionStorage, + std::move(service)); +} + +//////////////////////////////////////////////////////////////////////////////// + template struct TRequestDataHolder {}; @@ -861,7 +921,7 @@ class TServer final TVector> Executors; - std::unique_ptr EndpointPoller; + std::unique_ptr EndpointPoller; public: TServer( @@ -876,7 +936,7 @@ class TServer final void Start() override; void Stop() override; - IClientAcceptorPtr GetClientAcceptor() override; + IClientStorageFactoryPtr GetClientStorageFactory() override; size_t CollectRequests( const TIncompleteRequestsCollector& collector) override; @@ -1078,7 +1138,7 @@ void TServer::StartListenUnixSocket( { STORAGE_INFO("Listen on (control) " << unixSocketPath.Quote()); - EndpointPoller = std::make_unique(SessionStorage); + EndpointPoller = std::make_unique(); EndpointPoller->Start(); auto error = EndpointPoller->StartListenEndpoint( @@ -1086,7 +1146,7 @@ void TServer::StartListenUnixSocket( backlog, true, // multiClient NProto::SOURCE_FD_CONTROL_CHANNEL, - UdsService); + SessionStorage->CreateClientStorage(UdsService)); if (HasError(error)) { ReportEndpointStartingError(); @@ -1144,7 +1204,7 @@ void TServer::Stop() Executors.clear(); } -IClientAcceptorPtr TServer::GetClientAcceptor() +IClientStorageFactoryPtr TServer::GetClientStorageFactory() { return SessionStorage; } diff --git a/cloud/blockstore/libs/server/server.h b/cloud/blockstore/libs/server/server.h index f40b4656ad7..dadfcb16001 100644 --- a/cloud/blockstore/libs/server/server.h +++ b/cloud/blockstore/libs/server/server.h @@ -16,7 +16,7 @@ struct IServer : public IStartable , public IIncompleteRequestProvider { - virtual IClientAcceptorPtr GetClientAcceptor() = 0; + virtual IClientStorageFactoryPtr GetClientStorageFactory() = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/server/ya.make b/cloud/blockstore/libs/server/ya.make index b1869a1ee57..a56c02d7edc 100644 --- a/cloud/blockstore/libs/server/ya.make +++ b/cloud/blockstore/libs/server/ya.make @@ -1,12 +1,10 @@ LIBRARY() SRCS( - client_acceptor.cpp + client_storage_factory.cpp config.cpp - endpoint_poller.cpp server.cpp server_test.cpp - socket_poller.cpp ) PEERDIR( @@ -18,6 +16,7 @@ PEERDIR( cloud/blockstore/libs/diagnostics cloud/blockstore/libs/service cloud/storage/core/libs/grpc + cloud/storage/core/libs/uds library/cpp/actors/prof library/cpp/monlib/service diff --git a/cloud/storage/core/libs/uds/client_storage.cpp b/cloud/storage/core/libs/uds/client_storage.cpp new file mode 100644 index 00000000000..e793d61fd50 --- /dev/null +++ b/cloud/storage/core/libs/uds/client_storage.cpp @@ -0,0 +1,35 @@ +#include "client_storage.h" + +namespace NCloud::NStorage::NServer { + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +struct TClientStorageStub final + : public IClientStorage +{ + void AddClient( + const TSocketHolder& socket, + NCloud::NProto::ERequestSource source) override + { + Y_UNUSED(socket); + Y_UNUSED(source); + } + + void RemoveClient(const TSocketHolder& socket) override + { + Y_UNUSED(socket); + } +}; + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +IClientStoragePtr CreateClientStorageStub() +{ + return std::make_shared(); +} + +} // namespace NCloud::NStorage::NServer diff --git a/cloud/storage/core/libs/uds/client_storage.h b/cloud/storage/core/libs/uds/client_storage.h new file mode 100644 index 00000000000..0b57272fded --- /dev/null +++ b/cloud/storage/core/libs/uds/client_storage.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +#include + +namespace NCloud::NStorage::NServer { + +//////////////////////////////////////////////////////////////////////////////// + +struct IClientStorage +{ + virtual ~IClientStorage() = default; + + virtual void AddClient( + const TSocketHolder& clientSocket, + NCloud::NProto::ERequestSource source) = 0; + + virtual void RemoveClient(const TSocketHolder& clientSocket) = 0; +}; + +using IClientStoragePtr = std::shared_ptr; + +//////////////////////////////////////////////////////////////////////////////// + +IClientStoragePtr CreateClientStorageStub(); + +} // namespace NCloud::NStorage::NServer diff --git a/cloud/blockstore/libs/server/endpoint_poller.cpp b/cloud/storage/core/libs/uds/endpoint_poller.cpp similarity index 92% rename from cloud/blockstore/libs/server/endpoint_poller.cpp rename to cloud/storage/core/libs/uds/endpoint_poller.cpp index 45e24cb3c9b..7d0ca45d88b 100644 --- a/cloud/blockstore/libs/server/endpoint_poller.cpp +++ b/cloud/storage/core/libs/uds/endpoint_poller.cpp @@ -1,6 +1,5 @@ #include "endpoint_poller.h" -#include "client_acceptor.h" #include "socket_poller.h" #include @@ -18,7 +17,7 @@ #include -namespace NCloud::NBlockStore::NServer { +namespace NCloud::NStorage::NServer { namespace { @@ -42,7 +41,7 @@ using TConnectionPtr = std::shared_ptr; struct TEndpoint final : public ICookie { - const IBlockStorePtr Service; + const IClientStoragePtr ClientStorage; const bool MultiClient; const NProto::ERequestSource Source; @@ -51,10 +50,10 @@ struct TEndpoint final TSet Connections; TEndpoint( - IBlockStorePtr service, + IClientStoragePtr clientStorage, bool multiClient, NProto::ERequestSource source) - : Service(std::move(service)) + : ClientStorage(std::move(clientStorage)) , MultiClient(multiClient) , Source(source) {} @@ -142,8 +141,6 @@ class TEndpointPoller::TImpl final : public ISimpleThread { private: - const IClientAcceptorPtr ClientAcceptor; - TSocketPoller SocketPoller; TMutex CookieLock; @@ -153,14 +150,10 @@ class TEndpointPoller::TImpl final TMap Endpoints; public: - TImpl(IClientAcceptorPtr clientAcceptor) - : ClientAcceptor(std::move(clientAcceptor)) - {} - void Stop() { with_lock (EndpointLock) { - for (const auto& item : Endpoints) { + for (const auto& item: Endpoints) { const auto& endpoint = item.second; StopListenEndpointImpl(endpoint, false); } @@ -186,10 +179,10 @@ class TEndpointPoller::TImpl final ui32 unixSocketBacklog, bool multiClient, NProto::ERequestSource source, - IBlockStorePtr service) + IClientStoragePtr clientStorage) { auto endpoint = std::make_shared( - std::move(service), + std::move(clientStorage), multiClient, source); @@ -320,9 +313,8 @@ class TEndpointPoller::TImpl final SocketPoller.WaitClose(connection->Socket, connection.get()); - ClientAcceptor->Accept( + endpoint->ClientStorage->AddClient( connection->Socket, - endpoint->Service, endpoint->Source); } } @@ -331,7 +323,7 @@ class TEndpointPoller::TImpl final { CookieHolders.insert(connection->shared_from_this()); - ClientAcceptor->Remove(connection->Socket); + connection->Endpoint->ClientStorage->RemoveClient(connection->Socket); SocketPoller.Unwait(connection->Socket); @@ -341,8 +333,8 @@ class TEndpointPoller::TImpl final //////////////////////////////////////////////////////////////////////////////// -TEndpointPoller::TEndpointPoller(IClientAcceptorPtr clientAcceptor) - : Impl(std::make_unique(std::move(clientAcceptor))) +TEndpointPoller::TEndpointPoller() + : Impl(std::make_unique()) {} TEndpointPoller::~TEndpointPoller() @@ -363,14 +355,14 @@ NProto::TError TEndpointPoller::StartListenEndpoint( ui32 backlog, bool multiClient, NProto::ERequestSource source, - IBlockStorePtr service) + IClientStoragePtr clientStorage) { return Impl->StartListenEndpoint( unixSocketPath, backlog, multiClient, source, - std::move(service)); + std::move(clientStorage)); } NProto::TError TEndpointPoller::StopListenEndpoint(const TString& unixSocketPath) @@ -378,4 +370,4 @@ NProto::TError TEndpointPoller::StopListenEndpoint(const TString& unixSocketPath return Impl->StopListenEndpoint(unixSocketPath); } -} // namespace NCloud::NBlockStore::NServer +} // namespace NCloud::NStorage::NServer diff --git a/cloud/blockstore/libs/server/endpoint_poller.h b/cloud/storage/core/libs/uds/endpoint_poller.h similarity index 72% rename from cloud/blockstore/libs/server/endpoint_poller.h rename to cloud/storage/core/libs/uds/endpoint_poller.h index 550ef432002..396c7009345 100644 --- a/cloud/blockstore/libs/server/endpoint_poller.h +++ b/cloud/storage/core/libs/uds/endpoint_poller.h @@ -1,8 +1,8 @@ #pragma once -#include "public.h" +#include "client_storage.h" -#include +#include #include #include @@ -10,7 +10,7 @@ #include -namespace NCloud::NBlockStore::NServer { +namespace NCloud::NStorage::NServer { //////////////////////////////////////////////////////////////////////////////// @@ -21,7 +21,7 @@ class TEndpointPoller std::unique_ptr Impl; public: - TEndpointPoller(IClientAcceptorPtr clientAcceptor); + TEndpointPoller(); ~TEndpointPoller(); void Start(); @@ -32,9 +32,9 @@ class TEndpointPoller ui32 backlog, bool multiClient, NProto::ERequestSource source, - IBlockStorePtr service); + IClientStoragePtr clientStorage); NProto::TError StopListenEndpoint(const TString& unixSocketPath); }; -} // namespace NCloud::NBlockStore::NServer +} // namespace NCloud::NStorage::NServer diff --git a/cloud/blockstore/libs/server/socket_poller.cpp b/cloud/storage/core/libs/uds/socket_poller.cpp similarity index 97% rename from cloud/blockstore/libs/server/socket_poller.cpp rename to cloud/storage/core/libs/uds/socket_poller.cpp index 773890767f9..b17ee8039a6 100644 --- a/cloud/blockstore/libs/server/socket_poller.cpp +++ b/cloud/storage/core/libs/uds/socket_poller.cpp @@ -12,7 +12,7 @@ #include #endif -namespace NCloud::NBlockStore::NServer { +namespace NCloud::NStorage::NServer { //////////////////////////////////////////////////////////////////////////////// @@ -163,4 +163,4 @@ size_t TSocketPoller::Wait(void** events, size_t len) return Impl->Wait(events, len); } -} // namespace NCloud::NBlockStore::NServer \ No newline at end of file +} // namespace NCloud::NStorage::NServer \ No newline at end of file diff --git a/cloud/blockstore/libs/server/socket_poller.h b/cloud/storage/core/libs/uds/socket_poller.h similarity index 83% rename from cloud/blockstore/libs/server/socket_poller.h rename to cloud/storage/core/libs/uds/socket_poller.h index 35994849433..ff0c0f4984d 100644 --- a/cloud/blockstore/libs/server/socket_poller.h +++ b/cloud/storage/core/libs/uds/socket_poller.h @@ -4,7 +4,7 @@ #include -namespace NCloud::NBlockStore::NServer { +namespace NCloud::NStorage::NServer { //////////////////////////////////////////////////////////////////////////////// @@ -26,4 +26,4 @@ class TSocketPoller size_t Wait(void** events, size_t len); }; -} // namespace NCloud::NBlockStore::NServer +} // namespace NCloud::NStorage::NServer diff --git a/cloud/storage/core/libs/uds/ya.make b/cloud/storage/core/libs/uds/ya.make index 94acd7d9f6b..0035cd539fc 100644 --- a/cloud/storage/core/libs/uds/ya.make +++ b/cloud/storage/core/libs/uds/ya.make @@ -1,11 +1,15 @@ LIBRARY() SRCS( + client_storage.cpp + endpoint_poller.cpp + socket_poller.cpp uds_socket_client.cpp ) PEERDIR( cloud/storage/core/libs/common + cloud/storage/core/protos contrib/libs/grpc )