From 15d6bd5085a7710cd2f11b562b7109ec6d279289 Mon Sep 17 00:00:00 2001 From: Evgeny Budilovsky Date: Tue, 16 Jul 2024 11:25:39 +0300 Subject: [PATCH] NBSNEBIUS-258: Support Zero Copy for RDMA Data Path on Disk Agent (#1324) * NBSNEBIUS-258: Support Zero Copy for RDMA Data Path on Disk Agent The Disk Agent currently copies data buffers multiple times for READ/WRITE requests received using RDMA transport. For WRITE requests, the RDMA data buffer is first copied into the memory of TWriteBlocksRequest and then into a disk block-aligned buffer allocated by Storage. For READ requests, disk data is first read into a disk block-aligned buffer allocated by Storage and then copied into the TReadBlocksResponse message. This message is then serialized into the RDMA buffer. To avoid these expensive copies and maintain compatibility with older clients, we introduce the RDMA_PROTO_FLAG_RDATA flag, which signals the data layout relative to the allocated RDMA buffer. Previously, the data layout was: ``` buffer |--------------+-------+------+--------| | TProtoHeader | Proto | Data | unused | |--------------+-------+------+--------| ``` This layout allows the data offset in memory to be unaligned to 512/4096 bytes, even though the RDMA buffer is allocated in 4096-byte chunks. Libaio requires block-aligned memory buffers for writing to the underlying block device with O_DIRECT, necessitating a different data layout. With RDMA_PROTO_FLAG_RDATA, the data layout becomes: ``` |--------------+-------+--------+------| | TProtoHeader | Proto | unused | Data | |--------------+-------+--------+------| ``` Since the Data buffer size is a multiple of 512/4096 bytes (depending on the device block size) and the buffer is a multiple of 4096-byte chunks, the data offset in memory will be 512/4096 bytes aligned, allowing its use with libaio. --- cloud/blockstore/config/rdma.proto | 2 + .../libs/client_rdma/rdma_client.cpp | 3 + .../libs/daemon/common/bootstrap.cpp | 1 + .../socket_endpoint_listener.cpp | 8 +- .../libs/endpoints_rdma/rdma_server.cpp | 3 + .../endpoints_vhost/external_vhost_server.cpp | 11 ++ .../endpoints_vhost/external_vhost_server.h | 2 + .../external_vhost_server_ut.cpp | 54 +++++- cloud/blockstore/libs/rdma/iface/client.cpp | 2 + cloud/blockstore/libs/rdma/iface/client.h | 3 + cloud/blockstore/libs/rdma/iface/protobuf.cpp | 42 ++++- cloud/blockstore/libs/rdma/iface/protobuf.h | 9 + .../libs/rdma/iface/protobuf_ut.cpp | 65 ++++--- cloud/blockstore/libs/rdma/iface/protocol.h | 31 +++- cloud/blockstore/libs/rdma/iface/server.cpp | 1 + cloud/blockstore/libs/rdma/impl/client.cpp | 10 +- .../blockstore/libs/rdma_test/client_test.cpp | 4 + cloud/blockstore/libs/rdma_test/client_test.h | 5 + .../libs/rdma_test/server_test_async.cpp | 1 + cloud/blockstore/libs/service/storage.cpp | 118 +++++++++---- cloud/blockstore/libs/service/storage.h | 12 +- cloud/blockstore/libs/service/storage_ut.cpp | 159 +++++++++++++++--- .../libs/service_local/service_local.cpp | 8 +- .../libs/service_local/storage_rdma.cpp | 30 +++- .../libs/service_local/storage_rdma_ut.cpp | 5 + .../libs/service_local/storage_spdk_ut.cpp | 8 +- .../storage/disk_agent/disk_agent_state.cpp | 12 +- .../libs/storage/disk_agent/rdma_target.cpp | 78 +++++++-- .../part_nonrepl_rdma_actor.cpp | 7 + ...part_nonrepl_rdma_actor_checksumblocks.cpp | 1 + .../part_nonrepl_rdma_actor_writeblocks.cpp | 18 +- .../part_nonrepl_rdma_actor_zeroblocks.cpp | 1 + .../tools/testing/rdma-test/storage_rdma.cpp | 2 + .../tools/testing/rdma-test/target.cpp | 2 + .../blockstore/vhost-server/backend_rdma.cpp | 1 + cloud/blockstore/vhost-server/options.cpp | 4 + cloud/blockstore/vhost-server/options.h | 1 + 37 files changed, 585 insertions(+), 139 deletions(-) diff --git a/cloud/blockstore/config/rdma.proto b/cloud/blockstore/config/rdma.proto index 4107d65c25f..aa6541e7afb 100644 --- a/cloud/blockstore/config/rdma.proto +++ b/cloud/blockstore/config/rdma.proto @@ -26,6 +26,7 @@ message TRdmaServer uint32 MaxInflightBytes = 7; // per client uint64 AdaptiveWaitSleepDelay = 8; // in microseconds uint64 AdaptiveWaitSleepDuration = 9; // in microseconds + bool AlignedDataEnabled = 10; } message TRdmaClient @@ -36,6 +37,7 @@ message TRdmaClient EWaitMode WaitMode = 4; uint64 AdaptiveWaitSleepDelay = 5; // in microseconds uint64 AdaptiveWaitSleepDuration = 6; // in microseconds + bool AlignedDataEnabled = 7; } message TRdmaTarget diff --git a/cloud/blockstore/libs/client_rdma/rdma_client.cpp b/cloud/blockstore/libs/client_rdma/rdma_client.cpp index e5f5a83f3b5..608fa9bbe06 100644 --- a/cloud/blockstore/libs/client_rdma/rdma_client.cpp +++ b/cloud/blockstore/libs/client_rdma/rdma_client.cpp @@ -110,6 +110,7 @@ class TReadBlocksHandler final return Serializer->Serialize( buffer, TBlockStoreProtocol::ReadBlocksRequest, + 0, // flags *Request, TContIOVector(nullptr, 0)); } @@ -211,6 +212,7 @@ class TWriteBlocksHandler final return Serializer->Serialize( buffer, TBlockStoreProtocol::WriteBlocksRequest, + 0, // flags *Request, TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size())); } @@ -284,6 +286,7 @@ class TZeroBlocksHandler final return Serializer->Serialize( buffer, TBlockStoreProtocol::ZeroBlocksRequest, + 0, // flags *Request, TContIOVector(nullptr, 0)); } diff --git a/cloud/blockstore/libs/daemon/common/bootstrap.cpp b/cloud/blockstore/libs/daemon/common/bootstrap.cpp index aeb7e527aee..863c459e89a 100644 --- a/cloud/blockstore/libs/daemon/common/bootstrap.cpp +++ b/cloud/blockstore/libs/daemon/common/bootstrap.cpp @@ -420,6 +420,7 @@ void TBootstrapBase::Init() : FQDNHostName(), Configs->ServerConfig->GetSocketAccessMode(), Configs->ServerConfig->GetVhostServerTimeoutAfterParentExit(), + RdmaClient && RdmaClient->IsAlignedDataEnabled(), std::move(vhostEndpointListener)); STORAGE_INFO("VHOST External Vhost EndpointListener initialized"); diff --git a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp index 55707d18dc0..539b570a0fc 100644 --- a/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp +++ b/cloud/blockstore/libs/endpoints_grpc/socket_endpoint_listener.cpp @@ -153,7 +153,9 @@ class TEndpointService final Now(), std::move(callContext), std::move(request), - BlockSize); + BlockSize, + {} // no data buffer + ); } TFuture WriteBlocks( @@ -169,7 +171,9 @@ class TEndpointService final Now(), std::move(callContext), std::move(request), - BlockSize); + BlockSize, + {} // no data buffer + ); } TFuture ZeroBlocks( diff --git a/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp b/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp index aeec35eaed9..efb34994cbb 100644 --- a/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp +++ b/cloud/blockstore/libs/endpoints_rdma/rdma_server.cpp @@ -208,6 +208,7 @@ NProto::TError TRdmaEndpoint::HandleReadBlocksRequest( size_t responseBytes = Serializer->Serialize( out, TBlockStoreProtocol::ReadBlocksResponse, + 0, // flags response, TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size())); @@ -249,6 +250,7 @@ NProto::TError TRdmaEndpoint::HandleWriteBlocksRequest( size_t responseBytes = Serializer->Serialize( out, TBlockStoreProtocol::WriteBlocksResponse, + 0, // flags response, TContIOVector(nullptr, 0)); @@ -280,6 +282,7 @@ NProto::TError TRdmaEndpoint::HandleZeroBlocksRequest( size_t responseBytes = Serializer->Serialize( out, TBlockStoreProtocol::ZeroBlocksResponse, + 0, // flags response, TContIOVector(nullptr, 0)); diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp index 7c6f128ebb5..3f1a1cacc10 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp @@ -699,6 +699,7 @@ class TExternalVhostEndpointListener final const TExecutorPtr Executor; const TString LocalAgentId; const ui32 SocketAccessMode; + const bool IsAlignedDataEnabled; const IEndpointListenerPtr FallbackListener; const TExternalEndpointFactory EndpointFactory; const TDuration VhostServerTimeoutAfterParentExit; @@ -716,6 +717,7 @@ class TExternalVhostEndpointListener final TString localAgentId, ui32 socketAccessMode, TDuration vhostServerTimeoutAfterParentExit, + bool isAlignedDataEnabled, IEndpointListenerPtr fallbackListener, TExternalEndpointFactory endpointFactory) : Logging {std::move(logging)} @@ -723,6 +725,7 @@ class TExternalVhostEndpointListener final , Executor {std::move(executor)} , LocalAgentId {std::move(localAgentId)} , SocketAccessMode {socketAccessMode} + , IsAlignedDataEnabled(isAlignedDataEnabled) , FallbackListener {std::move(fallbackListener)} , EndpointFactory {std::move(endpointFactory)} , VhostServerTimeoutAfterParentExit{vhostServerTimeoutAfterParentExit} @@ -1011,6 +1014,10 @@ class TExternalVhostEndpointListener final args.emplace_back("--block-size"); args.emplace_back(ToString(volume.GetBlockSize())); + + if (IsAlignedDataEnabled) { + args.emplace_back("--rdma-aligned-data"); + } } for (const auto& device: volume.GetDevices()) { @@ -1161,6 +1168,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener( TString localAgentId, ui32 socketAccessMode, TDuration vhostServerTimeoutAfterParentExit, + bool isAlignedDataEnabled, IEndpointListenerPtr fallbackListener) { auto defaultFactory = [=] ( @@ -1191,6 +1199,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener( std::move(localAgentId), socketAccessMode, vhostServerTimeoutAfterParentExit, + isAlignedDataEnabled, std::move(fallbackListener), std::move(defaultFactory)); } @@ -1202,6 +1211,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener( TString localAgentId, ui32 socketAccessMode, TDuration vhostServerTimeoutAfterParentExit, + bool isAlignedDataEnabled, IEndpointListenerPtr fallbackListener, TExternalEndpointFactory factory) { @@ -1212,6 +1222,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener( std::move(localAgentId), socketAccessMode, vhostServerTimeoutAfterParentExit, + isAlignedDataEnabled, std::move(fallbackListener), std::move(factory)); } diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.h b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.h index 520c5b83c46..4d35c8baf56 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.h +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.h @@ -41,6 +41,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener( TString localAgentId, ui32 socketAccessMode, TDuration vhostServerTimeoutAfterParentExit, + bool isAlignedDataEnabled, IEndpointListenerPtr fallbackListener); IEndpointListenerPtr CreateExternalVhostEndpointListener( @@ -50,6 +51,7 @@ IEndpointListenerPtr CreateExternalVhostEndpointListener( TString localAgentId, ui32 socketAccessMode, TDuration vhostServerTimeoutAfterParentExit, + bool isAlignedDataEnabled, IEndpointListenerPtr fallbackListener, TExternalEndpointFactory factory); diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp index 9332eaa9a33..3e351a15436 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp @@ -255,17 +255,24 @@ struct TFixture THistory History; - IEndpointListenerPtr Listener = CreateExternalVhostEndpointListener( - Logging, - ServerStats, - Executor, - LocalAgentId, - S_IRGRP | S_IWGRP | S_IRUSR | S_IWUSR, - TDuration::Seconds(30), - CreateFallbackListener(), - CreateExternalEndpointFactory()); + IEndpointListenerPtr Listener = + CreateEndpointListener(false); // no rdma aligned data public: + IEndpointListenerPtr CreateEndpointListener(bool isAlignedDataEnabled) + { + return CreateExternalVhostEndpointListener( + Logging, + ServerStats, + Executor, + LocalAgentId, + S_IRGRP | S_IWGRP | S_IRUSR | S_IWUSR, + TDuration::Seconds(30), + isAlignedDataEnabled, + CreateFallbackListener(), + CreateExternalEndpointFactory()); + } + NProto::TStartEndpointRequest CreateDefaultStartEndpointRequest() { NProto::TStartEndpointRequest request; @@ -366,6 +373,11 @@ TString GetArg(const TVector& args, TStringBuf name) return *std::next(it); } +bool HasArg(const TVector& args, TStringBuf name) +{ + return Find(args, name) != args.end(); +} + TVector GetArgN(const TVector& args, TStringBuf name) { TVector values; @@ -564,6 +576,30 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest) UNIT_ASSERT_VALUES_EQUAL(1, History.size()); auto* stop = std::get_if(&History[0]); UNIT_ASSERT_C(stop, "actual entry: " << History[0].index()); + + History.clear(); + } + + { + auto alignedDataListener = CreateEndpointListener(true); + + auto request = CreateDefaultStartEndpointRequest(); + + auto error = alignedDataListener + ->StartEndpoint(request, FastPathVolume, Session) + .GetValueSync(); + UNIT_ASSERT_C(!HasError(error), error); + + UNIT_ASSERT_VALUES_EQUAL(3, History.size()); + + auto* create = std::get_if(&History[0]); + UNIT_ASSERT_C(create, "actual entry: " << History[0].index()); + + UNIT_ASSERT_C( + HasArg(create->CmdArgs, "--rdma-aligned-data"), + "missing --rdma-aligned-data arg"); + + History.clear(); } } diff --git a/cloud/blockstore/libs/rdma/iface/client.cpp b/cloud/blockstore/libs/rdma/iface/client.cpp index bd6fd1cd89a..f392519b378 100644 --- a/cloud/blockstore/libs/rdma/iface/client.cpp +++ b/cloud/blockstore/libs/rdma/iface/client.cpp @@ -42,6 +42,7 @@ TClientConfig::TClientConfig(const NProto::TRdmaClient& config) SET(PollerThreads); SET(AdaptiveWaitSleepDelay, TDuration::MicroSeconds); SET(AdaptiveWaitSleepDuration, TDuration::MicroSeconds); + SET(AlignedDataEnabled); } #undef SET @@ -65,6 +66,7 @@ void TClientConfig::DumpHtml(IOutputStream& out) const ENTRY(MaxResponseDelay, MaxResponseDelay.ToString()); ENTRY(AdaptiveWaitSleepDelay, AdaptiveWaitSleepDelay.ToString()); ENTRY(AdaptiveWaitSleepDuration, AdaptiveWaitSleepDuration.ToString()); + ENTRY(AlignedDataEnabled, AlignedDataEnabled); } } } diff --git a/cloud/blockstore/libs/rdma/iface/client.h b/cloud/blockstore/libs/rdma/iface/client.h index 14f07c65291..6d12e899de0 100644 --- a/cloud/blockstore/libs/rdma/iface/client.h +++ b/cloud/blockstore/libs/rdma/iface/client.h @@ -29,6 +29,7 @@ struct TClientConfig TDuration MaxResponseDelay = TDuration::Seconds(60); TDuration AdaptiveWaitSleepDelay = TDuration::MilliSeconds(10); TDuration AdaptiveWaitSleepDuration = TDuration::MicroSeconds(100); + bool AlignedDataEnabled = false; TClientConfig() = default; @@ -113,6 +114,8 @@ struct IClient ui32 port) = 0; virtual void DumpHtml(IOutputStream& out) const = 0; + + virtual bool IsAlignedDataEnabled() const = 0; }; } // namespace NCloud::NBlockStore::NRdma diff --git a/cloud/blockstore/libs/rdma/iface/protobuf.cpp b/cloud/blockstore/libs/rdma/iface/protobuf.cpp index 02e2893227f..903d675c7a2 100644 --- a/cloud/blockstore/libs/rdma/iface/protobuf.cpp +++ b/cloud/blockstore/libs/rdma/iface/protobuf.cpp @@ -3,6 +3,7 @@ #include "protocol.h" #include +#include #include @@ -56,13 +57,38 @@ size_t TProtoMessageSerializer::MessageByteSize( size_t TProtoMessageSerializer::Serialize( TStringBuf buffer, ui32 msgId, + ui32 flags, const TProtoMessage& proto, TContIOVector data) +{ + size_t dataLen = data.Bytes(); + char* ptr = const_cast(buffer.data()); + ptr += Serialize(buffer, msgId, flags, proto, dataLen); + + if (HasProtoFlag(flags, RDMA_PROTO_FLAG_DATA_AT_THE_END)) { + ptr = const_cast(buffer.data()) + buffer.length() - dataLen; + } + + for (size_t i = 0; i < data.Count(); ++i) { + const auto& part = data.Parts()[i]; + memcpy(ptr, part.buf, part.len); + ptr += part.len; + } + + return ptr - buffer.data(); +} + +// static +size_t TProtoMessageSerializer::Serialize( + TStringBuf buffer, + ui32 msgId, + ui32 flags, + const TProtoMessage& proto, + size_t dataLen) { size_t protoLen = proto.ByteSizeLong(); Y_ENSURE(protoLen < RDMA_MAX_PROTO_LEN, "protobuf message is too big"); - size_t dataLen = data.Bytes(); Y_ENSURE(dataLen < RDMA_MAX_DATA_LEN, "attached data is too big"); size_t totalLen = NRdma::MessageByteSize(protoLen, dataLen); @@ -73,6 +99,7 @@ size_t TProtoMessageSerializer::Serialize( TProtoHeader header = { .MsgId = msgId, + .Flags = flags, .ProtoLen = (ui32)protoLen, .DataLen = (ui32)dataLen, }; @@ -85,12 +112,6 @@ size_t TProtoMessageSerializer::Serialize( Y_ENSURE(succeeded, "could not serialize protobuf message"); ptr += protoLen; - for (size_t i = 0; i < data.Count(); ++i) { - const auto& part = data.Parts()[i]; - memcpy(ptr, part.buf, part.len); - ptr += part.len; - } - return ptr - buffer.data(); } @@ -120,10 +141,15 @@ TProtoMessageSerializer::Parse(TStringBuf buffer) const Y_ENSURE_RETURN(succeeded, "could not parse protobuf message"); ptr += header.ProtoLen; + if (HasProtoFlag(header.Flags, RDMA_PROTO_FLAG_DATA_AT_THE_END)) { + ptr = + const_cast(buffer.data()) + buffer.length() - header.DataLen; + } + auto data = TStringBuf(ptr, header.DataLen); ptr += header.DataLen; - return TParseResult { header.MsgId, std::move(proto), data }; + return TParseResult { header.MsgId, header.Flags, std::move(proto), data }; } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/rdma/iface/protobuf.h b/cloud/blockstore/libs/rdma/iface/protobuf.h index d079a54263b..bb84c9a8b96 100644 --- a/cloud/blockstore/libs/rdma/iface/protobuf.h +++ b/cloud/blockstore/libs/rdma/iface/protobuf.h @@ -56,12 +56,21 @@ class TProtoMessageSerializer static size_t Serialize( TStringBuf buffer, ui32 msgId, + ui32 flags, const TProtoMessage& proto, TContIOVector data); + static size_t Serialize( + TStringBuf buffer, + ui32 msgId, + ui32 flags, + const TProtoMessage& proto, + size_t dataLen); + struct TParseResult { ui32 MsgId; + ui32 Flags; TProtoMessagePtr Proto; TStringBuf Data; }; diff --git a/cloud/blockstore/libs/rdma/iface/protobuf_ut.cpp b/cloud/blockstore/libs/rdma/iface/protobuf_ut.cpp index ad17284d909..480c95c3713 100644 --- a/cloud/blockstore/libs/rdma/iface/protobuf_ut.cpp +++ b/cloud/blockstore/libs/rdma/iface/protobuf_ut.cpp @@ -1,9 +1,13 @@ #include "protobuf.h" +#include "protocol.h" + #include #include +#include + #include namespace NCloud::NBlockStore::NRdma { @@ -48,28 +52,49 @@ Y_UNIT_TEST_SUITE(TProtoMessageSerializerTest) NProto::TReadBlocksRequest proto; proto.SetDiskId("test"); - auto data = TString(1024, 0); + auto data = TString(1024, 'A'); IOutputStream::TPart part(data.data(), data.length()); - size_t byteSize = serializer->MessageByteSize(proto, data.length()); - auto buffer = TString::Uninitialized(byteSize); - - size_t serializedBytes = serializer->Serialize( - buffer, - TBlockStoreProtocol::ReadBlocksRequest, - proto, - TContIOVector(&part, 1)); - UNIT_ASSERT_EQUAL(serializedBytes, byteSize); - - auto resultOrError = serializer->Parse(buffer); - UNIT_ASSERT(!HasError(resultOrError)); - - const auto& result = resultOrError.GetResult(); - UNIT_ASSERT_EQUAL(result.MsgId, TBlockStoreProtocol::ReadBlocksRequest); - UNIT_ASSERT_EQUAL(result.Data, data); - - const auto& proto2 = static_cast(*result.Proto); - UNIT_ASSERT_EQUAL(proto2.GetDiskId(), "test"); + size_t msgByteSize = serializer->MessageByteSize(proto, data.length()); + TVector testedFlags{0U, RDMA_PROTO_FLAG_DATA_AT_THE_END}; + TVector testedBufferSizes{ + msgByteSize, + msgByteSize + 1024, + msgByteSize + 4096}; + + for (auto bufferSize: testedBufferSizes) { + for (auto flag: testedFlags) { + ui32 flags = 0; + if (flag) { + SetProtoFlag(flags, flag); + } + + auto buffer = TString::Uninitialized(bufferSize); + + size_t serializedBytes = serializer->Serialize( + buffer, + TBlockStoreProtocol::ReadBlocksRequest, + flags, + proto, + TContIOVector(&part, 1)); + + if (HasProtoFlag(flags, RDMA_PROTO_FLAG_DATA_AT_THE_END)) { + UNIT_ASSERT_VALUES_EQUAL(bufferSize, serializedBytes); + } else { + UNIT_ASSERT_VALUES_EQUAL(msgByteSize, serializedBytes); + } + + auto resultOrError = serializer->Parse(buffer); + UNIT_ASSERT(!HasError(resultOrError)); + + const auto& result = resultOrError.GetResult(); + UNIT_ASSERT_EQUAL(TBlockStoreProtocol::ReadBlocksRequest, result.MsgId); + UNIT_ASSERT_EQUAL(data, result.Data); + + const auto& proto2 = static_cast(*result.Proto); + UNIT_ASSERT_EQUAL("test", proto2.GetDiskId()); + } + } } }; diff --git a/cloud/blockstore/libs/rdma/iface/protocol.h b/cloud/blockstore/libs/rdma/iface/protocol.h index d83a3233a1d..2c0315a886c 100644 --- a/cloud/blockstore/libs/rdma/iface/protocol.h +++ b/cloud/blockstore/libs/rdma/iface/protocol.h @@ -38,6 +38,28 @@ enum { RDMA_PROTO_FAIL = 4, }; +enum { + RDMA_PROTO_FLAG_NONE = 0, + // encode data at the end of the buffer like this: + // + // buffer + // |--------------+-------+--------+------| + // | TProtoHeader | Proto | unused | Data | + // |--------------+-------+--------+------| + // + // instead of: + // buffer + // |--------------+-------+------+--------| + // | TProtoHeader | Proto | Data | unused | + // |--------------+-------+------+--------| + // + // + // If buffer is allocated in chunks of 4k + // and data is allocated in multiple of block size (512, 4k) + // then data address is also aligned to (512, 4k) + RDMA_PROTO_FLAG_DATA_AT_THE_END = 1, +}; + //////////////////////////////////////////////////////////////////////////////// struct Y_PACKED TMessageHeader @@ -122,9 +144,9 @@ static_assert(sizeof(TRejectMessage) == RDMA_PRIVATE_SIZE); struct Y_PACKED TBufferDesc { - ui64 Address; - ui32 Length; - ui32 Key; + ui64 Address = 0; + ui32 Length = 0; + ui32 Key = 0; }; //////////////////////////////////////////////////////////////////////////////// @@ -166,7 +188,8 @@ static_assert(sizeof(TResponseMessage) == RDMA_RESPONSE_SIZE); struct Y_PACKED TProtoHeader { - ui32 MsgId : 16; + ui32 MsgId : 8; + ui32 Flags : 8; ui32 ProtoLen : 16; ui32 DataLen; }; diff --git a/cloud/blockstore/libs/rdma/iface/server.cpp b/cloud/blockstore/libs/rdma/iface/server.cpp index 8ee45e54563..a4a74307112 100644 --- a/cloud/blockstore/libs/rdma/iface/server.cpp +++ b/cloud/blockstore/libs/rdma/iface/server.cpp @@ -68,6 +68,7 @@ void TServerConfig::DumpHtml(IOutputStream& out) const ENTRY(MaxInflightBytes, MaxInflightBytes); ENTRY(AdaptiveWaitSleepDelay, AdaptiveWaitSleepDelay.ToString()); ENTRY(AdaptiveWaitSleepDuration, AdaptiveWaitSleepDuration.ToString()); + ENTRY(AlignedDataEnabled, true); } } } diff --git a/cloud/blockstore/libs/rdma/impl/client.cpp b/cloud/blockstore/libs/rdma/impl/client.cpp index 0b4c9df7157..ac5f3bcefa8 100644 --- a/cloud/blockstore/libs/rdma/impl/client.cpp +++ b/cloud/blockstore/libs/rdma/impl/client.cpp @@ -737,11 +737,11 @@ TResultOrError TClientEndpoint::AllocateRequest( req->RequestBuffer = TStringBuf { reinterpret_cast(req->InBuffer.Address), - requestBytes, + req->InBuffer.Length, }; req->ResponseBuffer = TStringBuf { reinterpret_cast(req->OutBuffer.Address), - responseBytes, + req->OutBuffer.Length, }; return TClientRequestPtr(std::move(req)); @@ -1583,6 +1583,7 @@ class TClient final TString host, ui32 port) noexcept override; void DumpHtml(IOutputStream& out) const override; + bool IsAlignedDataEnabled() const override; private: // called from external thread @@ -2069,6 +2070,11 @@ void TClient::DumpHtml(IOutputStream& out) const } } +bool TClient::IsAlignedDataEnabled() const +{ + return Config->AlignedDataEnabled; +} + } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/rdma_test/client_test.cpp b/cloud/blockstore/libs/rdma_test/client_test.cpp index a0acc96b5fd..739328450b3 100644 --- a/cloud/blockstore/libs/rdma_test/client_test.cpp +++ b/cloud/blockstore/libs/rdma_test/client_test.cpp @@ -131,6 +131,7 @@ struct TRdmaClientTest::TRdmaEndpointImpl responseBytes = serializer->Serialize( req->ResponseBuffer, TBlockStoreProtocol::ReadDeviceBlocksResponse, + 0, // flags response, TContIOVector( (IOutputStream::TPart*)sglist.begin(), @@ -168,6 +169,7 @@ struct TRdmaClientTest::TRdmaEndpointImpl responseBytes = serializer->Serialize( req->ResponseBuffer, TBlockStoreProtocol::WriteDeviceBlocksResponse, + 0, // flags response, TContIOVector(nullptr, 0)); @@ -198,6 +200,7 @@ struct TRdmaClientTest::TRdmaEndpointImpl responseBytes = serializer->Serialize( req->ResponseBuffer, TBlockStoreProtocol::ZeroDeviceBlocksResponse, + 0, // flags response, TContIOVector(nullptr, 0)); @@ -231,6 +234,7 @@ struct TRdmaClientTest::TRdmaEndpointImpl responseBytes = serializer->Serialize( req->ResponseBuffer, TBlockStoreProtocol::ChecksumDeviceBlocksResponse, + 0, // flags response, TContIOVector(nullptr, 0)); diff --git a/cloud/blockstore/libs/rdma_test/client_test.h b/cloud/blockstore/libs/rdma_test/client_test.h index 8ede1697186..fdd08447372 100644 --- a/cloud/blockstore/libs/rdma_test/client_test.h +++ b/cloud/blockstore/libs/rdma_test/client_test.h @@ -46,6 +46,11 @@ struct TRdmaClientTest: NRdma::IClient Y_UNUSED(out); } + bool IsAlignedDataEnabled() const override + { + return false; + } + void InjectErrors( NProto::TError allocationError, NProto::TError rdmaResponseError, diff --git a/cloud/blockstore/libs/rdma_test/server_test_async.cpp b/cloud/blockstore/libs/rdma_test/server_test_async.cpp index b0db5e991f4..08ed41e2842 100644 --- a/cloud/blockstore/libs/rdma_test/server_test_async.cpp +++ b/cloud/blockstore/libs/rdma_test/server_test_async.cpp @@ -141,6 +141,7 @@ class TRdmaAsyncTestEndpoint: public NRdma::IServerEndpoint const size_t serializedSize = Serializer->Serialize( wrapper->Serialized, messageType, + 0, // flags request, ioVector); UNIT_ASSERT(expectedSerializedSize >= serializedSize); diff --git a/cloud/blockstore/libs/service/storage.cpp b/cloud/blockstore/libs/service/storage.cpp index c855885cb27..8c4a25a6264 100644 --- a/cloud/blockstore/libs/service/storage.cpp +++ b/cloud/blockstore/libs/service/storage.cpp @@ -197,13 +197,15 @@ class TStorageAdapter::TImpl TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const; + ui32 requestBlockSize, + TStringBuf dataBuffer) const; TFuture WriteBlocks( TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const; + ui32 requestBlockSize, + TStringBuf dataBuffer) const; TFuture ZeroBlocks( TInstant now, @@ -225,6 +227,7 @@ class TStorageAdapter::TImpl ui32 VerifyRequestSize(const NProto::TIOVector& iov) const; ui32 VerifyRequestSize(ui32 blocksCount, ui32 blockSize) const; + ui32 VerifyRequestSize(ui32 bytesCount) const; template void CheckIOTimeouts(TInflightTracker& inflights, TInstant now); @@ -250,7 +253,8 @@ TFuture TStorageAdapter::TImpl::ReadBlocks( TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const + ui32 requestBlockSize, + TStringBuf dataBuffer) const { const auto bytesCount = VerifyRequestSize( request->GetBlocksCount(), @@ -281,28 +285,33 @@ TFuture TStorageAdapter::TImpl::ReadBlocks( auto response = std::make_shared(); - // We are trying to allocate memory for request using Storage. If the memory - // is not allocated, then we will read immediately to the buffers in the - // protobuf. - auto buffer = Storage->AllocateBuffer(bytesCount); - TSgList sgList; + TStorageBuffer buffer; - if (buffer) { - sgList = {{ buffer.get(), bytesCount }}; + if (dataBuffer) { + sgList = {{dataBuffer.data(), dataBuffer.size()}}; } else { - sgList = ResizeIOVector( - *response->MutableBlocks(), - request->GetBlocksCount(), - requestBlockSize); + // We are trying to allocate memory for request using Storage. If the memory + // is not allocated, then we will read immediately to the buffers in the + // protobuf. + buffer = Storage->AllocateBuffer(bytesCount); + + if (buffer) { + sgList = {{ buffer.get(), bytesCount }}; + } else { + sgList = ResizeIOVector( + *response->MutableBlocks(), + request->GetBlocksCount(), + requestBlockSize); + } + } if (Normalize) { - if (buffer || requestBlockSize != StorageBlockSize) { + if (dataBuffer || buffer || requestBlockSize != StorageBlockSize) { // not normalized yet - auto sgListOrError = SgListNormalize( - std::move(sgList), - StorageBlockSize); + auto sgListOrError = + SgListNormalize(std::move(sgList), StorageBlockSize); if (HasError(sgListOrError)) { return MakeFuture( @@ -330,6 +339,7 @@ TFuture TStorageAdapter::TImpl::ReadBlocks( promise, response = std::move(response), buffer = std::move(buffer), + dataBuffer, guardedSgList = std::move(guardedSgList), requestBlocksCount = request->GetBlocksCount(), requestBlockSize, @@ -367,8 +377,9 @@ TFuture TStorageAdapter::TImpl::ReadBlocks( Y_ABORT_UNLESS(bytesCopied == bytesCount); } } else { - if (optimizeNetworkTransfer == - NProto::EOptimizeNetworkTransfer::SKIP_VOID_BLOCKS) + if (dataBuffer.empty() && + optimizeNetworkTransfer == + NProto::EOptimizeNetworkTransfer::SKIP_VOID_BLOCKS) { // If we read the data directly to the final destination, // then we clean out the void buffers where the data @@ -388,11 +399,18 @@ TFuture TStorageAdapter::TImpl::WriteBlocks( TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const + ui32 requestBlockSize, + TStringBuf dataBuffer) const { VerifyBlockSize(requestBlockSize); - const auto bytesCount = VerifyRequestSize(request->GetBlocks()); + ui32 bytesCount = 0; + if (dataBuffer) { + bytesCount = VerifyRequestSize(dataBuffer.size()); + } else { + bytesCount = VerifyRequestSize(request->GetBlocks()); + } + const ui32 localBlocksCount = bytesCount / StorageBlockSize; const ui64 localStartIndex = requestBlockSize == StorageBlockSize ? request->GetStartIndex() @@ -407,21 +425,28 @@ TFuture TStorageAdapter::TImpl::WriteBlocks( localRequest->BlocksCount = localBlocksCount; localRequest->BlockSize = StorageBlockSize; - auto sgList = GetSgList(*request); - auto buffer = Storage->AllocateBuffer(bytesCount); + TStorageBuffer buffer; + TSgList sgList; + + if (dataBuffer) { + sgList = TSgList{{dataBuffer.data(), dataBuffer.size()}}; + } else { + sgList = GetSgList(*request); + buffer = Storage->AllocateBuffer(bytesCount); + + if (buffer) { + TSgList bufferSgList = {{ buffer.get(), bytesCount }}; + size_t bytesCopied = SgListCopy(sgList, bufferSgList); + Y_ABORT_UNLESS(bytesCopied == bytesCount); + sgList = std::move(bufferSgList); + } - if (buffer) { - TSgList bufferSgList = {{ buffer.get(), bytesCount }}; - size_t bytesCopied = SgListCopy(sgList, bufferSgList); - Y_ABORT_UNLESS(bytesCopied == bytesCount); - sgList = std::move(bufferSgList); } if (Normalize && sgList.size() != localBlocksCount) { // not normalized yet - auto sgListOrError = SgListNormalize( - std::move(sgList), - StorageBlockSize); + auto sgListOrError = + SgListNormalize(std::move(sgList), StorageBlockSize); if (HasError(sgListOrError)) { return MakeFuture( @@ -596,6 +621,23 @@ ui32 TStorageAdapter::TImpl::VerifyRequestSize(ui32 blocksCount, ui32 blockSize) return static_cast(bytesCount); } +ui32 TStorageAdapter::TImpl::VerifyRequestSize(ui32 bytesCount) const +{ + if (bytesCount == 0 || bytesCount % StorageBlockSize != 0) { + ythrow TServiceError(E_ARGUMENT) + << "buffer size (" << bytesCount << ") is not a multiple of storage block size" + << " (storage block size = " << StorageBlockSize << ")"; + } + + if (MaxRequestSize > 0 && bytesCount > MaxRequestSize) { + ythrow TServiceError(E_ARGUMENT) + << "invalid request size: " << bytesCount + << " (max request size = " << MaxRequestSize << ")"; + } + + return bytesCount; +} + ui32 TStorageAdapter::TImpl::VerifyRequestSize(const NProto::TIOVector& iov) const { ui64 bytesCount = 0; @@ -645,26 +687,30 @@ TFuture TStorageAdapter::ReadBlocks( TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const + ui32 requestBlockSize, + TStringBuf dataBuffer) const { return Impl->ReadBlocks( now, std::move(callContext), std::move(request), - requestBlockSize); + requestBlockSize, + dataBuffer); } TFuture TStorageAdapter::WriteBlocks( TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const + ui32 requestBlockSize, + TStringBuf dataBuffer) const { return Impl->WriteBlocks( now, std::move(callContext), std::move(request), - requestBlockSize); + requestBlockSize, + dataBuffer); } TFuture TStorageAdapter::ZeroBlocks( diff --git a/cloud/blockstore/libs/service/storage.h b/cloud/blockstore/libs/service/storage.h index 65cf1974d2d..9b026ef1f25 100644 --- a/cloud/blockstore/libs/service/storage.h +++ b/cloud/blockstore/libs/service/storage.h @@ -67,13 +67,21 @@ class TStorageAdapter TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const; + ui32 requestBlockSize, + TStringBuf dataBuffer // if non empty, + // response data is written into the buffer + // instead of TReadBlocksResponse + ) const; NThreading::TFuture WriteBlocks( TInstant now, TCallContextPtr callContext, std::shared_ptr request, - ui32 requestBlockSize) const; + ui32 requestBlockSize, + TStringBuf dataBuffer // if non empty, + // data is read from the buffer + // instead of TWriteBlocksRequest + ) const; NThreading::TFuture ZeroBlocks( TInstant now, diff --git a/cloud/blockstore/libs/service/storage_ut.cpp b/cloud/blockstore/libs/service/storage_ut.cpp index 7a4565d38f7..10b9ac250c6 100644 --- a/cloud/blockstore/libs/service/storage_ut.cpp +++ b/cloud/blockstore/libs/service/storage_ut.cpp @@ -52,7 +52,7 @@ class TTickOnGetNowTimer: public ITimer Y_UNIT_TEST_SUITE(TStorageTest) { - void ShouldHandleNonNormalizedRequests(ui32 requestBlockSize) + void ShouldHandleNonNormalizedRequests(ui32 requestBlockSize, bool useDataBuffer) { auto storage = std::make_shared(); storage->WriteBlocksLocalHandler = [] (auto ctx, auto request) { @@ -81,17 +81,27 @@ Y_UNIT_TEST_SUITE(TStorageTest) auto request = std::make_shared(); request->SetStartIndex(1000); - auto& iov = *request->MutableBlocks(); - auto& buffers = *iov.MutableBuffers(); - auto& buffer = *buffers.Add(); - buffer.ReserveAndResize(1_MB); - memset(const_cast(buffer.data()), 'X', buffer.size()); + TString data; + TStringBuf dataBuffer; + + if (useDataBuffer) { + data = TString(1_MB, 'X'); + dataBuffer = TStringBuf(data); + } else { + auto& iov = *request->MutableBlocks(); + auto& buffers = *iov.MutableBuffers(); + auto& buffer = *buffers.Add(); + buffer.ReserveAndResize(1_MB); + memset(const_cast(buffer.data()), 'X', buffer.size()); + } auto future = adapter->WriteBlocks( Now(), MakeIntrusive(), std::move(request), - requestBlockSize); + requestBlockSize, + dataBuffer + ); auto response = future.GetValue(TDuration::Seconds(5)); UNIT_ASSERT(!HasError(response)); @@ -99,15 +109,17 @@ Y_UNIT_TEST_SUITE(TStorageTest) Y_UNIT_TEST(ShouldHandleNonNormalizedRequests4K) { - ShouldHandleNonNormalizedRequests(4_KB); + ShouldHandleNonNormalizedRequests(4_KB, false); + ShouldHandleNonNormalizedRequests(4_KB, true); } Y_UNIT_TEST(ShouldHandleNonNormalizedRequests8K) { - ShouldHandleNonNormalizedRequests(8_KB); + ShouldHandleNonNormalizedRequests(8_KB, false); + ShouldHandleNonNormalizedRequests(8_KB, true); } - void ShouldNormalizeRequests(ui32 requestBlockSize) + void ShouldNormalizeRequests(ui32 requestBlockSize, bool useDataBuffer) { auto storage = std::make_shared(); storage->WriteBlocksLocalHandler = [] (auto ctx, auto request) { @@ -138,17 +150,27 @@ Y_UNIT_TEST_SUITE(TStorageTest) auto request = std::make_shared(); request->SetStartIndex(1000); - auto& iov = *request->MutableBlocks(); - auto& buffers = *iov.MutableBuffers(); - auto& buffer = *buffers.Add(); - buffer.ReserveAndResize(1_MB); - memset(const_cast(buffer.data()), 'X', buffer.size()); + TString data; + TStringBuf dataBuffer; + + if (!useDataBuffer) { + auto& iov = *request->MutableBlocks(); + auto& buffers = *iov.MutableBuffers(); + auto& buffer = *buffers.Add(); + buffer.ReserveAndResize(1_MB); + memset(const_cast(buffer.data()), 'X', buffer.size()); + } else { + data = TString(1_MB, 'X'); + dataBuffer = TStringBuf(data); + } auto future = adapter->WriteBlocks( Now(), MakeIntrusive(), std::move(request), - requestBlockSize); + requestBlockSize, + dataBuffer + ); auto response = future.GetValue(TDuration::Seconds(5)); UNIT_ASSERT(!HasError(response)); @@ -156,12 +178,14 @@ Y_UNIT_TEST_SUITE(TStorageTest) Y_UNIT_TEST(ShouldNormalizeRequests4K) { - ShouldNormalizeRequests(4_KB); + ShouldNormalizeRequests(4_KB, false); + ShouldNormalizeRequests(4_KB, true); } Y_UNIT_TEST(ShouldNormalizeRequests8K) { - ShouldNormalizeRequests(8_KB); + ShouldNormalizeRequests(8_KB, false); + ShouldNormalizeRequests(8_KB, true); } template @@ -233,7 +257,9 @@ Y_UNIT_TEST_SUITE(TStorageTest) now, MakeIntrusive(), std::move(request), - 4096); + 4096, + {} // no data buffer + ); }; DoShouldHandleTimedOutRequests(runRequest); } @@ -255,7 +281,9 @@ Y_UNIT_TEST_SUITE(TStorageTest) now, MakeIntrusive(), std::move(request), - 4096); + 4096, + {} // no data buffer + ); }; DoShouldHandleTimedOutRequests(runRequest); } @@ -325,7 +353,9 @@ Y_UNIT_TEST_SUITE(TStorageTest) now, MakeIntrusive(), std::move(request), - 4096); + 4096, + {} // no data buffer + ); // Assert request is not handled. bool responseReady = response.Wait(waitTimeout); @@ -344,7 +374,9 @@ Y_UNIT_TEST_SUITE(TStorageTest) now, MakeIntrusive(), std::move(request), - 4096); + 4096, + {} // no data buffer + ); // Assert request is not handled. bool responseReady = response.Wait(waitTimeout); @@ -449,7 +481,9 @@ Y_UNIT_TEST_SUITE(TStorageTest) Now(), MakeIntrusive(), std::move(request), - blockSize); + blockSize, + {} // no data buffer + ); response.Wait(); const auto& value = response.GetValue(); UNIT_ASSERT_EQUAL(S_OK, value.GetError().GetCode()); @@ -477,6 +511,85 @@ Y_UNIT_TEST_SUITE(TStorageTest) DoShouldOptimizeVoidBlocks(true, blockSize, true, true); } } + + void DoShouldReadCorrectData(bool normalize, bool useDataBuffer) + { + auto storage = std::make_shared(); + + auto firstBlock = TString(4096, 'A'); + auto secondBlock = TString(4096, 'B'); + + storage->ReadBlocksLocalHandler = + [&firstBlock, &secondBlock]( + auto ctx, + std::shared_ptr request) + { + Y_UNUSED(ctx); + + UNIT_ASSERT_EQUAL(request->GetBlocksCount(), 2); + + TString blocks = firstBlock + secondBlock; + auto guard = request->Sglist.Acquire(); + UNIT_ASSERT(guard); + SgListCopy( + TBlockDataRef{blocks.data(), blocks.Size()}, + guard.Get()); + + NProto::TReadBlocksLocalResponse r; + *r.MutableError() = MakeError(S_OK); + return MakeFuture<>(std::move(r)); + }; + + auto adapter = std::make_shared( + storage, + 4096, // storageBlockSize + normalize, + 32_MB); // maxRequestSize + + { + auto request = std::make_shared(); + request->SetBlocksCount(2); + + TString data(4096 * 2 , '\0'); + TStringBuf dataBuf; + + if (useDataBuffer) { + dataBuf = TStringBuf(data); + } + + auto response = adapter->ReadBlocks( + Now(), + MakeIntrusive(), + std::move(request), + 4096, // block size + dataBuf + ); + + response.Wait(); + const auto& value = response.GetValue(); + UNIT_ASSERT_EQUAL(S_OK, value.GetError().GetCode()); + + if (useDataBuffer) { + UNIT_ASSERT_EQUAL(dataBuf.SubString(0, 4096), firstBlock); + UNIT_ASSERT_EQUAL(dataBuf.SubString(4096, 4096), secondBlock); + } else { + const auto& blockBuffers = value.GetBlocks().GetBuffers(); + UNIT_ASSERT_EQUAL(blockBuffers.size(), 2); + UNIT_ASSERT_EQUAL(firstBlock, blockBuffers[0]); + UNIT_ASSERT_EQUAL(secondBlock, blockBuffers[1]); + } + + } + } + + Y_UNIT_TEST(ShouldReadCorrectData) + { + DoShouldReadCorrectData(false, true); + DoShouldReadCorrectData(false, false); + DoShouldReadCorrectData(true, true); + DoShouldReadCorrectData(true, false); + } + } } // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/service_local/service_local.cpp b/cloud/blockstore/libs/service_local/service_local.cpp index 971e4a270bb..335308f7300 100644 --- a/cloud/blockstore/libs/service_local/service_local.cpp +++ b/cloud/blockstore/libs/service_local/service_local.cpp @@ -670,7 +670,9 @@ TFuture TLocalService::ReadBlocks( Now(), std::move(ctx), std::move(request), - volume->Volume.GetBlockSize()); + volume->Volume.GetBlockSize(), + {} // no data buffer + ); }); } @@ -727,7 +729,9 @@ TFuture TLocalService::WriteBlocks( Now(), std::move(ctx), std::move(request), - volume->Volume.GetBlockSize()); + volume->Volume.GetBlockSize(), + {} // no data buffer + ); }); } diff --git a/cloud/blockstore/libs/service_local/storage_rdma.cpp b/cloud/blockstore/libs/service_local/storage_rdma.cpp index b51725410aa..97dd4ca7f1b 100644 --- a/cloud/blockstore/libs/service_local/storage_rdma.cpp +++ b/cloud/blockstore/libs/service_local/storage_rdma.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -85,11 +86,17 @@ class TReadBlocksHandler final return Response.GetFuture(); } - size_t PrepareRequest(TStringBuf buffer) + size_t PrepareRequest(TStringBuf buffer, bool isAlignedDataEnabled) { + ui32 flags = 0; + if (isAlignedDataEnabled) { + SetProtoFlag(flags, NRdma::RDMA_PROTO_FLAG_DATA_AT_THE_END); + } + return Serializer->Serialize( buffer, TBlockStoreProtocol::ReadDeviceBlocksRequest, + flags, Proto, TContIOVector(nullptr, 0)); } @@ -191,16 +198,22 @@ class TWriteBlocksHandler final return Response.GetFuture(); } - size_t PrepareRequest(TStringBuf buffer) + size_t PrepareRequest(TStringBuf buffer, bool isAlignedDataEnabled) { auto guard = Request->Sglist.Acquire(); Y_ENSURE(guard); const auto& sglist = guard.Get(); + ui32 flags = 0; + if (isAlignedDataEnabled) { + SetProtoFlag(flags, NRdma::RDMA_PROTO_FLAG_DATA_AT_THE_END); + } + return Serializer->Serialize( buffer, TBlockStoreProtocol::WriteDeviceBlocksRequest, + flags, Proto, TContIOVector((IOutputStream::TPart*)sglist.data(), sglist.size())); } @@ -278,11 +291,14 @@ class TZeroBlocksHandler final return Response.GetFuture(); } - size_t PrepareRequest(TStringBuf buffer) + size_t PrepareRequest(TStringBuf buffer, bool isAlignedDataEnabled) { + Y_UNUSED(isAlignedDataEnabled); + return Serializer->Serialize( buffer, TBlockStoreProtocol::ZeroDeviceBlocksRequest, + 0, // flags Proto, TContIOVector(nullptr, 0)); } @@ -324,6 +340,7 @@ class TRdmaStorage ITaskQueuePtr TaskQueue; NRdma::IClientEndpointPtr Endpoint; + bool IsAlignedDataEnabled = false; public: static std::shared_ptr Create( TString uuid, @@ -377,9 +394,10 @@ class TRdmaStorage void ReportIOError() override {} - void Init(NRdma::IClientEndpointPtr endpoint) + void Init(NRdma::IClientEndpointPtr endpoint, bool isAlignedDataEnabled) { Endpoint = std::move(endpoint); + IsAlignedDataEnabled = isAlignedDataEnabled; } private: @@ -423,7 +441,7 @@ class TRdmaStorage return MakeFuture(TErrorResponse(err)); } - handler->PrepareRequest(req->RequestBuffer); + handler->PrepareRequest(req->RequestBuffer, IsAlignedDataEnabled); auto response = handler->GetResponse(); req->Context = std::move(handler); Endpoint->SendRequest(std::move(req), std::move(callContext)); @@ -547,7 +565,7 @@ class TRdmaStorageProvider final auto endpoint = Client->StartEndpoint(ep.Host, ep.Port) .Subscribe([=] (const auto& future) { - storage->Init(future.GetValue()); + storage->Init(future.GetValue(), Client->IsAlignedDataEnabled()); }); endpoints.emplace_back(std::move(endpoint)); diff --git a/cloud/blockstore/libs/service_local/storage_rdma_ut.cpp b/cloud/blockstore/libs/service_local/storage_rdma_ut.cpp index 4c4920ed93e..90da8425f36 100644 --- a/cloud/blockstore/libs/service_local/storage_rdma_ut.cpp +++ b/cloud/blockstore/libs/service_local/storage_rdma_ut.cpp @@ -68,6 +68,11 @@ class TRdmaClientHelper: public NRdma::IClient { Y_UNUSED(out); } + + bool IsAlignedDataEnabled() const override + { + return false; + } }; } // namespace diff --git a/cloud/blockstore/libs/service_local/storage_spdk_ut.cpp b/cloud/blockstore/libs/service_local/storage_spdk_ut.cpp index 2387d395e48..7c8a8bb653a 100644 --- a/cloud/blockstore/libs/service_local/storage_spdk_ut.cpp +++ b/cloud/blockstore/libs/service_local/storage_spdk_ut.cpp @@ -670,7 +670,9 @@ Y_UNIT_TEST_SUITE(TSpdkStorageTest) Now(), MakeIntrusive(), std::move(request), - blockSize); + blockSize, + {} // no data buffer + ); const auto& response = future.GetValue(TDuration::Seconds(5)); UNIT_ASSERT(!HasError(response)); @@ -685,7 +687,9 @@ Y_UNIT_TEST_SUITE(TSpdkStorageTest) Now(), MakeIntrusive(), std::move(request), - blockSize); + blockSize, + {} // no data buffer + ); const auto& response = future.GetValue(TDuration::Seconds(5)); UNIT_ASSERT(!HasError(response)); diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp index 887cac3c841..18a3f040ace 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp @@ -547,7 +547,9 @@ TFuture TDiskAgentState::Read( now, MakeIntrusive(), std::move(readRequest), - request.GetBlockSize()); + request.GetBlockSize(), + {} // no data buffer + ); return result.Apply( [] (auto future) { @@ -610,7 +612,9 @@ TFuture TDiskAgentState::Write( now, MakeIntrusive(), std::move(writeRequest), - request.GetBlockSize()); + request.GetBlockSize(), + {} // no data buffer + ); return result.Apply( [] (const auto& future) { @@ -727,7 +731,9 @@ TFuture TDiskAgentState::Checksum( now, MakeIntrusive(), std::move(readRequest), - request.GetBlockSize()); + request.GetBlockSize(), + {} // no data buffer + ); return result.Apply( [] (auto future) { diff --git a/cloud/blockstore/libs/storage/disk_agent/rdma_target.cpp b/cloud/blockstore/libs/storage/disk_agent/rdma_target.cpp index f873793267e..64d32e246ea 100644 --- a/cloud/blockstore/libs/storage/disk_agent/rdma_target.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/rdma_target.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -49,6 +50,7 @@ struct TRequestDetails { void* Context = nullptr; TStringBuf Out; + TStringBuf DataBuffer; // if non empty, zero copy is possible TString DeviceUUID; TString ClientId; @@ -246,6 +248,9 @@ class TRequestHandler final const auto& request = resultOrError.GetResult(); + const bool isZeroCopyDataSupported = + HasProtoFlag(request.Flags, NRdma::RDMA_PROTO_FLAG_DATA_AT_THE_END); + switch (request.MsgId) { case TBlockStoreProtocol::ReadDeviceBlocksRequest: return HandleReadBlocksRequest( @@ -253,6 +258,7 @@ class TRequestHandler final std::move(callContext), static_cast( *request.Proto), + isZeroCopyDataSupported, request.Data, out); @@ -261,6 +267,7 @@ class TRequestHandler final context, std::move(callContext), static_cast(*request.Proto), + isZeroCopyDataSupported, request.Data, out); @@ -504,6 +511,7 @@ class TRequestHandler final void* context, TCallContextPtr callContext, NProto::TReadDeviceBlocksRequest& request, + bool isZeroCopyDataSupported, TStringBuf requestData, TStringBuf out) const { @@ -537,17 +545,25 @@ class TRequestHandler final req->SetStartIndex(request.GetStartIndex()); req->SetBlocksCount(request.GetBlocksCount()); + TStringBuf dataBuffer; + if (isZeroCopyDataSupported) { + dataBuffer = out; + dataBuffer.RSeek(request.GetBlocksCount() * request.GetBlockSize()); + } + auto future = device->ReadBlocks( Now(), std::move(callContext), std::move(req), - request.GetBlockSize()); + request.GetBlockSize(), + dataBuffer); SubscribeForResponse( std::move(future), TRequestDetails{ context, out, + dataBuffer, request.GetDeviceUUID(), request.GetHeaders().GetClientId()}, &TRequestHandler::HandleReadBlocksResponse); @@ -575,19 +591,34 @@ class TRequestHandler final << error.GetMessage() << " (" << error.GetCode() << ")"); } - TStackVec parts; - parts.reserve(blocks.BuffersSize()); + size_t bytes; + ui32 flags = 0; + + if (requestDetails.DataBuffer.size()) { + SetProtoFlag(flags, NRdma::RDMA_PROTO_FLAG_DATA_AT_THE_END); + NRdma::TProtoMessageSerializer::Serialize( + requestDetails.Out, + TBlockStoreProtocol::ReadDeviceBlocksResponse, + flags, + proto, + requestDetails.DataBuffer.size()); + bytes = requestDetails.Out.size(); + } else { + TStackVec parts; + parts.reserve(blocks.BuffersSize()); + + for (const auto& buffer: blocks.GetBuffers()) { + parts.emplace_back(TStringBuf(buffer)); + } - for (const auto& buffer: blocks.GetBuffers()) { - parts.emplace_back(TStringBuf(buffer)); + bytes = NRdma::TProtoMessageSerializer::Serialize( + requestDetails.Out, + TBlockStoreProtocol::ReadDeviceBlocksResponse, + flags, + proto, + TContIOVector(parts.data(), parts.size())); } - size_t bytes = NRdma::TProtoMessageSerializer::Serialize( - requestDetails.Out, - TBlockStoreProtocol::ReadDeviceBlocksResponse, - proto, - TContIOVector(parts.data(), parts.size())); - if (auto ep = Endpoint.lock()) { ep->SendResponse(requestDetails.Context, bytes); } @@ -597,6 +628,7 @@ class TRequestHandler final void* context, TCallContextPtr callContext, NProto::TWriteDeviceBlocksRequest& request, + bool isZeroCopyDataSupported, TStringBuf requestData, TStringBuf out) const { @@ -610,17 +642,23 @@ class TRequestHandler final NProto::VOLUME_ACCESS_READ_WRITE); auto req = std::make_shared(); + TStringBuf dataBuffer; req->SetStartIndex(request.GetStartIndex()); - req->MutableBlocks()->AddBuffers( - requestData.data(), - requestData.length()); + if (isZeroCopyDataSupported) { + dataBuffer = requestData; + } else { + req->MutableBlocks()->AddBuffers( + requestData.data(), + requestData.length()); + } const ui32 blockCount = requestData.length() / request.GetBlockSize(); TWriteRequestContinuationData continuationData{ {context, out, + dataBuffer, request.GetDeviceUUID(), request.GetHeaders().GetClientId(), request.GetVolumeRequestId(), @@ -666,7 +704,8 @@ class TRequestHandler final Now(), std::move(continuationData.ExecutionData.CallContext), std::move(continuationData.ExecutionData.Request), - continuationData.ExecutionData.BlockSize); + continuationData.ExecutionData.BlockSize, + continuationData.RequestDetails.DataBuffer); SubscribeForResponse( std::move(future), @@ -708,6 +747,7 @@ class TRequestHandler final size_t bytes = NRdma::TProtoMessageSerializer::Serialize( requestDetails.Out, TBlockStoreProtocol::WriteDeviceBlocksResponse, + 0, // flags proto, TContIOVector(nullptr, 0)); @@ -739,6 +779,7 @@ class TRequestHandler final TZeroRequestContinuationData continuationData{ {context, out, + {}, // no data buffer request.GetDeviceUUID(), request.GetHeaders().GetClientId(), request.GetVolumeRequestId(), @@ -827,6 +868,7 @@ class TRequestHandler final size_t bytes = NRdma::TProtoMessageSerializer::Serialize( requestDetails.Out, TBlockStoreProtocol::ZeroDeviceBlocksResponse, + 0, // flags proto, TContIOVector(nullptr, 0)); @@ -860,13 +902,16 @@ class TRequestHandler final Now(), std::move(callContext), std::move(req), - request.GetBlockSize()); + request.GetBlockSize(), + {} // no data buffer + ); SubscribeForResponse( std::move(future), TRequestDetails{ context, out, + {}, // no data buffer request.GetDeviceUUID(), request.GetHeaders().GetClientId()}, &TRequestHandler::HandleChecksumBlocksResponse); @@ -903,6 +948,7 @@ class TRequestHandler final size_t bytes = NRdma::TProtoMessageSerializer::Serialize( requestDetails.Out, TBlockStoreProtocol::ChecksumDeviceBlocksResponse, + 0, // flags proto, TContIOVector(nullptr, 0)); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp index 7ed6aa3edaa..d80386a39ba 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -279,9 +280,15 @@ NProto::TError TNonreplicatedPartitionRdmaActor::SendReadRequests( return err; } + ui32 flags = 0; + if (RdmaClient->IsAlignedDataEnabled()) { + SetProtoFlag(flags, NRdma::RDMA_PROTO_FLAG_DATA_AT_THE_END); + } + NRdma::TProtoMessageSerializer::Serialize( req->RequestBuffer, TBlockStoreProtocol::ReadDeviceBlocksRequest, + flags, deviceRequest, TContIOVector(nullptr, 0)); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp index b3ea0e57f7b..c0baaf5a71e 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_checksumblocks.cpp @@ -258,6 +258,7 @@ void TNonreplicatedPartitionRdmaActor::HandleChecksumBlocks( serializer->Serialize( req->RequestBuffer, TBlockStoreProtocol::ChecksumDeviceBlocksRequest, + 0, // flags deviceRequest, TContIOVector(nullptr, 0)); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp index d5f8fbd7bf7..3101318a0a4 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_writeblocks.cpp @@ -282,9 +282,15 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocks( TVector parts; builder.BuildNextRequest(&parts); + ui32 flags = 0; + if (RdmaClient->IsAlignedDataEnabled()) { + SetProtoFlag(flags, NRdma::RDMA_PROTO_FLAG_DATA_AT_THE_END); + } + NRdma::TProtoMessageSerializer::Serialize( req->RequestBuffer, TBlockStoreProtocol::WriteDeviceBlocksRequest, + flags, deviceRequest, TContIOVector(parts.data(), parts.size())); @@ -424,18 +430,22 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocksLocal( return; } + ui32 flags = 0; + if (RdmaClient->IsAlignedDataEnabled()) { + SetProtoFlag(flags, NRdma::RDMA_PROTO_FLAG_DATA_AT_THE_END); + } + NRdma::TProtoMessageSerializer::Serialize( req->RequestBuffer, TBlockStoreProtocol::WriteDeviceBlocksRequest, + flags, deviceRequest, // XXX (cast) TContIOVector( const_cast( reinterpret_cast( - sglist.begin() + blocks - )), - r.DeviceBlockRange.Size() - )); + sglist.begin() + blocks)), + r.DeviceBlockRange.Size())); blocks += r.DeviceBlockRange.Size(); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp index e80c8a265b6..005c991a0e7 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_actor_zeroblocks.cpp @@ -229,6 +229,7 @@ void TNonreplicatedPartitionRdmaActor::HandleZeroBlocks( serializer->Serialize( req->RequestBuffer, TBlockStoreProtocol::ZeroDeviceBlocksRequest, + 0, // flags deviceRequest, TContIOVector(nullptr, 0)); diff --git a/cloud/blockstore/tools/testing/rdma-test/storage_rdma.cpp b/cloud/blockstore/tools/testing/rdma-test/storage_rdma.cpp index 179f60b37fa..36ad5d1419d 100644 --- a/cloud/blockstore/tools/testing/rdma-test/storage_rdma.cpp +++ b/cloud/blockstore/tools/testing/rdma-test/storage_rdma.cpp @@ -85,6 +85,7 @@ class TReadBlocksHandler final Serializer->Serialize( req->RequestBuffer, TBlockStoreProtocol::ReadBlocksRequest, + 0, // flags *Request, TContIOVector(nullptr, 0)); @@ -176,6 +177,7 @@ class TWriteBlocksHandler final Serializer->Serialize( req->RequestBuffer, TBlockStoreProtocol::WriteBlocksRequest, + 0, // flags *Request, TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size())); diff --git a/cloud/blockstore/tools/testing/rdma-test/target.cpp b/cloud/blockstore/tools/testing/rdma-test/target.cpp index 2ec2c063a49..4551ed81dc2 100644 --- a/cloud/blockstore/tools/testing/rdma-test/target.cpp +++ b/cloud/blockstore/tools/testing/rdma-test/target.cpp @@ -160,6 +160,7 @@ class TRequestHandler final size_t responseBytes = Serializer->Serialize( out, TBlockStoreProtocol::ReadBlocksResponse, + 0, // flags response, TContIOVector((IOutputStream::TPart*)sglist.begin(), sglist.size())); @@ -199,6 +200,7 @@ class TRequestHandler final size_t responseBytes = Serializer->Serialize( out, TBlockStoreProtocol::WriteBlocksResponse, + 0, // flags response, TContIOVector(nullptr, 0)); diff --git a/cloud/blockstore/vhost-server/backend_rdma.cpp b/cloud/blockstore/vhost-server/backend_rdma.cpp index 8e95cc87a86..af184142df1 100644 --- a/cloud/blockstore/vhost-server/backend_rdma.cpp +++ b/cloud/blockstore/vhost-server/backend_rdma.cpp @@ -185,6 +185,7 @@ vhd_bdev_info TRdmaBackend::Init(const TOptions& options) auto rdmaClientConfig = std::make_shared(); rdmaClientConfig->QueueSize = options.RdmaClient.QueueSize; rdmaClientConfig->MaxBufferSize = options.RdmaClient.MaxBufferSize; + rdmaClientConfig->AlignedDataEnabled = options.RdmaClient.AlignedData; RdmaClient = NRdma::CreateClient( NRdma::NVerbs::CreateVerbs(), diff --git a/cloud/blockstore/vhost-server/options.cpp b/cloud/blockstore/vhost-server/options.cpp index 88367393c13..c6b234d2ac2 100644 --- a/cloud/blockstore/vhost-server/options.cpp +++ b/cloud/blockstore/vhost-server/options.cpp @@ -132,6 +132,10 @@ void TOptions::Parse(int argc, char** argv) [this](const auto& timeout) { WaitAfterParentExit = TDuration::Seconds(timeout); }); + opts.AddLongOption("rdma-aligned-data", "enable rdma aligned data") + .NoArgument() + .SetFlag(&RdmaClient.AlignedData); + TOptsParseResultException res(&opts, argc, argv); if (res.FindLongOptParseResult("verbose") && VerboseLevel.empty()) { diff --git a/cloud/blockstore/vhost-server/options.h b/cloud/blockstore/vhost-server/options.h index 060b76fdcb7..b406c053d8c 100644 --- a/cloud/blockstore/vhost-server/options.h +++ b/cloud/blockstore/vhost-server/options.h @@ -44,6 +44,7 @@ struct TOptions { ui32 QueueSize = 256; ui32 MaxBufferSize = 4_MB + 4_KB; + bool AlignedData = false; } RdmaClient; void Parse(int argc, char** argv);