Skip to content

Commit

Permalink
Add status check + timeouts before next launch
Browse files Browse the repository at this point in the history
  • Loading branch information
robdrynkin committed Sep 26, 2024
1 parent 5807f62 commit f28c72c
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
QLOG_DEBUG_S("BSQ34", "Status# " << status
<< " VDiskId# " << VDiskId
<< " Cookie# " << cookie);
auto response = std::make_unique<TEvBlobStorage::TEvVStatusResult>(status, VDiskId, false, false, 0);
auto response = std::make_unique<TEvBlobStorage::TEvVStatusResult>(status, VDiskId, false, false, false, 0);
ctx.Send(sender, response.release(), 0, cookie);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/base/blobstorage_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,13 @@ namespace NKikimr {
{
TEvVStatusResult() = default;

TEvVStatusResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, bool joinedGroup, bool replicated,
TEvVStatusResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, bool joinedGroup, bool replicated, bool isReadOnly,
ui64 incarnationGuid)
{
Record.SetStatus(status);
Record.SetJoinedGroup(joinedGroup);
Record.SetReplicated(replicated);
Record.SetIsReadOnly(isReadOnly);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
if (status == NKikimrProto::OK) {
Record.SetIncarnationGuid(incarnationGuid);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ namespace NKikimr::NStorage {
vdiskConfig->BalancingRequestBlobsOnMainTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetRequestBlobsOnMainTimeoutMs());
vdiskConfig->BalancingDeleteBatchTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetDeleteBatchTimeoutMs());
vdiskConfig->BalancingEpochTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetEpochTimeoutMs());
vdiskConfig->BalancingTimeToSleepIfNothingToDo = TDuration::Seconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetSecondsToSleepIfNothingToDo());

// issue initial report to whiteboard before creating actor to avoid races
Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate(vdiskId, groupInfo->GetStoragePoolName(),
Expand Down
70 changes: 70 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct TTestEnv {
balancingConf->SetEnableSend(true);
balancingConf->SetEnableDelete(true);
balancingConf->SetBalanceOnlyHugeBlobs(false);
balancingConf->SetSecondsToSleepIfNothingToDo(1);
},
})
{
Expand Down Expand Up @@ -182,6 +183,23 @@ struct TTestEnv {
Queues.clear();
}

void SetVDiskReadOnly(ui32 position, bool value) {
const TVDiskID& someVDisk = GroupInfo->GetVDiskId(position);
auto baseConfig = Env.FetchBaseConfig();

const auto& somePDisk = baseConfig.GetPDisk(position);
const auto& someVSlot = baseConfig.GetVSlot(position);
Cerr << "Setting VDisk read-only to " << value << " for position " << position << Endl;
if (!value) {
Env.PDiskMockStates[{somePDisk.GetNodeId(), somePDisk.GetPDiskId()}]->SetReadOnly(someVDisk, value);
}
Env.SetVDiskReadOnly(somePDisk.GetNodeId(), somePDisk.GetPDiskId(), someVSlot.GetVSlotId().GetVSlotId(), someVDisk, value);
Env.Sim(TDuration::Seconds(30));
if (value) {
Env.PDiskMockStates[{somePDisk.GetNodeId(), somePDisk.GetPDiskId()}]->SetReadOnly(someVDisk, value);
}
}

TEnvironmentSetup* operator->() {
return &Env;
}
Expand Down Expand Up @@ -228,7 +246,56 @@ struct TStopOneNodeTest {
Env.SendPut(step, data, NKikimrProto::OK);
Env->Sim(TDuration::Seconds(10));
Env.StartNode(nodeIdWithBlob);
Env->Sim(TDuration::Seconds(60));
Cerr << "Start compaction" << Endl;
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
}
Env->Sim(TDuration::Seconds(30));
Cerr << "Finish compaction" << Endl;

Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data);
}
}
};

