Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
Browse files Browse the repository at this point in the history
  • Loading branch information
proller committed Nov 6, 2024
2 parents 39d6716 + 3db95bc commit 6ba589a
Show file tree
Hide file tree
Showing 165 changed files with 8,233 additions and 551 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,16 @@ void TMirrorPartitionActor::StartScrubbingRange(
void TMirrorPartitionActor::CompareChecksums(const TActorContext& ctx)
{
const auto& checksums = ChecksumRangeActorCompanion.GetChecksums();
bool equal = true;
for (size_t i = 1; i < checksums.size(); i++) {
if (checksums[i] != checksums[0]) {
equal = false;
break;
THashMap<ui64, ui32> checksumCount;
ui32 majorCount = 0;
for (size_t i = 0; i < checksums.size(); i++) {
ui64 checksum = checksums[i];
if (++checksumCount[checksum] > majorCount) {
majorCount = checksumCount[checksum];
}
}

const bool equal = (majorCount == checksums.size());
if (!equal && WriteIntersectsWithScrubbing) {
LOG_DEBUG(
ctx,
Expand Down Expand Up @@ -207,7 +209,8 @@ void TMirrorPartitionActor::CompareChecksums(const TActorContext& ctx)
++ChecksumMismatches;
ReportMirroredDiskChecksumMismatch();

if (Config->GetResyncRangeAfterScrubbing()) {
const bool hasQuorum = majorCount > checksums.size() / 2;
if (Config->GetResyncRangeAfterScrubbing() && hasQuorum) {
StartResyncRange(ctx);
return;
}
Expand Down
116 changes: 77 additions & 39 deletions cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,23 @@ struct TTestEnv
}
};


void WaitUntilScrubbingFinishesCurrentCycle(TTestEnv& testEnv)
{
auto& counters = testEnv.StorageStatsServiceState->Counters;
ui64 prevScrubbingProgress = counters.Simple.ScrubbingProgress.Value;
ui32 iterations = 0;
while (iterations++ < 100) {
testEnv.Runtime.AdvanceCurrentTime(UpdateCountersInterval);
testEnv.Runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
if (prevScrubbingProgress > counters.Simple.ScrubbingProgress.Value)
{
break;
}
prevScrubbingProgress = counters.Simple.ScrubbingProgress.Value;
}
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -963,7 +980,6 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
// TODO trigger and test migration for petya and petya#1
}


void DoShouldTransformAnyErrorToRetriable(NProto::TError error)
{
TTestBasicRuntime runtime;
Expand Down Expand Up @@ -1194,7 +1210,6 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
runtime.EnableScheduleForActor(actorId);
});


TDynamicCountersPtr critEventsCounters = new TDynamicCounters();
InitCriticalEventsCounter(critEventsCounters);

Expand All @@ -1209,21 +1224,9 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
env.WriteMirror(range2, 'A');
env.WriteReplica(2, range2, 'B');

auto& counters = env.StorageStatsServiceState->Counters;
ui64 prevScrubbingProgress = 101;
ui32 fullCyclesCount = 0;
ui32 iterations = 0;
while (fullCyclesCount < 2 && iterations++ < 100) {
if (prevScrubbingProgress != 0 &&
counters.Simple.ScrubbingProgress.Value == 0)
{
++fullCyclesCount;
}
prevScrubbingProgress = counters.Simple.ScrubbingProgress.Value;
runtime.AdvanceCurrentTime(UpdateCountersInterval);
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}
WaitUntilScrubbingFinishesCurrentCycle(env);

auto& counters = env.StorageStatsServiceState->Counters;
auto mirroredDiskChecksumMismatch = critEventsCounters->GetCounter(
"AppCriticalEvents/MirroredDiskChecksumMismatch",
true);
Expand All @@ -1235,38 +1238,21 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
env.WriteMirror(range3, 'A');
env.WriteReplica(1, range3, 'B');

iterations = 0;
// at this point, scrubbing may not start from the beginning,
// so we need to wait for 2 cycles to be sure that
// it has scanned the entire disk at least once
while (fullCyclesCount < 4 && iterations++ < 100) {
if (prevScrubbingProgress != 0 &&
counters.Simple.ScrubbingProgress.Value == 0)
{
++fullCyclesCount;
}
prevScrubbingProgress = counters.Simple.ScrubbingProgress.Value;
runtime.AdvanceCurrentTime(UpdateCountersInterval);
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}
WaitUntilScrubbingFinishesCurrentCycle(env);
WaitUntilScrubbingFinishesCurrentCycle(env);
UNIT_ASSERT_VALUES_EQUAL(3, counters.Simple.ChecksumMismatches.Value);
UNIT_ASSERT_VALUES_EQUAL(3, mirroredDiskChecksumMismatch->Val());

// check that all ranges was resynced and there is no more mismatches
iterations = 0;
// at this point, scrubbing may not start from the beginning,
// so we need to wait for 2 cycles to be sure that
// it has scanned the entire disk at least once
while (fullCyclesCount < 6 && iterations++ < 100) {
if (prevScrubbingProgress != 0 &&
counters.Simple.ScrubbingProgress.Value == 0)
{
++fullCyclesCount;
}
prevScrubbingProgress = counters.Simple.ScrubbingProgress.Value;
runtime.AdvanceCurrentTime(UpdateCountersInterval);
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}
WaitUntilScrubbingFinishesCurrentCycle(env);
WaitUntilScrubbingFinishesCurrentCycle(env);

// check that all ranges was resynced and there is no more mismatches
UNIT_ASSERT_VALUES_EQUAL(3, counters.Simple.ChecksumMismatches.Value);
UNIT_ASSERT_VALUES_EQUAL(3, mirroredDiskChecksumMismatch->Val());
}
Expand Down Expand Up @@ -1642,6 +1628,58 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
}
}
}

