Skip to content

Commit

Permalink
issue-652: do not acquire barriers from the past upon FlushBytes (#…
Browse files Browse the repository at this point in the history
…1919)

issue-652: add test reproducing the error
  • Loading branch information
debnatkh committed Sep 18, 2024
1 parent dffe3f9 commit a35f345
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 20 deletions.
30 changes: 15 additions & 15 deletions cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TFlushBytesActor final
const TString FileSystemId;
const TActorId Tablet;
const TRequestInfoPtr RequestInfo;
const ui64 CollectCommitId;
const ui64 CommitId;
const ui32 BlockSize;
const ui64 ChunkId;
const IProfileLogPtr ProfileLog;
Expand All @@ -133,7 +133,7 @@ class TFlushBytesActor final
TString fileSystemId,
TActorId tablet,
TRequestInfoPtr requestInfo,
ui64 collectCommitId,
ui64 commitId,
ui32 blockSize,
ui64 chunkId,
IProfileLogPtr profileLog,
Expand Down Expand Up @@ -180,7 +180,7 @@ TFlushBytesActor::TFlushBytesActor(
TString fileSystemId,
TActorId tablet,
TRequestInfoPtr requestInfo,
ui64 collectCommitId,
ui64 commitId,
ui32 blockSize,
ui64 chunkId,
IProfileLogPtr profileLog,
Expand All @@ -194,7 +194,7 @@ TFlushBytesActor::TFlushBytesActor(
, FileSystemId(std::move(fileSystemId))
, Tablet(tablet)
, RequestInfo(std::move(requestInfo))
, CollectCommitId(collectCommitId)
, CommitId(commitId)
, BlockSize(blockSize)
, ChunkId(chunkId)
, ProfileLog(std::move(profileLog))
Expand Down Expand Up @@ -441,7 +441,7 @@ void TFlushBytesActor::ReplyAndDie(
ctx.Now() - RequestInfo->StartedTs,
RequestInfo->CallContext,
std::move(MixedBlocksRanges),
CollectCommitId,
CommitId,
ChunkId);

NCloud::Send(ctx, Tablet, std::move(response));
Expand Down Expand Up @@ -672,11 +672,7 @@ void TIndexTabletActor::CompleteTx_FlushBytes(
}
};

args.CollectCommitId = Max<ui64>();

for (const auto& bytes: args.Bytes) {
args.CollectCommitId = Min(args.CollectCommitId, bytes.MinCommitId);
}

THashMap<TBlockLocation, TBlockWithBytes, TBlockLocationHash> blockMap;

Expand Down Expand Up @@ -808,15 +804,15 @@ void TIndexTabletActor::CompleteTx_FlushBytes(
auto dstBlobs = builder.Finish();
TABLET_VERIFY(dstBlobs);

auto commitId = GenerateCommitId();
if (commitId == InvalidCommitId) {
args.CommitId = GenerateCommitId();
if (args.CommitId == InvalidCommitId) {
return RebootTabletOnCommitOverflow(ctx, "FlushBytes");
}

ui32 blobIndex = 0;
for (auto& blob: dstBlobs) {
const auto ok = GenerateBlobId(
commitId,
args.CommitId,
blob.Blocks.size() * GetBlockSize(),
blobIndex++,
&blob.BlobId);
Expand All @@ -838,14 +834,18 @@ void TIndexTabletActor::CompleteTx_FlushBytes(
}
}

AcquireCollectBarrier(args.CollectCommitId);
AcquireCollectBarrier(args.CommitId);
// TODO(#1923): it may be problematic to acquire the barrier only upon
// completion of the transaction, because blobs, that have been read at the
// prepare stage, may be tampered with by the time of the transaction
// completion

auto actor = std::make_unique<TFlushBytesActor>(
LogTag,
GetFileSystemId(),
ctx.SelfID,
args.RequestInfo,
args.CollectCommitId,
args.CommitId,
GetBlockSize(),
args.ChunkId,
ProfileLog,
Expand Down Expand Up @@ -874,7 +874,7 @@ void TIndexTabletActor::HandleFlushBytesCompleted(
FormatError(msg->GetError()).c_str());

ReleaseMixedBlocks(msg->MixedBlocksRanges);
TABLET_VERIFY(TryReleaseCollectBarrier(msg->CollectCommitId));
TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId));
WorkerActors.erase(ev->Sender);

Metrics.FlushBytes.Update(1, msg->Size, msg->Time);
Expand Down
6 changes: 3 additions & 3 deletions cloud/filestore/libs/storage/tablet/tablet_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ struct TEvIndexTabletPrivate
{
const TCallContextPtr CallContext;
const TSet<ui32> MixedBlocksRanges;
const ui64 CollectCommitId;
const ui64 CommitId;
const ui64 ChunkId;

TFlushBytesCompleted(
Expand All @@ -419,12 +419,12 @@ struct TEvIndexTabletPrivate
TDuration d,
TCallContextPtr callContext,
TSet<ui32> mixedBlocksRanges,
ui64 collectCommitId,
ui64 commitId,
ui64 chunkId)
: TDataOperationCompleted(requestCount, requestBytes, d)
, CallContext(std::move(callContext))
, MixedBlocksRanges(std::move(mixedBlocksRanges))
, CollectCommitId(collectCommitId)
, CommitId(commitId)
, ChunkId(chunkId)
{
}
Expand Down
4 changes: 2 additions & 2 deletions cloud/filestore/libs/storage/tablet/tablet_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ struct TTxIndexTablet
const ui64 ChunkId;
const TVector<TBytes> Bytes;

ui64 CollectCommitId = InvalidCommitId;
ui64 CommitId = InvalidCommitId;

// NOTE: should persist state across tx restarts
TSet<ui32> MixedBlocksRanges;
Expand All @@ -1593,7 +1593,7 @@ struct TTxIndexTablet
{
TProfileAware::Clear();

CollectCommitId = InvalidCommitId;
CommitId = InvalidCommitId;
}
};

Expand Down
123 changes: 123 additions & 0 deletions cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5791,6 +5791,129 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Data)
}
}

