Skip to content

Commit

Permalink
resync range if scrubbing found a mismatch (#1591) (#1608)
Browse files Browse the repository at this point in the history
* resync range if scrubbing found a mismatch

* fix

* fix issues

* fix issues
  • Loading branch information
WilyTiger authored Jul 15, 2024
1 parent 80f8203 commit 06ba994
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 3 deletions.
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1004,4 +1004,7 @@ message TStorageServiceConfig
// CMS actions such as "REMOVE_HOST" and "REMOVE_DEVICE" will attempt to
// remove devices and the host from DR memory (including persistent storage).
optional bool CleanupDRConfigOnCMSActions = 375;

// Resync range, if scrubbing found a mismatch.
optional bool ResyncRangeAfterScrubbing = 376;
}
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ TDuration MSeconds(ui32 value)
xxx(WaitDependentDisksRetryRequestDelay, TDuration, Seconds(1) )\
\
xxx(DataScrubbingEnabled, bool, false )\
xxx(ResyncRangeAfterScrubbing, bool, false )\
xxx(ScrubbingInterval, TDuration, MSeconds(50) )\
xxx(ScrubbingChecksumMismatchTimeout, TDuration, Seconds(300) )\
xxx(ScrubbingBandwidth, ui64, 20 )\
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ class TStorageConfig
TDuration GetServiceSelfPingInterval() const;

bool GetDataScrubbingEnabled() const;
bool GetResyncRangeAfterScrubbing() const;
TDuration GetScrubbingInterval() const;
TDuration GetScrubbingChecksumMismatchTimeout() const;
ui64 GetScrubbingBandwidth() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "part_mirror_resync_util.h"
#include "part_nonrepl.h"
#include "part_nonrepl_migration.h"
#include "resync_range.h"

#include <cloud/blockstore/libs/diagnostics/critical_events.h>

Expand Down Expand Up @@ -204,11 +205,56 @@ void TMirrorPartitionActor::CompareChecksums(const TActorContext& ctx)
checksums[i]);
}
ReportMirroredDiskChecksumMismatch();

if (Config->GetResyncRangeAfterScrubbing()) {
StartResyncRange(ctx);
return;
}
}

StartScrubbingRange(ctx, ScrubbingRangeId + 1);
}

void TMirrorPartitionActor::StartResyncRange(
const NActors::TActorContext& ctx)
{
LOG_WARN(
ctx,
TBlockStoreComponents::PARTITION,
"[%s] Resyncing range %s",
DiskId.c_str(),
DescribeRange(GetScrubbingRange()).c_str());
ResyncRangeStarted = true;

auto requestInfo = CreateRequestInfo(
SelfId(),
0, // cookie
MakeIntrusive<TCallContext>()
);

TVector<TReplicaDescriptor> replicas;
const auto& replicaInfos = State.GetReplicaInfos();
const auto& replicaActors = State.GetReplicaActors();
for (ui32 i = 0; i < replicaInfos.size(); i++) {
if (replicaInfos[i].Config->DevicesReadyForReading(GetScrubbingRange()))
{
replicas.emplace_back(
replicaInfos[i].Config->GetName(),
i,
replicaActors[i]);
}
}

NCloud::Register<TResyncRangeActor>(
ctx,
std::move(requestInfo),
State.GetBlockSize(),
GetScrubbingRange(),
std::move(replicas),
State.GetRWClientId(),
BlockDigestGenerator);
}

