Skip to content

Commit

Permalink
set items count limit for TTrimBytes transaction (#1647)
Browse files Browse the repository at this point in the history
* set items count limit for TTrimBytes transaction

* update

* update

* update

* update

* update

* removed blank lines

* add STORAGE_VERIFY to FinishCleanup

* update

* add log tag to tfreshbytes

* more verify macroses

* add newline

---------

Co-authored-by: yegorskii <[email protected]>
  • Loading branch information
yegorskii and yegorskii committed Aug 5, 2024
1 parent 50a84ab commit d967cf1
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 52 deletions.
3 changes: 3 additions & 0 deletions cloud/filestore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ message TStorageConfig
// appropriate.
optional bool GetNodeAttrBatchEnabled = 358;

// Max number of items to delete during TrimBytes.
optional uint64 TrimBytesItemCount = 359;

// auth token for node registration via ydb discovery api.
optional string NodeRegistrationToken = 360;

Expand Down
1 change: 1 addition & 0 deletions cloud/filestore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ namespace {
xxx(MultiTabletForwardingEnabled, bool, false )\
xxx(GetNodeAttrBatchEnabled, bool, false )\
xxx(AllowFileStoreForceDestroy, bool, false )\
xxx(TrimBytesItemCount, ui64, 100'000 )\
xxx(NodeRegistrationRootCertsFile, TString, {} )\
xxx(NodeRegistrationCert, TCertificate, {} )\
xxx(NodeRegistrationToken, TString, "root@builtin")\
Expand Down
2 changes: 2 additions & 0 deletions cloud/filestore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ class TStorageConfig

bool GetAllowFileStoreForceDestroy() const;

ui64 GetTrimBytesItemCount() const;

void Dump(IOutputStream& out) const;
void DumpHtml(IOutputStream& out) const;
void DumpOverridesHtml(IOutputStream& out) const;
Expand Down
60 changes: 47 additions & 13 deletions cloud/filestore/libs/storage/tablet/model/fresh_bytes.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "fresh_bytes.h"
#include "verify.h"

#include <cloud/storage/core/libs/tablet/model/commit.h>

Expand Down Expand Up @@ -44,14 +45,14 @@ void TFreshBytes::DeleteBytes(

if (lo->second.Offset < offset) {
// cutting lo from the right side
Y_DEBUG_ABORT_UNLESS(lo->second.CommitId < commitId);
TABLET_VERIFY_DEBUG(lo->second.CommitId < commitId);
auto& loRef = c.Refs[{nodeId, offset}];
loRef = lo->second;
loRef.Buf = loRef.Buf.substr(0, offset - loRef.Offset);

if (lo->first.End <= end) {
// blockRange is not contained strictly inside lo
Y_DEBUG_ABORT_UNLESS(lo != hi);
TABLET_VERIFY_DEBUG(lo != hi);
c.Refs.erase(lo++);
}
}
Expand All @@ -62,7 +63,7 @@ void TFreshBytes::DeleteBytes(
{
// cutting hi from the left side
// hi might be equal to lo - it's not a problem
Y_DEBUG_ABORT_UNLESS(hi->second.CommitId < commitId);
TABLET_VERIFY_DEBUG(hi->second.CommitId < commitId);
const auto shift = end - hi->second.Offset;
hi->second.Buf = hi->second.Buf.substr(
shift,
Expand All @@ -88,7 +89,7 @@ void TFreshBytes::AddBytes(
if (c.FirstCommitId == InvalidCommitId) {
c.FirstCommitId = commitId;
} else {
Y_ABORT_UNLESS(commitId >= c.FirstCommitId);
TABLET_VERIFY(commitId >= c.FirstCommitId);
}

TByteVector buffer(Reserve(data.size()), Allocator);
Expand Down Expand Up @@ -116,7 +117,7 @@ void TFreshBytes::AddDeletionMarker(
{
auto& c = Chunks.back();
if (c.FirstCommitId != InvalidCommitId) {
Y_ABORT_UNLESS(commitId >= c.FirstCommitId);
TABLET_VERIFY(commitId >= c.FirstCommitId);
}

c.TotalDeletedBytes += len;
Expand All @@ -132,15 +133,15 @@ void TFreshBytes::AddDeletionMarker(

void TFreshBytes::Barrier(ui64 commitId)
{
Y_ABORT_UNLESS(!Chunks.empty());
TABLET_VERIFY(!Chunks.empty());
auto& c = Chunks.back();

if (!c.Data.empty() || !c.DeletionMarkers.empty()) {
if (!c.Data.empty()) {
Y_ABORT_UNLESS(c.Data.back().Descriptor.MinCommitId <= commitId);
TABLET_VERIFY(c.Data.back().Descriptor.MinCommitId <= commitId);
}
if (!c.DeletionMarkers.empty()) {
Y_ABORT_UNLESS(c.DeletionMarkers.back().MinCommitId <= commitId);
TABLET_VERIFY(c.DeletionMarkers.back().MinCommitId <= commitId);
}
Chunks.back().ClosingCommitId = commitId;
Chunks.emplace_back(Allocator);
Expand Down Expand Up @@ -173,23 +174,56 @@ TFlushBytesCleanupInfo TFreshBytes::StartCleanup(
return {Chunks.front().Id, Chunks.front().ClosingCommitId};
}

void TFreshBytes::VisitTop(const TChunkVisitor& visitor)
void TFreshBytes::VisitTop(
ui64 itemLimit,
const TChunkVisitor& visitor)
{
ui64 cnt = 0;
for (const auto& e: Chunks.front().Data) {
if (cnt++ == itemLimit) {
return;
}
visitor(e.Descriptor, false);
}

for (const auto& descriptor: Chunks.front().DeletionMarkers) {
if (cnt++ == itemLimit) {
return;
}
visitor(descriptor, true);
}
}

void TFreshBytes::FinishCleanup(ui64 chunkId)
bool TFreshBytes::FinishCleanup(
ui64 chunkId,
ui64 dataItemCount,
ui64 deletionMarkerCount)
{
Y_ABORT_UNLESS(Chunks.size() > 1);
Y_ABORT_UNLESS(Chunks.front().Id == chunkId);
TABLET_VERIFY(Chunks.size() > 1);
TABLET_VERIFY(Chunks.front().Id == chunkId);

auto& chunk = Chunks.front();

const auto dataSize = chunk.Data.size();
const auto deletionSize = chunk.DeletionMarkers.size();
if (dataItemCount == dataSize && deletionMarkerCount == deletionSize) {
Chunks.pop_front();
return true;
}

const auto check =
dataItemCount <= dataSize && deletionMarkerCount <= deletionSize;
TABLET_VERIFY(check);

chunk.Data.erase(
chunk.Data.begin(),
std::next(chunk.Data.begin(), dataItemCount));

chunk.DeletionMarkers.erase(
chunk.DeletionMarkers.begin(),
std::next(chunk.DeletionMarkers.begin(), deletionMarkerCount));

Chunks.pop_front();
return false;
}

void TFreshBytes::FindBytes(
Expand Down
13 changes: 11 additions & 2 deletions cloud/filestore/libs/storage/tablet/model/fresh_bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TFreshBytes
IAllocator* Allocator;
TDeque<TChunk, TStlAllocator> Chunks;
ui64 LastChunkId = 0;
TString LogTag;

public:
TFreshBytes(IAllocator* allocator);
Expand All @@ -94,6 +95,11 @@ class TFreshBytes
return std::make_pair(bytes, deletedBytes);
}

void UpdateLogTag(TString logTag)
{
LogTag = std::move(logTag);
}

void AddBytes(ui64 nodeId, ui64 offset, TStringBuf data, ui64 commitId);
void AddDeletionMarker(ui64 nodeId, ui64 offset, ui64 len, ui64 commitId);

Expand All @@ -103,8 +109,11 @@ class TFreshBytes
ui64 commitId,
TVector<TBytes>* entries,
TVector<TBytes>* deletionMarkers);
void VisitTop(const TChunkVisitor& visitor);
void FinishCleanup(ui64 chunkId);
void VisitTop(ui64 itemLimit, const TChunkVisitor& visitor);
bool FinishCleanup(
ui64 chunkId,
ui64 dataItemCount,
ui64 deletionMarkerCount);

void FindBytes(
IFreshBytesVisitor& visitor,
Expand Down
152 changes: 144 additions & 8 deletions cloud/filestore/libs/storage/tablet/model/fresh_bytes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,27 @@ Y_UNIT_TEST_SUITE(TFreshBytesTest)
auto visitTop = [&] () {
visitedBytes.clear();
visitedDeletionMarkers.clear();
freshBytes.VisitTop([&] (const TBytes& bytes, bool isDel) {
if (isDel) {
visitedDeletionMarkers.push_back(bytes);
} else {
visitedBytes.push_back(bytes);
constexpr ui64 itemLimit = 100;
freshBytes.VisitTop(
itemLimit,
[&] (const TBytes& bytes, bool isDel) {
if (isDel) {
visitedDeletionMarkers.push_back(bytes);
} else {
visitedBytes.push_back(bytes);
}
}
});
);
};

visitTop();
COMPARE_BYTES(bytes, visitedBytes);
COMPARE_BYTES(deletionMarkers, visitedDeletionMarkers);

freshBytes.FinishCleanup(info.ChunkId);
UNIT_ASSERT(freshBytes.FinishCleanup(
info.ChunkId,
visitedBytes.size(),
visitedDeletionMarkers.size()));

{
TFreshBytesVisitor visitor;
Expand Down Expand Up @@ -172,7 +179,10 @@ Y_UNIT_TEST_SUITE(TFreshBytesTest)
COMPARE_BYTES(bytes, visitedBytes);
COMPARE_BYTES(deletionMarkers, visitedDeletionMarkers);

freshBytes.FinishCleanup(info.ChunkId);
UNIT_ASSERT(freshBytes.FinishCleanup(
info.ChunkId,
visitedBytes.size(),
visitedDeletionMarkers.size()));
}

Y_UNIT_TEST(ShouldInsertIntervalInTheMiddleOfAnotherInterval)
Expand Down Expand Up @@ -226,6 +236,132 @@ Y_UNIT_TEST_SUITE(TFreshBytesTest)
}
}

Y_UNIT_TEST(ShouldObeyLimitProvidedForVisitTop)
{
TFreshBytes freshBytes(TDefaultAllocator::Instance());

freshBytes.AddBytes(1, 100, "aAa", 10);
freshBytes.AddBytes(1, 101, "bBbB", 11);
freshBytes.AddBytes(1, 50, "cCc", 12);
freshBytes.AddBytes(1, 50, "dDd", 13);
freshBytes.AddBytes(2, 100, "eEeEe", 14);
freshBytes.AddBytes(2, 1000, "fFf", 15);
freshBytes.AddDeletionMarker(2, 100, 3, 16);

TVector<TBytes> bytes;
TVector<TBytes> deletionMarkers;
auto info = freshBytes.StartCleanup(17, &bytes, &deletionMarkers);

COMPARE_BYTES(
TVector<TBytes>({
{1, 100, 3, 10, InvalidCommitId},
{1, 101, 4, 11, InvalidCommitId},
{1, 50, 3, 12, InvalidCommitId},
{1, 50, 3, 13, InvalidCommitId},
{2, 100, 5, 14, InvalidCommitId},
{2, 1000, 3, 15, InvalidCommitId},
}), bytes);
COMPARE_BYTES(
TVector<TBytes>({
{2, 100, 3, 16, InvalidCommitId},
}), deletionMarkers);
UNIT_ASSERT_VALUES_EQUAL(17, info.ClosingCommitId);

TVector<TBytes> visitedBytes;
TVector<TBytes> visitedDeletionMarkers;

constexpr ui64 itemsLimit = 2;

auto visitTop = [&] () {
visitedBytes.clear();
visitedDeletionMarkers.clear();
freshBytes.VisitTop(
itemsLimit,
[&] (const TBytes& bytes, bool isDel) {
if (isDel) {
visitedDeletionMarkers.push_back(bytes);
} else {
visitedBytes.push_back(bytes);
}
}
);
};

{
TVector<TBytes> expectedBytes {
{1, 100, 3, 10, InvalidCommitId},
{1, 101, 4, 11, InvalidCommitId},
};
TVector<TBytes> expectedDeletionMarkers;

visitTop();
COMPARE_BYTES(expectedBytes, visitedBytes);
COMPARE_BYTES(expectedDeletionMarkers, visitedDeletionMarkers);

UNIT_ASSERT(!freshBytes.FinishCleanup(
info.ChunkId,
visitedBytes.size(),
visitedDeletionMarkers.size()));
}

{
TVector<TBytes> expectedBytes {
{1, 50, 3, 12, InvalidCommitId},
{1, 50, 3, 13, InvalidCommitId},
};
TVector<TBytes> expectedDeletionMarkers;

visitTop();
COMPARE_BYTES(expectedBytes, visitedBytes);
COMPARE_BYTES(expectedDeletionMarkers, visitedDeletionMarkers);

UNIT_ASSERT(!freshBytes.FinishCleanup(
info.ChunkId,
visitedBytes.size(),
visitedDeletionMarkers.size()));
}

{
TVector<TBytes> expectedBytes {
{2, 100, 5, 14, InvalidCommitId},
{2, 1000, 3, 15, InvalidCommitId},
};
TVector<TBytes> expectedDeletionMarkers;

visitTop();
COMPARE_BYTES(expectedBytes, visitedBytes);
COMPARE_BYTES(expectedDeletionMarkers, visitedDeletionMarkers);

UNIT_ASSERT(!freshBytes.FinishCleanup(
info.ChunkId,
visitedBytes.size(),
visitedDeletionMarkers.size()));
}

{
TVector<TBytes> expectedBytes;
TVector<TBytes> expectedDeletionMarkers {
{2, 100, 3, 16, InvalidCommitId},
};

visitTop();
COMPARE_BYTES(expectedBytes, visitedBytes);
COMPARE_BYTES(expectedDeletionMarkers, visitedDeletionMarkers);

UNIT_ASSERT(freshBytes.FinishCleanup(
info.ChunkId,
visitedBytes.size(),
visitedDeletionMarkers.size()));
}

{
TFreshBytesVisitor visitor;
freshBytes.FindBytes(visitor, 1, TByteRange(0, 1000, 4_KB), 14);
COMPARE_BYTES(TVector<TBytes>(), visitor.Bytes);
UNIT_ASSERT_VALUES_EQUAL(TString(), visitor.Data);
}
}

// TODO test all branches of AddBytes

// TODO test with multiple chunks
Expand Down
12 changes: 12 additions & 0 deletions cloud/filestore/libs/storage/tablet/model/verify.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,15 @@
#define TABLET_VERIFY(expr) \
TABLET_VERIFY_C(expr, "") \
// TABLET_VERIFY

#define TABLET_VERIFY_DEBUG_C(expr, message) \
STORAGE_VERIFY_DEBUG_C( \
expr, \
TWellKnownEntityTypes::FILESYSTEM, \
LogTag, \
message) \
// TABLET_VERIFY_DEBUG_C

#define TABLET_VERIFY_DEBUG(expr) \
TABLET_VERIFY_DEBUG_C(expr, "") \
// TABLET_VERIFY_DEBUG
Loading

0 comments on commit d967cf1

Please sign in to comment.