Skip to content

Commit

Permalink
NBS #179: use durable client to retry network errors in vhost-server …
Browse files Browse the repository at this point in the history
…rdma backend
  • Loading branch information
budevg committed Feb 12, 2024
1 parent 8dc993f commit 5b13bce
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 4 deletions.
129 changes: 125 additions & 4 deletions cloud/blockstore/vhost-server/backend_rdma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@

#include "backend.h"

#include <cloud/blockstore/libs/client/config.h>
#include <cloud/blockstore/libs/client/durable.h>
#include <cloud/blockstore/libs/common/device_path.h>
#include <cloud/blockstore/libs/diagnostics/request_stats.h>
#include <cloud/blockstore/libs/diagnostics/server_stats.h>
#include <cloud/blockstore/libs/diagnostics/volume_stats.h>
#include <cloud/blockstore/libs/rdma/impl/client.h>
#include <cloud/blockstore/libs/rdma/impl/verbs.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>
#include <cloud/blockstore/libs/service/storage.h>
#include <cloud/blockstore/libs/service/storage_provider.h>
#include <cloud/blockstore/libs/service_local/storage_rdma.h>
#include <cloud/blockstore/public/api/protos/volume.pb.h>
#include <cloud/contrib/vhost/include/vhost/server.h>
#include <cloud/storage/core/libs/common/scheduler.h>
#include <cloud/storage/core/libs/common/task_queue.h>
#include <cloud/storage/core/libs/common/timer.h>
#include <cloud/storage/core/libs/common/verify.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>
#include <cloud/storage/core/libs/diagnostics/monitoring.h>
Expand All @@ -22,19 +29,102 @@
namespace NCloud::NBlockStore::NVHostServer {

using namespace NCloud::NBlockStore;
using namespace NThreading;

namespace {

////////////////////////////////////////////////////////////////////////////////

constexpr ui32 REQUEST_TIMEOUT_MSEC = 86400000;

////////////////////////////////////////////////////////////////////////////////

class TStorageDataClient final
: public IBlockStore
{
private:
const IStoragePtr Storage;

public:
TStorageDataClient(IStoragePtr storage)
: Storage(std::move(storage))
{}

void Start() override
{}

void Stop() override
{}

TStorageBuffer AllocateBuffer(size_t bytesCount) override
{
return Storage->AllocateBuffer(bytesCount);
}

#define BLOCKSTORE_DONT_IMPLEMENT_METHOD(name, ...) \
TFuture<NProto::T##name##Response> name( \
TCallContextPtr callContext, \
std::shared_ptr<NProto::T##name##Request> request) override \
{ \
Y_UNUSED(callContext); \
Y_UNUSED(request); \
const auto& type = GetBlockStoreRequestName(EBlockStoreRequest::name); \
return MakeFuture<NProto::T##name##Response>(TErrorResponse( \
E_NOT_IMPLEMENTED, \
TStringBuilder() << "Unsupported request " << type.Quote())); \
} \

#define BLOCKSTORE_IMPLEMENT_METHOD(name, ...) \
TFuture<NProto::T##name##Response> name( \
TCallContextPtr callContext, \
std::shared_ptr<NProto::T##name##Request> request) override \
{ \
return Storage->name(std::move(callContext), std::move(request)); \
} \

BLOCKSTORE_GRPC_SERVICE(BLOCKSTORE_DONT_IMPLEMENT_METHOD)
BLOCKSTORE_LOCAL_SERVICE(BLOCKSTORE_IMPLEMENT_METHOD)

#undef BLOCKSTORE_IMPLEMENT_METHOD
#undef BLOCKSTORE_DONT_IMPLEMENT_METHOD
};

////////////////////////////////////////////////////////////////////////////////

class TRetryPolicy : public NClient::IRetryPolicy
{
private:
NClient::IRetryPolicyPtr Impl;

public:
TRetryPolicy(NClient::TClientAppConfigPtr config)
: Impl(NClient::CreateRetryPolicy(std::move(config)))
{}

NClient::TRetrySpec ShouldRetry(
NClient::TRetryState& state,
const NProto::TError& error) override
{
if (error.GetCode() == E_BS_INVALID_SESSION) {
return Impl->ShouldRetry(state, MakeError(E_REJECTED));
}

return Impl->ShouldRetry(state, error);
}
};

