Skip to content

Commit

Permalink
Handle migration partition non-retriable errors separately
Browse files Browse the repository at this point in the history
  • Loading branch information
drbasic committed Sep 26, 2024
1 parent 9c55e94 commit 1ab51d4
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 45 deletions.
112 changes: 80 additions & 32 deletions cloud/blockstore/libs/storage/partition_nonrepl/mirror_request_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,41 @@ class TMirrorRequestActor final
: public NActors::TActorBootstrapped<TMirrorRequestActor<TMethod>>
{
private:
using TBase = NActors::TActorBootstrapped<TMirrorRequestActor<TMethod>>;
using TResponseProto = typename TMethod::TResponse::ProtoRecordType;

const TRequestInfoPtr RequestInfo;
const TVector<NActors::TActorId> Partitions;
const TVector<NActors::TActorId> LeaderPartitions;
const NActors::TActorId FollowerPartition;
const typename TMethod::TRequest::ProtoRecordType Request;
const TString DiskId;
const NActors::TActorId ParentActorId;
const ui64 NonreplicatedRequestCounter;
const bool ShouldProcessError;

TVector<TCallContextPtr> ForkedCallContexts;
ui32 Responses = 0;
typename TMethod::TResponse::ProtoRecordType Record;

using TBase = NActors::TActorBootstrapped<TMirrorRequestActor<TMethod>>;
TResponseProto LeadersCollectiveResponse;
TResponseProto FollowerResponse;

public:
TMirrorRequestActor(
TRequestInfoPtr requestInfo,
TVector<NActors::TActorId> partitions,
TVector<NActors::TActorId> leaderPartitions,
NActors::TActorId followerPartition,
typename TMethod::TRequest::ProtoRecordType request,
TString diskId,
NActors::TActorId parentActorId,
ui64 nonreplicatedRequestCounter,
bool shouldProcessError);
ui64 nonreplicatedRequestCounter);

void Bootstrap(const NActors::TActorContext& ctx);

private:
void SendRequests(const NActors::TActorContext& ctx);
void Done(const NActors::TActorContext& ctx);
size_t GetTotalPartitionCount() const;
void UpdateResponse(
const NActors::TActorId& sender,
TResponseProto&& response);

private:
STFUNC(StateWork);
Expand All @@ -77,19 +83,19 @@ class TMirrorRequestActor final
template <typename TMethod>
TMirrorRequestActor<TMethod>::TMirrorRequestActor(
TRequestInfoPtr requestInfo,
TVector<NActors::TActorId> partitions,
TVector<NActors::TActorId> leaderPartitions,
NActors::TActorId followerPartition,
typename TMethod::TRequest::ProtoRecordType request,
TString diskId,
NActors::TActorId parentActorId,
ui64 nonreplicatedRequestCounter,
bool shouldProcessError)
ui64 nonreplicatedRequestCounter)
: RequestInfo(std::move(requestInfo))
, Partitions(std::move(partitions))
, LeaderPartitions(std::move(leaderPartitions))
, FollowerPartition(followerPartition)
, Request(std::move(request))
, DiskId(std::move(diskId))
, ParentActorId(parentActorId)
, NonreplicatedRequestCounter(nonreplicatedRequestCounter)
, ShouldProcessError(shouldProcessError)
{}