Y_UNIT_TEST(ShouldStartResyncAfterScrubbingOnlyIfMajorityOfChecksumsAreEqual)
{
using namespace NMonitoring;

TTestBasicRuntime runtime;

runtime.SetRegistrationObserverFunc(
[] (auto& runtime, const auto& parentId, const auto& actorId)
{
Y_UNUSED(parentId);
runtime.EnableScheduleForActor(actorId);
});

ui32 rangeResynced = 0;
runtime.SetEventFilter([&] (auto& runtime, auto& event) {
Y_UNUSED(runtime);
if (event->GetTypeRewrite() ==
TEvNonreplPartitionPrivate::EvRangeResynced)
{
++rangeResynced;
}
return false;
});

TTestEnv env(runtime);
auto& counters = env.StorageStatsServiceState->Counters;

// Write different data to all replicas
const auto range = TBlockRange64::WithLength(2049, 50);
env.WriteReplica(0, range, 'A');
env.WriteReplica(1, range, 'B');
env.WriteReplica(2, range, 'C');

// Wait util all ranges process in scrubbing at least two times.
// We need to be sure that resync wasn't started.
WaitUntilScrubbingFinishesCurrentCycle(env);
WaitUntilScrubbingFinishesCurrentCycle(env);
UNIT_ASSERT_VALUES_EQUAL(2, counters.Simple.ChecksumMismatches.Value);
UNIT_ASSERT_VALUES_EQUAL(0, rangeResynced);

// Make data in 1st and 3rd replica the same.
env.WriteReplica(2, range, 'A');

// Wait again until all ranges process in scrubbing at least two times.
// Check that mismatch was found and range was resynced now
WaitUntilScrubbingFinishesCurrentCycle(env);
WaitUntilScrubbingFinishesCurrentCycle(env);
UNIT_ASSERT_VALUES_EQUAL(3, counters.Simple.ChecksumMismatches.Value);
UNIT_ASSERT_VALUES_EQUAL(1, rangeResynced);
}

}

} // namespace NCloud::NBlockStore::NStorage
39 changes: 38 additions & 1 deletion cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,39 @@ void RejectVolumeRequest(
NCloud::Send(ctx, caller, std::move(response), callerCookie);
}

////////////////////////////////////////////////////////////////////////////////

void CopyDataBuffer(TEvService::TEvWriteBlocksLocalRequest& request)
{
auto& record = request.Record;
auto g = record.Sglist.Acquire();
if (!g) {
return;
}

const auto& sgList = g.Get();
STORAGE_VERIFY_C(
record.GetBlocks().BuffersSize() == 0,
TWellKnownEntityTypes::DISK,
record.GetDiskId(),
TStringBuilder() << "Buffers: " << record.GetBlocks().BuffersSize());
TSgList newSgList;
newSgList.reserve(sgList.size());
for (const auto& block: sgList) {
auto& buffer = *record.MutableBlocks()->AddBuffers();
buffer.ReserveAndResize(block.Size());
memcpy(buffer.begin(), block.Data(), block.Size());
newSgList.emplace_back(buffer.data(), buffer.size());
}
record.Sglist.SetSgList(std::move(newSgList));
}

template <typename T>
void CopyDataBuffer(T& t)
{
Y_UNUSED(t);
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -241,7 +274,7 @@ void TVolumeActor::SendRequestToPartition(
ctx,
wrappedRequest,
partActorId,
TabletID()))
volumeRequestId))
{
// The request was sent to the partition with tracking of used blocks.
return;
Expand Down Expand Up @@ -719,6 +752,10 @@ void TVolumeActor::ForwardRequest(
* to the underlying (storage) layer.
*/
if constexpr (IsWriteMethod<TMethod>) {
if (State->GetUseIntermediateWriteBuffer()) {
CopyDataBuffer(*msg);
}

const auto range = BuildRequestBlockRange(
*msg,
State->GetBlockSize());
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/storage/volume/volume_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ void TVolumeState::Reset()
UseFastPath = false;
UseRdmaForThisVolume = false;
AcceptInvalidDiskAllocationResponse = false;
UseIntermediateWriteBuffer = false;

if (IsDiskRegistryMediaKind()) {
if (Meta.GetDevices().size()) {
Expand Down Expand Up @@ -297,6 +298,8 @@ void TVolumeState::Reset()
TDuration::TryParse(value, MaxTimedOutDeviceStateDuration);
} else if (tag == "use-fastpath") {
UseFastPath = true;
} else if (tag == "use-intermediate-write-buffer") {
UseIntermediateWriteBuffer = true;
}
}

Expand Down
6 changes: 6 additions & 0 deletions cloud/blockstore/libs/storage/volume/volume_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class TVolumeState
bool UseRdmaForThisVolume = false;
bool RdmaUnavailable = false;
TDuration MaxTimedOutDeviceStateDuration;
bool UseIntermediateWriteBuffer = false;

bool UseMirrorResync = false;
bool ForceMirrorResync = false;
Expand Down Expand Up @@ -677,6 +678,11 @@ class TVolumeState
return MaxTimedOutDeviceStateDuration;
}

bool GetUseIntermediateWriteBuffer() const
{
return UseIntermediateWriteBuffer;
}

size_t GetUsedBlockCount() const
{
return UsedBlocks ? UsedBlocks->Count() : 0;
Expand Down
Loading

0 comments on commit 6ba589a

Please sign in to comment.