Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBS-1704: extract common code for unix domain sockets #283

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ void TBootstrapBase::Init()

STORAGE_INFO("Server initialized");

GrpcEndpointListener->SetClientAcceptor(Server->GetClientAcceptor());
GrpcEndpointListener->SetClientStorageFactory(
Server->GetClientStorageFactory());

TVector<IIncompleteRequestProviderPtr> requestProviders = {
Server,
Expand Down
4 changes: 2 additions & 2 deletions cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <cloud/blockstore/libs/encryption/encryption_client.h>
#include <cloud/blockstore/libs/encryption/encryption_key.h>
#include <cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/service/service_test.h>
#include <cloud/blockstore/libs/service/storage_provider.h>

Expand Down Expand Up @@ -913,7 +913,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest)
TBootstrap bootstrap(CreateTestService(mountedVolumes));

auto grpcListener = CreateSocketEndpointListener(bootstrap.Logging, 16);
grpcListener->SetClientAcceptor(CreateClientAcceptorStub());
grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub());

auto manager = CreateEndpointManager(
bootstrap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
#include <cloud/blockstore/libs/client/session.h>
#include <cloud/blockstore/libs/common/iovector.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/endpoint_poller.h>
#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>
#include <cloud/blockstore/libs/service/storage.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>
#include <cloud/storage/core/libs/uds/endpoint_poller.h>

namespace NCloud::NBlockStore::NServer {

Expand Down Expand Up @@ -210,7 +211,8 @@ class TSocketEndpointListener final
private:
const ui32 UnixSocketBacklog;

std::unique_ptr<TEndpointPoller> EndpointPoller;
std::unique_ptr<NStorage::NServer::TEndpointPoller> EndpointPoller;
IClientStorageFactoryPtr ClientStorageFactory;

TLog Log;

Expand All @@ -228,11 +230,11 @@ class TSocketEndpointListener final
Stop();
}

void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) override
void SetClientStorageFactory(IClientStorageFactoryPtr factory) override
{
Y_ABORT_UNLESS(!EndpointPoller);
EndpointPoller = std::make_unique<TEndpointPoller>(
std::move(clientAcceptor));
ClientStorageFactory = std::move(factory);
EndpointPoller = std::make_unique<NStorage::NServer::TEndpointPoller>();
}

void Start() override
Expand Down Expand Up @@ -263,7 +265,8 @@ class TSocketEndpointListener final
UnixSocketBacklog,
false, // multiClient
NProto::SOURCE_FD_DATA_CHANNEL,
std::move(sessionService));
ClientStorageFactory->CreateClientStorage(
std::move(sessionService)));

return MakeFuture(std::move(error));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ISocketEndpointListener
: public IEndpointListener
, public IStartable
{
virtual void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) = 0;
virtual void SetClientStorageFactory(IClientStorageFactoryPtr factory) = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <cloud/blockstore/libs/diagnostics/server_stats.h>
#include <cloud/blockstore/libs/diagnostics/volume_stats.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/server/server.h>
#include <cloud/blockstore/libs/server/server_test.h>
#include <cloud/blockstore/libs/service/service.h>
Expand Down Expand Up @@ -38,30 +38,39 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

class TTestClientAcceptor final
: public IClientAcceptor
using NStorage::NServer::IClientStorage;
using NStorage::NServer::IClientStoragePtr;