template <typename TMethod>
Expand All @@ -111,7 +117,7 @@ void TMirrorRequestActor<TMethod>::Bootstrap(const NActors::TActorContext& ctx)
template <typename TMethod>
void TMirrorRequestActor<TMethod>::SendRequests(const NActors::TActorContext& ctx)
{
for (const auto& actorId: Partitions) {
auto sendRequest = [&](const NActors::TActorId& actorId) {
auto request = std::make_unique<typename TMethod::TRequest>();
auto& callContext = *RequestInfo->CallContext;
if (!callContext.LWOrbit.Fork(request->CallContext->LWOrbit)) {
Expand All @@ -134,14 +140,32 @@ void TMirrorRequestActor<TMethod>::SendRequests(const NActors::TActorContext& ct
);

ctx.Send(std::move(event));
};

for (const auto& actorId: LeaderPartitions) {
sendRequest(actorId);
}
if (FollowerPartition) {
sendRequest(FollowerPartition);
}
}

template <typename TMethod>
void TMirrorRequestActor<TMethod>::Done(const NActors::TActorContext& ctx)
{
const bool hasFollower = FollowerPartition != NActors::TActorId();
const bool isFollowerResponseError =
hasFollower && HasError(FollowerResponse);
const bool isFollowerResponseFatal =
isFollowerResponseError &&
GetErrorKind(FollowerResponse.GetError()) == EErrorKind::ErrorFatal;

if (isFollowerResponseError && !isFollowerResponseFatal) {
UpdateResponse({}, std::move(FollowerResponse));
}

auto response = std::make_unique<typename TMethod::TResponse>();
response->Record = std::move(Record);
response->Record = std::move(LeadersCollectiveResponse);

auto& callContext = *RequestInfo->CallContext;
for (auto& cc: ForkedCallContexts) {
Expand All @@ -156,18 +180,44 @@ void TMirrorRequestActor<TMethod>::Done(const NActors::TActorContext& ctx)

NCloud::Reply(ctx, *RequestInfo, std::move(response));

using TCompletion =
TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted;
auto completion =
std::make_unique<TCompletion>(
std::make_unique<TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted>(
NonreplicatedRequestCounter,
RequestInfo->GetTotalCycles());

RequestInfo->GetTotalCycles(),
isFollowerResponseFatal);
NCloud::Send(ctx, ParentActorId, std::move(completion));

TBase::Die(ctx);
}

template <typename TMethod>
size_t TMirrorRequestActor<TMethod>::GetTotalPartitionCount() const
{
const bool hasFollower = FollowerPartition != NActors::TActorId();
return LeaderPartitions.size() + (hasFollower ? 1 : 0);
}

template <typename TMethod>
void TMirrorRequestActor<TMethod>::UpdateResponse(
const NActors::TActorId& sender,
TResponseProto&& response)
{
if (sender == FollowerPartition) {
FollowerResponse = std::move(response);
return;
}

const bool hasFollower = FollowerPartition != NActors::TActorId();

if (!hasFollower) {
ProcessMirrorActorError(*response.MutableError());
}

if (!HasError(LeadersCollectiveResponse)) {
LeadersCollectiveResponse = std::move(response);
}
}

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod>
Expand All @@ -182,10 +232,12 @@ void TMirrorRequestActor<TMethod>::HandleUndelivery(
DiskId.c_str(),
TMethod::Name);

Record.MutableError()->CopyFrom(MakeError(E_REJECTED, TStringBuilder()
<< TMethod::Name << " request undelivered to some nonrepl partitions"));
LeadersCollectiveResponse.MutableError()->CopyFrom(MakeError(
E_REJECTED,
TStringBuilder() << TMethod::Name
<< " request undelivered to some nonrepl partitions"));

if (++Responses < Partitions.size()) {
if (++Responses < GetTotalPartitionCount()) {
return;
}

Expand All @@ -207,15 +259,9 @@ void TMirrorRequestActor<TMethod>::HandleResponse(
FormatError(msg->Record.GetError()).c_str());
}

if (ShouldProcessError) {
ProcessMirrorActorError(*msg->Record.MutableError());
}
UpdateResponse(ev->Sender, std::move(msg->Record));

if (!HasError(Record)) {
Record = std::move(msg->Record);
}

if (++Responses < Partitions.size()) {
if (++Responses < GetTotalPartitionCount()) {
return;
}

Expand All @@ -229,7 +275,9 @@ void TMirrorRequestActor<TMethod>::HandlePoisonPill(
{
Y_UNUSED(ev);

Record.MutableError()->CopyFrom(MakeError(E_REJECTED, "Dead"));
LeadersCollectiveResponse.MutableError()->CopyFrom(
MakeError(E_REJECTED, "Dead"));

Done(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ void TMirrorPartitionActor::MirrorRequest(
ctx,
std::move(requestInfo),
State.GetReplicaActors(),
TActorId{},
std::move(msg->Record),
State.GetReplicaInfos()[0].Config->GetName(),
SelfId(),
requestIdentityKey,
true // shouldProcessError
);
requestIdentityKey);
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,15 @@ struct TEvNonreplPartitionPrivate
{
ui64 RequestCounter;
ui64 TotalCycles;
bool FollowerGotNonretriableError;

TWriteOrZeroCompleted(
ui64 requestCounter,
ui64 totalCycles)
ui64 totalCycles,
bool followerGotNonretriableError)
: RequestCounter(requestCounter)
, TotalCycles(totalCycles)
, FollowerGotNonretriableError(followerGotNonretriableError)
{
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ class TNonreplicatedPartitionMigrationCommonActor
TDuration CalculateMigrationTimeout(TBlockRange64 range) const;
void DoRegisterTrafficSource(const NActors::TActorContext& ctx);

void OnMigrationNonRetriableError(const NActors::TActorContext& ctx);

private:
STFUNC(StateWork);
STFUNC(StateZombie);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,7 @@ void TNonreplicatedPartitionMigrationCommonActor::HandleRangeMigrated(
FormatError(msg->GetError()).c_str());

if (GetErrorKind(msg->GetError()) != EErrorKind::ErrorRetriable) {
ReportMigrationFailed();
MigrationOwner->OnMigrationError(ctx);
MigrationEnabled = false;
OnMigrationNonRetriableError(ctx);
return;
}

Expand Down Expand Up @@ -354,12 +352,21 @@ void TNonreplicatedPartitionMigrationCommonActor::DoRegisterTrafficSource(
new TEvents::TEvWakeup(WR_REGISTER_TRAFFIC_SOURCE));
}

void TNonreplicatedPartitionMigrationCommonActor::OnMigrationNonRetriableError(
const NActors::TActorContext& ctx)
{
ReportMigrationFailed();
MigrationOwner->OnMigrationError(ctx);
MigrationEnabled = false;
}

////////////////////////////////////////////////////////////////////////////////

void TNonreplicatedPartitionMigrationCommonActor::ScheduleRangeMigration(
const TActorContext& ctx)
{
if (RangeMigrationScheduled || IsIoDepthLimitReached()) {
if (!MigrationEnabled || RangeMigrationScheduled || IsIoDepthLimitReached())
{
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ void TNonreplicatedPartitionMigrationCommonActor::HandleWriteOrZeroCompleted(
const TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted::TPtr& ev,
const TActorContext& ctx)
{
const auto counter = ev->Get()->RequestCounter;
auto * msg = ev->Get();
const auto counter = msg->RequestCounter;
if (!WriteAndZeroRequestsInProgress.RemoveRequest(counter)) {
Y_DEBUG_ABORT_UNLESS(0);
}

if (msg->FollowerGotNonretriableError) {
OnMigrationNonRetriableError(ctx);
}

DrainActorCompanion.ProcessDrainRequests(ctx);
ScheduleRangeMigration(ctx);
}
Expand Down Expand Up @@ -97,13 +102,12 @@ void TNonreplicatedPartitionMigrationCommonActor::MirrorRequest(
NCloud::Register<TMirrorRequestActor<TMethod>>(
ctx,
std::move(requestInfo),
TVector<TActorId>{SrcActorId, DstActorId},
TVector<TActorId>{SrcActorId},
DstActorId,
std::move(msg->Record),
DiskId,
SelfId(),
WriteAndZeroRequestsInProgress.AddWriteRequest(range),
false // shouldProcessError
);
WriteAndZeroRequestsInProgress.AddWriteRequest(range));

if constexpr (IsExactlyWriteMethod<TMethod>) {
ChangedRangesMap.MarkChanged(range);
Expand Down

0 comments on commit 1ab51d4

Please sign in to comment.