struct TDontSendToReadOnlyTest {
TTestEnv Env;
TString data;

void RunTest() {
ui32 step = 0;

{ // Check just a normal put works
Env.SendPut(++step, data, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data);
Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
}


{
auto blobId = MakeLogoBlobId(++step, data.size());
auto locations = Env.GetExpectedPartsLocations(blobId);
ui32 nodeIdWithBlob = 0;
while (locations[nodeIdWithBlob].size() == 0) ++nodeIdWithBlob;

Env.SetVDiskReadOnly(nodeIdWithBlob, true);
Env.SendPut(step, data, NKikimrProto::OK);

// Check that balancing does not send anything when vdisk in readonly mode
Env.Env.Runtime->FilterFunction = [&](ui32, std::unique_ptr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvBlobStorage::EvVPut || ev->GetTypeRewrite() == TEvBlobStorage::EvVMultiPut) {
UNIT_ASSERT(false);
}
return true;
};
Env->Sim(TDuration::Seconds(60));
Env.Env.Runtime->FilterFunction = {};

Env.SetVDiskReadOnly(nodeIdWithBlob, false);

Env->Sim(TDuration::Seconds(60));

Cerr << "Start compaction" << Endl;
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Expand Down Expand Up @@ -430,4 +497,7 @@ Y_UNIT_TEST_SUITE(VDiskBalancing) {
TTwoPartsOnOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(521_KB * 6)}.RunTest();
}

Y_UNIT_TEST(TestDontSendToReadOnlyTest_Block42) {
TDontSendToReadOnlyTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(100)}.RunTest();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TNodeWardenMockActor::TVDiskMockActor : public TActorBootstrapped<TVDiskMo
auto& record = ev->Get()->Record;
if (const auto& vdisk = VDisk.lock()) {
auto response = std::make_unique<TEvBlobStorage::TEvVStatusResult>(NKikimrProto::ERROR, vdisk->GetVDiskId(),
false, false, 0);
false, false, false, 0);
auto& r = response->Record;
if (VDiskIDFromVDiskID(record.GetVDiskID()) != vdisk->GetVDiskId()) { // RACE
r.SetStatus(NKikimrProto::RACE);
Expand Down
107 changes: 75 additions & 32 deletions ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ namespace NBalancing {
TLogoBlobsSnapshot::TForwardIterator It;
TQueueActorMapPtr QueueActorMapPtr;
THashSet<TVDiskID> ConnectedVDisks;
THashSet<TVDiskID> GoodStatusVDisks;

TBatchedQueue<TPartInfo> SendOnMainParts;
TBatchedQueue<TLogoBlobID> TryDeleteParts;
Expand All @@ -83,46 +84,46 @@ namespace NBalancing {
TInstant StartTime;

///////////////////////////////////////////////////////////////////////////////////////////
// Main logic
// Init logic
///////////////////////////////////////////////////////////////////////////////////////////

void ContinueBalancing() {
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();
void SendCheckVDisksStatusRequests() {
for (ui32 i = 0; i < GInfo->GetTotalVDisksNum(); ++i) {
const auto vdiskId = GInfo->GetVDiskId(i);
const auto actorId = GInfo->GetActorId(i);
if (TVDiskIdShort(vdiskId) != Ctx->VCtx->ShortSelfVDisk) {
Send(actorId, new TEvBlobStorage::TEvVStatus(vdiskId));
}
}
}

if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) {
// no more parts to send or delete
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed"));
PassAway();
void Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev) {
auto msg = ev->Get();
auto vdiskId = VDiskIDFromVDiskID(msg->Record.GetVDiskID());
auto status = msg->Record.GetStatus();
bool replicated = msg->Record.GetReplicated();
bool isReadOnly = msg->Record.GetIsReadOnly();

if (status != NKikimrProto::EReplyStatus::OK || !replicated || isReadOnly) {
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "VDisk is not ready. Stop balancing"),
(VDiskId, vdiskId), (Status, NKikimrProto::EReplyStatus_Name(status)), (Replicated, replicated), (ReadOnly, isReadOnly));
Stop(TDuration::Seconds(10));
return;
}

// ask for repl token to continue balancing
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB01, VDISKP(Ctx->VCtx, "Ask repl token to continue balancing"), (SelfId, SelfId()), (PDiskId, Ctx->VDiskCfg->BaseInfo.PDiskId));
Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(Ctx->VDiskCfg->BaseInfo.PDiskId), NActors::IEventHandle::FlagTrackDelivery);
GoodStatusVDisks.insert(vdiskId);
}

void ScheduleJobQuant() {
Ctx->MonGroup.ReplTokenAquired()++;

// once repl token received, start balancing - waking up sender and deleter
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
(SendPartsLeft, SendOnMainParts.Size()), (DeletePartsLeft, TryDeleteParts.Size()),
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum()));

