Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge to stable-23-3 #340

Merged
merged 8 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ void TBootstrapBase::Init()

STORAGE_INFO("Server initialized");

GrpcEndpointListener->SetClientAcceptor(Server->GetClientAcceptor());
GrpcEndpointListener->SetClientStorageFactory(
Server->GetClientStorageFactory());

TVector<IIncompleteRequestProviderPtr> requestProviders = {
Server,
Expand Down
4 changes: 2 additions & 2 deletions cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <cloud/blockstore/libs/encryption/encryption_client.h>
#include <cloud/blockstore/libs/encryption/encryption_key.h>
#include <cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/service/service_test.h>
#include <cloud/blockstore/libs/service/storage_provider.h>

Expand Down Expand Up @@ -913,7 +913,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest)
TBootstrap bootstrap(CreateTestService(mountedVolumes));

auto grpcListener = CreateSocketEndpointListener(bootstrap.Logging, 16);
grpcListener->SetClientAcceptor(CreateClientAcceptorStub());
grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub());

auto manager = CreateEndpointManager(
bootstrap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ target_link_libraries(blockstore-libs-endpoints_grpc PUBLIC
blockstore-libs-endpoints
blockstore-libs-server
blockstore-libs-service
core-libs-uds
)
target_sources(blockstore-libs-endpoints_grpc PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
#include <cloud/blockstore/libs/client/session.h>
#include <cloud/blockstore/libs/common/iovector.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/endpoint_poller.h>
#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>
#include <cloud/blockstore/libs/service/storage.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>
#include <cloud/storage/core/libs/uds/endpoint_poller.h>

namespace NCloud::NBlockStore::NServer {

Expand Down Expand Up @@ -210,7 +211,8 @@ class TSocketEndpointListener final
private:
const ui32 UnixSocketBacklog;

std::unique_ptr<TEndpointPoller> EndpointPoller;
std::unique_ptr<NStorage::NServer::TEndpointPoller> EndpointPoller;
IClientStorageFactoryPtr ClientStorageFactory;

TLog Log;

Expand All @@ -228,11 +230,11 @@ class TSocketEndpointListener final
Stop();
}

void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) override
void SetClientStorageFactory(IClientStorageFactoryPtr factory) override
{
Y_ABORT_UNLESS(!EndpointPoller);
EndpointPoller = std::make_unique<TEndpointPoller>(
std::move(clientAcceptor));
ClientStorageFactory = std::move(factory);
EndpointPoller = std::make_unique<NStorage::NServer::TEndpointPoller>();
}

void Start() override
Expand Down Expand Up @@ -263,7 +265,8 @@ class TSocketEndpointListener final
UnixSocketBacklog,
false, // multiClient
NProto::SOURCE_FD_DATA_CHANNEL,
std::move(sessionService));
ClientStorageFactory->CreateClientStorage(
std::move(sessionService)));

