Skip to content

Commit

Permalink
Handle huge requests in unaligned device handler (#2035)
Browse files Browse the repository at this point in the history
* Handle huge requests in unaligned device handler

* Fix leak

* Comments and simplifications

* Fix comments and small fixes

* Added state Completed

* Add check for all reference gone
  • Loading branch information
drbasic committed Sep 19, 2024
1 parent 06126c3 commit 9ed8d4f
Show file tree
Hide file tree
Showing 8 changed files with 976 additions and 790 deletions.
1 change: 1 addition & 0 deletions cloud/blockstore/libs/service/CMakeLists.linux-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ target_sources(blockstore-libs-service PRIVATE
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/storage.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/storage_provider.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/storage_test.cpp
${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/service/unaligned_device_handler.cpp
)
150 changes: 85 additions & 65 deletions cloud/blockstore/libs/service/aligned_device_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ using namespace NThreading;

namespace {

TErrorResponse CreateErrorAcquireResponse()
{
return {E_CANCELLED, "failed to acquire sglist in DeviceHandler"};
}

TErrorResponse CreateRequestNotAlignedResponse()
{
return {E_ARGUMENT, "Request is not aligned"};
}

// Removes the first blockCount elements from the sgList. Returns these removed
// items in TGuardedSgList.
TGuardedSgList TakeHeadBlocks(TGuardedSgList& sgList, ui32 blockCount)
Expand All @@ -29,67 +39,56 @@ TGuardedSgList TakeHeadBlocks(TGuardedSgList& sgList, ui32 blockCount)

} // namespace

TResultOrError<bool> TryToNormalize(
NProto::TError TryToNormalize(
TGuardedSgList& guardedSgList,
const TBlocksInfo& blocksInfo,
ui64 length,
ui32 blockSize)
TBlocksInfo& blocksInfo)
{
const auto length = blocksInfo.BufferSize();
if (length == 0) {
return MakeError(E_ARGUMENT, "Local request has zero length");
}

auto guard = guardedSgList.Acquire();
if (!guard) {
return MakeError(
E_CANCELLED,
"failed to acquire sglist in DeviceHandler");
return CreateErrorAcquireResponse();
}

auto bufferSize = SgListGetSize(guard.Get());
if (bufferSize != length) {
return MakeError(
E_ARGUMENT,
TStringBuilder()
<< "Invalid local request:" << " buffer size " << bufferSize
<< "Invalid local request: buffer size " << bufferSize
<< " not equal to length " << length);
}

if (blocksInfo.BeginOffset != 0 || blocksInfo.EndOffset != 0) {
return false;
if (!blocksInfo.IsAligned()) {
return MakeError(S_OK);
}

for (const auto& buffer: guard.Get()) {
if (buffer.Size() % blockSize != 0) {
return false;
}
bool allBuffersAligned = AllOf(
guard.Get(),
[blockSize = blocksInfo.BlockSize](const auto& buffer)
{ return buffer.Size() % blockSize == 0; });

if (!allBuffersAligned) {
blocksInfo.SgListAligned = false;
return MakeError(S_OK);
}

auto sgListOrError = SgListNormalize(guard.Get(), blockSize);
auto sgListOrError = SgListNormalize(guard.Get(), blocksInfo.BlockSize);
if (HasError(sgListOrError)) {
return sgListOrError.GetError();
}

guardedSgList.SetSgList(sgListOrError.ExtractResult());
return true;
}

////////////////////////////////////////////////////////////////////////////////

TStorageBuffer AllocateStorageBuffer(IStorage& storage, size_t bytesCount)
{
auto buffer = storage.AllocateBuffer(bytesCount);
if (!buffer) {
buffer = std::shared_ptr<char>(
new char[bytesCount],
std::default_delete<char[]>());
}
return buffer;
return MakeError(S_OK);
}

////////////////////////////////////////////////////////////////////////////////

TBlocksInfo::TBlocksInfo(ui64 from, ui64 length, ui32 blockSize)
: BlockSize(blockSize)
{
ui64 startIndex = from / blockSize;
ui64 beginOffset = from - startIndex * blockSize;
Expand All @@ -108,6 +107,25 @@ TBlocksInfo::TBlocksInfo(ui64 from, ui64 length, ui32 blockSize)
EndOffset = endOffset;
}

size_t TBlocksInfo::BufferSize() const
{
return Range.Size() * BlockSize - BeginOffset - EndOffset;
}

bool TBlocksInfo::IsAligned() const
{
return SgListAligned && BeginOffset == 0 && EndOffset == 0;
}

TBlocksInfo TBlocksInfo::MakeAligned() const
{
TBlocksInfo result(*this);
result.BeginOffset = 0;
result.EndOffset = 0;
result.SgListAligned = true;
return result;
}

////////////////////////////////////////////////////////////////////////////////

TAlignedDeviceHandler::TAlignedDeviceHandler(
Expand All @@ -131,16 +149,15 @@ TFuture<NProto::TReadBlocksLocalResponse> TAlignedDeviceHandler::Read(
const TString& checkpointId)
{
auto blocksInfo = TBlocksInfo(from, length, BlockSize);

auto aligned = TryToNormalize(sgList, blocksInfo, length, BlockSize);
if (HasError(aligned)) {
auto normalizeError = TryToNormalize(sgList, blocksInfo);
if (HasError(normalizeError)) {
return MakeFuture<NProto::TReadBlocksLocalResponse>(
TErrorResponse(aligned.GetError()));
TErrorResponse(normalizeError));
}

if (!aligned.GetResult()) {
if (!blocksInfo.IsAligned()) {
return MakeFuture<NProto::TReadBlocksLocalResponse>(
TErrorResponse(E_ARGUMENT, "Request is not aligned"));
CreateRequestNotAlignedResponse());
}

return ExecuteReadRequest(
Expand All @@ -158,15 +175,15 @@ TFuture<NProto::TWriteBlocksLocalResponse> TAlignedDeviceHandler::Write(
{
auto blocksInfo = TBlocksInfo(from, length, BlockSize);

auto aligned = TryToNormalize(sgList, blocksInfo, length, BlockSize);
if (HasError(aligned)) {
auto normalizeError = TryToNormalize(sgList, blocksInfo);
if (HasError(normalizeError)) {
return MakeFuture<NProto::TWriteBlocksLocalResponse>(
TErrorResponse(aligned.GetError()));
TErrorResponse(normalizeError));
}

if (!aligned.GetResult()) {
if (!blocksInfo.IsAligned()) {
return MakeFuture<NProto::TWriteBlocksLocalResponse>(
TErrorResponse(E_ARGUMENT, "Request is not aligned"));
CreateRequestNotAlignedResponse());
}

return ExecuteWriteRequest(std::move(ctx), blocksInfo, std::move(sgList));
Expand All @@ -181,20 +198,23 @@ TAlignedDeviceHandler::Zero(TCallContextPtr ctx, ui64 from, ui64 length)
}

auto blocksInfo = TBlocksInfo(from, length, BlockSize);
if (blocksInfo.BeginOffset != 0 || blocksInfo.EndOffset != 0) {
if (!blocksInfo.IsAligned()) {
return MakeFuture<NProto::TZeroBlocksResponse>(
TErrorResponse(E_ARGUMENT, "Request is not aligned"));
CreateRequestNotAlignedResponse());
}

return ExecuteZeroRequest(
std::move(ctx),
blocksInfo.Range.Start,
blocksInfo.Range.Size());
return ExecuteZeroRequest(std::move(ctx), blocksInfo);
}

TStorageBuffer TAlignedDeviceHandler::AllocateBuffer(size_t bytesCount)
{
return AllocateStorageBuffer(*Storage, bytesCount);
auto buffer = Storage->AllocateBuffer(bytesCount);
if (!buffer) {
buffer = std::shared_ptr<char>(
new char[bytesCount],
std::default_delete<char[]>());
}
return buffer;
}

TFuture<NProto::TReadBlocksLocalResponse>
Expand All @@ -204,6 +224,8 @@ TAlignedDeviceHandler::ExecuteReadRequest(
TGuardedSgList sgList,
TString checkpointId) const
{
Y_DEBUG_ABORT_UNLESS(blocksInfo.IsAligned());

auto requestBlockCount =
std::min<ui32>(blocksInfo.Range.Size(), MaxBlockCount);

Expand All @@ -226,9 +248,8 @@ TAlignedDeviceHandler::ExecuteReadRequest(
// sub-request and leave the rest in original sgList.
request->Sglist = TakeHeadBlocks(sgList, requestBlockCount);
if (request->Sglist.Empty()) {
return MakeFuture<NProto::TReadBlocksResponse>(TErrorResponse(
E_CANCELLED,
"failed to acquire sglist in DeviceHandler"));
return MakeFuture<NProto::TReadBlocksResponse>(
CreateErrorAcquireResponse());
}

auto result = Storage->ReadBlocksLocal(ctx, std::move(request));
Expand Down Expand Up @@ -268,6 +289,8 @@ TAlignedDeviceHandler::ExecuteWriteRequest(
TBlocksInfo blocksInfo,
TGuardedSgList sgList) const
{
Y_DEBUG_ABORT_UNLESS(blocksInfo.IsAligned());

auto requestBlockCount =
std::min<ui32>(blocksInfo.Range.Size(), MaxBlockCount);

Expand All @@ -289,9 +312,8 @@ TAlignedDeviceHandler::ExecuteWriteRequest(
// sub-request and leave the rest in original sgList.
request->Sglist = TakeHeadBlocks(sgList, requestBlockCount);
if (request->Sglist.Empty()) {
return MakeFuture<NProto::TWriteBlocksResponse>(TErrorResponse(
E_CANCELLED,
"failed to acquire sglist in DeviceHandler"));
return MakeFuture<NProto::TWriteBlocksResponse>(
CreateErrorAcquireResponse());
}

auto result = Storage->WriteBlocksLocal(ctx, std::move(request));
Expand Down Expand Up @@ -325,31 +347,32 @@ TAlignedDeviceHandler::ExecuteWriteRequest(

TFuture<NProto::TZeroBlocksResponse> TAlignedDeviceHandler::ExecuteZeroRequest(
TCallContextPtr ctx,
ui64 startIndex,
ui32 blockCount) const
TBlocksInfo blocksInfo) const
{
auto requestBlockCount = std::min(blockCount, MaxBlockCount);
Y_DEBUG_ABORT_UNLESS(blocksInfo.IsAligned());

auto requestBlockCount = std::min<ui32>(blocksInfo.Range.Size(), MaxBlockCount);

auto request = std::make_shared<NProto::TZeroBlocksRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetStartIndex(startIndex);
request->SetStartIndex(blocksInfo.Range.Start);
request->SetBlocksCount(requestBlockCount);

if (requestBlockCount == blockCount) {
if (requestBlockCount == blocksInfo.Range.Size()) {
// The request size is quite small. We do all work at once.
return Storage->ZeroBlocks(std::move(ctx), std::move(request));
}

auto result = Storage->ZeroBlocks(ctx, std::move(request));

blocksInfo.Range.Start += requestBlockCount;

return result.Apply(
[ctx = std::move(ctx),
weakPtr = weak_from_this(),
startIndex = startIndex + requestBlockCount,
blocksCount =
blockCount - requestBlockCount](const auto& future) mutable
blocksInfo = blocksInfo](const auto& future) mutable
{
// Only part of the request was completed. Continue doing the
// rest of the work
Expand All @@ -360,10 +383,7 @@ TFuture<NProto::TZeroBlocksResponse> TAlignedDeviceHandler::ExecuteZeroRequest(
}

if (auto self = weakPtr.lock()) {
return self->ExecuteZeroRequest(
std::move(ctx),
startIndex,
blocksCount);
return self->ExecuteZeroRequest(std::move(ctx), blocksInfo);
}
return MakeFuture<NProto::TZeroBlocksResponse>(
TErrorResponse(E_CANCELLED));
Expand Down
47 changes: 37 additions & 10 deletions cloud/blockstore/libs/service/aligned_device_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,35 @@ struct TBlocksInfo
TBlocksInfo(ui64 from, ui64 length, ui32 blockSize);
TBlocksInfo(const TBlocksInfo&) = default;

[[nodiscard]] size_t BufferSize() const;

// The data may be misaligned for two reasons: if the start or end of the
// block do not correspond to the block boundaries, or if the client buffers
// are not a multiple of the block size.
[[nodiscard]] bool IsAligned() const;

// Creates an aligned TBlocksInfo.
[[nodiscard]] TBlocksInfo MakeAligned() const;

TBlockRange64 Range;
// Offset relative to the beginning of the range.
ui64 BeginOffset = 0;
// Offset relative to the ending of the range.
ui64 EndOffset = 0;
const ui32 BlockSize = 0;
// The request also unaligned if the sglist buffer sizes are not multiples
// of the block size
bool SgListAligned = true;
};

////////////////////////////////////////////////////////////////////////////////

class TAlignedDeviceHandler
// The TAlignedDeviceHandler can only process requests that are aligned. If the
// size of a request exceeds the maximum size that the underlying layer can
// handle, the TAlignedDeviceHandler will break the request into smaller parts
// and execute them separately. If a request contains unaligned data, the
// E_ARGUMENT error is returned.
class TAlignedDeviceHandler final
: public IDeviceHandler
, public std::enable_shared_from_this<TAlignedDeviceHandler>
{
Expand All @@ -38,6 +59,7 @@ class TAlignedDeviceHandler
ui32 blockSize,
ui32 maxBlockCount);

// implements IDeviceHandler
NThreading::TFuture<NProto::TReadBlocksLocalResponse> Read(
TCallContextPtr ctx,
ui64 from,
Expand All @@ -56,32 +78,37 @@ class TAlignedDeviceHandler

TStorageBuffer AllocateBuffer(size_t bytesCount) override;

private:
// Performs a read. It can only be called for aligned data.
NThreading::TFuture<NProto::TReadBlocksResponse> ExecuteReadRequest(
TCallContextPtr ctx,
TBlocksInfo blocksInfo,
TGuardedSgList sgList,
TString checkpointId) const;

// Performs a write. It can only be called for aligned data.
NThreading::TFuture<NProto::TWriteBlocksResponse> ExecuteWriteRequest(
TCallContextPtr ctx,
TBlocksInfo blocksInfo,
TGuardedSgList sgList) const;

// Performs a zeroes. It can only be called for aligned data.
NThreading::TFuture<NProto::TZeroBlocksResponse> ExecuteZeroRequest(
TCallContextPtr ctx,
ui64 startIndex,
ui32 blockCount) const;
TBlocksInfo blocksInfo) const;
};

////////////////////////////////////////////////////////////////////////////////

TStorageBuffer AllocateStorageBuffer(IStorage& storage, size_t bytesCount);

TResultOrError<bool> TryToNormalize(
// Normalizes the SgList in guardedSgList. If the total size of the buffers does
// not match the request size, an error is returned. If it is not possible to
// normalize the number of buffers so that they correspond to the number of
// requested blocks and the size of each buffer is equal to the specified block
// size, the SgListAligned flag is set in the blocksInfo structure, but no error
// is returned. This indicates that the request is valid, but not aligned.
NProto::TError TryToNormalize(
TGuardedSgList& guardedSgList,
const TBlocksInfo& blocksInfo,
ui64 length,
ui32 blockSize);
TBlocksInfo& blocksInfo);

////////////////////////////////////////////////////////////////////////////////

} // namespace NCloud::NBlockStore
Loading

0 comments on commit 9ed8d4f

Please sign in to comment.