class TTestClientStorage
: public IClientStorageFactory
, public IClientStorage
, public std::enable_shared_from_this<TTestClientStorage>
{
private:
TMutex Lock;
TMap<SOCKET, IBlockStorePtr> Sessions;
TSet<SOCKET> Sessions;

public:
void Accept(
IClientStoragePtr CreateClientStorage(
IBlockStorePtr service) override
yegorskii marked this conversation as resolved.
Show resolved Hide resolved
{
Y_UNUSED(service);
return shared_from_this();
}

void AddClient(
const TSocketHolder& socket,
IBlockStorePtr sessionService,
NProto::ERequestSource source) override
{
Y_UNUSED(source);

with_lock (Lock) {
auto [it, inserted] = Sessions.emplace(
socket,
std::move(sessionService));
auto [it, inserted] = Sessions.emplace(socket);
UNIT_ASSERT(inserted);
}
}

void Remove(const TSocketHolder& socket) override
void RemoveClient(const TSocketHolder& socket) override
{
with_lock (Lock) {
auto it = Sessions.find(socket);
Expand Down Expand Up @@ -267,7 +276,8 @@ TBootstrap CreateBootstrap(TOptions options)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
options.UnixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

NProto::TStartEndpointRequest request;
request.SetUnixSocketPath(options.UnixSocketPath);
Expand Down Expand Up @@ -331,9 +341,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TFsPath unixSocket("./testSocket");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -359,7 +369,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
};

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

{
auto future = listener->StopEndpoint(unixSocket.GetPath());
Expand All @@ -368,17 +378,17 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
}

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldHandleClientDisconnection)
{
auto logging = CreateLoggingService("console");
TFsPath unixSocket("./testSocket");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -397,7 +407,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
clientEndpoint->Start();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

clientEndpoint->Stop();
clientEndpoint.reset();
Expand All @@ -406,7 +416,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
client.reset();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldNotAcceptClientAfterServerStopped)
Expand All @@ -429,7 +439,8 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
unixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

endpointListener->Start();
server->Start();
Expand Down Expand Up @@ -1101,9 +1112,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TString unixSocket("./invalid/path/to/socket");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoints_grpc/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ PEERDIR(
cloud/blockstore/libs/endpoints
cloud/blockstore/libs/server
cloud/blockstore/libs/service
cloud/storage/core/libs/uds
)

END()
Expand Down
37 changes: 0 additions & 37 deletions cloud/blockstore/libs/server/client_acceptor.cpp

This file was deleted.

31 changes: 0 additions & 31 deletions cloud/blockstore/libs/server/client_acceptor.h

This file was deleted.

29 changes: 29 additions & 0 deletions cloud/blockstore/libs/server/client_storage_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "client_storage_factory.h"

namespace NCloud::NBlockStore::NServer {

using namespace NCloud::NStorage::NServer;

////////////////////////////////////////////////////////////////////////////////

struct TClientStorageFactoryStub
: public IClientStorageFactory
{
NStorage::NServer::IClientStoragePtr CreateClientStorage(
IBlockStorePtr service) override
{
Y_UNUSED(service);
return CreateClientStorageStub();
}
};

////////////////////////////////////////////////////////////////////////////////

IClientStorageFactoryPtr CreateClientStorageFactoryStub()
{
return std::make_shared<TClientStorageFactoryStub>();
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NCloud::NBlockStore::NServer
27 changes: 27 additions & 0 deletions cloud/blockstore/libs/server/client_storage_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "public.h"

#include <cloud/blockstore/libs/service/public.h>

#include <cloud/storage/core/libs/uds/client_storage.h>

namespace NCloud::NBlockStore::NServer {

////////////////////////////////////////////////////////////////////////////////

struct IClientStorageFactory
{
virtual ~IClientStorageFactory() = default;

virtual NStorage::NServer::IClientStoragePtr CreateClientStorage(
IBlockStorePtr service) = 0;
};

////////////////////////////////////////////////////////////////////////////////

IClientStorageFactoryPtr CreateClientStorageFactoryStub();

////////////////////////////////////////////////////////////////////////////////

} // namespace NCloud::NBlockStore::NServer
4 changes: 2 additions & 2 deletions cloud/blockstore/libs/server/public.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ using TServerAppConfigPtr = std::shared_ptr<TServerAppConfig>;
struct IServer;
using IServerPtr = std::shared_ptr<IServer>;

struct IClientAcceptor;
using IClientAcceptorPtr = std::shared_ptr<IClientAcceptor>;
struct IClientStorageFactory;
using IClientStorageFactoryPtr = std::shared_ptr<IClientStorageFactory>;

} // namespace NServer
} // namespace NCloud::NBlockStore
Loading
Loading