Skip to content

Commit

Permalink
NBSNEBIUS-258: Support Zero Copy for RDMA Data Path on Disk Agent (#1324
Browse files Browse the repository at this point in the history
)

* NBSNEBIUS-258: Support Zero Copy for RDMA Data Path on Disk Agent

The Disk Agent currently copies data buffers multiple times for READ/WRITE
requests received using RDMA transport.

For WRITE requests, the RDMA data buffer is first copied into the memory of
TWriteBlocksRequest and then into a disk block-aligned buffer allocated by
Storage.

For READ requests, disk data is first read into a disk block-aligned buffer
allocated by Storage and then copied into the TReadBlocksResponse message. This
message is then serialized into the RDMA buffer.

To avoid these expensive copies and maintain compatibility with older clients,
we introduce the RDMA_PROTO_FLAG_RDATA flag, which signals the data layout
relative to the allocated RDMA buffer.

Previously, the data layout was:

```
             buffer
|--------------+-------+------+--------|
| TProtoHeader | Proto | Data | unused |
|--------------+-------+------+--------|
```

This layout allows the data offset in memory to be unaligned to 512/4096 bytes,
even though the RDMA buffer is allocated in 4096-byte chunks. Libaio requires
block-aligned memory buffers for writing to the underlying block device with
O_DIRECT, necessitating a different data layout.

With RDMA_PROTO_FLAG_RDATA, the data layout becomes:

```
|--------------+-------+--------+------|
| TProtoHeader | Proto | unused | Data |
|--------------+-------+--------+------|
```

Since the Data buffer size is a multiple of 512/4096 bytes (depending on the
device block size) and the buffer is a multiple of 4096-byte chunks, the data
offset in memory will be 512/4096 bytes aligned, allowing its use with libaio.
  • Loading branch information
budevg committed Jul 16, 2024
1 parent 97bc0cf commit 15d6bd5
Show file tree
Hide file tree
Showing 37 changed files with 585 additions and 139 deletions.
2 changes: 2 additions & 0 deletions cloud/blockstore/config/rdma.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ message TRdmaServer
uint32 MaxInflightBytes = 7; // per client
uint64 AdaptiveWaitSleepDelay = 8; // in microseconds
uint64 AdaptiveWaitSleepDuration = 9; // in microseconds
bool AlignedDataEnabled = 10;
}

message TRdmaClient
Expand All @@ -36,6 +37,7 @@ message TRdmaClient
EWaitMode WaitMode = 4;
uint64 AdaptiveWaitSleepDelay = 5; // in microseconds
uint64 AdaptiveWaitSleepDuration = 6; // in microseconds
bool AlignedDataEnabled = 7;
}

message TRdmaTarget
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/client_rdma/rdma_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class TReadBlocksHandler final
return Serializer->Serialize(
buffer,
TBlockStoreProtocol::ReadBlocksRequest,
0, // flags
*Request,
TContIOVector(nullptr, 0));
}
Expand Down Expand Up @@ -211,6 +212,7 @@ class TWriteBlocksHandler final
return Serializer->Serialize(
buffer,
TBlockStoreProtocol::WriteBlocksRequest,
0, // flags
*Request,
TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size()));
}
Expand Down Expand Up @@ -284,6 +286,7 @@ class TZeroBlocksHandler final
return Serializer->Serialize(
buffer,
TBlockStoreProtocol::ZeroBlocksRequest,
0, // flags
*Request,
TContIOVector(nullptr, 0));
}
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ void TBootstrapBase::Init()
: FQDNHostName(),
Configs->ServerConfig->GetSocketAccessMode(),
Configs->ServerConfig->GetVhostServerTimeoutAfterParentExit(),
RdmaClient && RdmaClient->IsAlignedDataEnabled(),
std::move(vhostEndpointListener));

STORAGE_INFO("VHOST External Vhost EndpointListener initialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ class TEndpointService final
Now(),
std::move(callContext),
std::move(request),
BlockSize);
BlockSize,
{} // no data buffer
);
}

TFuture<NProto::TWriteBlocksResponse> WriteBlocks(
Expand All @@ -169,7 +171,9 @@ class TEndpointService final
Now(),
std::move(callContext),
std::move(request),
BlockSize);
BlockSize,
{} // no data buffer
);
}