void TMirrorPartitionActor::ReplyAndDie(const TActorContext& ctx)
{
NCloud::Reply(ctx, *Poisoner, std::make_unique<TEvents::TEvPoisonTaken>());
Expand Down Expand Up @@ -408,6 +454,23 @@ void TMirrorPartitionActor::HandleChecksumResponse(
CompareChecksums(ctx);
}

void TMirrorPartitionActor::HandleRangeResynced(
const TEvNonreplPartitionPrivate::TEvRangeResynced::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();
const auto range = msg->Range;

LOG_WARN(ctx, TBlockStoreComponents::PARTITION,
"[%s] Range %s resync finished: %s",
DiskId.c_str(),
DescribeRange(range).c_str(),
FormatError(msg->GetError()).c_str());

ResyncRangeStarted = false;
StartScrubbingRange(ctx, ScrubbingRangeId + 1);
}

////////////////////////////////////////////////////////////////////////////////

void TMirrorPartitionActor::HandleRWClientIdChanged(
Expand Down Expand Up @@ -466,6 +529,10 @@ STFUNC(TMirrorPartitionActor::StateWork)
TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse,
HandleChecksumResponse);

HFunc(
TEvNonreplPartitionPrivate::TEvRangeResynced,
HandleRangeResynced);

HFunc(TEvService::TEvReadBlocksRequest, HandleReadBlocks);
HFunc(TEvService::TEvWriteBlocksRequest, HandleWriteBlocks);
HFunc(TEvService::TEvZeroBlocksRequest, HandleZeroBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class TMirrorPartitionActor final
bool WriteIntersectsWithScrubbing = false;
ui64 ScrubbingThroughput = 0;
TInstant ScrubbingRangeStarted;
bool ResyncRangeStarted = false;

public:
TMirrorPartitionActor(
Expand Down Expand Up @@ -106,6 +107,7 @@ class TMirrorPartitionActor final
void StartScrubbingRange(
const NActors::TActorContext& ctx,
ui64 scrubbingRangeId);
void StartResyncRange(const NActors::TActorContext& ctx);

private:
STFUNC(StateWork);
Expand Down Expand Up @@ -139,6 +141,10 @@ class TMirrorPartitionActor final
const TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleRangeResynced(
const TEvNonreplPartitionPrivate::TEvRangeResynced::TPtr& ev,
const NActors::TActorContext& ctx);

void HandlePoisonPill(
const NActors::TEvents::TEvPoisonPill::TPtr& ev,
const NActors::TActorContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ void TMirrorPartitionActor::MirrorRequest(
*ev->Get(),
State.GetBlockSize());
const auto requestIdentityKey = ev->Cookie;
RequestsInProgress.AddWriteRequest(requestIdentityKey, range);

if (GetScrubbingRange().Overlaps(range)) {
if (ResyncRangeStarted) {
auto response = std::make_unique<typename TMethod::TResponse>(
MakeError(
E_REJECTED,
TStringBuilder()
<< "Request " << TMethod::Name
<< " intersects with currently resyncing range"));
NCloud::Reply(ctx, *ev, std::move(response));
return;
}
WriteIntersectsWithScrubbing = true;
}
RequestsInProgress.AddWriteRequest(requestIdentityKey, range);

NCloud::Register<TMirrorRequestActor<TMethod>>(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ void TMirrorPartitionActor::ReadBlocks(
record.GetStartIndex(),
record.GetBlocksCount());

if (ResyncRangeStarted && GetScrubbingRange().Overlaps(blockRange)) {
auto response = std::make_unique<typename TMethod::TResponse>(
MakeError(
E_REJECTED,
TStringBuilder()
<< "Request " << TMethod::Name
<< " intersects with currently resyncing range"));
NCloud::Reply(ctx, *ev, std::move(response));
return;
}

TActorId replicaActorId;
const auto error = State.NextReadReplica(blockRange, &replicaActorId);
if (HasError(error)) {
Expand Down
124 changes: 123 additions & 1 deletion cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ struct TTestEnv
storageConfig.SetDataScrubbingEnabled(true);
// set bandwidth to reach maximum bandwidth for scrubbing - 50 MiB/s
storageConfig.SetScrubbingBandwidth(20000000);
storageConfig.SetResyncRangeAfterScrubbing(true);

Config = std::make_shared<TStorageConfig>(
std::move(storageConfig),
Expand Down Expand Up @@ -1249,7 +1250,21 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
runtime.AdvanceCurrentTime(UpdateCountersInterval);
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}
UNIT_ASSERT_VALUES_EQUAL(5, mirroredDiskChecksumMismatch->Val());
UNIT_ASSERT_VALUES_EQUAL(3, mirroredDiskChecksumMismatch->Val());

// check that all ranges was resynced and there is no more mismatches
iterations = 0;
while (fullCyclesCount < 4 && iterations++ < 100) {
if (prevScrubbingProgress != 0 &&
counters.Simple.ScrubbingProgress.Value == 0)
{
++fullCyclesCount;
}
prevScrubbingProgress = counters.Simple.ScrubbingProgress.Value;
runtime.AdvanceCurrentTime(UpdateCountersInterval);
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}
UNIT_ASSERT_VALUES_EQUAL(3, mirroredDiskChecksumMismatch->Val());
}

Y_UNIT_TEST(ShouldPostponeScrubbingIfIntersectingWritePending)
Expand Down Expand Up @@ -1516,6 +1531,113 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)

UNIT_ASSERT_VALUES_EQUAL(0, mirroredDiskChecksumMismatch->Val());
}

Y_UNIT_TEST(ShouldRejectRequestsIfRangeResyncingAfterChecksumMismatch)
{
using namespace NMonitoring;

TTestBasicRuntime runtime;

runtime.SetRegistrationObserverFunc(
[] (auto& runtime, const auto& parentId, const auto& actorId)
{
Y_UNUSED(parentId);
runtime.EnableScheduleForActor(actorId);
});

TAutoPtr<IEventHandle> delayedRangeResynced;
runtime.SetEventFilter([&] (auto& runtime, auto& event) {
Y_UNUSED(runtime);
if (event->GetTypeRewrite() ==
TEvNonreplPartitionPrivate::EvRangeResynced)
{
delayedRangeResynced = event.Release();
return true;
}
return false;
});

ui32 rangeCount = 0;
runtime.SetScheduledEventFilter(
[&] (auto& runtime, auto& event, auto&& delay, auto&& deadline)
{
Y_UNUSED(runtime);
Y_UNUSED(delay);
Y_UNUSED(deadline);
if (event->GetTypeRewrite() ==
TEvNonreplPartitionPrivate::EvScrubbingNextRange)
{
++rangeCount;
}

return false;
});

TDynamicCountersPtr critEventsCounters = new TDynamicCounters();
InitCriticalEventsCounter(critEventsCounters);

TTestEnv env(runtime);

const auto range1 = TBlockRange64::WithLength(3100, 100);
const auto range2 = TBlockRange64::WithLength(3150, 2);
env.WriteMirror(range1, 'A');
env.WriteReplica(2, range1, 'B');

// wait for scrubbing 3rd range
ui32 iterations = 0;
while (rangeCount < 3 && iterations++ < 100) {
runtime.DispatchEvents({}, env.ScrubbingInterval);
}

// wait for resync 3rd range
iterations = 0;
while (!delayedRangeResynced && iterations++ < 100) {
runtime.AdvanceCurrentTime(env.Config->GetScrubbingChecksumMismatchTimeout());
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}

// check that read/write to 3rd range will be rejected
TPartitionClient client(runtime, env.ActorId);
{
TString data(DefaultBlockSize, 'B');
client.SendWriteBlocksLocalRequest(range2, data);
auto response = client.RecvWriteBlocksLocalResponse();
UNIT_ASSERT_VALUES_EQUAL_C(
E_REJECTED,
response->GetStatus(),
response->GetErrorReason());
}

{
TVector<TString> blocks;
client.SendReadBlocksLocalRequest(
range2,
TGuardedSgList(ResizeBlocks(
blocks,
range2.Size(),
TString(DefaultBlockSize, '\0')
)));
auto response = client.RecvReadBlocksLocalResponse();
UNIT_ASSERT_VALUES_EQUAL_C(
E_REJECTED,
response->GetStatus(),
response->GetErrorReason());
}

// check that after resync requests to range complete successfully
runtime.Send(delayedRangeResynced.Release());
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
{
client.WriteBlocks(range2, 'C');
auto response = client.ReadBlocks(range2);
const auto& blocks = response->Record.GetBlocks();
for (ui32 i = 0; i < blocks.BuffersSize(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(
TString(DefaultBlockSize, 'C'),
blocks.GetBuffers(i));
}
}
}
}

} // namespace NCloud::NBlockStore::NStorage

0 comments on commit 06ba994

Please sign in to comment.