TTestActorRuntimeBase::TEventFilter MakeCollectGarbageFilter(
ui64 keepCount,
ui64 doNotKeepCount)
{
return [keepCount, doNotKeepCount](auto& runtime, auto& event)
{
Y_UNUSED(runtime);
switch (event->GetTypeRewrite()) {
case TEvBlobStorage::EvCollectGarbage: {
auto* msg =
event
->template Get<TEvBlobStorage::TEvCollectGarbage>();
if (msg->Channel >= TIndexTabletSchema::DataChannel) {
UNIT_ASSERT_VALUES_EQUAL(keepCount, msg->Keep->size());
UNIT_ASSERT_VALUES_EQUAL(
doNotKeepCount,
msg->DoNotKeep->size());
}
break;
}
}
return false;
};
}

TABLET_TEST(FlushBytesShouldNotInterfereWithCollectGarbage)
{
const auto block = tabletConfig.BlockSize;
NProto::TStorageConfig storageConfig;

storageConfig.SetCompactionThreshold(999'999);
storageConfig.SetCleanupThreshold(999'999);
storageConfig.SetCollectGarbageThreshold(1_GB);
storageConfig.SetMinChannelCount(1);

// ensure that all blobs use the same channel
TTestEnv env(
{.ChannelCount = TIndexTabletSchema::DataChannel + 1},
std::move(storageConfig));

env.CreateSubDomain("nfs");

ui32 nodeIdx = env.CreateNode("nfs");
ui64 tabletId = env.BootIndexTablet(nodeIdx);

TIndexTabletClient tablet(
env.GetRuntime(),
nodeIdx,
tabletId,
tabletConfig);
tablet.InitSession("client", "session");
auto id = CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test"));

auto handle = CreateHandle(tablet, id);

const int size = BlockGroupSize * block;
tablet.WriteData(handle, 0 * size, size, 'a');
tablet.WriteData(handle, 1 * size, size, 'b');
tablet.WriteData(handle, 2 * size, size, 'c');
tablet.WriteData(handle, 2 * size, 1_KB, 'f');
tablet.WriteData(handle, 0, size, 'd');

// File contents:
// [dddddd][bbbbbb][fcccccc]

auto& runtime = env.GetRuntime();

runtime.SetEventFilter(MakeCollectGarbageFilter(4, 0));
tablet.CollectGarbage();
// left to right: growth of commitId
// new new new new |
// [ a ][ b ][ c ][fresh][ d ] |
// |
// lastCollectCommitId

tablet.Cleanup(GetMixedRangeIndex(id, 0));
// garbage |
// [ a ][ b ][ c ][fresh][ d ] |
// |
// lastCollectCommitId
// runtime.SetEventFilter(MakeCollectGarbageFilter(0, 1));

// during the FlushBytes (if the barrier would have been acquired for
// the fresh bytes commit Id)
//
// garbage | |
// [ a ][ b ][ c ][fresh][ d ] |
// | |
// barrier lastCollectCommitId
//
// in order to ensure the acquired barrier is held for long enough time,
// we postpone the ReadBlob request, that is supposed to be sent during
// the FlushBytes

TAutoPtr<IEventHandle> readBlob;
runtime.SetEventFilter(
[&readBlob](auto& runtime, auto& event)
{
Y_UNUSED(runtime);
switch (event->GetTypeRewrite()) {
case TEvIndexTabletPrivate::EvReadBlobRequest: {
readBlob = event.Release();
return true;
}
}
return false;
});
tablet.SendFlushBytesRequest();
runtime.DispatchEvents(TDispatchOptions{
.CustomFinalCondition = [&]() -> bool
{
return readBlob.Get();
}});

runtime.SetEventFilter(MakeCollectGarbageFilter(0, 1));

tablet.CollectGarbage();
env.GetRuntime().Send(readBlob.Release(), nodeIdx);

auto result = tablet.RecvFlushBytesResponse();
UNIT_ASSERT_VALUES_EQUAL(S_OK, result->GetStatus());
}

TABLET_TEST(ShouldNotCollectGarbageWithPreviousGeneration)
{
const auto block = tabletConfig.BlockSize;
Expand Down

0 comments on commit a35f345

Please sign in to comment.