Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBSNEBIUS-258: Support Zero Copy for RDMA Data Path on Disk Agent (#1324) #1617

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/blockstore/config/rdma.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ message TRdmaServer
uint32 MaxInflightBytes = 7; // per client
uint64 AdaptiveWaitSleepDelay = 8; // in microseconds
uint64 AdaptiveWaitSleepDuration = 9; // in microseconds
bool AlignedDataEnabled = 10;
}

message TRdmaClient
Expand All @@ -36,6 +37,7 @@ message TRdmaClient
EWaitMode WaitMode = 4;
uint64 AdaptiveWaitSleepDelay = 5; // in microseconds
uint64 AdaptiveWaitSleepDuration = 6; // in microseconds
bool AlignedDataEnabled = 7;
}

message TRdmaTarget
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/client_rdma/rdma_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class TReadBlocksHandler final
return Serializer->Serialize(
buffer,
TBlockStoreProtocol::ReadBlocksRequest,
0, // flags
*Request,
TContIOVector(nullptr, 0));
}
Expand Down Expand Up @@ -211,6 +212,7 @@ class TWriteBlocksHandler final
return Serializer->Serialize(
buffer,
TBlockStoreProtocol::WriteBlocksRequest,
0, // flags
*Request,
TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size()));
}
Expand Down Expand Up @@ -284,6 +286,7 @@ class TZeroBlocksHandler final
return Serializer->Serialize(
buffer,
TBlockStoreProtocol::ZeroBlocksRequest,
0, // flags
*Request,
TContIOVector(nullptr, 0));
}
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ void TBootstrapBase::Init()
: FQDNHostName(),
Configs->ServerConfig->GetSocketAccessMode(),
Configs->ServerConfig->GetVhostServerTimeoutAfterParentExit(),
RdmaClient && RdmaClient->IsAlignedDataEnabled(),
std::move(vhostEndpointListener));

STORAGE_INFO("VHOST External Vhost EndpointListener initialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ class TEndpointService final
Now(),
std::move(callContext),
std::move(request),
BlockSize);
BlockSize,
{} // no data buffer
);
}

TFuture<NProto::TWriteBlocksResponse> WriteBlocks(
Expand All @@ -169,7 +171,9 @@ class TEndpointService final
Now(),
std::move(callContext),
std::move(request),
BlockSize);
BlockSize,
{} // no data buffer
);
}

TFuture<NProto::TZeroBlocksResponse> ZeroBlocks(
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ NProto::TError TRdmaEndpoint::HandleReadBlocksRequest(
size_t responseBytes = Serializer->Serialize(
out,
TBlockStoreProtocol::ReadBlocksResponse,
0, // flags
response,
TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size()));

Expand Down Expand Up @@ -249,6 +250,7 @@ NProto::TError TRdmaEndpoint::HandleWriteBlocksRequest(
size_t responseBytes = Serializer->Serialize(
out,
TBlockStoreProtocol::WriteBlocksResponse,
0, // flags
response,
TContIOVector(nullptr, 0));

Expand Down Expand Up @@ -280,6 +282,7 @@ NProto::TError TRdmaEndpoint::HandleZeroBlocksRequest(
size_t responseBytes = Serializer->Serialize(
out,
TBlockStoreProtocol::ZeroBlocksResponse,
0, // flags
response,
TContIOVector(nullptr, 0));

Expand Down
11 changes: 11 additions & 0 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ class TExternalVhostEndpointListener final
const TExecutorPtr Executor;
const TString LocalAgentId;
const ui32 SocketAccessMode;
const bool IsAlignedDataEnabled;
const IEndpointListenerPtr FallbackListener;
const TExternalEndpointFactory EndpointFactory;
const TDuration VhostServerTimeoutAfterParentExit;
Expand All @@ -716,13 +717,15 @@ class TExternalVhostEndpointListener final
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener,
TExternalEndpointFactory endpointFactory)
: Logging {std::move(logging)}
, ServerStats {std::move(serverStats)}
, Executor {std::move(executor)}
, LocalAgentId {std::move(localAgentId)}
, SocketAccessMode {socketAccessMode}
, IsAlignedDataEnabled(isAlignedDataEnabled)
, FallbackListener {std::move(fallbackListener)}
, EndpointFactory {std::move(endpointFactory)}
, VhostServerTimeoutAfterParentExit{vhostServerTimeoutAfterParentExit}
Expand Down Expand Up @@ -1011,6 +1014,10 @@ class TExternalVhostEndpointListener final

args.emplace_back("--block-size");
args.emplace_back(ToString(volume.GetBlockSize()));

if (IsAlignedDataEnabled) {
args.emplace_back("--rdma-aligned-data");
}
}

for (const auto& device: volume.GetDevices()) {
Expand Down Expand Up @@ -1161,6 +1168,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener)
{
auto defaultFactory = [=] (
Expand Down Expand Up @@ -1191,6 +1199,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
std::move(localAgentId),
socketAccessMode,
vhostServerTimeoutAfterParentExit,
isAlignedDataEnabled,
std::move(fallbackListener),
std::move(defaultFactory));
}
Expand All @@ -1202,6 +1211,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener,
TExternalEndpointFactory factory)
{
Expand All @@ -1212,6 +1222,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
std::move(localAgentId),
socketAccessMode,
vhostServerTimeoutAfterParentExit,
isAlignedDataEnabled,
std::move(fallbackListener),
std::move(factory));
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener);

