diff --git a/cloud/blockstore/libs/service/CMakeLists.linux-x86_64.txt b/cloud/blockstore/libs/service/CMakeLists.linux-x86_64.txt index 3a1a5dd034..3c0c875943 100644 --- a/cloud/blockstore/libs/service/CMakeLists.linux-x86_64.txt +++ b/cloud/blockstore/libs/service/CMakeLists.linux-x86_64.txt @@ -36,4 +36,5 @@ target_sources(blockstore-libs-service PRIVATE ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/storage.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/storage_provider.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/storage_test.cpp + ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/unaligned_device_handler.cpp ) diff --git a/cloud/blockstore/libs/service/aligned_device_handler.cpp b/cloud/blockstore/libs/service/aligned_device_handler.cpp index 0ca47875eb..3f6afbe8eb 100644 --- a/cloud/blockstore/libs/service/aligned_device_handler.cpp +++ b/cloud/blockstore/libs/service/aligned_device_handler.cpp @@ -11,6 +11,16 @@ using namespace NThreading; namespace { +TErrorResponse CreateErrorAcquireResponse() +{ + return {E_CANCELLED, "failed to acquire sglist in DeviceHandler"}; +} + +TErrorResponse CreateRequestNotAlignedResponse() +{ + return {E_ARGUMENT, "Request is not aligned"}; +} + // Removes the first blockCount elements from the sgList. Returns these removed // items in TGuardedSgList. TGuardedSgList TakeHeadBlocks(TGuardedSgList& sgList, ui32 blockCount) @@ -29,21 +39,18 @@ TGuardedSgList TakeHeadBlocks(TGuardedSgList& sgList, ui32 blockCount) } // namespace -TResultOrError TryToNormalize( +NProto::TError TryToNormalize( TGuardedSgList& guardedSgList, - const TBlocksInfo& blocksInfo, - ui64 length, - ui32 blockSize) + TBlocksInfo& blocksInfo) { + const auto length = blocksInfo.BufferSize(); if (length == 0) { return MakeError(E_ARGUMENT, "Local request has zero length"); } auto guard = guardedSgList.Acquire(); if (!guard) { - return MakeError( - E_CANCELLED, - "failed to acquire sglist in DeviceHandler"); + return CreateErrorAcquireResponse(); } auto bufferSize = SgListGetSize(guard.Get()); @@ -51,45 +58,37 @@ TResultOrError TryToNormalize( return MakeError( E_ARGUMENT, TStringBuilder() - << "Invalid local request:" << " buffer size " << bufferSize + << "Invalid local request: buffer size " << bufferSize << " not equal to length " << length); } - if (blocksInfo.BeginOffset != 0 || blocksInfo.EndOffset != 0) { - return false; + if (!blocksInfo.IsAligned()) { + return MakeError(S_OK); } - for (const auto& buffer: guard.Get()) { - if (buffer.Size() % blockSize != 0) { - return false; - } + bool allBuffersAligned = AllOf( + guard.Get(), + [blockSize = blocksInfo.BlockSize](const auto& buffer) + { return buffer.Size() % blockSize == 0; }); + + if (!allBuffersAligned) { + blocksInfo.SgListAligned = false; + return MakeError(S_OK); } - auto sgListOrError = SgListNormalize(guard.Get(), blockSize); + auto sgListOrError = SgListNormalize(guard.Get(), blocksInfo.BlockSize); if (HasError(sgListOrError)) { return sgListOrError.GetError(); } guardedSgList.SetSgList(sgListOrError.ExtractResult()); - return true; -} - -//////////////////////////////////////////////////////////////////////////////// - -TStorageBuffer AllocateStorageBuffer(IStorage& storage, size_t bytesCount) -{ - auto buffer = storage.AllocateBuffer(bytesCount); - if (!buffer) { - buffer = std::shared_ptr( - new char[bytesCount], - std::default_delete()); - } - return buffer; + return MakeError(S_OK); } //////////////////////////////////////////////////////////////////////////////// TBlocksInfo::TBlocksInfo(ui64 from, ui64 length, ui32 blockSize) + : BlockSize(blockSize) { ui64 startIndex = from / blockSize; ui64 beginOffset = from - startIndex * blockSize; @@ -108,6 +107,25 @@ TBlocksInfo::TBlocksInfo(ui64 from, ui64 length, ui32 blockSize) EndOffset = endOffset; } +size_t TBlocksInfo::BufferSize() const +{ + return Range.Size() * BlockSize - BeginOffset - EndOffset; +} + +bool TBlocksInfo::IsAligned() const +{ + return SgListAligned && BeginOffset == 0 && EndOffset == 0; +} + +TBlocksInfo TBlocksInfo::MakeAligned() const +{ + TBlocksInfo result(*this); + result.BeginOffset = 0; + result.EndOffset = 0; + result.SgListAligned = true; + return result; +} + //////////////////////////////////////////////////////////////////////////////// TAlignedDeviceHandler::TAlignedDeviceHandler( @@ -131,16 +149,15 @@ TFuture TAlignedDeviceHandler::Read( const TString& checkpointId) { auto blocksInfo = TBlocksInfo(from, length, BlockSize); - - auto aligned = TryToNormalize(sgList, blocksInfo, length, BlockSize); - if (HasError(aligned)) { + auto normalizeError = TryToNormalize(sgList, blocksInfo); + if (HasError(normalizeError)) { return MakeFuture( - TErrorResponse(aligned.GetError())); + TErrorResponse(normalizeError)); } - if (!aligned.GetResult()) { + if (!blocksInfo.IsAligned()) { return MakeFuture( - TErrorResponse(E_ARGUMENT, "Request is not aligned")); + CreateRequestNotAlignedResponse()); } return ExecuteReadRequest( @@ -158,15 +175,15 @@ TFuture TAlignedDeviceHandler::Write( { auto blocksInfo = TBlocksInfo(from, length, BlockSize); - auto aligned = TryToNormalize(sgList, blocksInfo, length, BlockSize); - if (HasError(aligned)) { + auto normalizeError = TryToNormalize(sgList, blocksInfo); + if (HasError(normalizeError)) { return MakeFuture( - TErrorResponse(aligned.GetError())); + TErrorResponse(normalizeError)); } - if (!aligned.GetResult()) { + if (!blocksInfo.IsAligned()) { return MakeFuture( - TErrorResponse(E_ARGUMENT, "Request is not aligned")); + CreateRequestNotAlignedResponse()); } return ExecuteWriteRequest(std::move(ctx), blocksInfo, std::move(sgList)); @@ -181,20 +198,23 @@ TAlignedDeviceHandler::Zero(TCallContextPtr ctx, ui64 from, ui64 length) } auto blocksInfo = TBlocksInfo(from, length, BlockSize); - if (blocksInfo.BeginOffset != 0 || blocksInfo.EndOffset != 0) { + if (!blocksInfo.IsAligned()) { return MakeFuture( - TErrorResponse(E_ARGUMENT, "Request is not aligned")); + CreateRequestNotAlignedResponse()); } - return ExecuteZeroRequest( - std::move(ctx), - blocksInfo.Range.Start, - blocksInfo.Range.Size()); + return ExecuteZeroRequest(std::move(ctx), blocksInfo); } TStorageBuffer TAlignedDeviceHandler::AllocateBuffer(size_t bytesCount) { - return AllocateStorageBuffer(*Storage, bytesCount); + auto buffer = Storage->AllocateBuffer(bytesCount); + if (!buffer) { + buffer = std::shared_ptr( + new char[bytesCount], + std::default_delete()); + } + return buffer; } TFuture @@ -204,6 +224,8 @@ TAlignedDeviceHandler::ExecuteReadRequest( TGuardedSgList sgList, TString checkpointId) const { + Y_DEBUG_ABORT_UNLESS(blocksInfo.IsAligned()); + auto requestBlockCount = std::min(blocksInfo.Range.Size(), MaxBlockCount); @@ -226,9 +248,8 @@ TAlignedDeviceHandler::ExecuteReadRequest( // sub-request and leave the rest in original sgList. request->Sglist = TakeHeadBlocks(sgList, requestBlockCount); if (request->Sglist.Empty()) { - return MakeFuture(TErrorResponse( - E_CANCELLED, - "failed to acquire sglist in DeviceHandler")); + return MakeFuture( + CreateErrorAcquireResponse()); } auto result = Storage->ReadBlocksLocal(ctx, std::move(request)); @@ -268,6 +289,8 @@ TAlignedDeviceHandler::ExecuteWriteRequest( TBlocksInfo blocksInfo, TGuardedSgList sgList) const { + Y_DEBUG_ABORT_UNLESS(blocksInfo.IsAligned()); + auto requestBlockCount = std::min(blocksInfo.Range.Size(), MaxBlockCount); @@ -289,9 +312,8 @@ TAlignedDeviceHandler::ExecuteWriteRequest( // sub-request and leave the rest in original sgList. request->Sglist = TakeHeadBlocks(sgList, requestBlockCount); if (request->Sglist.Empty()) { - return MakeFuture(TErrorResponse( - E_CANCELLED, - "failed to acquire sglist in DeviceHandler")); + return MakeFuture( + CreateErrorAcquireResponse()); } auto result = Storage->WriteBlocksLocal(ctx, std::move(request)); @@ -325,31 +347,32 @@ TAlignedDeviceHandler::ExecuteWriteRequest( TFuture TAlignedDeviceHandler::ExecuteZeroRequest( TCallContextPtr ctx, - ui64 startIndex, - ui32 blockCount) const + TBlocksInfo blocksInfo) const { - auto requestBlockCount = std::min(blockCount, MaxBlockCount); + Y_DEBUG_ABORT_UNLESS(blocksInfo.IsAligned()); + + auto requestBlockCount = std::min(blocksInfo.Range.Size(), MaxBlockCount); auto request = std::make_shared(); request->MutableHeaders()->SetRequestId(ctx->RequestId); request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds()); request->MutableHeaders()->SetClientId(ClientId); - request->SetStartIndex(startIndex); + request->SetStartIndex(blocksInfo.Range.Start); request->SetBlocksCount(requestBlockCount); - if (requestBlockCount == blockCount) { + if (requestBlockCount == blocksInfo.Range.Size()) { // The request size is quite small. We do all work at once. return Storage->ZeroBlocks(std::move(ctx), std::move(request)); } auto result = Storage->ZeroBlocks(ctx, std::move(request)); + blocksInfo.Range.Start += requestBlockCount; + return result.Apply( [ctx = std::move(ctx), weakPtr = weak_from_this(), - startIndex = startIndex + requestBlockCount, - blocksCount = - blockCount - requestBlockCount](const auto& future) mutable + blocksInfo = blocksInfo](const auto& future) mutable { // Only part of the request was completed. Continue doing the // rest of the work @@ -360,10 +383,7 @@ TFuture TAlignedDeviceHandler::ExecuteZeroRequest( } if (auto self = weakPtr.lock()) { - return self->ExecuteZeroRequest( - std::move(ctx), - startIndex, - blocksCount); + return self->ExecuteZeroRequest(std::move(ctx), blocksInfo); } return MakeFuture( TErrorResponse(E_CANCELLED)); diff --git a/cloud/blockstore/libs/service/aligned_device_handler.h b/cloud/blockstore/libs/service/aligned_device_handler.h index 6ce367a4de..20feab0f3c 100644 --- a/cloud/blockstore/libs/service/aligned_device_handler.h +++ b/cloud/blockstore/libs/service/aligned_device_handler.h @@ -14,14 +14,35 @@ struct TBlocksInfo TBlocksInfo(ui64 from, ui64 length, ui32 blockSize); TBlocksInfo(const TBlocksInfo&) = default; + [[nodiscard]] size_t BufferSize() const; + + // The data may be misaligned for two reasons: if the start or end of the + // block do not correspond to the block boundaries, or if the client buffers + // are not a multiple of the block size. + [[nodiscard]] bool IsAligned() const; + + // Creates an aligned TBlocksInfo. + [[nodiscard]] TBlocksInfo MakeAligned() const; + TBlockRange64 Range; + // Offset relative to the beginning of the range. ui64 BeginOffset = 0; + // Offset relative to the ending of the range. ui64 EndOffset = 0; + const ui32 BlockSize = 0; + // The request also unaligned if the sglist buffer sizes are not multiples + // of the block size + bool SgListAligned = true; }; //////////////////////////////////////////////////////////////////////////////// -class TAlignedDeviceHandler +// The TAlignedDeviceHandler can only process requests that are aligned. If the +// size of a request exceeds the maximum size that the underlying layer can +// handle, the TAlignedDeviceHandler will break the request into smaller parts +// and execute them separately. If a request contains unaligned data, the +// E_ARGUMENT error is returned. +class TAlignedDeviceHandler final : public IDeviceHandler , public std::enable_shared_from_this { @@ -38,6 +59,7 @@ class TAlignedDeviceHandler ui32 blockSize, ui32 maxBlockCount); + // implements IDeviceHandler NThreading::TFuture Read( TCallContextPtr ctx, ui64 from, @@ -56,32 +78,37 @@ class TAlignedDeviceHandler TStorageBuffer AllocateBuffer(size_t bytesCount) override; -private: + // Performs a read. It can only be called for aligned data. NThreading::TFuture ExecuteReadRequest( TCallContextPtr ctx, TBlocksInfo blocksInfo, TGuardedSgList sgList, TString checkpointId) const; + // Performs a write. It can only be called for aligned data. NThreading::TFuture ExecuteWriteRequest( TCallContextPtr ctx, TBlocksInfo blocksInfo, TGuardedSgList sgList) const; + // Performs a zeroes. It can only be called for aligned data. NThreading::TFuture ExecuteZeroRequest( TCallContextPtr ctx, - ui64 startIndex, - ui32 blockCount) const; + TBlocksInfo blocksInfo) const; }; //////////////////////////////////////////////////////////////////////////////// -TStorageBuffer AllocateStorageBuffer(IStorage& storage, size_t bytesCount); - -TResultOrError TryToNormalize( +// Normalizes the SgList in guardedSgList. If the total size of the buffers does +// not match the request size, an error is returned. If it is not possible to +// normalize the number of buffers so that they correspond to the number of +// requested blocks and the size of each buffer is equal to the specified block +// size, the SgListAligned flag is set in the blocksInfo structure, but no error +// is returned. This indicates that the request is valid, but not aligned. +NProto::TError TryToNormalize( TGuardedSgList& guardedSgList, - const TBlocksInfo& blocksInfo, - ui64 length, - ui32 blockSize); + TBlocksInfo& blocksInfo); + +//////////////////////////////////////////////////////////////////////////////// } // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/service/device_handler.cpp b/cloud/blockstore/libs/service/device_handler.cpp index b3643d8dd8..2573a12f13 100644 --- a/cloud/blockstore/libs/service/device_handler.cpp +++ b/cloud/blockstore/libs/service/device_handler.cpp @@ -1,17 +1,7 @@ #include "device_handler.h" #include "aligned_device_handler.h" -#include "context.h" - -#include -#include -#include - -#include -#include -#include - -#include +#include "unaligned_device_handler.h" namespace NCloud::NBlockStore { @@ -21,699 +11,7 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -using TReadBlocksResponseFuture = TFuture; -using TWriteBlocksResponseFuture = TFuture; -using TZeroBlocksResponseFuture = TFuture; - -//////////////////////////////////////////////////////////////////////////////// - - -class TDeviceHandler final - : public IDeviceHandler - , public std::enable_shared_from_this -{ - class TModifyRequest; - using TModifyRequestPtr = std::shared_ptr; - - template - class TModifyRequestImpl; - using TWriteRequest = TModifyRequestImpl; - using TZeroRequest = TModifyRequestImpl; - -private: - static constexpr ui32 MaxUnalignedRequestSize = 32_MB; - - const IStoragePtr Storage; - const TString ClientId; - const ui32 BlockSize; - const ui32 ZeroBlocksCountLimit; - - TList AlignedRequests; - TList UnalignedRequests; - TAdaptiveLock RequestsLock; - -public: - TDeviceHandler( - IStoragePtr storage, - TString clientId, - ui32 blockSize, - ui32 zeroBlocksCountLimit); - - TReadBlocksResponseFuture Read( - TCallContextPtr ctx, - ui64 from, - ui64 length, - TGuardedSgList sgList, - const TString& checkpointId) override; - - TWriteBlocksResponseFuture Write( - TCallContextPtr ctx, - ui64 from, - ui64 length, - TGuardedSgList sgList) override; - - TZeroBlocksResponseFuture Zero( - TCallContextPtr ctx, - ui64 from, - ui64 length) override; - - TStorageBuffer AllocateBuffer(size_t bytesCount) override - { - return AllocateStorageBuffer(*Storage, bytesCount); - } - -private: - TReadBlocksResponseFuture ExecuteAlignedReadRequest( - TCallContextPtr ctx, - const TBlocksInfo& blocksInfo, - TGuardedSgList sgList, - const TString& checkpointId); - - TReadBlocksResponseFuture ExecuteUnalignedReadRequest( - TCallContextPtr ctx, - const TBlocksInfo& blocksInfo, - TGuardedSgList guardedSgList, - const TString& checkpointId); - - template - TFuture Modify( - TCallContextPtr ctx, - const TBlocksInfo& blocksInfo, - bool aligned, - TGuardedSgList sgList); - - template - TFuture ExecuteModifyRequest(std::shared_ptr request); - - void HandleExecutedModifyRequest(TModifyRequestPtr request); - - template - TFuture ExecuteAlignedModifyRequest(TRequest& request); - - TFuture ExecuteUnalignedModifyRequest( - TModifyRequestPtr request); - - static TFuture HandleRmwReadResponse( - IStoragePtr storage, - TModifyRequestPtr request, - TStorageBuffer buffer, - size_t bufferSize); - - template - static TFuture CreateResponseFuture( - const TFuture& future); - - static void RemoveRequest( - TList& requests, - const TModifyRequestPtr& request); - - static void PrepareRequests( - const TList& requests, - TVector& result); -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TDeviceHandler::TModifyRequest -{ -protected: - enum EStatus - { - Waiting, - InFlight, - Completed, - }; - -public: - const TString ClientId; - const ui32 BlockSize; - const TCallContextPtr CallContext; - const TBlocksInfo BlocksInfo; - const bool Aligned; - const TGuardedSgList SgList; - -protected: - std::atomic Status = Waiting; - TVector Dependencies; - -public: - TModifyRequest( - TString clientId, - ui32 blockSize, - TCallContextPtr callContext, - const TBlocksInfo& blocksInfo, - bool aligned, - TGuardedSgList sgList) - : ClientId(std::move(clientId)) - , BlockSize(blockSize) - , CallContext(std::move(callContext)) - , BlocksInfo(blocksInfo) - , Aligned(aligned) - , SgList(std::move(sgList)) - {} - - virtual ~TModifyRequest() = default; - - bool IsAligned() const - { - return Aligned; - } - - void AddDependencies(const TList& requests) - { - for (const auto& request: requests) { - if (request->Status.load() != Completed && - BlocksInfo.Range.Overlaps(request->BlocksInfo.Range)) - { - Dependencies.push_back(request); - } - } - } - - bool Prepare() - { - if (Status.load() != Waiting) { - return false; - } - - for (const auto& dependency: Dependencies) { - if (dependency->Status.load() != Completed) { - return false; - } - } - - Dependencies.clear(); - - Status.store(InFlight); - return true; - } - - virtual TFuture BaseExecute(const IStoragePtr& storage) = 0; - - virtual void Complete(const NProto::TError& error) = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -template -class TDeviceHandler::TModifyRequestImpl - : public TModifyRequest -{ -private: - TPromise Promise; - -public: - using TModifyRequest::TModifyRequest; - - TFuture BaseExecute(const IStoragePtr& storage) override - { - return Execute(storage).Apply([=] (const auto& f) { - return f.GetValue().GetError(); - }); - } - - void Complete(const NProto::TError& error) override - { - Complete(static_cast(TErrorResponse(error))); - } - - TFuture Execute(const IStoragePtr& storage); - - TFuture GetFuture() - { - Y_ABORT_UNLESS(!Promise.Initialized()); - Promise = NewPromise(); - return Promise.GetFuture(); - } - - void Complete(const TResponse& response) - { - Status.store(Completed); - - if (Promise.Initialized()) { - Promise.SetValue(response); - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -template <> -TZeroBlocksResponseFuture TDeviceHandler::TZeroRequest::Execute( - const IStoragePtr& storage) -{ - auto request = std::make_shared(); - request->MutableHeaders()->SetRequestId(CallContext->RequestId); - request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds()); - request->MutableHeaders()->SetClientId(ClientId); - request->SetStartIndex(BlocksInfo.Range.Start); - request->SetBlocksCount(BlocksInfo.Range.Size()); - - return storage->ZeroBlocks(CallContext, std::move(request)); -} - -template <> -TWriteBlocksResponseFuture TDeviceHandler::TWriteRequest::Execute( - const IStoragePtr& storage) -{ - auto request = std::make_shared(); - request->MutableHeaders()->SetRequestId(CallContext->RequestId); - request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds()); - request->MutableHeaders()->SetClientId(ClientId); - request->SetStartIndex(BlocksInfo.Range.Start); - request->BlocksCount = BlocksInfo.Range.Size(); - request->BlockSize = BlockSize; - request->Sglist = SgList; - - return storage->WriteBlocksLocal(CallContext, std::move(request)); -} - -//////////////////////////////////////////////////////////////////////////////// - -TDeviceHandler::TDeviceHandler( - IStoragePtr storage, - TString clientId, - ui32 blockSize, - ui32 zeroBlocksCountLimit) - : Storage(std::move(storage)) - , ClientId(std::move(clientId)) - , BlockSize(blockSize) - , ZeroBlocksCountLimit(zeroBlocksCountLimit) -{ - Y_ABORT_UNLESS(ZeroBlocksCountLimit > 0); -} - -TReadBlocksResponseFuture TDeviceHandler::Read( - TCallContextPtr ctx, - ui64 from, - ui64 length, - TGuardedSgList sgList, - const TString& checkpointId) -{ - auto blocksInfo = TBlocksInfo(from, length, BlockSize); - - auto aligned = TryToNormalize(sgList, blocksInfo, length, BlockSize); - if (HasError(aligned)) { - return MakeFuture( - TErrorResponse(aligned.GetError())); - } - - if (!aligned.GetResult()) { - return ExecuteUnalignedReadRequest( - std::move(ctx), - blocksInfo, - std::move(sgList), - checkpointId); - } - - return ExecuteAlignedReadRequest( - std::move(ctx), - blocksInfo, - std::move(sgList), - checkpointId); -} - -TWriteBlocksResponseFuture TDeviceHandler::Write( - TCallContextPtr ctx, - ui64 from, - ui64 length, - TGuardedSgList sgList) -{ - auto blocksInfo = TBlocksInfo(from, length, BlockSize); - - auto aligned = TryToNormalize(sgList, blocksInfo, length, BlockSize); - if (HasError(aligned)) { - return MakeFuture( - TErrorResponse(aligned.GetError())); - } - - if (!aligned.GetResult() && - blocksInfo.Range.Size() > MaxUnalignedRequestSize / BlockSize) - { - return MakeFuture( - TErrorResponse(E_ARGUMENT, TStringBuilder() - << "Unaligned write request is too big. BlockCount=" - << blocksInfo.Range.Size())); - } - - return Modify( - std::move(ctx), - blocksInfo, - aligned.GetResult(), - std::move(sgList)); -} - -TZeroBlocksResponseFuture TDeviceHandler::Zero( - TCallContextPtr ctx, - ui64 from, - ui64 length) -{ - if (length == 0) { - return MakeFuture( - TErrorResponse(E_ARGUMENT, "Local request has zero length")); - } - - ui64 startIndex = from / BlockSize; - ui64 beginOffset = from - startIndex * BlockSize; - ui64 lengthLimit = static_cast(ZeroBlocksCountLimit) * BlockSize - beginOffset; - - auto requestLength = std::min(length, lengthLimit); - auto blocksInfo = TBlocksInfo(from, requestLength, BlockSize); - bool aligned = (blocksInfo.BeginOffset == 0 && blocksInfo.EndOffset == 0); - - auto result = Modify( - ctx, - blocksInfo, - aligned, - TGuardedSgList({TBlockDataRef::CreateZeroBlock(requestLength)})); - - return result.Apply([=] (const auto& future) mutable { - auto response = future.GetValue(); - - if (length <= requestLength || HasError(response)) { - return MakeFuture(response); - } - - return Zero(std::move(ctx), from + requestLength, length - requestLength); - }); -} - -template -TFuture TDeviceHandler::Modify( - TCallContextPtr ctx, - const TBlocksInfo& blocksInfo, - bool aligned, - TGuardedSgList sgList) -{ - auto request = std::make_shared>( - ClientId, - BlockSize, - std::move(ctx), - blocksInfo, - aligned, - std::move(sgList)); - - with_lock (RequestsLock) { - if (request->IsAligned()) { - request->AddDependencies(UnalignedRequests); - AlignedRequests.push_back(request); - } else { - request->AddDependencies(UnalignedRequests); - request->AddDependencies(AlignedRequests); - UnalignedRequests.push_back(request); - } - - if (!request->Prepare()) { - request->CallContext->Postpone(GetCycleCount()); - return request->GetFuture(); - } - } - - return ExecuteModifyRequest(std::move(request)); -} - -template -TFuture TDeviceHandler::ExecuteAlignedModifyRequest( - TRequest& request) -{ - return request.Execute(Storage); -} - -template <> -TFuture TDeviceHandler::ExecuteAlignedModifyRequest( - TModifyRequest& request) -{ - return request.BaseExecute(Storage); -} - -template -TFuture TDeviceHandler::ExecuteModifyRequest( - std::shared_ptr request) -{ - TFuture future; - - if (!request->IsAligned()) { - auto error = ExecuteUnalignedModifyRequest(request); - future = CreateResponseFuture(error); - } else { - future = ExecuteAlignedModifyRequest(*request); - } - - auto weakPtr = weak_from_this(); - - return future.Apply( - [request = std::move(request), weakPtr = std::move(weakPtr)] (const auto& f) { - request->Complete(f.GetValue()); - - if (auto p = weakPtr.lock()) { - p->HandleExecutedModifyRequest(std::move(request)); - } - - return f.GetValue(); - }); -} - -void TDeviceHandler::HandleExecutedModifyRequest(TModifyRequestPtr request) -{ - TVector preparedRequests; - - with_lock (RequestsLock) { - if (request->IsAligned()) { - RemoveRequest(AlignedRequests, request); - PrepareRequests(UnalignedRequests, preparedRequests); - } else { - RemoveRequest(UnalignedRequests, request); - PrepareRequests(AlignedRequests, preparedRequests); - PrepareRequests(UnalignedRequests, preparedRequests); - } - } - - for (auto& preparedRequest: preparedRequests) { - preparedRequest->CallContext->Advance(GetCycleCount()); - ExecuteModifyRequest(std::move(preparedRequest)); - } -} - -void TDeviceHandler::RemoveRequest( - TList& requests, - const TModifyRequestPtr& request) -{ - for (auto it = requests.begin(); it != requests.end(); ) { - if (*it == request) { - it = requests.erase(it); - } else { - ++it; - } - } -} - -void TDeviceHandler::PrepareRequests( - const TList& requests, - TVector& result) -{ - for (const auto& request: requests) { - if (request->Prepare()) { - result.push_back(request); - } - } -} - -TReadBlocksResponseFuture TDeviceHandler::ExecuteAlignedReadRequest( - TCallContextPtr ctx, - const TBlocksInfo& blocksInfo, - TGuardedSgList sgList, - const TString& checkpointId) -{ - auto request = std::make_shared(); - request->MutableHeaders()->SetRequestId(ctx->RequestId); - request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds()); - request->MutableHeaders()->SetClientId(ClientId); - request->SetCheckpointId(checkpointId); - request->SetStartIndex(blocksInfo.Range.Start); - request->SetBlocksCount(blocksInfo.Range.Size()); - request->BlockSize = BlockSize; - request->Sglist = std::move(sgList); - - return Storage->ReadBlocksLocal(std::move(ctx), std::move(request)); -} - -TReadBlocksResponseFuture TDeviceHandler::ExecuteUnalignedReadRequest( - TCallContextPtr ctx, - const TBlocksInfo& blocksInfo, - TGuardedSgList guardedSgList, - const TString& checkpointId) -{ - if (blocksInfo.Range.Size() > MaxUnalignedRequestSize / BlockSize) { - return MakeFuture( - TErrorResponse(E_ARGUMENT, TStringBuilder() - << "Unaligned read request is too big. BlockCount=" - << blocksInfo.Range.Size())); - } - - auto bufferSize = blocksInfo.Range.Size() * BlockSize; - auto buffer = AllocateBuffer(bufferSize); - - auto sgListOrError = SgListNormalize( - TBlockDataRef(buffer.get(), bufferSize), - BlockSize); - - if (HasError(sgListOrError)) { - return MakeFuture( - TErrorResponse(sgListOrError.GetError())); - } - - auto request = std::make_shared(); - request->MutableHeaders()->SetRequestId(ctx->RequestId); - request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds()); - request->MutableHeaders()->SetClientId(ClientId); - request->SetCheckpointId(checkpointId); - request->SetStartIndex(blocksInfo.Range.Start); - request->SetBlocksCount(blocksInfo.Range.Size()); - request->BlockSize = BlockSize; - request->Sglist = guardedSgList.Create(sgListOrError.ExtractResult()); - - return Storage->ReadBlocksLocal(std::move(ctx), std::move(request)) - .Apply([=, buf = std::move(buffer)] (const auto& future) { - const auto& response = future.GetValue(); - if (HasError(response)) { - return response; - } - - auto guard = guardedSgList.Acquire(); - if (!guard) { - return static_cast( - TErrorResponse( - E_CANCELLED, - "failed to acquire sglist in DeviceHandler")); - } - - const auto& dstSgList = guard.Get(); - auto size = SgListGetSize(dstSgList); - TBlockDataRef srcBuf(buf.get() + blocksInfo.BeginOffset, size); - auto cpSize = SgListCopy({srcBuf}, dstSgList); - Y_ABORT_UNLESS(cpSize == size); - - return response; - }); -} - -TFuture TDeviceHandler::ExecuteUnalignedModifyRequest( - TModifyRequestPtr request) -{ - const auto& ctx = request->CallContext; - const auto& blocksInfo = request->BlocksInfo; - - auto bufferSize = blocksInfo.Range.Size() * BlockSize; - auto buffer = AllocateBuffer(bufferSize); - - auto sgListOrError = SgListNormalize( - TBlockDataRef(buffer.get(), bufferSize), - BlockSize); - - if (HasError(sgListOrError)) { - return MakeFuture(sgListOrError.GetError()); - } - - TGuardedSgList readSgList(sgListOrError.ExtractResult()); - - auto readRequest = std::make_shared(); - readRequest->MutableHeaders()->SetRequestId(ctx->RequestId); - readRequest->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds()); - readRequest->MutableHeaders()->SetClientId(ClientId); - readRequest->SetStartIndex(blocksInfo.Range.Start); - readRequest->SetBlocksCount(blocksInfo.Range.Size()); - readRequest->BlockSize = BlockSize; - readRequest->Sglist = readSgList; - - auto future = Storage->ReadBlocksLocal(ctx, std::move(readRequest)); - - return future.Apply([=, req = std::move(request), buf = std::move(buffer)] - (const auto& f) mutable - { - readSgList.Close(); - - const auto& readResponse = f.GetValue(); - if (HasError(readResponse)) { - return MakeFuture(readResponse.GetError()); - } - - return HandleRmwReadResponse( - Storage, - std::move(req), - std::move(buf), - bufferSize); - }); -} - -TFuture TDeviceHandler::HandleRmwReadResponse( - IStoragePtr storage, - TModifyRequestPtr request, - TStorageBuffer buffer, - size_t bufferSize) -{ - const auto& ctx = request->CallContext; - const auto& blocksInfo = request->BlocksInfo; - - { - auto guard = request->SgList.Acquire(); - if (!guard) { - return MakeFuture(TErrorResponse( - E_CANCELLED, - "failed to acquire sglist in DeviceHandler")); - } - - const auto& srcSgList = guard.Get(); - auto size = SgListGetSize(srcSgList); - TBlockDataRef dstBuf(buffer.get() + blocksInfo.BeginOffset, size); - auto cpSize = SgListCopy(srcSgList, {dstBuf}); - Y_ABORT_UNLESS(cpSize == size); - } - - auto sgListOrError = SgListNormalize( - TBlockDataRef(buffer.get(), bufferSize), - request->BlockSize); - - if (HasError(sgListOrError)) { - return MakeFuture(sgListOrError.GetError()); - } - - TGuardedSgList writeSgList(sgListOrError.ExtractResult()); - - auto writeRequest = std::make_shared(); - writeRequest->MutableHeaders()->SetRequestId(ctx->RequestId); - writeRequest->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds()); - writeRequest->MutableHeaders()->SetClientId(request->ClientId); - writeRequest->SetStartIndex(blocksInfo.Range.Start); - writeRequest->BlocksCount = blocksInfo.Range.Size(); - writeRequest->BlockSize = request->BlockSize; - writeRequest->Sglist = writeSgList; - - return storage->WriteBlocksLocal(ctx, std::move(writeRequest)) - .Apply([=, buf = std::move(buffer)] (const auto& future) mutable { - writeSgList.Close(); - - Y_UNUSED(buf); - return future.GetValue().GetError(); - }); -} - -template -TFuture TDeviceHandler::CreateResponseFuture( - const TFuture& future) -{ - return future.Apply([] (const auto& f) { - TResponse response; - response.MutableError()->CopyFrom(f.GetValue()); - return response; - }); -} - -template <> -TFuture TDeviceHandler::CreateResponseFuture( - const TFuture& future) -{ - return future; -} +constexpr ui32 MaxUnalignedRequestSize = 32_MB; //////////////////////////////////////////////////////////////////////////////// @@ -735,11 +33,12 @@ struct TDefaultDeviceHandlerFactory final maxBlockCount); } - return std::make_shared( + return std::make_shared( std::move(storage), std::move(clientId), blockSize, - maxBlockCount); + maxBlockCount, + MaxUnalignedRequestSize); } }; diff --git a/cloud/blockstore/libs/service/device_handler_ut.cpp b/cloud/blockstore/libs/service/device_handler_ut.cpp index 539d55285a..33b3b589cd 100644 --- a/cloud/blockstore/libs/service/device_handler_ut.cpp +++ b/cloud/blockstore/libs/service/device_handler_ut.cpp @@ -211,18 +211,24 @@ class TTestEnvironment const auto& response = future.GetValue(TDuration::Seconds(5)); UNIT_ASSERT(!HasError(response)); + TString read; + read.resize(expected.size(), 0); const char* ptr = buffer.data(); - for (const char& c: expected) { - for (size_t i = 0; i < SectorSize; ++i) { - + bool allOk = true; + for (size_t i = 0; i != expected.size(); ++i) { + char c = expected[i]; + read[i] = *ptr == 0 ? 'Z' : *ptr; + for (size_t j = 0; j < SectorSize; ++j) { if (c == 'Z') { - UNIT_ASSERT(*ptr == 0); + allOk = allOk && *ptr == 0; } else { - UNIT_ASSERT(*ptr == c); + allOk = allOk && *ptr == c; } ++ptr; } } + UNIT_ASSERT_VALUES_EQUAL(expected, read); + UNIT_ASSERT(allOk); } ui32 GetReadRequestCount() const { @@ -236,6 +242,13 @@ class TTestEnvironment ui32 GetZeroRequestCount() const { return ZeroRequestCount; } + + void ResetRequestCounters() + { + ReadRequestCount = 0; + WriteRequestCount = 0; + ZeroRequestCount = 0; + } }; } // namespace @@ -516,18 +529,61 @@ Y_UNIT_TEST_SUITE(TDeviceHandlerTest) } } - Y_UNIT_TEST(ShouldSliceHugeReadAndWriteRequestsInAlignedBackend) + void ShouldSliceHugeAlignedRequests(bool unalignedRequestsDisabled) { - TTestEnvironment env(16, DefaultBlockSize, 1, 4, true); + TTestEnvironment + env(24, DefaultBlockSize, 1, 4, unalignedRequestsDisabled); env.RunWriteService(); + env.ZeroSectors(0, 24); + UNIT_ASSERT_VALUES_EQUAL(6, env.GetZeroRequestCount()); env.WriteSectors(0, 8, 'a'); env.WriteSectors(8, 8, 'b'); UNIT_ASSERT_VALUES_EQUAL(4, env.GetWriteRequestCount()); - env.ReadSectorsAndCheck(0, 16, "aaaaaaaabbbbbbbb"); - UNIT_ASSERT_VALUES_EQUAL(4, env.GetReadRequestCount()); + env.ReadSectorsAndCheck(0, 24, "aaaaaaaabbbbbbbbZZZZZZZZ"); + UNIT_ASSERT_VALUES_EQUAL(6, env.GetReadRequestCount()); + } + + Y_UNIT_TEST(ShouldSliceHugeAlignedRequestsInAlignedBackend) + { + ShouldSliceHugeAlignedRequests(true); + } + + Y_UNIT_TEST(ShouldSliceHugeAlignedRequestsInUnalignedBackend) + { + ShouldSliceHugeAlignedRequests(false); + } + + Y_UNIT_TEST (ShouldSliceHugeUnalignedRequests) + { + TTestEnvironment + env(24, DefaultBlockSize, 2, 4, false); + + env.RunWriteService(); + + // An unaligned request requires the execution of a read-modify-write pattern. + env.ZeroSectors(1, 46); + UNIT_ASSERT_VALUES_EQUAL(6, env.GetReadRequestCount()); + UNIT_ASSERT_VALUES_EQUAL(6, env.GetWriteRequestCount()); + env.ResetRequestCounters(); + + env.WriteSectors(3, 8, 'a'); + UNIT_ASSERT_VALUES_EQUAL(2, env.GetReadRequestCount()); + UNIT_ASSERT_VALUES_EQUAL(2, env.GetWriteRequestCount()); + env.ResetRequestCounters(); + + env.WriteSectors(13, 3, 'b'); + UNIT_ASSERT_VALUES_EQUAL(1, env.GetReadRequestCount()); + UNIT_ASSERT_VALUES_EQUAL(1, env.GetWriteRequestCount()); + env.ResetRequestCounters(); + + env.ReadSectorsAndCheck( + 0, + 48, + "0ZZaaaaaaaaZZbbbZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ0"); + UNIT_ASSERT_VALUES_EQUAL(6, env.GetReadRequestCount()); } void DoShouldSliceHugeZeroRequest(bool requestUnaligned, bool unalignedRequestDisabled) diff --git a/cloud/blockstore/libs/service/unaligned_device_handler.cpp b/cloud/blockstore/libs/service/unaligned_device_handler.cpp new file mode 100644 index 0000000000..97255c454c --- /dev/null +++ b/cloud/blockstore/libs/service/unaligned_device_handler.cpp @@ -0,0 +1,695 @@ + +#include "unaligned_device_handler.h" + +#include + +namespace NCloud::NBlockStore { + +using namespace NThreading; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TErrorResponse CreateErrorAcquireResponse() +{ + return {E_CANCELLED, "failed to acquire sglist in DeviceHandler"}; +} + +TErrorResponse CreateRequestDestroyedResponse() +{ + return {E_CANCELLED, "request destroyed"}; +} + +TErrorResponse CreateBackendDestroyedResponse() +{ + return {E_CANCELLED, "backend destroyed"}; +} + +TErrorResponse CreateUnalignedTooBigResponse(ui32 blockCount) +{ + return { + E_ARGUMENT, + TStringBuilder() << "Unaligned request is too big. BlockCount=" + << blockCount}; +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +// The base class for wrapping over a write or zero request. +class TModifyRequest: public std::enable_shared_from_this +{ +private: + enum class EStatus + { + Created, + Postponed, + InFlight, + Completed, + }; + // The current status of the request execution can be read and written + // only under the RequestsLock. + EStatus Status = EStatus::Created; + + // A list of requests that prevent the execution. + TVector Dependencies; + + // A list iterator that points to an object in the list of running requests. + // Makes it easier to delete an object after the request is completed. + TModifyRequestIt It; + +protected: + const std::weak_ptr Backend; + const TBlocksInfo BlocksInfo; + TCallContextPtr CallContext; + + // The buffer that was allocated to execute the unaligned request. Memory is + // allocated only if the request is not aligned. + TStorageBuffer RMWBuffer; + TSgList RMWBufferSgList; + +public: + TModifyRequest( + std::weak_ptr backend, + TCallContextPtr callContext, + const TBlocksInfo& blocksInfo); + + virtual ~TModifyRequest() = default; + + void SetIt(TModifyRequestIt it); + TModifyRequestIt GetIt() const; + + bool IsAligned() const; + + // Adds a list of overlapped requests that this request depends on. The + // execution of this request will not start until all this requests have + // been completed. + void AddDependencies(const TList& requests); + + // Returns a flag indicating whether the request can be executed. The method + // modifies the internal state and gives permission to execute only once. It + // is necessary to call the method under RequestsLock. + [[nodiscard]] bool PrepareToRun(); + + // Starts the execution of the postponed request. The method can only be + // called if the request has been postponed. + void ExecutePostponed(); + + // It is necessary to call the method under RequestsLock. + void SetCompleted(); + +protected: + void AllocateRMWBuffer(TAlignedDeviceHandler& backend); + + virtual void DoPostpone() = 0; + virtual void DoExecutePostponed() = 0; +}; + +// Wrapper for a write request. +class TWriteRequest final: public TModifyRequest +{ +public: + using TResponsePromise = TPromise; + using TResponseFuture = TFuture; + +private: + TResponsePromise Promise; + TGuardedSgList SgList; + +public: + TWriteRequest( + std::weak_ptr backend, + TCallContextPtr callContext, + const TBlocksInfo& blocksInfo, + TGuardedSgList sgList); + + TResponseFuture ExecuteOrPostpone(bool readyToRun); + +protected: + void DoPostpone() override; + void DoExecutePostponed() override; + +private: + TResponseFuture DoExecute(); + TResponseFuture ReadModifyWrite(TAlignedDeviceHandler& backend); + TResponseFuture ModifyAndWrite(); +}; + +// Wrapper for a zero request. +class TZeroRequest final: public TModifyRequest +{ +public: + using TResponsePromise = TPromise; + using TResponseFuture = TFuture; + +private: + TResponsePromise Promise; + TGuardedSgList SgList; + +public: + TZeroRequest( + std::weak_ptr backend, + TCallContextPtr callContext, + const TBlocksInfo& blocksInfo); + ~TZeroRequest() override; + + TResponseFuture ExecuteOrPostpone(bool readyToRun); + +protected: + void DoPostpone() override; + void DoExecutePostponed() override; + +private: + TResponseFuture DoExecute(); + TResponseFuture ReadModifyWrite(TAlignedDeviceHandler& backend); + TResponseFuture ModifyAndWrite(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TModifyRequest::TModifyRequest( + std::weak_ptr backend, + TCallContextPtr callContext, + const TBlocksInfo& blocksInfo) + : Backend(std::move(backend)) + , BlocksInfo(blocksInfo) + , CallContext(std::move(callContext)) +{} + +void TModifyRequest::AddDependencies(const TList& requests) +{ + for (const auto& request: requests) { + if (BlocksInfo.Range.Overlaps(request->BlocksInfo.Range)) { + Dependencies.push_back(request); + } + } +} + +void TModifyRequest::SetIt(TModifyRequestIt it) +{ + It = it; +} + +TModifyRequestIt TModifyRequest::GetIt() const +{ + return It; +} + +bool TModifyRequest::IsAligned() const +{ + return BlocksInfo.IsAligned(); +} + +bool TModifyRequest::PrepareToRun() +{ + if (Status == EStatus::InFlight) { + // can't run a request that is already running + return false; + } + + bool allDependenciesCompleted = AllOf( + Dependencies, + [](const TModifyRequestWeakPtr& weakRequest) + { + if (auto request = weakRequest.lock()) { + return request->Status == EStatus::Completed; + } + return true; + }); + if (!allDependenciesCompleted) { + if (Status == EStatus::Created) { + // Postponing the execution of the newly created request. + Status = EStatus::Postponed; + CallContext->Postpone(GetCycleCount()); + DoPostpone(); + } + return false; + } + + // The request can be executed. We update status and give a sign to caller + // that the request needs to be run for execution. We don't run here because + // we want to minimize the code executed under RequestsLock. + Status = EStatus::InFlight; + return true; +} + +void TModifyRequest::ExecutePostponed() +{ + CallContext->Advance(GetCycleCount()); + DoExecutePostponed(); +} + +void TModifyRequest::SetCompleted() +{ + Status = EStatus::Completed; +} + +void TModifyRequest::AllocateRMWBuffer(TAlignedDeviceHandler& backend) +{ + auto bufferSize = BlocksInfo.MakeAligned().BufferSize(); + RMWBuffer = backend.AllocateBuffer(bufferSize); + + auto sgListOrError = SgListNormalize( + TBlockDataRef(RMWBuffer.get(), bufferSize), + BlocksInfo.BlockSize); + Y_DEBUG_ABORT_UNLESS(!HasError(sgListOrError)); + + RMWBufferSgList = sgListOrError.ExtractResult(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TWriteRequest::TWriteRequest( + std::weak_ptr backend, + TCallContextPtr callContext, + const TBlocksInfo& blocksInfo, + TGuardedSgList sgList) + : TModifyRequest( + std::move(backend), + std::move(callContext), + blocksInfo) + , SgList(std::move(sgList)) +{} + +TWriteRequest::TResponseFuture TWriteRequest::ExecuteOrPostpone(bool readyToRun) +{ + return readyToRun ? DoExecute() : Promise.GetFuture(); +} + +void TWriteRequest::DoPostpone() +{ + Y_ABORT_UNLESS(!Promise.Initialized()); + Promise = NewPromise(); +} + +void TWriteRequest::DoExecutePostponed() +{ + Y_ABORT_UNLESS(Promise.Initialized()); + auto future = DoExecute(); + future.Subscribe([promise = Promise](const TResponseFuture& f) mutable + { promise.SetValue(f.GetValue()); }); +} + +TWriteRequest::TResponseFuture TWriteRequest::DoExecute() +{ + if (auto backend = Backend.lock()) { + return IsAligned() ? backend->ExecuteWriteRequest( + std::move(CallContext), + BlocksInfo, + std::move(SgList)) + : ReadModifyWrite(*backend); + } + + return MakeFuture( + CreateBackendDestroyedResponse()); +} + +TWriteRequest::TResponseFuture TWriteRequest::ReadModifyWrite( + TAlignedDeviceHandler& backend) +{ + AllocateRMWBuffer(backend); + + auto read = backend.ExecuteReadRequest( + CallContext, + BlocksInfo.MakeAligned(), + SgList.Create(RMWBufferSgList), + {}); + + return read.Apply( + [weakPtr = weak_from_this()]( + const TFuture& future) mutable + { + const auto& response = future.GetValue(); + if (HasError(response)) { + return MakeFuture( + TErrorResponse(response.GetError())); + } + + if (auto p = weakPtr.lock()) { + return static_cast(p.get())->ModifyAndWrite(); + } + + return MakeFuture( + CreateRequestDestroyedResponse()); + }); +} + +TWriteRequest::TResponseFuture TWriteRequest::ModifyAndWrite() +{ + auto backend = Backend.lock(); + if (!backend) { + return MakeFuture( + CreateBackendDestroyedResponse()); + } + + if (auto guard = SgList.Acquire()) { + const auto& srcSgList = guard.Get(); + auto size = SgListGetSize(srcSgList); + TBlockDataRef dstBuf(RMWBuffer.get() + BlocksInfo.BeginOffset, size); + auto cpSize = SgListCopy(srcSgList, {dstBuf}); + Y_ABORT_UNLESS(cpSize == size); + } else { + return MakeFuture( + CreateErrorAcquireResponse()); + } + + return backend->ExecuteWriteRequest( + CallContext, + BlocksInfo.MakeAligned(), + SgList.Create(RMWBufferSgList)); +} + +//////////////////////////////////////////////////////////////////////////////// + +TZeroRequest::TZeroRequest( + std::weak_ptr backend, + TCallContextPtr callContext, + const TBlocksInfo& blocksInfo) + : TModifyRequest( + std::move(backend), + std::move(callContext), + blocksInfo) +{} + +TZeroRequest::~TZeroRequest() +{ + SgList.Close(); +} + +TZeroRequest::TResponseFuture TZeroRequest::ExecuteOrPostpone(bool readyToRun) +{ + return readyToRun ? DoExecute() : Promise.GetFuture(); +} + +void TZeroRequest::DoPostpone() +{ + Y_ABORT_UNLESS(!Promise.Initialized()); + Promise = NewPromise(); +} + +void TZeroRequest::DoExecutePostponed() +{ + Y_ABORT_UNLESS(Promise.Initialized()); + auto future = DoExecute(); + future.Subscribe([promise = Promise](const TResponseFuture& f) mutable + { promise.SetValue(f.GetValue()); }); +} + +TZeroRequest::TResponseFuture TZeroRequest::DoExecute() +{ + if (auto backend = Backend.lock()) { + return IsAligned() ? backend->ExecuteZeroRequest( + std::move(CallContext), + BlocksInfo) + : ReadModifyWrite(*backend); + } + + return MakeFuture( + CreateBackendDestroyedResponse()); +} + +TZeroRequest::TResponseFuture TZeroRequest::ReadModifyWrite( + TAlignedDeviceHandler& backend) +{ + AllocateRMWBuffer(backend); + SgList.SetSgList(RMWBufferSgList); + + auto read = backend.ExecuteReadRequest( + CallContext, + BlocksInfo.MakeAligned(), + SgList, + {}); + + return read.Apply( + [weakPtr = weak_from_this()]( + const TFuture& future) mutable + { + const auto& response = future.GetValue(); + if (HasError(response)) { + return MakeFuture( + TErrorResponse(response.GetError())); + } + + if (auto p = weakPtr.lock()) { + return static_cast(p.get())->ModifyAndWrite(); + } + + return MakeFuture( + CreateRequestDestroyedResponse()); + }); +} + +TZeroRequest::TResponseFuture TZeroRequest::ModifyAndWrite() +{ + auto backend = Backend.lock(); + if (!backend) { + return MakeFuture( + CreateBackendDestroyedResponse()); + } + + std::memset( + RMWBuffer.get() + BlocksInfo.BeginOffset, + 0, + BlocksInfo.BufferSize()); + + auto result = + backend->ExecuteWriteRequest(CallContext, BlocksInfo.MakeAligned(), SgList); + return result.Apply( + [](const TFuture& future) + { + return MakeFuture( + TErrorResponse(future.GetValue().GetError())); + }); +} + +//////////////////////////////////////////////////////////////////////////////// + +TUnalignedDeviceHandler::TUnalignedDeviceHandler( + IStoragePtr storage, + TString clientId, + ui32 blockSize, + ui32 maxBlockCount, + ui32 maxUnalignedRequestSize) + : Backend(std::make_shared( + std::move(storage), + std::move(clientId), + blockSize, + maxBlockCount)) + , BlockSize(blockSize) + , MaxUnalignedBlockCount(maxUnalignedRequestSize / BlockSize) +{} + +TFuture TUnalignedDeviceHandler::Read( + TCallContextPtr ctx, + ui64 from, + ui64 length, + TGuardedSgList sgList, + const TString& checkpointId) +{ + auto blocksInfo = TBlocksInfo(from, length, BlockSize); + auto normalizeError = TryToNormalize(sgList, blocksInfo); + if (HasError(normalizeError)) { + return MakeFuture( + TErrorResponse(normalizeError)); + } + return blocksInfo.IsAligned() ? Backend->ExecuteReadRequest( + std::move(ctx), + blocksInfo, + std::move(sgList), + checkpointId) + : ExecuteUnalignedReadRequest( + std::move(ctx), + blocksInfo, + std::move(sgList), + checkpointId); +} + +TFuture TUnalignedDeviceHandler::Write( + TCallContextPtr ctx, + ui64 from, + ui64 length, + TGuardedSgList sgList) +{ + auto blocksInfo = TBlocksInfo(from, length, BlockSize); + auto normalizeError = TryToNormalize(sgList, blocksInfo); + if (HasError(normalizeError)) { + return MakeFuture( + TErrorResponse(normalizeError)); + } + + if (!blocksInfo.IsAligned() && + blocksInfo.Range.Size() > MaxUnalignedBlockCount) + { + return MakeFuture( + CreateUnalignedTooBigResponse(blocksInfo.Range.Size())); + } + + auto request = std::make_shared( + Backend, + ctx, + blocksInfo, + std::move(sgList)); + auto weakRequest = request->weak_from_this(); + auto* rawRequest = request.get(); + + const bool readyToRun = RegisterRequest(std::move(request)); + auto result = rawRequest->ExecuteOrPostpone(readyToRun); + return result.Apply( + [weakDeviceHandler = weak_from_this(), + weakRequest = std::move(weakRequest)]( + const TFuture& f) mutable + { + if (auto p = weakDeviceHandler.lock()) { + p->OnRequestFinished(std::move(weakRequest)); + } + return f.GetValue(); + }); +} + +TFuture +TUnalignedDeviceHandler::Zero(TCallContextPtr ctx, ui64 from, ui64 length) +{ + auto blocksInfo = TBlocksInfo(from, length, BlockSize); + auto request = std::make_shared(Backend, ctx, blocksInfo); + auto weakRequest = request->weak_from_this(); + auto* rawRequest = request.get(); + + const bool readyToRun = RegisterRequest(std::move(request)); + auto result = rawRequest->ExecuteOrPostpone(readyToRun); + return result.Apply( + [weakDeviceHandler = weak_from_this(), + weakRequest = std::move(weakRequest)]( + const TFuture& f) mutable + { + if (auto p = weakDeviceHandler.lock()) { + p->OnRequestFinished(std::move(weakRequest)); + } + return f.GetValue(); + }); +} + +TStorageBuffer TUnalignedDeviceHandler::AllocateBuffer(size_t bytesCount) +{ + return Backend->AllocateBuffer(bytesCount); +} + +bool TUnalignedDeviceHandler::RegisterRequest(TModifyRequestPtr request) +{ + with_lock (RequestsLock) { + request->AddDependencies(UnalignedRequests); + if (request->IsAligned()) { + AlignedRequests.push_front(request); + request->SetIt(AlignedRequests.begin()); + } else { + request->AddDependencies(AlignedRequests); + UnalignedRequests.push_front(request); + request->SetIt(UnalignedRequests.begin()); + } + return request->PrepareToRun(); + } +} + +TFuture +TUnalignedDeviceHandler::ExecuteUnalignedReadRequest( + TCallContextPtr ctx, + TBlocksInfo blocksInfo, + TGuardedSgList sgList, + TString checkpointId) const +{ + if (blocksInfo.Range.Size() > MaxUnalignedBlockCount) { + return MakeFuture( + CreateUnalignedTooBigResponse(blocksInfo.Range.Size())); + } + + auto bufferSize = blocksInfo.MakeAligned().BufferSize(); + auto buffer = Backend->AllocateBuffer(bufferSize); + + auto sgListOrError = + SgListNormalize(TBlockDataRef(buffer.get(), bufferSize), BlockSize); + + if (HasError(sgListOrError)) { + return MakeFuture( + TErrorResponse(sgListOrError.GetError())); + } + + auto alignedRequest = Backend->ExecuteReadRequest( + std::move(ctx), + blocksInfo.MakeAligned(), + sgList.Create(sgListOrError.ExtractResult()), + std::move(checkpointId)); + + return alignedRequest.Apply( + [sgList = std::move(sgList), + buffer = std::move(buffer), + beginOffset = blocksInfo.BeginOffset]( + const TFuture& future) + { + const auto& response = future.GetValue(); + if (HasError(response)) { + return response; + } + + if (auto guard = sgList.Acquire()) { + const auto& dstSgList = guard.Get(); + auto size = SgListGetSize(dstSgList); + TBlockDataRef srcBuf(buffer.get() + beginOffset, size); + auto cpSize = SgListCopy({srcBuf}, dstSgList); + Y_ABORT_UNLESS(cpSize == size); + return response; + } + + return static_cast( + CreateErrorAcquireResponse()); + }); +} + +void TUnalignedDeviceHandler::OnRequestFinished( + TModifyRequestWeakPtr weakRequest) +{ + // When the request is complete, it must be unregistered. We + // also need to find and run postponed requests that are waiting for + // a completetion this one. + + auto request = weakRequest.lock(); + if (!request) { + return; + } + + TVector readyToRunPostponedRequests; + + auto collectReadyToRunRequests = + [&](const TList& requests) + { + for (const auto& request: requests) { + if (request->PrepareToRun()) { + readyToRunPostponedRequests.push_back(request.get()); + } + } + }; + + with_lock (RequestsLock) { + request->SetCompleted(); + const bool isAligned = request->IsAligned(); + if (isAligned) { + AlignedRequests.erase(request->GetIt()); + } else { + UnalignedRequests.erase(request->GetIt()); + } + // Here we reset the last reference to the request. + request.reset(); + Y_DEBUG_ABORT_UNLESS(weakRequest.expired()); + + collectReadyToRunRequests(UnalignedRequests); + if (!isAligned) { + collectReadyToRunRequests(AlignedRequests); + } + } + + for (auto* readyToRunRequest: readyToRunPostponedRequests) { + readyToRunRequest->ExecutePostponed(); + } +} + +} // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/service/unaligned_device_handler.h b/cloud/blockstore/libs/service/unaligned_device_handler.h new file mode 100644 index 0000000000..cc32d7e3e6 --- /dev/null +++ b/cloud/blockstore/libs/service/unaligned_device_handler.h @@ -0,0 +1,87 @@ +#pragma once + +#include "aligned_device_handler.h" + +#include + +#include +#include + +namespace NCloud::NBlockStore { + +//////////////////////////////////////////////////////////////////////////////// + +class TModifyRequest; +using TModifyRequestPtr = std::shared_ptr; +using TModifyRequestWeakPtr = std::weak_ptr; +using TModifyRequestIt = TList::iterator; + +// The TUnalignedDeviceHandler can handle both aligned and unaligned requests. +// If a request is unaligned, the read-modify-write sequence is used. +// TUnalignedDeviceHandler monitors the execution of all requests. If an +// unaligned request needs to be executed, it waits for all requests it +// intersects to be completed before run. While an unaligned request is being +// processed, it prevents other requests from being processed that overlap with +// it. +// The TAlignedDeviceHandler is used to process requests. Only aligned requests +// are sent to this handler. +class TUnalignedDeviceHandler final + : public IDeviceHandler + , public std::enable_shared_from_this +{ +private: + const std::shared_ptr Backend; + const ui32 BlockSize; + const ui32 MaxUnalignedBlockCount; + + // Requests that are currently in flight. These fields are only accessible + // under RequestsLock. + TList AlignedRequests; + TList UnalignedRequests; + TAdaptiveLock RequestsLock; + +public: + TUnalignedDeviceHandler( + IStoragePtr storage, + TString clientId, + ui32 blockSize, + ui32 maxBlockCount, + ui32 maxUnalignedRequestSize); + + NThreading::TFuture Read( + TCallContextPtr ctx, + ui64 from, + ui64 length, + TGuardedSgList sgList, + const TString& checkpointId) override; + + NThreading::TFuture Write( + TCallContextPtr ctx, + ui64 from, + ui64 length, + TGuardedSgList sgList) override; + + NThreading::TFuture + Zero(TCallContextPtr ctx, ui64 from, ui64 length) override; + + TStorageBuffer AllocateBuffer(size_t bytesCount) override; + +private: + // Registers the request in the in-flight lists. + // If the request needs to be processed immediately, a true value will be + // returned. This flag should not be disregarded. + [[nodiscard]] bool RegisterRequest(TModifyRequestPtr request); + + NThreading::TFuture + ExecuteUnalignedReadRequest( + TCallContextPtr ctx, + TBlocksInfo blocksInfo, + TGuardedSgList sgList, + TString checkpointId) const; + + void OnRequestFinished(TModifyRequestWeakPtr weakRequest); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/service/ya.make b/cloud/blockstore/libs/service/ya.make index 7c54ea1138..9d2a9e246e 100644 --- a/cloud/blockstore/libs/service/ya.make +++ b/cloud/blockstore/libs/service/ya.make @@ -17,6 +17,7 @@ SRCS( storage_provider.cpp storage_test.cpp storage.cpp + unaligned_device_handler.cpp ) PEERDIR(