// register sender and deleter actors
BatchManager = TBatchManager(
CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx),
CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx)
);
bool ReadyToBalance() const {
return (GoodStatusVDisks.size() + 1 == GInfo->GetTotalVDisksNum()) && (ConnectedVDisks.size() + 1 == GInfo->GetTotalVDisksNum());
}

void CollectKeys() {
if (ConnectedVDisks.size() + 1 != GInfo->GetTotalVDisksNum()) {
if (!ReadyToBalance()) {
// not all vdisks are connected
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Not all vdisks are connected, balancing should work only for full groups"),
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisksInGroup, GInfo->GetTotalVDisksNum()));
PassAway();
(ConnectedVDisks, ConnectedVDisks.size()), (GoodStatusVDisks, GoodStatusVDisks.size()), (TotalVDisksInGroup, GInfo->GetTotalVDisksNum()));
Stop(TDuration::Seconds(10));
return;
}

Expand Down Expand Up @@ -197,6 +198,42 @@ namespace NBalancing {
ContinueBalancing();
}

///////////////////////////////////////////////////////////////////////////////////////////
// Main logic
///////////////////////////////////////////////////////////////////////////////////////////

void ContinueBalancing() {
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();

if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) {
// no more parts to send or delete
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed"));
bool hasSomeWorkForNextEpoch = SendOnMainParts.Data.size() >= Ctx->Cfg.MaxToSendPerEpoch || TryDeleteParts.Data.size() >= Ctx->Cfg.MaxToDeletePerEpoch;
Stop(hasSomeWorkForNextEpoch ? TDuration::Seconds(0) : Ctx->Cfg.TimeToSleepIfNothingToDo);
return;
}

// ask for repl token to continue balancing
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB01, VDISKP(Ctx->VCtx, "Ask repl token to continue balancing"), (SelfId, SelfId()), (PDiskId, Ctx->VDiskCfg->BaseInfo.PDiskId));
Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(Ctx->VDiskCfg->BaseInfo.PDiskId), NActors::IEventHandle::FlagTrackDelivery);
}

void ScheduleJobQuant() {
Ctx->MonGroup.ReplTokenAquired()++;

// once repl token received, start balancing - waking up sender and deleter
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
(SendPartsLeft, SendOnMainParts.Size()), (DeletePartsLeft, TryDeleteParts.Size()),
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum()));

// register sender and deleter actors
BatchManager = TBatchManager(
CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx),
CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx)
);
}

void Handle(NActors::TEvents::TEvCompleted::TPtr ev) {
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "TEvCompleted"), (Type, ev->Type));
BatchManager.Handle(ev);
Expand All @@ -205,7 +242,7 @@ namespace NBalancing {
Ctx->MonGroup.EpochTimeouts()++;
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Epoch timeout"));
PassAway();
Stop(TDuration::Seconds(0));
return;
}

Expand Down Expand Up @@ -280,19 +317,24 @@ namespace NBalancing {
Send(BatchManager.DeleterId, msg->Clone());
}

void PassAway() override {
void Stop(TDuration timeoutBeforeNextLaunch) {
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB12, VDISKP(Ctx->VCtx, "Stop balancing"), (SendOnMainParts, SendOnMainParts.Data.size()), (TryDeleteParts, TryDeleteParts.Data.size()), (SecondsBeforeNextLaunch, timeoutBeforeNextLaunch.Seconds()));

Send(BatchManager.SenderId, new NActors::TEvents::TEvPoison);
Send(BatchManager.DeleterId, new NActors::TEvents::TEvPoison);
for (const auto& kv : *QueueActorMapPtr) {
Send(kv.second, new TEvents::TEvPoison);
}
Send(Ctx->SkeletonId, new TEvStartBalancing());
TActorBootstrapped::PassAway();
TlsActivationContext->Schedule(timeoutBeforeNextLaunch, new IEventHandle(Ctx->SkeletonId, SelfId(), new TEvStartBalancing()));
PassAway();
}

