Skip to content

Commit

Permalink
NBS-1704: extract common code for unix domain sockets (#283)
Browse files Browse the repository at this point in the history
* NBS-1704: extract common

* fix tests

* update
  • Loading branch information
yegorskii committed Feb 5, 2024
1 parent 0d7f342 commit a3b0687
Show file tree
Hide file tree
Showing 21 changed files with 270 additions and 149 deletions.
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ void TBootstrapBase::Init()

STORAGE_INFO("Server initialized");

GrpcEndpointListener->SetClientAcceptor(Server->GetClientAcceptor());
GrpcEndpointListener->SetClientStorageFactory(
Server->GetClientStorageFactory());

TVector<IIncompleteRequestProviderPtr> requestProviders = {
Server,
Expand Down
4 changes: 2 additions & 2 deletions cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <cloud/blockstore/libs/encryption/encryption_client.h>
#include <cloud/blockstore/libs/encryption/encryption_key.h>
#include <cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/service/service_test.h>
#include <cloud/blockstore/libs/service/storage_provider.h>

Expand Down Expand Up @@ -913,7 +913,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest)
TBootstrap bootstrap(CreateTestService(mountedVolumes));

auto grpcListener = CreateSocketEndpointListener(bootstrap.Logging, 16);
grpcListener->SetClientAcceptor(CreateClientAcceptorStub());
grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub());

auto manager = CreateEndpointManager(
bootstrap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
#include <cloud/blockstore/libs/client/session.h>
#include <cloud/blockstore/libs/common/iovector.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/endpoint_poller.h>
#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>
#include <cloud/blockstore/libs/service/storage.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>
#include <cloud/storage/core/libs/uds/endpoint_poller.h>

namespace NCloud::NBlockStore::NServer {

Expand Down Expand Up @@ -210,7 +211,8 @@ class TSocketEndpointListener final
private:
const ui32 UnixSocketBacklog;

std::unique_ptr<TEndpointPoller> EndpointPoller;
std::unique_ptr<NStorage::NServer::TEndpointPoller> EndpointPoller;
IClientStorageFactoryPtr ClientStorageFactory;

TLog Log;

Expand All @@ -228,11 +230,11 @@ class TSocketEndpointListener final
Stop();
}

void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) override
void SetClientStorageFactory(IClientStorageFactoryPtr factory) override
{
Y_ABORT_UNLESS(!EndpointPoller);
EndpointPoller = std::make_unique<TEndpointPoller>(
std::move(clientAcceptor));
ClientStorageFactory = std::move(factory);
EndpointPoller = std::make_unique<NStorage::NServer::TEndpointPoller>();
}

void Start() override
Expand Down Expand Up @@ -263,7 +265,8 @@ class TSocketEndpointListener final
UnixSocketBacklog,
false, // multiClient
NProto::SOURCE_FD_DATA_CHANNEL,
std::move(sessionService));
ClientStorageFactory->CreateClientStorage(
std::move(sessionService)));

return MakeFuture(std::move(error));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ISocketEndpointListener
: public IEndpointListener
, public IStartable
{
virtual void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) = 0;
virtual void SetClientStorageFactory(IClientStorageFactoryPtr factory) = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <cloud/blockstore/libs/diagnostics/server_stats.h>
#include <cloud/blockstore/libs/diagnostics/volume_stats.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/server/server.h>
#include <cloud/blockstore/libs/server/server_test.h>
#include <cloud/blockstore/libs/service/service.h>
Expand Down Expand Up @@ -38,30 +38,38 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

class TTestClientAcceptor final
: public IClientAcceptor
using NStorage::NServer::IClientStorage;
using NStorage::NServer::IClientStoragePtr;

