diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/mirror_request_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/mirror_request_actor.h index 90cbb40349b..497cb36b6e9 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/mirror_request_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/mirror_request_actor.h @@ -26,35 +26,41 @@ class TMirrorRequestActor final : public NActors::TActorBootstrapped> { private: + using TBase = NActors::TActorBootstrapped>; + using TResponseProto = typename TMethod::TResponse::ProtoRecordType; + const TRequestInfoPtr RequestInfo; - const TVector Partitions; + const TVector LeaderPartitions; + const NActors::TActorId FollowerPartition; const typename TMethod::TRequest::ProtoRecordType Request; const TString DiskId; const NActors::TActorId ParentActorId; const ui64 NonreplicatedRequestCounter; - const bool ShouldProcessError; TVector ForkedCallContexts; ui32 Responses = 0; - typename TMethod::TResponse::ProtoRecordType Record; - - using TBase = NActors::TActorBootstrapped>; + TResponseProto LeadersCollectiveResponse; + TResponseProto FollowerResponse; public: TMirrorRequestActor( TRequestInfoPtr requestInfo, - TVector partitions, + TVector leaderPartitions, + NActors::TActorId followerPartition, typename TMethod::TRequest::ProtoRecordType request, TString diskId, NActors::TActorId parentActorId, - ui64 nonreplicatedRequestCounter, - bool shouldProcessError); + ui64 nonreplicatedRequestCounter); void Bootstrap(const NActors::TActorContext& ctx); private: void SendRequests(const NActors::TActorContext& ctx); void Done(const NActors::TActorContext& ctx); + size_t GetTotalPartitionCount() const; + void UpdateResponse( + const NActors::TActorId& sender, + TResponseProto&& response); private: STFUNC(StateWork); @@ -77,19 +83,19 @@ class TMirrorRequestActor final template TMirrorRequestActor::TMirrorRequestActor( TRequestInfoPtr requestInfo, - TVector partitions, + TVector leaderPartitions, + NActors::TActorId followerPartition, typename TMethod::TRequest::ProtoRecordType request, TString diskId, NActors::TActorId parentActorId, - ui64 nonreplicatedRequestCounter, - bool shouldProcessError) + ui64 nonreplicatedRequestCounter) : RequestInfo(std::move(requestInfo)) - , Partitions(std::move(partitions)) + , LeaderPartitions(std::move(leaderPartitions)) + , FollowerPartition(followerPartition) , Request(std::move(request)) , DiskId(std::move(diskId)) , ParentActorId(parentActorId) , NonreplicatedRequestCounter(nonreplicatedRequestCounter) - , ShouldProcessError(shouldProcessError) {} template @@ -111,7 +117,7 @@ void TMirrorRequestActor::Bootstrap(const NActors::TActorContext& ctx) template void TMirrorRequestActor::SendRequests(const NActors::TActorContext& ctx) { - for (const auto& actorId: Partitions) { + auto sendRequest = [&](const NActors::TActorId& actorId) { auto request = std::make_unique(); auto& callContext = *RequestInfo->CallContext; if (!callContext.LWOrbit.Fork(request->CallContext->LWOrbit)) { @@ -134,14 +140,32 @@ void TMirrorRequestActor::SendRequests(const NActors::TActorContext& ct ); ctx.Send(std::move(event)); + }; + + for (const auto& actorId: LeaderPartitions) { + sendRequest(actorId); + } + if (FollowerPartition) { + sendRequest(FollowerPartition); } } template void TMirrorRequestActor::Done(const NActors::TActorContext& ctx) { + const bool hasFollower = FollowerPartition != NActors::TActorId(); + const bool isFollowerResponseError = + hasFollower && HasError(FollowerResponse); + const bool isFollowerResponseFatal = + isFollowerResponseError && + GetErrorKind(FollowerResponse.GetError()) == EErrorKind::ErrorFatal; + + if (isFollowerResponseError && !isFollowerResponseFatal) { + UpdateResponse({}, std::move(FollowerResponse)); + } + auto response = std::make_unique(); - response->Record = std::move(Record); + response->Record = std::move(LeadersCollectiveResponse); auto& callContext = *RequestInfo->CallContext; for (auto& cc: ForkedCallContexts) { @@ -156,18 +180,44 @@ void TMirrorRequestActor::Done(const NActors::TActorContext& ctx) NCloud::Reply(ctx, *RequestInfo, std::move(response)); - using TCompletion = - TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted; auto completion = - std::make_unique( + std::make_unique( NonreplicatedRequestCounter, - RequestInfo->GetTotalCycles()); - + RequestInfo->GetTotalCycles(), + isFollowerResponseFatal); NCloud::Send(ctx, ParentActorId, std::move(completion)); TBase::Die(ctx); } +template +size_t TMirrorRequestActor::GetTotalPartitionCount() const +{ + const bool hasFollower = FollowerPartition != NActors::TActorId(); + return LeaderPartitions.size() + (hasFollower ? 1 : 0); +} + +template +void TMirrorRequestActor::UpdateResponse( + const NActors::TActorId& sender, + TResponseProto&& response) +{ + if (sender == FollowerPartition) { + FollowerResponse = std::move(response); + return; + } + + const bool hasFollower = FollowerPartition != NActors::TActorId(); + + if (!hasFollower) { + ProcessMirrorActorError(*response.MutableError()); + } + + if (!HasError(LeadersCollectiveResponse)) { + LeadersCollectiveResponse = std::move(response); + } +} + //////////////////////////////////////////////////////////////////////////////// template @@ -182,10 +232,12 @@ void TMirrorRequestActor::HandleUndelivery( DiskId.c_str(), TMethod::Name); - Record.MutableError()->CopyFrom(MakeError(E_REJECTED, TStringBuilder() - << TMethod::Name << " request undelivered to some nonrepl partitions")); + LeadersCollectiveResponse.MutableError()->CopyFrom(MakeError( + E_REJECTED, + TStringBuilder() << TMethod::Name + << " request undelivered to some nonrepl partitions")); - if (++Responses < Partitions.size()) { + if (++Responses < GetTotalPartitionCount()) { return; } @@ -207,15 +259,9 @@ void TMirrorRequestActor::HandleResponse( FormatError(msg->Record.GetError()).c_str()); } - if (ShouldProcessError) { - ProcessMirrorActorError(*msg->Record.MutableError()); - } + UpdateResponse(ev->Sender, std::move(msg->Record)); - if (!HasError(Record)) { - Record = std::move(msg->Record); - } - - if (++Responses < Partitions.size()) { + if (++Responses < GetTotalPartitionCount()) { return; } @@ -229,7 +275,9 @@ void TMirrorRequestActor::HandlePoisonPill( { Y_UNUSED(ev); - Record.MutableError()->CopyFrom(MakeError(E_REJECTED, "Dead")); + LeadersCollectiveResponse.MutableError()->CopyFrom( + MakeError(E_REJECTED, "Dead")); + Done(ctx); } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp index 64d266909bf..316a6c7d23a 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_mirror.cpp @@ -78,12 +78,11 @@ void TMirrorPartitionActor::MirrorRequest( ctx, std::move(requestInfo), State.GetReplicaActors(), + TActorId{}, std::move(msg->Record), State.GetReplicaInfos()[0].Config->GetName(), SelfId(), - requestIdentityKey, - true // shouldProcessError - ); + requestIdentityKey); } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h index 4525ee53a66..e71d1002466 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_events_private.h @@ -83,12 +83,15 @@ struct TEvNonreplPartitionPrivate { ui64 RequestCounter; ui64 TotalCycles; + bool FollowerGotNonretriableError; TWriteOrZeroCompleted( ui64 requestCounter, - ui64 totalCycles) + ui64 totalCycles, + bool followerGotNonretriableError) : RequestCounter(requestCounter) , TotalCycles(totalCycles) + , FollowerGotNonretriableError(followerGotNonretriableError) { } }; diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h index 95b5b87c583..d20afa02978 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor.h @@ -212,6 +212,8 @@ class TNonreplicatedPartitionMigrationCommonActor TDuration CalculateMigrationTimeout(TBlockRange64 range) const; void DoRegisterTrafficSource(const NActors::TActorContext& ctx); + void OnMigrationNonRetriableError(const NActors::TActorContext& ctx); + private: STFUNC(StateWork); STFUNC(StateZombie); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp index 87633cde7cc..12c8ca0e83c 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_migration.cpp @@ -253,9 +253,7 @@ void TNonreplicatedPartitionMigrationCommonActor::HandleRangeMigrated( FormatError(msg->GetError()).c_str()); if (GetErrorKind(msg->GetError()) != EErrorKind::ErrorRetriable) { - ReportMigrationFailed(); - MigrationOwner->OnMigrationError(ctx); - MigrationEnabled = false; + OnMigrationNonRetriableError(ctx); return; } @@ -354,12 +352,21 @@ void TNonreplicatedPartitionMigrationCommonActor::DoRegisterTrafficSource( new TEvents::TEvWakeup(WR_REGISTER_TRAFFIC_SOURCE)); } +void TNonreplicatedPartitionMigrationCommonActor::OnMigrationNonRetriableError( + const NActors::TActorContext& ctx) +{ + ReportMigrationFailed(); + MigrationOwner->OnMigrationError(ctx); + MigrationEnabled = false; +} + //////////////////////////////////////////////////////////////////////////////// void TNonreplicatedPartitionMigrationCommonActor::ScheduleRangeMigration( const TActorContext& ctx) { - if (RangeMigrationScheduled || IsIoDepthLimitReached()) { + if (!MigrationEnabled || RangeMigrationScheduled || IsIoDepthLimitReached()) + { return; } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_mirror.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_mirror.cpp index 96565670e98..014a95c41d5 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_mirror.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_common_actor_mirror.cpp @@ -19,11 +19,16 @@ void TNonreplicatedPartitionMigrationCommonActor::HandleWriteOrZeroCompleted( const TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted::TPtr& ev, const TActorContext& ctx) { - const auto counter = ev->Get()->RequestCounter; + auto * msg = ev->Get(); + const auto counter = msg->RequestCounter; if (!WriteAndZeroRequestsInProgress.RemoveRequest(counter)) { Y_DEBUG_ABORT_UNLESS(0); } + if (msg->FollowerGotNonretriableError) { + OnMigrationNonRetriableError(ctx); + } + DrainActorCompanion.ProcessDrainRequests(ctx); ScheduleRangeMigration(ctx); } @@ -97,13 +102,12 @@ void TNonreplicatedPartitionMigrationCommonActor::MirrorRequest( NCloud::Register>( ctx, std::move(requestInfo), - TVector{SrcActorId, DstActorId}, + TVector{SrcActorId}, + DstActorId, std::move(msg->Record), DiskId, SelfId(), - WriteAndZeroRequestsInProgress.AddWriteRequest(range), - false // shouldProcessError - ); + WriteAndZeroRequestsInProgress.AddWriteRequest(range)); if constexpr (IsExactlyWriteMethod) { ChangedRangesMap.MarkChanged(range);