Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 23-3: resync range if scrubbing found a mismatch #1608

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1004,4 +1004,7 @@ message TStorageServiceConfig
// CMS actions such as "REMOVE_HOST" and "REMOVE_DEVICE" will attempt to
// remove devices and the host from DR memory (including persistent storage).
optional bool CleanupDRConfigOnCMSActions = 375;

// Resync range, if scrubbing found a mismatch.
optional bool ResyncRangeAfterScrubbing = 376;
}
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ TDuration MSeconds(ui32 value)
xxx(WaitDependentDisksRetryRequestDelay, TDuration, Seconds(1) )\
\
xxx(DataScrubbingEnabled, bool, false )\
xxx(ResyncRangeAfterScrubbing, bool, false )\
xxx(ScrubbingInterval, TDuration, MSeconds(50) )\
xxx(ScrubbingChecksumMismatchTimeout, TDuration, Seconds(300) )\
xxx(ScrubbingBandwidth, ui64, 20 )\
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ class TStorageConfig
TDuration GetServiceSelfPingInterval() const;

bool GetDataScrubbingEnabled() const;
bool GetResyncRangeAfterScrubbing() const;
TDuration GetScrubbingInterval() const;
TDuration GetScrubbingChecksumMismatchTimeout() const;
ui64 GetScrubbingBandwidth() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "part_mirror_resync_util.h"
#include "part_nonrepl.h"
#include "part_nonrepl_migration.h"
#include "resync_range.h"

#include <cloud/blockstore/libs/diagnostics/critical_events.h>

Expand Down Expand Up @@ -204,11 +205,56 @@ void TMirrorPartitionActor::CompareChecksums(const TActorContext& ctx)
checksums[i]);
}
ReportMirroredDiskChecksumMismatch();

if (Config->GetResyncRangeAfterScrubbing()) {
StartResyncRange(ctx);
return;
}
}

StartScrubbingRange(ctx, ScrubbingRangeId + 1);
}

void TMirrorPartitionActor::StartResyncRange(
const NActors::TActorContext& ctx)
{
LOG_WARN(
ctx,
TBlockStoreComponents::PARTITION,
"[%s] Resyncing range %s",
DiskId.c_str(),
DescribeRange(GetScrubbingRange()).c_str());
ResyncRangeStarted = true;

auto requestInfo = CreateRequestInfo(
SelfId(),
0, // cookie
MakeIntrusive<TCallContext>()
);

TVector<TReplicaDescriptor> replicas;
const auto& replicaInfos = State.GetReplicaInfos();
const auto& replicaActors = State.GetReplicaActors();
for (ui32 i = 0; i < replicaInfos.size(); i++) {
if (replicaInfos[i].Config->DevicesReadyForReading(GetScrubbingRange()))
{
replicas.emplace_back(
replicaInfos[i].Config->GetName(),
i,
replicaActors[i]);
}
}

NCloud::Register<TResyncRangeActor>(
ctx,
std::move(requestInfo),
State.GetBlockSize(),
GetScrubbingRange(),
std::move(replicas),
State.GetRWClientId(),
BlockDigestGenerator);
}