return MakeFuture(std::move(error));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ISocketEndpointListener
: public IEndpointListener
, public IStartable
{
virtual void SetClientAcceptor(IClientAcceptorPtr clientAcceptor) = 0;
virtual void SetClientStorageFactory(IClientStorageFactoryPtr factory) = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <cloud/blockstore/libs/diagnostics/server_stats.h>
#include <cloud/blockstore/libs/diagnostics/volume_stats.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/server/client_acceptor.h>
#include <cloud/blockstore/libs/server/client_storage_factory.h>
#include <cloud/blockstore/libs/server/server.h>
#include <cloud/blockstore/libs/server/server_test.h>
#include <cloud/blockstore/libs/service/service.h>
Expand Down Expand Up @@ -38,30 +38,38 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

class TTestClientAcceptor final
: public IClientAcceptor
using NStorage::NServer::IClientStorage;
using NStorage::NServer::IClientStoragePtr;

class TTestClientStorage
: public IClientStorageFactory
, public IClientStorage
, public std::enable_shared_from_this<TTestClientStorage>
{
private:
TMutex Lock;
TMap<SOCKET, IBlockStorePtr> Sessions;
TSet<SOCKET> Sessions;

public:
void Accept(
IClientStoragePtr CreateClientStorage(IBlockStorePtr service) override
{
Y_UNUSED(service);
return shared_from_this();
}

void AddClient(
const TSocketHolder& socket,
IBlockStorePtr sessionService,
NProto::ERequestSource source) override
{
Y_UNUSED(source);

with_lock (Lock) {
auto [it, inserted] = Sessions.emplace(
socket,
std::move(sessionService));
auto [it, inserted] = Sessions.emplace(socket);
UNIT_ASSERT(inserted);
}
}

void Remove(const TSocketHolder& socket) override
void RemoveClient(const TSocketHolder& socket) override
{
with_lock (Lock) {
auto it = Sessions.find(socket);
Expand Down Expand Up @@ -267,7 +275,8 @@ TBootstrap CreateBootstrap(TOptions options)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
options.UnixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

NProto::TStartEndpointRequest request;
request.SetUnixSocketPath(options.UnixSocketPath);
Expand Down Expand Up @@ -331,9 +340,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TFsPath unixSocket(CreateGuidAsString() + ".sock");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -359,7 +368,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
};

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

{
auto future = listener->StopEndpoint(unixSocket.GetPath());
Expand All @@ -368,17 +377,17 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
}

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldHandleClientDisconnection)
{
auto logging = CreateLoggingService("console");
TFsPath unixSocket(CreateGuidAsString() + ".sock");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand All @@ -397,7 +406,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
clientEndpoint->Start();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 1);
UNIT_ASSERT(clientStorage->GetSessionCount() == 1);

clientEndpoint->Stop();
clientEndpoint.reset();
Expand All @@ -406,7 +415,7 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
client.reset();

Sleep(TDuration::MilliSeconds(100));
UNIT_ASSERT(clientAcceptor->GetSessionCount() == 0);
UNIT_ASSERT(clientStorage->GetSessionCount() == 0);
}

Y_UNIT_TEST(ShouldNotAcceptClientAfterServerStopped)
Expand All @@ -429,7 +438,8 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto endpointListener = CreateSocketEndpointListener(
testFactory.Logging,
unixSocketBacklog);
endpointListener->SetClientAcceptor(server->GetClientAcceptor());
endpointListener->SetClientStorageFactory(
server->GetClientStorageFactory());

endpointListener->Start();
server->Start();
Expand Down Expand Up @@ -1101,9 +1111,9 @@ Y_UNIT_TEST_SUITE(TSocketEndpointListenerTest)
auto logging = CreateLoggingService("console");
TString unixSocket("./invalid/path/to/socket");

auto clientAcceptor = std::make_shared<TTestClientAcceptor>();
auto clientStorage = std::make_shared<TTestClientStorage>();
auto listener = CreateSocketEndpointListener(logging, 16);
listener->SetClientAcceptor(clientAcceptor);
listener->SetClientStorageFactory(clientStorage);
listener->Start();
Y_DEFER {
listener->Stop();
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoints_grpc/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ PEERDIR(
cloud/blockstore/libs/endpoints
cloud/blockstore/libs/server
cloud/blockstore/libs/service
cloud/storage/core/libs/uds
)

END()
Expand Down
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.darwin-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.linux-aarch64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.linux-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
5 changes: 2 additions & 3 deletions cloud/blockstore/libs/server/CMakeLists.windows-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ target_link_libraries(blockstore-libs-server PUBLIC
cpp-monlib-service
monlib-service-pages
contrib-libs-grpc
core-libs-uds
)
target_sources(blockstore-libs-server PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_acceptor.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/client_storage_factory.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/config.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/endpoint_poller.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/server_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/server/socket_poller.cpp
)
37 changes: 0 additions & 37 deletions cloud/blockstore/libs/server/client_acceptor.cpp

This file was deleted.

Loading
Loading