class TTestClientStorage
: public IClientStorageFactory
, public IClientStorage
, public std::enable_shared_from_this<TTestClientStorage>
{
private:
TMutex Lock;
TMap<SOCKET, IBlockStorePtr> Sessions;
TSet<SOCKET> Sessions;

public:
void Accept(
IClientStoragePtr CreateClientStorage(IBlockStorePtr service) override
{
Y_UNUSED(service);
return shared_from_this();
}

void AddClient(
const TSocketHolder& socket,
IBlockStorePtr sessionService,
NProto::ERequestSource source) override
{
Y_UNUSED(source);

with_lock (Lock) {
auto [it, inserted] = Sessions.emplace(
socket,
std::move(sessionService));
auto [it, inserted] = Sessions.emplace(socket);
UNIT_ASSERT(inserted);
}
}

void Remove(const TSocketHolder& socket) override
void RemoveClient(const TSocketHolder& socket) override
{
with_lock (Lock) {
auto it = Sessions.find(socket);
Expand Down Expand Up @@ -267,7 +275,8 @@ TBootstrap CreateBootstrap(TOptions options)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
options.UnixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

NProto::TStartEndpointRequest request;
request.SetUnixSocketPath(options.UnixSocketPath);
Expand Down Expand Up @@ -331,9 +340,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TFsPath unixSocket(CreateGuidAsString() + ".sock");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -359,7 +368,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
};

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

{
auto future = listener->StopEndpoint(unixSocket.GetPath());
Expand All @@ -368,17 +377,17 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
}

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldHandleClientDisconnection)
{
auto logging = CreateLoggingService("console");
TFsPath unixSocket(CreateGuidAsString() + ".sock");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -397,7 +406,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
clientEndpoint->Start();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

clientEndpoint->Stop();
clientEndpoint.reset();
Expand All @@ -406,7 +415,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
client.reset();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldNotAcceptClientAfterServerStopped)
Expand All @@ -429,7 +438,8 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
unixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

endpointListener->Start();
server->Start();
Expand Down Expand Up @@ -1101,9 +1111,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TString unixSocket("./invalid/path/to/socket");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoints_grpc/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ PEERDIR(
cloud/blockstore/libs/endpoints
cloud/blockstore/libs/server
cloud/blockstore/libs/service
cloud/storage/core/libs/uds
)

END()
Expand Down
37 changes: 0 additions & 37 deletions cloud/blockstore/libs/server/client_acceptor.cpp

This file was deleted.

31 changes: 0 additions & 31 deletions cloud/blockstore/libs/server/client_acceptor.h

This file was deleted.

29 changes: 29 additions & 0 deletions cloud/blockstore/libs/server/client_storage_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "client_storage_factory.h"

namespace NCloud::NBlockStore::NServer {

using namespace NCloud::NStorage::NServer;

////////////////////////////////////////////////////////////////////////////////

struct TClientStorageFactoryStub
: public IClientStorageFactory
{
NStorage::NServer::IClientStoragePtr CreateClientStorage(
IBlockStorePtr service) override
{
Y_UNUSED(service);
return CreateClientStorageStub();
}
};

////////////////////////////////////////////////////////////////////////////////

IClientStorageFactoryPtr CreateClientStorageFactoryStub()
{
return std::make_shared<TClientStorageFactoryStub>();
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NCloud::NBlockStore::NServer
27 changes: 27 additions & 0 deletions cloud/blockstore/libs/server/client_storage_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "public.h"

#include <cloud/blockstore/libs/service/public.h>

#include <cloud/storage/core/libs/uds/client_storage.h>

namespace NCloud::NBlockStore::NServer {

////////////////////////////////////////////////////////////////////////////////

struct IClientStorageFactory
{
virtual ~IClientStorageFactory() = default;

virtual NStorage::NServer::IClientStoragePtr CreateClientStorage(
IBlockStorePtr service) = 0;
};

////////////////////////////////////////////////////////////////////////////////

IClientStorageFactoryPtr CreateClientStorageFactoryStub();

////////////////////////////////////////////////////////////////////////////////

} // namespace NCloud::NBlockStore::NServer
4 changes: 2 additions & 2 deletions cloud/blockstore/libs/server/public.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ using TServerAppConfigPtr = std::shared_ptr<TServerAppConfig>;
struct IServer;
using IServerPtr = std::shared_ptr<IServer>;

struct IClientAcceptor;
using IClientAcceptorPtr = std::shared_ptr<IClientAcceptor>;
struct IClientStorageFactory;
using IClientStorageFactoryPtr = std::shared_ptr<IClientStorageFactory>;

} // namespace NServer
} // namespace NCloud::NBlockStore
Loading

0 comments on commit a3b0687

Please sign in to comment.