Skip to content

Commit

Permalink
Merge to stable-23-3 (#340)
Browse files Browse the repository at this point in the history
* NBS-1704: extract common code for unix domain sockets (#283)

* NBS-1704: extract common

* fix tests

* update

* tablet history entries should contain fqdn (#325)

* tablet history entries should contain fqdn

* add ut

* removed useless code

* fix test

* fix test

* update CMakeLists

* update CMakeLists

* update

* update

* update
  • Loading branch information
yegorskii authored Feb 5, 2024
1 parent 7664a23 commit ad640f9
Show file tree
Hide file tree
Showing 35 changed files with 334 additions and 163 deletions.
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ void TBootstrapBase::Init()

STORAGE_INFO("Server initialized");

GrpcEndpointListener->SetClientAcceptor(Server->GetClientAcceptor());
GrpcEndpointListener->SetClientStorageFactory(
Server->GetClientStorageFactory());

TVector<IIncompleteRequestProviderPtr> requestProviders = {
Server,
Expand Down
4 changes: 2 additions & 2 deletions cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <cloud/blockstore/libs/encryption/encryption_client.h>
#include <cloud/blockstore/libs/encryption/encryption_key.h>
#include <cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/service/service_test.h>
#include <cloud/blockstore/libs/service/storage_provider.h>

Expand Down Expand Up @@ -913,7 +913,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest)
TBootstrap bootstrap(CreateTestService(mountedVolumes));

auto grpcListener = CreateSocketEndpointListener(bootstrap.Logging, 16);
grpcListener->SetClientAcceptor(CreateClientAcceptorStub());
grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub());

auto manager = CreateEndpointManager(
bootstrap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
#include <cloud/blockstore/libs/client/session.h>
#include <cloud/blockstore/libs/common/iovector.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/endpoint_poller.h>
#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>
#include <cloud/blockstore/libs/service/storage.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>
#include <cloud/storage/core/libs/uds/endpoint_poller.h>

namespace NCloud::NBlockStore::NServer {

Expand Down Expand Up @@ -210,7 +211,8 @@ class TSocketEndpointListener final
private:
const ui32 UnixSocketBacklog;

std::unique_ptr<TEndpointPoller> EndpointPoller;
std::unique_ptr<NStorage::NServer::TEndpointPoller> EndpointPoller;
IClientStorageFactoryPtr ClientStorageFactory;

TLog Log;

Expand All @@ -228,11 +230,11 @@ class TSocketEndpointListener final
Stop();
}

void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) override
void SetClientStorageFactory(IClientStorageFactoryPtr factory) override
{
Y_ABORT_UNLESS(!EndpointPoller);
EndpointPoller = std::make_unique<TEndpointPoller>(
std::move(clientAcceptor));
ClientStorageFactory = std::move(factory);
EndpointPoller = std::make_unique<NStorage::NServer::TEndpointPoller>();
}

void Start() override
Expand Down Expand Up @@ -263,7 +265,8 @@ class TSocketEndpointListener final
UnixSocketBacklog,
false, // multiClient
NProto::SOURCE_FD_DATA_CHANNEL,
std::move(sessionService));
ClientStorageFactory->CreateClientStorage(
std::move(sessionService)));

return MakeFuture(std::move(error));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ISocketEndpointListener
: public IEndpointListener
, public IStartable
{
virtual void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) = 0;
virtual void SetClientStorageFactory(IClientStorageFactoryPtr factory) = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <cloud/blockstore/libs/diagnostics/server_stats.h>
#include <cloud/blockstore/libs/diagnostics/volume_stats.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/server/server.h>
#include <cloud/blockstore/libs/server/server_test.h>
#include <cloud/blockstore/libs/service/service.h>
Expand Down Expand Up @@ -38,30 +38,38 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

class TTestClientAcceptor final
: public IClientAcceptor
using NStorage::NServer::IClientStorage;
using NStorage::NServer::IClientStoragePtr;

class TTestClientStorage
: public IClientStorageFactory
, public IClientStorage
, public std::enable_shared_from_this<TTestClientStorage>
{
private:
TMutex Lock;
TMap<SOCKET, IBlockStorePtr> Sessions;
TSet<SOCKET> Sessions;

public:
void Accept(
IClientStoragePtr CreateClientStorage(IBlockStorePtr service) override
{
Y_UNUSED(service);
return shared_from_this();
}

void AddClient(
const TSocketHolder& socket,
IBlockStorePtr sessionService,
NProto::ERequestSource source) override
{
Y_UNUSED(source);

with_lock (Lock) {
auto [it, inserted] = Sessions.emplace(
socket,
std::move(sessionService));
auto [it, inserted] = Sessions.emplace(socket);
UNIT_ASSERT(inserted);
}
}

void Remove(const TSocketHolder& socket) override
void RemoveClient(const TSocketHolder& socket) override
{
with_lock (Lock) {
auto it = Sessions.find(socket);
Expand Down Expand Up @@ -267,7 +275,8 @@ TBootstrap CreateBootstrap(TOptions options)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
options.UnixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

NProto::TStartEndpointRequest request;
request.SetUnixSocketPath(options.UnixSocketPath);
Expand Down Expand Up @@ -331,9 +340,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TFsPath unixSocket(CreateGuidAsString() + ".sock");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -359,7 +368,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
};

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

{
auto future = listener->StopEndpoint(unixSocket.GetPath());
Expand All @@ -368,17 +377,17 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
}

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldHandleClientDisconnection)
{
auto logging = CreateLoggingService("console");
TFsPath unixSocket(CreateGuidAsString() + ".sock");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -397,7 +406,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
clientEndpoint->Start();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

clientEndpoint->Stop();
clientEndpoint.reset();
Expand All @@ -406,7 +415,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
client.reset();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldNotAcceptClientAfterServerStopped)
Expand All @@ -429,7 +438,8 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
unixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

endpointListener->Start();
server->Start();
Expand Down Expand Up @@ -1101,9 +1111,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TString unixSocket("./invalid/path/to/socket");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoints_grpc/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ PEERDIR(
cloud/blockstore/libs/endpoints
cloud/blockstore/libs/server
cloud/blockstore/libs/service
cloud/storage/core/libs/uds
)

END()
Expand Down
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.darwin-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.linux-aarch64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.linux-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.windows-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
37 changes: 0 additions & 37 deletions cloud/blockstore/libs/server/client_acceptor.cpp

This file was deleted.

Loading

0 comments on commit ad640f9

Please sign in to comment.