IEndpointListenerPtr CreateExternalVhostEndpointListener(
Expand All @@ -50,6 +51,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener(
TString localAgentId,
ui32 socketAccessMode,
TDuration vhostServerTimeoutAfterParentExit,
bool isAlignedDataEnabled,
IEndpointListenerPtr fallbackListener,
TExternalEndpointFactory factory);

Expand Down
54 changes: 45 additions & 9 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,24 @@ struct TFixture

THistory History;

IEndpointListenerPtr Listener = CreateExternalVhostEndpointListener(
Logging,
ServerStats,
Executor,
LocalAgentId,
S_IRGRP | S_IWGRP | S_IRUSR | S_IWUSR,
TDuration::Seconds(30),
CreateFallbackListener(),
CreateExternalEndpointFactory());
IEndpointListenerPtr Listener =
CreateEndpointListener(false); // no rdma aligned data

public:
IEndpointListenerPtr CreateEndpointListener(bool isAlignedDataEnabled)
{
return CreateExternalVhostEndpointListener(
Logging,
ServerStats,
Executor,
LocalAgentId,
S_IRGRP | S_IWGRP | S_IRUSR | S_IWUSR,
TDuration::Seconds(30),
isAlignedDataEnabled,
CreateFallbackListener(),
CreateExternalEndpointFactory());
}

NProto::TStartEndpointRequest CreateDefaultStartEndpointRequest()
{
NProto::TStartEndpointRequest request;
Expand Down Expand Up @@ -366,6 +373,11 @@ TString GetArg(const TVector<TString>& args, TStringBuf name)
return *std::next(it);
}

bool HasArg(const TVector<TString>& args, TStringBuf name)
{
return Find(args, name) != args.end();
}

TVector<TString> GetArgN(const TVector<TString>& args, TStringBuf name)
{
TVector<TString> values;
Expand Down Expand Up @@ -564,6 +576,30 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest)
UNIT_ASSERT_VALUES_EQUAL(1, History.size());
auto* stop = std::get_if<TStopExternalEndpoint>(&History[0]);
UNIT_ASSERT_C(stop, "actual entry: " << History[0].index());

History.clear();
}

{
auto alignedDataListener = CreateEndpointListener(true);

auto request = CreateDefaultStartEndpointRequest();

auto error = alignedDataListener
->StartEndpoint(request, FastPathVolume, Session)
.GetValueSync();
UNIT_ASSERT_C(!HasError(error), error);

UNIT_ASSERT_VALUES_EQUAL(3, History.size());

auto* create = std::get_if<TCreateExternalEndpoint>(&History[0]);
UNIT_ASSERT_C(create, "actual entry: " << History[0].index());

UNIT_ASSERT_C(
HasArg(create->CmdArgs, "--rdma-aligned-data"),
"missing --rdma-aligned-data arg");

History.clear();
}
}

Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/rdma/iface/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ TClientConfig::TClientConfig(const NProto::TRdmaClient& config)
SET(PollerThreads);
SET(AdaptiveWaitSleepDelay, TDuration::MicroSeconds);
SET(AdaptiveWaitSleepDuration, TDuration::MicroSeconds);
SET(AlignedDataEnabled);
}

