From 277c3ec709c70242f2d74c24724870297492b2ad Mon Sep 17 00:00:00 2001 From: Darya Frolova <47457802+WilyTiger@users.noreply.github.com> Date: Tue, 7 May 2024 19:29:33 +0200 Subject: [PATCH] NBSNEBIUS-127: add data scrubbing for mirrored disks (#954) * add data scrubbing for mirrored disks * fix issues * add processing of write requests intersecting with scrubbing * fix issues * fix issues * fix issues --- cloud/blockstore/config/storage.proto | 5 + .../libs/diagnostics/critical_events.h | 1 + cloud/blockstore/libs/storage/core/config.cpp | 3 + cloud/blockstore/libs/storage/core/config.h | 3 + .../libs/storage/core/disk_counters.h | 7 + .../partition_nonrepl/checksum_range.cpp | 34 +- .../partition_nonrepl/checksum_range.h | 19 +- .../partition_nonrepl/part_mirror_actor.cpp | 201 ++++++++++++ .../partition_nonrepl/part_mirror_actor.h | 25 +- .../part_mirror_actor_mirror.cpp | 10 +- .../part_mirror_actor_stats.cpp | 2 + .../partition_nonrepl/part_mirror_state.cpp | 21 +- .../partition_nonrepl/part_mirror_state.h | 5 + .../partition_nonrepl/part_mirror_ut.cpp | 299 +++++++++++++++++- .../part_nonrepl_events_private.h | 2 + .../part_nonrepl_migration_common_actor.cpp | 4 + .../part_nonrepl_migration_common_actor.h | 1 + ..._migration_common_actor_checksumblocks.cpp | 21 ++ .../partition_nonrepl/resync_range.cpp | 11 +- .../storage/partition_nonrepl/resync_range.h | 4 +- .../libs/storage/partition_nonrepl/ya.make | 1 + 21 files changed, 632 insertions(+), 47 deletions(-) create mode 100644 cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_checksumblocks.cpp diff --git a/cloud/blockstore/config/storage.proto b/cloud/blockstore/config/storage.proto index 76bc9c9254c..d3d198fc037 100644 --- a/cloud/blockstore/config/storage.proto +++ b/cloud/blockstore/config/storage.proto @@ -965,4 +965,9 @@ message TStorageServiceConfig // Max shadow disk fill io depth. optional uint32 MaxShadowDiskFillIoDepth = 363; + + // Enable data scrubbing for mirrored disks. + optional bool DataScrubbingEnabled = 364; + // Interval between scrubbing ranges in milliseconds + optional uint32 ScrubbingInterval = 365; } diff --git a/cloud/blockstore/libs/diagnostics/critical_events.h b/cloud/blockstore/libs/diagnostics/critical_events.h index 1f2cc921080..05639cb87d2 100644 --- a/cloud/blockstore/libs/diagnostics/critical_events.h +++ b/cloud/blockstore/libs/diagnostics/critical_events.h @@ -31,6 +31,7 @@ namespace NCloud::NBlockStore { xxx(MirroredDiskDeviceReplacementForbidden) \ xxx(MirroredDiskDeviceReplacementFailure) \ xxx(MirroredDiskDeviceReplacementRateLimitExceeded) \ + xxx(MirroredDiskChecksumMismatch) \ xxx(CounterUpdateRace) \ xxx(EndpointStartingError) \ xxx(ResyncFailed) \ diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index 1aac461c6d9..1d5e6770092 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -482,6 +482,9 @@ TDuration MSeconds(ui32 value) xxx(MaxAcquireShadowDiskRetryDelayWhenNonBlocked, TDuration, Seconds(10) )\ xxx(MaxAcquireShadowDiskTotalTimeoutWhenBlocked, TDuration, Seconds(5) )\ xxx(MaxAcquireShadowDiskTotalTimeoutWhenNonBlocked, TDuration, Seconds(600) )\ + \ + xxx(DataScrubbingEnabled, bool, false )\ + xxx(ScrubbingInterval, TDuration, MSeconds(50) )\ // BLOCKSTORE_STORAGE_CONFIG_RW diff --git a/cloud/blockstore/libs/storage/core/config.h b/cloud/blockstore/libs/storage/core/config.h index 557e0a0578a..c7543c5bd9b 100644 --- a/cloud/blockstore/libs/storage/core/config.h +++ b/cloud/blockstore/libs/storage/core/config.h @@ -569,6 +569,9 @@ class TStorageConfig TDuration GetVolumeProxyCacheRetryDuration() const; TDuration GetServiceSelfPingInterval() const; + + bool GetDataScrubbingEnabled() const; + TDuration GetScrubbingInterval() const; }; ui64 GetAllocationUnit( diff --git a/cloud/blockstore/libs/storage/core/disk_counters.h b/cloud/blockstore/libs/storage/core/disk_counters.h index 732dace809a..6bcc078b78c 100644 --- a/cloud/blockstore/libs/storage/core/disk_counters.h +++ b/cloud/blockstore/libs/storage/core/disk_counters.h @@ -58,6 +58,12 @@ enum class EPublishingPolicy //////////////////////////////////////////////////////////////////////////////// +#define BLOCKSTORE_DRBASED_PART_CUMULATIVE_COUNTERS(xxx, ...) \ + xxx(ScrubbingThroughput, Generic, Permanent, __VA_ARGS__)\ +// BLOCKSTORE_DRBASED_PART_CUMULATIVE_COUNTERS + +//////////////////////////////////////////////////////////////////////////////// + #define BLOCKSTORE_REPL_PART_CUMULATIVE_COUNTERS(xxx, ...) \ xxx(BytesWritten, Generic, Permanent, __VA_ARGS__)\ xxx(BytesRead, Generic, Permanent, __VA_ARGS__)\ @@ -191,6 +197,7 @@ struct TPartitionDiskCounters // BLOCKSTORE_CUMULATIVE_COUNTER BLOCKSTORE_REPL_PART_CUMULATIVE_COUNTERS(BLOCKSTORE_CUMULATIVE_COUNTER) + BLOCKSTORE_DRBASED_PART_CUMULATIVE_COUNTERS(BLOCKSTORE_CUMULATIVE_COUNTER) #undef BLOCKSTORE_CUMULATIVE_COUNTER } Cumulative; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.cpp index 0621665b15b..27c8f984daf 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.cpp @@ -12,17 +12,15 @@ using namespace NActors; //////////////////////////////////////////////////////////////////////////////// TChecksumRangeActorCompanion::TChecksumRangeActorCompanion( - TBlockRange64 range, TVector replicas) - : Range(range) - , Replicas(std::move(replicas)) + : Replicas(std::move(replicas)) { Checksums.resize(Replicas.size()); } bool TChecksumRangeActorCompanion::IsFinished() const { - return Finished; + return CalculatedChecksumsCount == Replicas.size(); } const TVector& TChecksumRangeActorCompanion::GetChecksums() const @@ -45,19 +43,24 @@ TDuration TChecksumRangeActorCompanion::GetChecksumDuration() const return ChecksumDuration; } -void TChecksumRangeActorCompanion::CalculateChecksums(const TActorContext& ctx) +void TChecksumRangeActorCompanion::CalculateChecksums( + const TActorContext& ctx, + TBlockRange64 range) { for (size_t i = 0; i < Replicas.size(); ++i) { - CalculateReplicaChecksum(ctx, i); + CalculateReplicaChecksum(ctx, range, i); } ChecksumStartTs = ctx.Now(); } -void TChecksumRangeActorCompanion::CalculateReplicaChecksum(const TActorContext& ctx, int idx) +void TChecksumRangeActorCompanion::CalculateReplicaChecksum( + const TActorContext& ctx, + TBlockRange64 range, + int idx) { auto request = std::make_unique(); - request->Record.SetStartIndex(Range.Start); - request->Record.SetBlocksCount(Range.Size()); + request->Record.SetStartIndex(range.Start); + request->Record.SetBlocksCount(range.Size()); auto* headers = request->Record.MutableHeaders(); headers->SetIsBackgroundRequest(true); @@ -81,33 +84,30 @@ void TChecksumRangeActorCompanion::HandleChecksumResponse( const TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse::TPtr& ev, const TActorContext& ctx) { + ++CalculatedChecksumsCount; auto* msg = ev->Get(); - - Error = msg->Record.GetError(); - - if (HasError(Error)) { + if (HasError(msg->Record.GetError())) { LOG_WARN(ctx, TBlockStoreComponents::PARTITION, "[%s] Checksum error %s", Replicas[0].Name.c_str(), FormatError(Error).c_str()); + Error = msg->Record.GetError(); ChecksumDuration = ctx.Now() - ChecksumStartTs; - Finished = true; return; } Checksums[ev->Cookie] = msg->Record.GetChecksum(); - if (++CalculatedChecksumsCount == Replicas.size()) { + if (CalculatedChecksumsCount == Replicas.size()) { ChecksumDuration = ctx.Now() - ChecksumStartTs; - Finished = true; } } void TChecksumRangeActorCompanion::HandleChecksumUndelivery( const NActors::TActorContext& ctx) { + ++CalculatedChecksumsCount; ChecksumDuration = ctx.Now() - ChecksumStartTs; - Error = MakeError(E_REJECTED, "ChecksumBlocks request undelivered"); } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.h b/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.h index 1bdf4a1baf2..15fab2338d0 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/checksum_range.h @@ -14,22 +14,22 @@ namespace NCloud::NBlockStore::NStorage { class TChecksumRangeActorCompanion { private: - const TBlockRange64 Range; - const TVector Replicas; + TVector Replicas; TInstant ChecksumStartTs; TDuration ChecksumDuration; ui32 CalculatedChecksumsCount = 0; - bool Finished = false; TVector Checksums; NProto::TError Error; public: - TChecksumRangeActorCompanion( - TBlockRange64 range, - TVector replicas); + TChecksumRangeActorCompanion(TVector replicas); + + TChecksumRangeActorCompanion() = default; - void CalculateChecksums(const NActors::TActorContext& ctx); + void CalculateChecksums( + const NActors::TActorContext& ctx, + TBlockRange64 range); void HandleChecksumResponse( const TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse::TPtr& ev, @@ -43,7 +43,10 @@ class TChecksumRangeActorCompanion TDuration GetChecksumDuration() const; private: - void CalculateReplicaChecksum(const NActors::TActorContext& ctx, int idx); + void CalculateReplicaChecksum( + const NActors::TActorContext& ctx, + TBlockRange64 range, + int idx); }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp index fda9973291e..392400ad98b 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp @@ -1,4 +1,5 @@ #include "part_mirror_actor.h" +#include "part_mirror_resync_util.h" #include "part_nonrepl.h" #include "part_nonrepl_migration.h" @@ -57,6 +58,10 @@ void TMirrorPartitionActor::Bootstrap(const TActorContext& ctx) SetupPartitions(ctx); ScheduleCountersUpdate(ctx); + if (Config->GetDataScrubbingEnabled() && !ResyncActorId) { + ScheduleScrubbingNextRange(ctx); + } + Become(&TThis::StateWork); } @@ -107,6 +112,56 @@ void TMirrorPartitionActor::SetupPartitions(const TActorContext& ctx) ReplicaCounters.resize(State.GetReplicaInfos().size()); } +void TMirrorPartitionActor::CompareChecksums(const TActorContext& ctx) +{ + const auto& checksums = ChecksumRangeActorCompanion.GetChecksums(); + bool equal = true; + for (size_t i = 1; i < checksums.size(); i++) { + if (checksums[i] != checksums[0]) { + equal = false; + break; + } + } + + if (!equal && WriteIntersectsWithScrubbing) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Reschedule scrubbing for range %s due to inflight write", + DiskId.c_str(), + DescribeRange( + RangeId2BlockRange(ScrubbingRangeId, State.GetBlockSize())).c_str()); + ScheduleScrubbingNextRange(ctx); + return; + } + + if (!equal) { + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Checksum mismatch for range %s", + DiskId.c_str(), + DescribeRange( + RangeId2BlockRange(ScrubbingRangeId, State.GetBlockSize())).c_str()); + + for (size_t i = 0; i < checksums.size(); i++) { + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Replica %lu range %s checksum %lu", + DiskId.c_str(), + i, + DescribeRange( + RangeId2BlockRange(ScrubbingRangeId, State.GetBlockSize())).c_str(), + checksums[i]); + } + ReportMirroredDiskChecksumMismatch(); + } + + ++ScrubbingRangeId; + ScheduleScrubbingNextRange(ctx); +} + void TMirrorPartitionActor::ReplyAndDie(const TActorContext& ctx) { NCloud::Reply(ctx, *Poisoner, std::make_unique()); @@ -181,6 +236,137 @@ void TMirrorPartitionActor::HandleUpdateCounters( //////////////////////////////////////////////////////////////////////////////// +void TMirrorPartitionActor::ScheduleScrubbingNextRange( + const TActorContext& ctx) +{ + if (!ScrubbingScheduled) { + ctx.Schedule( + Config->GetScrubbingInterval(), + new TEvNonreplPartitionPrivate::TEvScrubbingNextRange()); + ScrubbingScheduled = true; + } +} + +void TMirrorPartitionActor::HandleScrubbingNextRange( + const TEvNonreplPartitionPrivate::TEvScrubbingNextRange::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + ScrubbingScheduled = false; + WriteIntersectsWithScrubbing = false; + auto scrubbingRange = + RangeId2BlockRange(ScrubbingRangeId, State.GetBlockSize()); + if (scrubbingRange.Start >= State.GetBlockCount()) { + ScrubbingRangeId = 0; + scrubbingRange = + RangeId2BlockRange(ScrubbingRangeId, State.GetBlockSize()); + } + + for (const auto& [key, requestInfo]: RequestsInProgress.AllRequests()) { + if (!requestInfo.Write) { + continue; + } + const auto& requestRange = requestInfo.Value; + if (scrubbingRange.Overlaps(requestRange)) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Reschedule scrubbing for range %s due to inflight write to %s", + DiskId.c_str(), + DescribeRange(scrubbingRange).c_str(), + DescribeRange(requestRange).c_str()); + + ScheduleScrubbingNextRange(ctx); + return; + } + } + + TVector replicas; + const auto& replicaInfos = State.GetReplicaInfos(); + const auto& replicaActors = State.GetReplicaActors(); + for (ui32 i = 0; i < replicaInfos.size(); i++) { + if (replicaInfos[i].Config->DevicesReadyForReading(scrubbingRange)) { + replicas.emplace_back( + replicaInfos[i].Config->GetName(), + i, + replicaActors[i]); + } + } + + if (replicas.size() < 2) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Skipping scrubbing for range %s, devices not ready for reading", + DiskId.c_str(), + DescribeRange(scrubbingRange).c_str()); + + ++ScrubbingRangeId; + ScheduleScrubbingNextRange(ctx); + return; + } + + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Scrubbing range %s", + DiskId.c_str(), + DescribeRange(scrubbingRange).c_str()); + + ScrubbingThroughput += scrubbingRange.Size() * State.GetBlockSize(); + ChecksumRangeActorCompanion = TChecksumRangeActorCompanion(replicas); + ChecksumRangeActorCompanion.CalculateChecksums(ctx, scrubbingRange); +} + +void TMirrorPartitionActor::HandleChecksumUndelivery( + const TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + ChecksumRangeActorCompanion.HandleChecksumUndelivery(ctx); + if (ChecksumRangeActorCompanion.IsFinished()) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Reschedule scrubbing for range %s due to checksum error %s", + DiskId.c_str(), + DescribeRange( + RangeId2BlockRange(ScrubbingRangeId, State.GetBlockSize())).c_str(), + FormatError(ChecksumRangeActorCompanion.GetError()).c_str()); + ScheduleScrubbingNextRange(ctx); + } +} + +void TMirrorPartitionActor::HandleChecksumResponse( + const TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse::TPtr& ev, + const TActorContext& ctx) +{ + ChecksumRangeActorCompanion.HandleChecksumResponse(ev, ctx); + + if (!ChecksumRangeActorCompanion.IsFinished()) { + return; + } + + if (HasError(ChecksumRangeActorCompanion.GetError())) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "[%s] Reschedule scrubbing for range %s due to checksum error %s", + DiskId.c_str(), + DescribeRange( + RangeId2BlockRange(ScrubbingRangeId, State.GetBlockSize())).c_str(), + FormatError(ChecksumRangeActorCompanion.GetError()).c_str()); + ScheduleScrubbingNextRange(ctx); + return; + } + + CompareChecksums(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + void TMirrorPartitionActor::HandleRWClientIdChanged( const TEvVolume::TEvRWClientIdChanged::TPtr& ev, const TActorContext& ctx) @@ -226,6 +412,17 @@ STFUNC(TMirrorPartitionActor::StateWork) TEvNonreplPartitionPrivate::TEvUpdateCounters, HandleUpdateCounters); + HFunc( + TEvNonreplPartitionPrivate::TEvScrubbingNextRange, + HandleScrubbingNextRange); + + HFunc( + TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest, + HandleChecksumUndelivery); + HFunc( + TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse, + HandleChecksumResponse); + HFunc(TEvService::TEvReadBlocksRequest, HandleReadBlocks); HFunc(TEvService::TEvWriteBlocksRequest, HandleWriteBlocks); HFunc(TEvService::TEvZeroBlocksRequest, HandleZeroBlocks); @@ -267,6 +464,10 @@ STFUNC(TMirrorPartitionActor::StateZombie) switch (ev->GetTypeRewrite()) { IgnoreFunc(TEvNonreplPartitionPrivate::TEvUpdateCounters); + IgnoreFunc(TEvNonreplPartitionPrivate::TEvScrubbingNextRange); + IgnoreFunc(TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest); + IgnoreFunc(TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse); + HFunc(TEvService::TEvReadBlocksRequest, RejectReadBlocks); HFunc(TEvService::TEvWriteBlocksRequest, RejectWriteBlocks); HFunc(TEvService::TEvZeroBlocksRequest, RejectZeroBlocks); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h index 377f631194f..50934bd2b43 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h @@ -2,6 +2,7 @@ #include "public.h" +#include "checksum_range.h" #include "config.h" #include "part_nonrepl_events_private.h" #include "part_mirror_state.h" @@ -47,7 +48,8 @@ class TMirrorPartitionActor final ui64 NetworkBytes = 0; TDuration CpuUsage; - TRequestsInProgress RequestsInProgress{EAllowedRequests::ReadWrite}; + TRequestsInProgress RequestsInProgress{ + EAllowedRequests::ReadWrite}; TDrainActorCompanion DrainActorCompanion{ RequestsInProgress, DiskId}; @@ -57,6 +59,13 @@ class TMirrorPartitionActor final NProto::TError Status; + bool ScrubbingScheduled = false; + ui64 ScrubbingRangeId = 0; + TBlockRange64 ScrubbingRange; + TChecksumRangeActorCompanion ChecksumRangeActorCompanion; + bool WriteIntersectsWithScrubbing = false; + ui64 ScrubbingThroughput = 0; + public: TMirrorPartitionActor( TStorageConfigPtr config, @@ -78,7 +87,9 @@ class TMirrorPartitionActor final void KillActors(const NActors::TActorContext& ctx); void SetupPartitions(const NActors::TActorContext& ctx); void ScheduleCountersUpdate(const NActors::TActorContext& ctx); + void ScheduleScrubbingNextRange(const NActors::TActorContext& ctx); void SendStats(const NActors::TActorContext& ctx); + void CompareChecksums(const NActors::TActorContext& ctx); void ReplyAndDie(const NActors::TActorContext& ctx); private: @@ -101,6 +112,18 @@ class TMirrorPartitionActor final const TEvNonreplPartitionPrivate::TEvUpdateCounters::TPtr& ev, const NActors::TActorContext& ctx); + void HandleScrubbingNextRange( + const TEvNonreplPartitionPrivate::TEvScrubbingNextRange::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleChecksumResponse( + const TEvNonreplPartitionPrivate::TEvChecksumBlocksResponse::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleChecksumUndelivery( + const TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest::TPtr& ev, + const NActors::TActorContext& ctx); + void HandlePoisonPill( const NActors::TEvents::TEvPoisonPill::TPtr& ev, const NActors::TActorContext& ctx); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp index cf8d60c9e7c..b545c19c9ea 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace NCloud::NBlockStore::NStorage { @@ -54,8 +55,15 @@ void TMirrorPartitionActor::MirrorRequest( return; } + const auto range = BuildRequestBlockRange( + *ev->Get(), + State.GetBlockSize()); const auto requestIdentityKey = ev->Cookie; - RequestsInProgress.AddWriteRequest(requestIdentityKey); + RequestsInProgress.AddWriteRequest(requestIdentityKey, range); + + if (ScrubbingRange.Overlaps(range)) { + WriteIntersectsWithScrubbing = true; + } NCloud::Register>( ctx, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp index 1402e5a54ab..f70103097f4 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_stats.cpp @@ -62,6 +62,7 @@ void TMirrorPartitionActor::SendStats(const TActorContext& ctx) } } + stats->Cumulative.ScrubbingThroughput.Value = ScrubbingThroughput; auto request = std::make_unique( MakeIntrusive(), @@ -72,6 +73,7 @@ void TMirrorPartitionActor::SendStats(const TActorContext& ctx) NetworkBytes = 0; CpuUsage = {}; + ScrubbingThroughput = 0; NCloud::Send( ctx, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.cpp index e575acc2d53..7f3facb8dc5 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.cpp @@ -21,12 +21,13 @@ TMirrorPartitionState::TMirrorPartitionState( TMigrations migrations, TVector replicaDevices) : Config(std::move(config)) + , PartConfig(std::move(partConfig)) , RWClientId(std::move(rwClientId)) , Migrations(std::move(migrations)) { - ReplicaInfos.push_back({partConfig->Fork(partConfig->GetDevices()), {}}); + ReplicaInfos.push_back({PartConfig->Fork(PartConfig->GetDevices()), {}}); for (auto& devices: replicaDevices) { - ReplicaInfos.push_back({partConfig->Fork(std::move(devices)), {}}); + ReplicaInfos.push_back({PartConfig->Fork(std::move(devices)), {}}); } ui32 freshDeviceCount = 0; @@ -34,11 +35,11 @@ TMirrorPartitionState::TMirrorPartitionState( freshDeviceCount += replicaInfo.Config->GetFreshDeviceIds().size(); } - if (freshDeviceCount != partConfig->GetFreshDeviceIds().size()) { + if (freshDeviceCount != PartConfig->GetFreshDeviceIds().size()) { ReportFreshDeviceNotFoundInConfig(TStringBuilder() << "Fresh device count mismatch: " << freshDeviceCount - << " != " << partConfig->GetFreshDeviceIds().size() - << " for disk " << partConfig->GetName()); + << " != " << PartConfig->GetFreshDeviceIds().size() + << " for disk " << PartConfig->GetName()); } } @@ -194,4 +195,14 @@ NProto::TError TMirrorPartitionState::NextReadReplica( << DescribeRange(readRange) << " targets only fresh/dummy devices"); } +ui32 TMirrorPartitionState::GetBlockSize() const +{ + return PartConfig->GetBlockSize(); +} + +ui64 TMirrorPartitionState::GetBlockCount() const +{ + return PartConfig->GetBlockCount(); +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.h index 44616a527dd..a2ed29c6d67 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state.h @@ -19,6 +19,7 @@ class TMirrorPartitionState { private: const TStorageConfigPtr Config; + const TNonreplicatedPartitionConfigPtr PartConfig; TString RWClientId; TMigrations Migrations; @@ -69,6 +70,10 @@ class TMirrorPartitionState [[nodiscard]] NProto::TError NextReadReplica( const TBlockRange64 readRange, NActors::TActorId* actorId); + + ui32 GetBlockSize() const; + + ui64 GetBlockCount() const; }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp index a5286fdaa2c..01afcd7be8c 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp @@ -1,8 +1,11 @@ #include "part_mirror.h" #include "part_mirror_actor.h" +#include "part_mirror_resync_util.h" +#include "part_nonrepl_actor.h" #include "ut_env.h" #include +#include #include #include #include @@ -33,10 +36,12 @@ namespace { struct TTestEnv { TTestActorRuntime& Runtime; + TStorageConfigPtr Config; TActorId ActorId; TActorId VolumeActorId; TStorageStatsServiceStatePtr StorageStatsServiceState; TDiskAgentStatePtr DiskAgentState; + TVector ReplicaActors; static void AddDevice( ui32 nodeId, @@ -125,8 +130,9 @@ struct TTestEnv storageConfig.SetMaxTimedOutDeviceStateDuration(20'000); storageConfig.SetNonReplicatedMinRequestTimeoutSSD(1'000); storageConfig.SetNonReplicatedMaxRequestTimeoutSSD(5'000); + storageConfig.SetDataScrubbingEnabled(true); - auto config = std::make_shared( + Config = std::make_shared( std::move(storageConfig), std::make_shared( NCloud::NProto::TFeaturesConfig()) @@ -180,13 +186,13 @@ struct TTestEnv } auto part = std::make_unique( - std::move(config), + Config, CreateProfileLogStub(), CreateBlockDigestGeneratorStub(), "", // rwClientId - std::move(partConfig), + partConfig, std::move(migrations), - std::move(replicas), + replicas, nullptr, // rdmaClient VolumeActorId, TActorId() // resyncActorId @@ -197,6 +203,11 @@ struct TTestEnv TActorSetupCmd(part.release(), TMailboxType::Simple, 0) ); + AddReplica(partConfig->Fork(partConfig->GetDevices()), "ZZZ"); + for (size_t i = 0; i < replicas.size(); ++i) { + AddReplica(partConfig->Fork(replicas[i]), Sprintf("ZZZ%zu", i)); + } + auto volume = std::make_unique(); Runtime.AddLocalService( @@ -223,6 +234,41 @@ struct TTestEnv SetupTabletServices(Runtime); } + void AddReplica( + TNonreplicatedPartitionConfigPtr partConfig, + TString name) + { + auto part = std::make_unique( + Config, + partConfig, + TActorId() // do not send stats + ); + + TActorId actorId(0, name); + Runtime.AddLocalService( + actorId, + TActorSetupCmd(part.release(), TMailboxType::Simple, 0) + ); + + ReplicaActors.push_back(actorId); + } + + void WriteMirror(TBlockRange64 range, char fill) + { + WriteActor(ActorId, range, fill); + } + + void WriteReplica(int idx, TBlockRange64 range, char fill) + { + WriteActor(ReplicaActors[idx], range, fill); + } + + void WriteActor(TActorId actorId, TBlockRange64 range, char fill) + { + TPartitionClient client(Runtime, actorId); + client.WriteBlocks(range, fill); + } + void SetupLogging() { Runtime.AppendToLogSettings( @@ -231,7 +277,7 @@ struct TTestEnv GetComponentName); for (ui32 i = TBlockStoreComponents::START; i < TBlockStoreComponents::END; ++i) { - Runtime.SetLogPriority(i, NLog::PRI_INFO); + Runtime.SetLogPriority(i, NLog::PRI_DEBUG); } // Runtime.SetLogPriority(NLog::InvalidComponent, NLog::PRI_DEBUG); } @@ -1083,6 +1129,249 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest) 6 * 1024 * DefaultBlockSize, counters.BytesCount.Value); } + + Y_UNIT_TEST(ShouldReportScrubbingCounter) + { + using namespace NMonitoring; + + TTestBasicRuntime runtime; + runtime.SetRegistrationObserverFunc( + [] (auto& runtime, const auto& parentId, const auto& actorId) + { + Y_UNUSED(parentId); + runtime.EnableScheduleForActor(actorId); + }); + + TTestEnv env(runtime); + + auto& counter = env.StorageStatsServiceState->Counters.Cumulative.ScrubbingThroughput; + + runtime.DispatchEvents({}, env.Config->GetScrubbingInterval()); + runtime.AdvanceCurrentTime(UpdateCountersInterval); + runtime.DispatchEvents({}, TDuration::MilliSeconds(50)); + + UNIT_ASSERT_VALUES_EQUAL(2 * 4_MB, counter.Value); + } + + Y_UNIT_TEST(ShouldFindChecksumMismatch) + { + using namespace NMonitoring; + + TTestBasicRuntime runtime; + + runtime.SetRegistrationObserverFunc( + [] (auto& runtime, const auto& parentId, const auto& actorId) + { + Y_UNUSED(parentId); + runtime.EnableScheduleForActor(actorId); + }); + + ui32 rangeCount = 0; + runtime.SetScheduledEventFilter([&] (auto& runtime, auto& event, auto&& delay, auto&& deadline) { + Y_UNUSED(runtime); + Y_UNUSED(delay); + Y_UNUSED(deadline); + if (event->GetTypeRewrite() == TEvNonreplPartitionPrivate::EvScrubbingNextRange) { + ++rangeCount; + } + + return false; + }); + + TDynamicCountersPtr counters = new TDynamicCounters(); + InitCriticalEventsCounter(counters); + + TTestEnv env(runtime); + + const auto range1 = TBlockRange64::WithLength(0, 2); + env.WriteMirror(range1, 'A'); + env.WriteReplica(1, range1, 'B'); + env.WriteReplica(2, range1, 'B'); + + const auto range2 = TBlockRange64::WithLength(4096, 100); + env.WriteMirror(range2, 'A'); + env.WriteReplica(2, range2, 'B'); + + while (rangeCount < 6) { + runtime.DispatchEvents({}, env.Config->GetScrubbingInterval()); + } + + auto mirroredDiskChecksumMismatch = + counters->GetCounter("AppCriticalEvents/MirroredDiskChecksumMismatch", true); + + UNIT_ASSERT_VALUES_EQUAL(2, mirroredDiskChecksumMismatch->Val()); + + const auto range3 = TBlockRange64::WithLength(1025, 50); + env.WriteMirror(range3, 'A'); + env.WriteReplica(1, range3, 'B'); + + rangeCount = 0; + while (rangeCount < 6) { + runtime.DispatchEvents({}, env.Config->GetScrubbingInterval()); + } + UNIT_ASSERT_VALUES_EQUAL(5, mirroredDiskChecksumMismatch->Val()); + } + + Y_UNIT_TEST(ShouldPostponeScrubbingIfIntersectingWritePending) + { + using namespace NMonitoring; + + TDynamicCountersPtr counters = new TDynamicCounters(); + InitCriticalEventsCounter(counters); + + TTestBasicRuntime runtime; + runtime.SetRegistrationObserverFunc( + [] (auto& runtime, const auto& parentId, const auto& actorId) + { + Y_UNUSED(parentId); + runtime.EnableScheduleForActor(actorId); + }); + + TTestEnv env(runtime); + + const auto range = TBlockRange64::WithLength(1030, 200); + + env.WriteReplica(0, range, 'A'); + env.WriteReplica(1, range, 'B'); + env.WriteReplica(2, range, 'C'); + + ui32 rangeCount = 0; + TAutoPtr delayedRequest; + runtime.SetScheduledEventFilter([&] (auto& runtime, auto& event, auto&& delay, auto&& deadline) { + Y_UNUSED(runtime); + Y_UNUSED(delay); + Y_UNUSED(deadline); + if (event->GetTypeRewrite() == TEvNonreplPartitionPrivate::EvScrubbingNextRange) { + ++rangeCount; + if (delayedRequest && rangeCount > 5) { + runtime.Send(delayedRequest.Release()); + } + } + + return false; + }); + runtime.SetEventFilter([&] (auto& runtime, auto& event) { + Y_UNUSED(runtime); + if (event->GetTypeRewrite() == TEvDiskAgent::EvWriteDeviceBlocksRequest) { + if (!delayedRequest) { + delayedRequest = event.Release(); + return true; + } + } + + return false; + }); + env.WriteActor(env.ActorId, range, 'D'); + + auto mirroredDiskChecksumMismatch = + counters->GetCounter("AppCriticalEvents/MirroredDiskChecksumMismatch", true); + UNIT_ASSERT_VALUES_EQUAL(0, mirroredDiskChecksumMismatch->Val()); + + rangeCount = 0; + while (rangeCount < 5) { + runtime.DispatchEvents({}, env.Config->GetScrubbingInterval()); + } + + UNIT_ASSERT_VALUES_EQUAL(0, mirroredDiskChecksumMismatch->Val()); + } + + Y_UNIT_TEST(ShouldNotFindMismatchIfChecksumIntersectedWithWrite) + { + using namespace NMonitoring; + + TDynamicCountersPtr counters = new TDynamicCounters(); + InitCriticalEventsCounter(counters); + + TTestBasicRuntime runtime; + runtime.SetRegistrationObserverFunc( + [] (auto& runtime, const auto& parentId, const auto& actorId) + { + Y_UNUSED(parentId); + runtime.EnableScheduleForActor(actorId); + }); + + TTestEnv env(runtime); + + const auto range = TBlockRange64::WithLength(0, 200); + + env.WriteReplica(0, range, 'A'); + env.WriteReplica(1, range, 'B'); + env.WriteReplica(2, range, 'C'); + + enum { + INIT, + REQUESTS_RECEIVED, + CHECKSUM_SENT, + FINISH + } state = INIT; + + ui32 rangeCount = 0; + TAutoPtr delayedWriteRequest; + TAutoPtr delayedChecksumRequest; + runtime.SetEventFilter([&] (auto& runtime, auto& event) { + Y_UNUSED(runtime); + switch (state) { + case INIT: { + if (event->GetTypeRewrite() == TEvDiskAgent::EvWriteDeviceBlocksRequest) { + if (!delayedWriteRequest) { + delayedWriteRequest = event.Release(); + return true; + } + } + if (event->GetTypeRewrite() == TEvDiskAgent::EvChecksumDeviceBlocksRequest) { + if (!delayedChecksumRequest) { + delayedChecksumRequest = event.Release(); + return true; + } + } + if (delayedWriteRequest && delayedChecksumRequest) { + state = REQUESTS_RECEIVED; + } + break; + } + case REQUESTS_RECEIVED: { + state = CHECKSUM_SENT; + runtime.Send(delayedChecksumRequest.Release()); + break; + } + case CHECKSUM_SENT: { + if (event->GetTypeRewrite() == TEvDiskAgent::EvChecksumDeviceBlocksResponse) { + state = FINISH; + runtime.Send(delayedWriteRequest.Release()); + } + break; + } + default: + break; + } + + return false; + }); + runtime.SetScheduledEventFilter([&] (auto& runtime, auto& event, auto&& delay, auto&& deadline) { + Y_UNUSED(runtime); + Y_UNUSED(delay); + Y_UNUSED(deadline); + if (state == FINISH && event->GetTypeRewrite() == TEvNonreplPartitionPrivate::EvScrubbingNextRange) { + ++rangeCount; + } + + return false; + }); + + + runtime.DispatchEvents({}, env.Config->GetScrubbingInterval()); + + env.WriteActor(env.ActorId, range, 'D'); + + while (rangeCount < 5) { + runtime.DispatchEvents({}, env.Config->GetScrubbingInterval()); + } + + auto mirroredDiskChecksumMismatch = + counters->GetCounter("AppCriticalEvents/MirroredDiskChecksumMismatch", true); + + UNIT_ASSERT_VALUES_EQUAL(0, mirroredDiskChecksumMismatch->Val()); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h index aefe0e73b4a..2df8d8d5150 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h @@ -169,6 +169,7 @@ struct TEvNonreplPartitionPrivate EvBegin = TBlockStorePrivateEvents::PARTITION_NONREPL_START, EvUpdateCounters, + EvScrubbingNextRange, EvReadBlocksCompleted, EvWriteBlocksCompleted, EvZeroBlocksCompleted, @@ -189,6 +190,7 @@ struct TEvNonreplPartitionPrivate "EvEnd expected to be < TBlockStorePrivateEvents::PARTITION_NONREPL_END"); using TEvUpdateCounters = TResponseEvent; + using TEvScrubbingNextRange = TResponseEvent; using TEvReadBlocksCompleted = TResponseEvent; using TEvWriteBlocksCompleted = TResponseEvent; using TEvZeroBlocksCompleted = TResponseEvent; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.cpp index 744d1f8cd7a..def5b301761 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.cpp @@ -141,6 +141,8 @@ STFUNC(TNonreplicatedPartitionMigrationCommonActor::StateWork) HFunc(TEvService::TEvReadBlocksLocalRequest, HandleReadBlocksLocal); HFunc(TEvService::TEvWriteBlocksLocalRequest, HandleWriteBlocksLocal); + HFunc(TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest, HandleChecksumBlocks); + HFunc( NPartition::TEvPartition::TEvDrainRequest, DrainActorCompanion.HandleDrain); @@ -193,6 +195,8 @@ STFUNC(TNonreplicatedPartitionMigrationCommonActor::StateZombie) HFunc(TEvService::TEvWriteBlocksRequest, RejectWriteBlocks); HFunc(TEvService::TEvZeroBlocksRequest, RejectZeroBlocks); + HFunc(TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest, RejectChecksumBlocks); + HFunc(TEvService::TEvReadBlocksLocalRequest, RejectReadBlocksLocal); HFunc(TEvService::TEvWriteBlocksLocalRequest, RejectWriteBlocksLocal); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h index d499127bb02..17f462ae3b8 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h @@ -232,6 +232,7 @@ class TNonreplicatedPartitionMigrationCommonActor BLOCKSTORE_IMPLEMENT_REQUEST(ReadBlocksLocal, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(WriteBlocksLocal, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(ZeroBlocks, TEvService); + BLOCKSTORE_IMPLEMENT_REQUEST(ChecksumBlocks, TEvNonreplPartitionPrivate); BLOCKSTORE_IMPLEMENT_REQUEST(Drain, NPartition::TEvPartition); BLOCKSTORE_IMPLEMENT_REQUEST(DescribeBlocks, TEvVolume); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_checksumblocks.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_checksumblocks.cpp new file mode 100644 index 00000000000..2718487f05d --- /dev/null +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_checksumblocks.cpp @@ -0,0 +1,21 @@ +#include "part_nonrepl_migration_common_actor.h" + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +//////////////////////////////////////////////////////////////////////////////// + +void TNonreplicatedPartitionMigrationCommonActor::HandleChecksumBlocks( + const TEvNonreplPartitionPrivate::TEvChecksumBlocksRequest::TPtr& ev, + const TActorContext& ctx) +{ + ForwardRequestWithNondeliveryTracking( + ctx, + SrcActorId, + *ev); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.cpp index 11afffd793b..115347783bc 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.cpp @@ -44,7 +44,7 @@ void TResyncRangeActor::Bootstrap(const TActorContext& ctx) "ResyncRange", RequestInfo->CallContext->RequestId); - ChecksumRangeActorCompanion.CalculateChecksums(ctx); + ChecksumRangeActorCompanion.CalculateChecksums(ctx, Range); } void TResyncRangeActor::CompareChecksums(const TActorContext& ctx) @@ -228,18 +228,15 @@ void TResyncRangeActor::HandleChecksumResponse( { ChecksumRangeActorCompanion.HandleChecksumResponse(ev, ctx); - if (!ChecksumRangeActorCompanion.IsFinished()) { - return; - } - Error = ChecksumRangeActorCompanion.GetError(); - if (HasError(Error)) { Done(ctx); return; } - CompareChecksums(ctx); + if (ChecksumRangeActorCompanion.IsFinished()) { + CompareChecksums(ctx); + } } void TResyncRangeActor::HandleReadUndelivery( diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.h b/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.h index a7ce5a05b02..74614d71955 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/resync_range.h @@ -41,9 +41,7 @@ class TResyncRangeActor final TDuration WriteDuration; TVector AffectedBlockInfos; - TChecksumRangeActorCompanion ChecksumRangeActorCompanion{ - Range, - Replicas}; + TChecksumRangeActorCompanion ChecksumRangeActorCompanion{Replicas}; public: TResyncRangeActor( diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/ya.make b/cloud/blockstore/libs/storage/partition_nonrepl/ya.make index 7a2fbfe8a2f..be56a82c8d1 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/ya.make +++ b/cloud/blockstore/libs/storage/partition_nonrepl/ya.make @@ -49,6 +49,7 @@ SRCS( part_nonrepl_migration_actor.cpp part_nonrepl_migration_common_actor.cpp + part_nonrepl_migration_common_actor_checksumblocks.cpp part_nonrepl_migration_common_actor_migration.cpp part_nonrepl_migration_common_actor_mirror.cpp part_nonrepl_migration_common_actor_readblocks_local.cpp