STRICT_STFUNC(StateFunc,
// Logic events
// Init events
cFunc(NActors::TEvents::TEvWakeup::EventType, CollectKeys)
hFunc(TEvBlobStorage::TEvVStatusResult, Handle)

// Main logic events
cFunc(TEvReplToken::EventType, ScheduleJobQuant)
hFunc(NActors::TEvents::TEvCompleted, Handle)
hFunc(TEvBalancingSendPartsOnMain, Handle)
Expand Down Expand Up @@ -323,6 +365,7 @@ namespace NBalancing {
CreateVDisksQueues();
It.SeekToFirst();
++Ctx->MonGroup.BalancingIterations();
SendCheckVDisksStatusRequests();
Schedule(TDuration::Seconds(10), new NActors::TEvents::TEvWakeup());
}
};
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/vdisk/balance/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace NKikimr {
TDuration RequestBlobsOnMainTimeout;
TDuration DeleteBatchTimeout;
TDuration EpochTimeout;

TDuration TimeToSleepIfNothingToDo;
};

struct TBalancingCtx {
Expand Down
16 changes: 10 additions & 6 deletions ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace NKikimr {
const TActorId NotifyId;
const TInstant Now;
const bool ReplDone;
const bool IsReadOnly;
unsigned Counter;
std::unique_ptr<TEvBlobStorage::TEvVStatusResult> Result;

Expand All @@ -32,7 +33,7 @@ namespace NKikimr {
const NKikimrBlobStorage::TEvVStatus &record = Ev->Get()->Record;
if (!SelfVDiskId.SameDisk(record.GetVDiskID())) {
Result = std::make_unique<TEvBlobStorage::TEvVStatusResult>(NKikimrProto::RACE, SelfVDiskId, false,
false, IncarnationGuid);
false, false, IncarnationGuid);
SetRacingGroupInfo(record, Result->Record, GroupInfo);
LOG_DEBUG(ctx, BS_VDISK_OTHER, VDISKP(VCtx->VDiskLogPrefix, "TEvVStatusResult Request# {%s} Response# {%s}",
SingleLineProto(record).data(), SingleLineProto(Result->Record).data()));
Expand All @@ -41,8 +42,8 @@ namespace NKikimr {
return;
}

Result = std::make_unique<TEvBlobStorage::TEvVStatusResult>(NKikimrProto::OK, SelfVDiskId, true, ReplDone,
IncarnationGuid);
Result = std::make_unique<TEvBlobStorage::TEvVStatusResult>(
NKikimrProto::OK, SelfVDiskId, true, ReplDone, IsReadOnly, IncarnationGuid);

const auto& oos = VCtx->GetOutOfSpaceState();
Result->Record.SetStatusFlags(oos.GetGlobalStatusFlags().Flags);
Expand Down Expand Up @@ -104,7 +105,8 @@ namespace NKikimr {
TEvBlobStorage::TEvVStatus::TPtr &ev,
const TActorId &notifyId,
const TInstant &now,
bool replDone)
bool replDone,
bool isReadOnly)
: TActorBootstrapped<TStatusRequestHandler>()
, VCtx(vctx)
, SkeletonId(skeletonId)
Expand All @@ -118,6 +120,7 @@ namespace NKikimr {
, NotifyId(notifyId)
, Now(now)
, ReplDone(replDone)
, IsReadOnly(isReadOnly)
, Counter(0)
{}
};
Expand All @@ -134,9 +137,10 @@ namespace NKikimr {
TEvBlobStorage::TEvVStatus::TPtr &ev,
const TActorId &notifyId,
const TInstant &now,
bool replDone) {
bool replDone,
bool isReadOnly) {
return new TStatusRequestHandler(vctx, skeletonId, syncerId, syncLogId, ifaceMonGroup, selfVDiskId,
incarnationGuid, groupInfo, ev, notifyId, now, replDone);
incarnationGuid, groupInfo, ev, notifyId, now, replDone, isReadOnly);
}

} // NKikimr
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/vdisk/common/blobstorage_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace NKikimr {
TEvBlobStorage::TEvVStatus::TPtr &ev,
const TActorId &notifyId,
const TInstant &now,
bool replDone);
bool replDone,
bool isReadOnly);

} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ namespace NKikimr {
TDuration BalancingRequestBlobsOnMainTimeout;
TDuration BalancingDeleteBatchTimeout;
TDuration BalancingEpochTimeout;
TDuration BalancingTimeToSleepIfNothingToDo;

///////////// COST METRICS SETTINGS ////////////////
bool UseCostTracker = true;
Expand Down
Loading

0 comments on commit f28c72c

Please sign in to comment.