////////////////////////////////////////////////////////////////////////////////

class TRdmaBackend final: public IBackend
{
private:
const ILoggingServicePtr Logging;
TLog Log;
NRdma::IClientPtr RdmaClient;
IStorageProviderPtr StorageProvider;
IStoragePtr Storage;
IBlockStorePtr DataClient;
ISchedulerPtr Scheduler;
ITimerPtr Timer;
NProto::TVolume Volume;
TString ClientId;
ICompletionStatsPtr CompletionStats;
Expand Down Expand Up @@ -62,6 +152,7 @@ class TRdmaBackend final: public IBackend
struct vhd_io* io,
TCpuCycles startCycles,
bool isError);
IBlockStorePtr CreateDataClient(IStoragePtr storage);
};

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -77,6 +168,9 @@ vhd_bdev_info TRdmaBackend::Init(const TOptions& options)
{
STORAGE_INFO("Initializing RDMA backend");

Scheduler = CreateScheduler();
Timer = CreateWallClockTimer();

ClientId = options.ClientId;
ReadOnly = options.ReadOnly;

Expand Down Expand Up @@ -156,12 +250,37 @@ vhd_bdev_info TRdmaBackend::Init(const TOptions& options)
.num_queues = options.QueueCount, // Max count of virtio queues
.total_blocks = totalBytes / BlockSize,
.features = ReadOnly ? VHD_BDEV_F_READONLY : 0};

}

IBlockStorePtr TRdmaBackend::CreateDataClient(IStoragePtr storage)
{
NProto::TClientAppConfig config;
config.MutableClientConfig()->SetRequestTimeout(REQUEST_TIMEOUT_MSEC);

auto clientConfig =
std::make_shared<NClient::TClientAppConfig>(std::move(config));

auto retryPolicy = std::make_shared<TRetryPolicy>(clientConfig);

auto client = std::make_shared<TStorageDataClient>(std::move(storage));

return CreateDurableClient(
std::move(clientConfig),
std::move(client),
std::move(retryPolicy),
Logging,
Timer,
Scheduler,
CreateRequestStatsStub(),
CreateVolumeStatsStub());
}

void TRdmaBackend::Start()
{
STORAGE_INFO("Starting RDMA backend");

Scheduler->Start();
RdmaClient->Start();

auto accessMode = ReadOnly ? NProto::VOLUME_ACCESS_READ_ONLY
Expand All @@ -170,14 +289,16 @@ void TRdmaBackend::Start()
Volume,
ClientId,
accessMode);
Storage = future.GetValueSync();
auto storage = future.GetValueSync();
DataClient = CreateDataClient(std::move(storage));
}

void TRdmaBackend::Stop()
{
STORAGE_INFO("Stopping RDMA backend");

RdmaClient->Stop();
Scheduler->Stop();
}

void TRdmaBackend::ProcessQueue(
Expand Down Expand Up @@ -254,7 +375,7 @@ void TRdmaBackend::ProcessReadRequest(struct vhd_io* io, TCpuCycles startCycles)
request->GetBlocksCount(),
request->BlockSize);
auto future =
Storage->ReadBlocksLocal(std::move(callContext), std::move(request));
DataClient->ReadBlocksLocal(std::move(callContext), std::move(request));
future.Subscribe(
[this, io, requestId, startCycles](const auto& future)
{
Expand Down Expand Up @@ -296,7 +417,7 @@ void TRdmaBackend::ProcessWriteRequest(
request->BlocksCount,
request->BlockSize);
auto future =
Storage->WriteBlocksLocal(std::move(callContext), std::move(request));
DataClient->WriteBlocksLocal(std::move(callContext), std::move(request));
future.Subscribe(
[this, io, requestId, startCycles](const auto& future)
{
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/vhost-server/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ADDINCL(
)

PEERDIR(
cloud/blockstore/libs/client
cloud/blockstore/libs/rdma/impl
cloud/blockstore/libs/service_local

Expand Down

0 comments on commit 5b13bce

Please sign in to comment.