void TMirrorPartitionActor::ReplyAndDie(const TActorContext& ctx)
{
NCloud::Reply(ctx, *Poisoner, std::make_unique<TEvents::TEvPoisonTaken>());
Expand Down Expand Up @@ -408,6 +454,23 @@ void TMirrorPartitionActor::HandleChecksumResponse(
CompareChecksums(ctx);
}

void TMirrorPartitionActor::HandleRangeResynced(
const TEvNonreplPartitionPrivate::TEvRangeResynced::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();
const auto range = msg->Range;

LOG_WARN(ctx, TBlockStoreComponents::PARTITION,
"[%s] Range %s resync finished: %s",
DiskId.c_str(),
DescribeRange(range).c_str(),
FormatError(msg->GetError()).c_str());

ResyncRangeStarted = false;
StartScrubbingRange(ctx, ScrubbingRangeId + 1);
}

////////////////////////////////////////////////////////////////////////////////

void TMirrorPartitionActor::HandleRWClientIdChanged(
Expand Down Expand Up @@ -466,6 +529,10 @@ STFUNC(TMirrorPartitionActor::StateWork)
TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse,
HandleChecksumResponse);

HFunc(
TEvNonreplPartitionPrivate::TEvRangeResynced,
HandleRangeResynced);

HFunc(TEvService::TEvReadBlocksRequest, HandleReadBlocks);
HFunc(TEvService::TEvWriteBlocksRequest, HandleWriteBlocks);
HFunc(TEvService::TEvZeroBlocksRequest, HandleZeroBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class TMirrorPartitionActor final
bool WriteIntersectsWithScrubbing = false;
ui64 ScrubbingThroughput = 0;
TInstant ScrubbingRangeStarted;
bool ResyncRangeStarted = false;

public:
TMirrorPartitionActor(
Expand Down Expand Up @@ -106,6 +107,7 @@ class TMirrorPartitionActor final
void StartScrubbingRange(
const NActors::TActorContext& ctx,
ui64 scrubbingRangeId);
void StartResyncRange(const NActors::TActorContext& ctx);

private:
STFUNC(StateWork);
Expand Down Expand Up @@ -139,6 +141,10 @@ class TMirrorPartitionActor final
const TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleRangeResynced(
const TEvNonreplPartitionPrivate::TEvRangeResynced::TPtr& ev,
const NActors::TActorContext& ctx);

void HandlePoisonPill(
const NActors::TEvents::TEvPoisonPill::TPtr& ev,
const NActors::TActorContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ void TMirrorPartitionActor::MirrorRequest(
*ev->Get(),
State.GetBlockSize());
const auto requestIdentityKey = ev->Cookie;
RequestsInProgress.AddWriteRequest(requestIdentityKey, range);

if (GetScrubbingRange().Overlaps(range)) {
if (ResyncRangeStarted) {
auto response = std::make_unique<typename TMethod::TResponse>(
MakeError(
E_REJECTED,
TStringBuilder()
<< "Request " << TMethod::Name
<< " intersects with currently resyncing range"));
NCloud::Reply(ctx, *ev, std::move(response));
return;
}
WriteIntersectsWithScrubbing = true;
}
RequestsInProgress.AddWriteRequest(requestIdentityKey, range);

NCloud::Register<TMirrorRequestActor<TMethod>>(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ void TMirrorPartitionActor::ReadBlocks(
record.GetStartIndex(),
record.GetBlocksCount());

if (ResyncRangeStarted && GetScrubbingRange().Overlaps(blockRange)) {
auto response = std::make_unique<typename TMethod::TResponse>(
MakeError(
E_REJECTED,
TStringBuilder()
<< "Request " << TMethod::Name
<< " intersects with currently resyncing range"));
NCloud::Reply(ctx, *ev, std::move(response));
return;
}

TActorId replicaActorId;
const auto error = State.NextReadReplica(blockRange, &replicaActorId);
if (HasError(error)) {
Expand Down
124 changes: 123 additions & 1 deletion cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ struct TTestEnv
storageConfig.SetDataScrubbingEnabled(true);
// set bandwidth to reach maximum bandwidth for scrubbing - 50 MiB/s
storageConfig.SetScrubbingBandwidth(20000000);
storageConfig.SetResyncRangeAfterScrubbing(true);

Config = std::make_shared<TStorageConfig>(
std::move(storageConfig),
Expand Down Expand Up @@ -1249,7 +1250,21 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
runtime.AdvanceCurrentTime(UpdateCountersInterval);
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}
UNIT_ASSERT_VALUES_EQUAL(5, mirroredDiskChecksumMismatch->Val());
UNIT_ASSERT_VALUES_EQUAL(3, mirroredDiskChecksumMismatch->Val());

// check that all ranges was resynced and there is no more mismatches
iterations = 0;
while (fullCyclesCount < 4 && iterations++ < 100) {
if (prevScrubbingProgress != 0 &&
counters.Simple.ScrubbingProgress.Value == 0)
{
++fullCyclesCount;
}
prevScrubbingProgress = counters.Simple.ScrubbingProgress.Value;
runtime.AdvanceCurrentTime(UpdateCountersInterval);
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}
UNIT_ASSERT_VALUES_EQUAL(3, mirroredDiskChecksumMismatch->Val());
}

Y_UNIT_TEST(ShouldPostponeScrubbingIfIntersectingWritePending)
Expand Down Expand Up @@ -1516,6 +1531,113 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)

UNIT_ASSERT_VALUES_EQUAL(0, mirroredDiskChecksumMismatch->Val());
}

Y_UNIT_TEST(ShouldRejectRequestsIfRangeResyncingAfterChecksumMismatch)
{
using namespace NMonitoring;

TTestBasicRuntime runtime;

runtime.SetRegistrationObserverFunc(
[] (auto& runtime, const auto& parentId, const auto& actorId)
{
Y_UNUSED(parentId);
runtime.EnableScheduleForActor(actorId);
});

TAutoPtr<IEventHandle> delayedRangeResynced;
runtime.SetEventFilter([&] (auto& runtime, auto& event) {
Y_UNUSED(runtime);
if (event->GetTypeRewrite() ==
TEvNonreplPartitionPrivate::EvRangeResynced)
{
delayedRangeResynced = event.Release();
return true;
}
return false;
});

ui32 rangeCount = 0;
runtime.SetScheduledEventFilter(
[&] (auto& runtime, auto& event, auto&& delay, auto&& deadline)
{
Y_UNUSED(runtime);
Y_UNUSED(delay);
Y_UNUSED(deadline);
if (event->GetTypeRewrite() ==
TEvNonreplPartitionPrivate::EvScrubbingNextRange)
{
++rangeCount;
}

return false;
});

TDynamicCountersPtr critEventsCounters = new TDynamicCounters();
InitCriticalEventsCounter(critEventsCounters);

TTestEnv env(runtime);

const auto range1 = TBlockRange64::WithLength(3100, 100);
const auto range2 = TBlockRange64::WithLength(3150, 2);
env.WriteMirror(range1, 'A');
env.WriteReplica(2, range1, 'B');

// wait for scrubbing 3rd range
ui32 iterations = 0;
while (rangeCount < 3 && iterations++ < 100) {
runtime.DispatchEvents({}, env.ScrubbingInterval);
}

// wait for resync 3rd range
iterations = 0;
while (!delayedRangeResynced && iterations++ < 100) {
runtime.AdvanceCurrentTime(env.Config->GetScrubbingChecksumMismatchTimeout());
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
}

// check that read/write to 3rd range will be rejected
TPartitionClient client(runtime, env.ActorId);
{
TString data(DefaultBlockSize, 'B');
client.SendWriteBlocksLocalRequest(range2, data);
auto response = client.RecvWriteBlocksLocalResponse();
UNIT_ASSERT_VALUES_EQUAL_C(
E_REJECTED,
response->GetStatus(),
response->GetErrorReason());
}

{
TVector<TString> blocks;
client.SendReadBlocksLocalRequest(
range2,
TGuardedSgList(ResizeBlocks(
blocks,
range2.Size(),
TString(DefaultBlockSize, '\0')
)));
auto response = client.RecvReadBlocksLocalResponse();
UNIT_ASSERT_VALUES_EQUAL_C(
E_REJECTED,
response->GetStatus(),
response->GetErrorReason());
}

// check that after resync requests to range complete successfully
runtime.Send(delayedRangeResynced.Release());
runtime.DispatchEvents({}, TDuration::MilliSeconds(50));
{
client.WriteBlocks(range2, 'C');
auto response = client.ReadBlocks(range2);
const auto& blocks = response->Record.GetBlocks();
for (ui32 i = 0; i < blocks.BuffersSize(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(
TString(DefaultBlockSize, 'C'),
blocks.GetBuffers(i));
}
}
}
}

} // namespace NCloud::NBlockStore::NStorage
Loading