TFuture<NProto::TZeroBlocksResponse> ZeroBlocks(
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ NProto::TError TRdmaEndpoint::HandleReadBlocksRequest(
size_t responseBytes = Serializer->Serialize(
out,
TBlockStoreProtocol::ReadBlocksResponse,
0, // flags
response,
TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size()));

Expand Down Expand Up @@ -249,6 +250,7 @@ NProto::TError TRdmaEndpoint::HandleWriteBlocksRequest(
size_t responseBytes = Serializer->Serialize(
out,
TBlockStoreProtocol::WriteBlocksResponse,
0, // flags
response,
TContIOVector(nullptr, 0));

Expand Down Expand Up @@ -280,6 +282,7 @@ NProto::TError TRdmaEndpoint::HandleZeroBlocksRequest(
size_t responseBytes = Serializer->Serialize(
out,
TBlockStoreProtocol::ZeroBlocksResponse,
0, // flags
response,
TContIOVector(nullptr, 0));

Expand Down
11 changes: 11 additions & 0 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ class TExternalVhostEndpointListener final
const TExecutorPtr Executor;
const TString LocalAgentId;
const ui32 SocketAccessMode;
const bool IsAlignedDataEnabled;
const IEndpointListenerPtr FallbackListener;
const TExternalEndpointFactory EndpointFactory;
const TDuration VhostServerTimeoutAfterParentExit;
Expand All @@ -716,13 +717,15 @@ class TExternalVhostEndpointListener final
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener,
TExternalEndpointFactory endpointFactory)
: Logging {std::move(logging)}
, ServerStats {std::move(serverStats)}
, Executor {std::move(executor)}
, LocalAgentId {std::move(localAgentId)}
, SocketAccessMode {socketAccessMode}
, IsAlignedDataEnabled(isAlignedDataEnabled)
, FallbackListener {std::move(fallbackListener)}
, EndpointFactory {std::move(endpointFactory)}
, VhostServerTimeoutAfterParentExit{vhostServerTimeoutAfterParentExit}
Expand Down Expand Up @@ -1011,6 +1014,10 @@ class TExternalVhostEndpointListener final

args.emplace_back("--block-size");
args.emplace_back(ToString(volume.GetBlockSize()));

if (IsAlignedDataEnabled) {
args.emplace_back("--rdma-aligned-data");
}
}

for (const auto& device: volume.GetDevices()) {
Expand Down Expand Up @@ -1161,6 +1168,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener)
{
auto defaultFactory = [=] (
Expand Down Expand Up @@ -1191,6 +1199,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
std::move(localAgentId),
socketAccessMode,
vhostServerTimeoutAfterParentExit,
isAlignedDataEnabled,
std::move(fallbackListener),
std::move(defaultFactory));
}
Expand All @@ -1202,6 +1211,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener,
TExternalEndpointFactory factory)
{
Expand All @@ -1212,6 +1222,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
std::move(localAgentId),
socketAccessMode,
vhostServerTimeoutAfterParentExit,
isAlignedDataEnabled,
std::move(fallbackListener),
std::move(factory));
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener);

IEndpointListenerPtr CreateExternalVhostEndpointListener(
Expand All @@ -50,6 +51,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener,
TExternalEndpointFactory factory);

Expand Down
54 changes: 45 additions & 9 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,24 @@ struct TFixture

THistory History;

IEndpointListenerPtr Listener = CreateExternalVhostEndpointListener(
Logging,
ServerStats,
Executor,
LocalAgentId,
S_IRGRP | S_IWGRP | S_IRUSR | S_IWUSR,
TDuration::Seconds(30),
CreateFallbackListener(),
CreateExternalEndpointFactory());
IEndpointListenerPtr Listener =
CreateEndpointListener(false); // no rdma aligned data

public:
IEndpointListenerPtr CreateEndpointListener(bool isAlignedDataEnabled)
{
return CreateExternalVhostEndpointListener(
Logging,
ServerStats,
Executor,
LocalAgentId,
S_IRGRP | S_IWGRP | S_IRUSR | S_IWUSR,
TDuration::Seconds(30),
isAlignedDataEnabled,
CreateFallbackListener(),
CreateExternalEndpointFactory());
}

NProto::TStartEndpointRequest CreateDefaultStartEndpointRequest()
{
NProto::TStartEndpointRequest request;
Expand Down Expand Up @@ -366,6 +373,11 @@ TString GetArg(const TVector<TString>& args, TStringBuf name)
return *std::next(it);
}

bool HasArg(const TVector<TString>& args, TStringBuf name)
{
return Find(args, name) != args.end();
}

TVector<TString> GetArgN(const TVector<TString>& args, TStringBuf name)
{
TVector<TString> values;
Expand Down Expand Up @@ -564,6 +576,30 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest)
UNIT_ASSERT_VALUES_EQUAL(1, History.size());
auto* stop = std::get_if<TStopExternalEndpoint>(&History[0]);
UNIT_ASSERT_C(stop, "actual entry: " << History[0].index());

History.clear();
}

{
auto alignedDataListener = CreateEndpointListener(true);

auto request = CreateDefaultStartEndpointRequest();

auto error = alignedDataListener
->StartEndpoint(request, FastPathVolume, Session)
.GetValueSync();
UNIT_ASSERT_C(!HasError(error), error);

UNIT_ASSERT_VALUES_EQUAL(3, History.size());

auto* create = std::get_if<TCreateExternalEndpoint>(&History[0]);
UNIT_ASSERT_C(create, "actual entry: " << History[0].index());

UNIT_ASSERT_C(
HasArg(create->CmdArgs, "--rdma-aligned-data"),
"missing --rdma-aligned-data arg");

History.clear();
}
}

Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/rdma/iface/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ TClientConfig::TClientConfig(const NProto::TRdmaClient& config)
SET(PollerThreads);
SET(AdaptiveWaitSleepDelay, TDuration::MicroSeconds);
SET(AdaptiveWaitSleepDuration, TDuration::MicroSeconds);
SET(AlignedDataEnabled);
}