#undef SET
Expand All @@ -65,6 +66,7 @@ void TClientConfig::DumpHtml(IOutputStream& out) const
ENTRY(MaxResponseDelay, MaxResponseDelay.ToString());
ENTRY(AdaptiveWaitSleepDelay, AdaptiveWaitSleepDelay.ToString());
ENTRY(AdaptiveWaitSleepDuration, AdaptiveWaitSleepDuration.ToString());
ENTRY(AlignedDataEnabled, AlignedDataEnabled);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/rdma/iface/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct TClientConfig
TDuration MaxResponseDelay = TDuration::Seconds(60);
TDuration AdaptiveWaitSleepDelay = TDuration::MilliSeconds(10);
TDuration AdaptiveWaitSleepDuration = TDuration::MicroSeconds(100);
bool AlignedDataEnabled = false;

TClientConfig() = default;

Expand Down Expand Up @@ -113,6 +114,8 @@ struct IClient
ui32 port) = 0;

virtual void DumpHtml(IOutputStream& out) const = 0;

virtual bool IsAlignedDataEnabled() const = 0;
};

} // namespace NCloud::NBlockStore::NRdma
42 changes: 34 additions & 8 deletions cloud/blockstore/libs/rdma/iface/protobuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "protocol.h"

#include <cloud/blockstore/libs/diagnostics/critical_events.h>
#include <cloud/storage/core/libs/common/helpers.h>

#include <google/protobuf/message.h>

Expand Down Expand Up @@ -56,13 +57,38 @@ size_t TProtoMessageSerializer::MessageByteSize(
size_t TProtoMessageSerializer::Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
TContIOVector data)
{
size_t dataLen = data.Bytes();
char* ptr = const_cast<char*>(buffer.data());
ptr += Serialize(buffer, msgId, flags, proto, dataLen);

if (HasProtoFlag(flags, RDMA_PROTO_FLAG_DATA_AT_THE_END)) {
ptr = const_cast<char*>(buffer.data()) + buffer.length() - dataLen;
}

for (size_t i = 0; i < data.Count(); ++i) {
const auto& part = data.Parts()[i];
memcpy(ptr, part.buf, part.len);
ptr += part.len;
}

return ptr - buffer.data();
}

// static
size_t TProtoMessageSerializer::Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
size_t dataLen)
{
size_t protoLen = proto.ByteSizeLong();
Y_ENSURE(protoLen < RDMA_MAX_PROTO_LEN, "protobuf message is too big");

size_t dataLen = data.Bytes();
Y_ENSURE(dataLen < RDMA_MAX_DATA_LEN, "attached data is too big");

size_t totalLen = NRdma::MessageByteSize(protoLen, dataLen);
Expand All @@ -73,6 +99,7 @@ size_t TProtoMessageSerializer::Serialize(

TProtoHeader header = {
.MsgId = msgId,
.Flags = flags,
.ProtoLen = (ui32)protoLen,
.DataLen = (ui32)dataLen,
};
Expand All @@ -85,12 +112,6 @@ size_t TProtoMessageSerializer::Serialize(
Y_ENSURE(succeeded, "could not serialize protobuf message");
ptr += protoLen;

for (size_t i = 0; i < data.Count(); ++i) {
const auto& part = data.Parts()[i];
memcpy(ptr, part.buf, part.len);
ptr += part.len;
}

return ptr - buffer.data();
}

Expand Down Expand Up @@ -120,10 +141,15 @@ TProtoMessageSerializer::Parse(TStringBuf buffer) const
Y_ENSURE_RETURN(succeeded, "could not parse protobuf message");
ptr += header.ProtoLen;

if (HasProtoFlag(header.Flags, RDMA_PROTO_FLAG_DATA_AT_THE_END)) {
ptr =
const_cast<char*>(buffer.data()) + buffer.length() - header.DataLen;
}

auto data = TStringBuf(ptr, header.DataLen);
ptr += header.DataLen;

return TParseResult { header.MsgId, std::move(proto), data };
return TParseResult { header.MsgId, header.Flags, std::move(proto), data };
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 9 additions & 0 deletions cloud/blockstore/libs/rdma/iface/protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,21 @@ class TProtoMessageSerializer
static size_t Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
TContIOVector data);

static size_t Serialize(
TStringBuf buffer,
ui32 msgId,
ui32 flags,
const TProtoMessage& proto,
size_t dataLen);

struct TParseResult
{
ui32 MsgId;
ui32 Flags;
TProtoMessagePtr Proto;
TStringBuf Data;
};
Expand Down
Loading
Loading