Skip to content

Commit

Permalink
YDB Import 489
Browse files Browse the repository at this point in the history
  • Loading branch information
robot-ydb-importer committed Dec 12, 2023
1 parent 9aa3f8b commit 64797af
Show file tree
Hide file tree
Showing 19 changed files with 810 additions and 570 deletions.
116 changes: 53 additions & 63 deletions contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ void TBlobState::Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &info)
Parts.resize(info.Type.TotalPartCount());
ui32 blobSubgroupSize = info.Type.BlobSubgroupSize();
Disks.resize(blobSubgroupSize);
TBlobStorageGroupInfo::TServiceIds vdisksSvc;
TBlobStorageGroupInfo::TVDiskIds vdisksId;
const ui32 hash = Id.Hash();
info.PickSubgroup(hash, &vdisksId, &vdisksSvc);
TBlobStorageGroupInfo::TOrderNums nums;
info.GetTopology().PickSubgroup(Id.Hash(), nums);
Y_DEBUG_ABORT_UNLESS(nums.size() == blobSubgroupSize);
for (ui32 i = 0; i < blobSubgroupSize; ++i) {
Disks[i].OrderNumber = info.GetOrderNumber(vdisksId[i]);
Disks[i].OrderNumber = nums[i];
Disks[i].DiskParts.resize(info.Type.TotalPartCount());
}
IsChanged = true;
Expand All @@ -51,7 +50,6 @@ void TBlobState::AddPartToPut(ui32 partIdx, TRope&& partData) {

void TBlobState::MarkBlobReadyToPut(ui8 blobIdx) {
Y_ABORT_UNLESS(WholeSituation == ESituation::Unknown || WholeSituation == ESituation::Present);
WholeSituation = ESituation::Present;
BlobIdx = blobIdx;
IsChanged = true;
}
Expand Down Expand Up @@ -179,7 +177,6 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog

Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
TDiskPart &diskPart = disk.DiskParts[partIdx];
//Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
diskPart.Situation = ESituation::Lost;
diskPart.Requested.Clear();

Expand Down Expand Up @@ -379,21 +376,7 @@ void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() == 0);
Y_ABORT_UNLESS(id.BlobSize() != 0);
auto it = BlobStates.find(id);
if (it == BlobStates.end()) {
auto doneIt = DoneBlobStates.find(id);
const char *errorMsg = doneIt == DoneBlobStates.end() ?
"This blobId is not in BlobStates or in DoneBlobStates" :
"This blobId is already in DoneBlobStates";
Y_VERIFY_S(false, errorMsg << " BlobId# " << id << " Blackboard# " << ToString());
} else {
if (!it->second.IsDone) {
DoneCount++;
it->second.IsDone = true;
}
auto node = BlobStates.extract(it);
DoneBlobStates.insert(std::move(node));
}
DoneBlobStates.insert(BlobStates.extract(id));
}

void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
Expand Down Expand Up @@ -431,67 +414,72 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
state.AddErrorResponse(*Info, id, orderNumber);
}

EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished) {
IStrategy& temp = const_cast<IStrategy&>(s); // better UX
Y_ABORT_UNLESS(BlobStates.size());
EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
TString errorReason;
for (auto it = BlobStates.begin(); it != BlobStates.end(); ++it) {
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
auto& blob = it->second;
if (!blob.IsChanged) {
if (!std::exchange(blob.IsChanged, false)) {
++it;
continue;
}
blob.IsChanged = false;
// recalculate blob outcome if it is not yet determined
switch (auto res = temp.Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
case EStrategyOutcome::IN_PROGRESS:
if (blob.IsDone) {
DoneCount--;
blob.IsDone = false;
}
break;

case EStrategyOutcome::ERROR:
if (IsAllRequestsTogether) {
return res;
} else {
blob.Status = NKikimrProto::ERROR;
if (finished) {
finished->push_back(&*it);
// recalculate blob outcome if it is not yet determined
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
for (IStrategy *strategy : s) {
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
case EStrategyOutcome::IN_PROGRESS:
status = NKikimrProto::UNKNOWN;
break;

case EStrategyOutcome::ERROR:
if (IsAllRequestsTogether) {
return res;
}
if (errorReason) {
errorReason += " && ";
errorReason += res.ErrorReason;
} else {
errorReason = res.ErrorReason;
}
}
if (!blob.IsDone) {
DoneCount++;
blob.IsDone = true;
}
break;
status = NKikimrProto::ERROR;
break;

case EStrategyOutcome::DONE:
if (!IsAllRequestsTogether) {
blob.Status = NKikimrProto::OK;
if (finished) {
finished->push_back(&*it);
case EStrategyOutcome::DONE:
if (expired && !blob.HasWrittenQuorum(*Info, *expired)) {
blob.IsChanged = true;
status = NKikimrProto::UNKNOWN;
}
}
if (!blob.IsDone) {
DoneCount++;
blob.IsDone = true;
}
break;
}
if (status != NKikimrProto::OK) {
break;
}
}
if (status != NKikimrProto::UNKNOWN) {
const auto [doneIt, inserted, node] = DoneBlobStates.insert(BlobStates.extract(it++));
Y_ABORT_UNLESS(inserted);
if (!IsAllRequestsTogether) {
blob.Status = status;
if (finished) {
finished->push_back(&*doneIt);
}
}
} else {
++it;
}
}

const bool isDone = (DoneCount == (BlobStates.size() + DoneBlobStates.size()));
EStrategyOutcome outcome(isDone ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS);
EStrategyOutcome outcome(BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS);
outcome.ErrorReason = std::move(errorReason);
return outcome;
}

EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, finished, expired);
}

TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
Y_ABORT_UNLESS(bool(id));
TLogoBlobID fullId = id.FullID();
Expand Down Expand Up @@ -607,13 +595,15 @@ TString TBlackboard::ToString() const {
void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
const TVDiskID vdiskId = Info->GetVDiskId(orderNumber);
for (auto& [id, state] : BlobStates) {
Y_ABORT_UNLESS(!state.IsDone);
if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) {
TBlobState::TDisk& disk = state.Disks[diskIdx];
for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
TBlobState::TDiskPart& part = disk.DiskParts[partIdx];
if (part.Situation == TBlobState::ESituation::Sent || part.Situation == TBlobState::ESituation::Present) {
if (part.Situation == TBlobState::ESituation::Present) {
part.Situation = TBlobState::ESituation::Unknown;
if (state.WholeSituation == TBlobState::ESituation::Present) {
state.WholeSituation = TBlobState::ESituation::Unknown;
}
state.IsChanged = true;
}
}
Expand Down
7 changes: 4 additions & 3 deletions contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ struct TBlobState {
ui8 BlobIdx;
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
bool IsChanged = false;
bool IsDone = false;
std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr;
NWilson::TSpan *Span = nullptr;
bool Keep = false;
Expand Down Expand Up @@ -196,7 +195,6 @@ struct TBlackboard {
const NKikimrBlobStorage::EPutHandleClass PutHandleClass;
const NKikimrBlobStorage::EGetHandleClass GetHandleClass;
const bool IsAllRequestsTogether;
ui64 DoneCount = 0;

TBlackboard(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass,
Expand All @@ -219,7 +217,10 @@ struct TBlackboard {
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr);
EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
TBatchedVec<TBlobStates::value_type*> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr,
const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
TBlobState& GetState(const TLogoBlobID &id);
ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex);
void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status);
Expand Down
25 changes: 14 additions & 11 deletions contrib/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,14 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx,
}

EStrategyOutcome TGetImpl::RunBoldStrategy(TLogContext &logCtx) {
EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, TBoldStrategy(PhantomCheck));
if (outcome == EStrategyOutcome::DONE && MustRestoreFirst) {
Blackboard.ChangeAll();
outcome = Blackboard.RunStrategy(logCtx, TRestoreStrategy());
TStackVec<IStrategy*, 1> strategies;
TBoldStrategy s1(PhantomCheck);
strategies.push_back(&s1);
TRestoreStrategy s2;
if (MustRestoreFirst) {
strategies.push_back(&s2);
}
return outcome;
return Blackboard.RunStrategies(logCtx, strategies);
}

EStrategyOutcome TGetImpl::RunMirror3dcStrategy(TLogContext &logCtx) {
Expand All @@ -403,13 +405,14 @@ EStrategyOutcome TGetImpl::RunMirror3dcStrategy(TLogContext &logCtx) {
}

EStrategyOutcome TGetImpl::RunMirror3of4Strategy(TLogContext &logCtx) {
// run basic get strategy and, if blob restoration is required and we have successful get, restore the blob to full amount of parts
EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, TMirror3of4GetStrategy());
if (outcome == EStrategyOutcome::DONE && MustRestoreFirst) {
Blackboard.ChangeAll();
outcome = Blackboard.RunStrategy(logCtx, TPut3of4Strategy(TEvBlobStorage::TEvPut::TacticMaxThroughput));
TStackVec<IStrategy*, 1> strategies;
TMirror3of4GetStrategy s1;
strategies.push_back(&s1);
TPut3of4Strategy s2(TEvBlobStorage::TEvPut::TacticMaxThroughput);
if (MustRestoreFirst) {
strategies.push_back(&s2);
}
return outcome;
return Blackboard.RunStrategies(logCtx, strategies);
}

EStrategyOutcome TGetImpl::RunStrategies(TLogContext &logCtx) {
Expand Down
Loading

0 comments on commit 64797af

Please sign in to comment.