#undef SET
Expand All @@ -65,6 +66,7 @@ void TClientConfig::DumpHtml(IOutputStream& out) const
ENTRY(MaxResponseDelay, MaxResponseDelay.ToString());
ENTRY(AdaptiveWaitSleepDelay, AdaptiveWaitSleepDelay.ToString());
ENTRY(AdaptiveWaitSleepDuration, AdaptiveWaitSleepDuration.ToString());
ENTRY(AlignedDataEnabled, AlignedDataEnabled);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/rdma/iface/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct TClientConfig
TDuration MaxResponseDelay = TDuration::Seconds(60);
TDuration AdaptiveWaitSleepDelay = TDuration::MilliSeconds(10);
TDuration AdaptiveWaitSleepDuration = TDuration::MicroSeconds(100);
bool AlignedDataEnabled = false;

TClientConfig() = default;

Expand Down Expand Up @@ -113,6 +114,8 @@ struct IClient
ui32 port) = 0;

virtual void DumpHtml(IOutputStream& out) const = 0;

virtual bool IsAlignedDataEnabled() const = 0;
};

} // namespace NCloud::NBlockStore::NRdma
42 changes: 34 additions & 8 deletions cloud/blockstore/libs/rdma/iface/protobuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "protocol.h"

#include <cloud/blockstore/libs/diagnostics/critical_events.h>
#include <cloud/storage/core/libs/common/helpers.h>

#include <google/protobuf/message.h>

Expand Down Expand Up @@ -56,13 +57,38 @@ size_t TProtoMessageSerializer::MessageByteSize(
size_t TProtoMessageSerializer::Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
TContIOVector data)
{
size_t dataLen = data.Bytes();
char* ptr = const_cast<char*>(buffer.data());
ptr += Serialize(buffer, msgId, flags, proto, dataLen);

if (HasProtoFlag(flags, RDMA_PROTO_FLAG_DATA_AT_THE_END)) {
ptr = const_cast<char*>(buffer.data()) + buffer.length() - dataLen;
}

for (size_t i = 0; i < data.Count(); ++i) {
const auto& part = data.Parts()[i];
memcpy(ptr, part.buf, part.len);
ptr += part.len;
}

return ptr - buffer.data();
}

// static
size_t TProtoMessageSerializer::Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
size_t dataLen)
{
size_t protoLen = proto.ByteSizeLong();
Y_ENSURE(protoLen < RDMA_MAX_PROTO_LEN, "protobuf message is too big");

size_t dataLen = data.Bytes();
Y_ENSURE(dataLen < RDMA_MAX_DATA_LEN, "attached data is too big");

size_t totalLen = NRdma::MessageByteSize(protoLen, dataLen);
Expand All @@ -73,6 +99,7 @@ size_t TProtoMessageSerializer::Serialize(

TProtoHeader header = {
.MsgId = msgId,
.Flags = flags,
.ProtoLen = (ui32)protoLen,
.DataLen = (ui32)dataLen,
};
Expand All @@ -85,12 +112,6 @@ size_t TProtoMessageSerializer::Serialize(
Y_ENSURE(succeeded, "could not serialize protobuf message");
ptr += protoLen;

for (size_t i = 0; i < data.Count(); ++i) {
const auto& part = data.Parts()[i];
memcpy(ptr, part.buf, part.len);
ptr += part.len;
}

return ptr - buffer.data();
}

Expand Down Expand Up @@ -120,10 +141,15 @@ TProtoMessageSerializer::Parse(TStringBuf buffer) const
Y_ENSURE_RETURN(succeeded, "could not parse protobuf message");
ptr += header.ProtoLen;

if (HasProtoFlag(header.Flags, RDMA_PROTO_FLAG_DATA_AT_THE_END)) {
ptr =
const_cast<char*>(buffer.data()) + buffer.length() - header.DataLen;
}

auto data = TStringBuf(ptr, header.DataLen);
ptr += header.DataLen;

return TParseResult { header.MsgId, std::move(proto), data };
return TParseResult { header.MsgId, header.Flags, std::move(proto), data };
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 9 additions & 0 deletions cloud/blockstore/libs/rdma/iface/protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,21 @@ class TProtoMessageSerializer
static size_t Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
TContIOVector data);

static size_t Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
size_t dataLen);

struct TParseResult
{
ui32 MsgId;
ui32 Flags;
TProtoMessagePtr Proto;
TStringBuf Data;
};
Expand Down
Loading

0 comments on commit 15d6bd5

Please sign in to comment.