From 64797af09ae91789b9a1bf7e465c27ef93a9b6f9 Mon Sep 17 00:00:00 2001 From: robot-ydb-importer Date: Tue, 12 Dec 2023 20:17:18 +0300 Subject: [PATCH] YDB Import 489 --- .../dsproxy/dsproxy_blackboard.cpp | 116 ++--- .../blobstorage/dsproxy/dsproxy_blackboard.h | 7 +- .../blobstorage/dsproxy/dsproxy_get_impl.cpp | 25 +- .../core/blobstorage/dsproxy/dsproxy_put.cpp | 226 ++++++--- .../blobstorage/dsproxy/dsproxy_put_impl.cpp | 13 +- .../blobstorage/dsproxy/dsproxy_put_impl.h | 37 +- .../dsproxy/dsproxy_strategy_accelerate_put.h | 2 +- .../dsproxy_strategy_accelerate_put_m3dc.h | 2 +- .../dsproxy/dsproxy_strategy_base.cpp | 18 - .../dsproxy/dsproxy_strategy_base.h | 1 - .../dsproxy/dsproxy_strategy_restore.h | 136 +++-- .../blobstorage/dsproxy/ut/dsproxy_put_ut.cpp | 9 +- contrib/ydb/core/fq/libs/actors/run_actor.cpp | 96 ++-- .../actors/kafka_offset_fetch_actor.cpp | 92 ++-- .../actors/kafka_offset_fetch_actor.h | 17 +- contrib/ydb/core/kafka_proxy/kafka_events.h | 7 + .../ydb/core/kafka_proxy/ut/ut_protocol.cpp | 90 +++- .../provider/exec/yql_dq_exectransformer.cpp | 17 +- .../providers/dq/provider/yql_dq_gateway.cpp | 469 ++++++++++-------- 19 files changed, 810 insertions(+), 570 deletions(-) diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 0de81447ee2..b7c8a9a51fd 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -25,12 +25,11 @@ void TBlobState::Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &info) Parts.resize(info.Type.TotalPartCount()); ui32 blobSubgroupSize = info.Type.BlobSubgroupSize(); Disks.resize(blobSubgroupSize); - TBlobStorageGroupInfo::TServiceIds vdisksSvc; - TBlobStorageGroupInfo::TVDiskIds vdisksId; - const ui32 hash = Id.Hash(); - info.PickSubgroup(hash, &vdisksId, &vdisksSvc); + TBlobStorageGroupInfo::TOrderNums nums; + info.GetTopology().PickSubgroup(Id.Hash(), nums); + Y_DEBUG_ABORT_UNLESS(nums.size() == blobSubgroupSize); for (ui32 i = 0; i < blobSubgroupSize; ++i) { - Disks[i].OrderNumber = info.GetOrderNumber(vdisksId[i]); + Disks[i].OrderNumber = nums[i]; Disks[i].DiskParts.resize(info.Type.TotalPartCount()); } IsChanged = true; @@ -51,7 +50,6 @@ void TBlobState::AddPartToPut(ui32 partIdx, TRope&& partData) { void TBlobState::MarkBlobReadyToPut(ui8 blobIdx) { Y_ABORT_UNLESS(WholeSituation == ESituation::Unknown || WholeSituation == ESituation::Present); - WholeSituation = ESituation::Present; BlobIdx = blobIdx; IsChanged = true; } @@ -179,7 +177,6 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); TDiskPart &diskPart = disk.DiskParts[partIdx]; - //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl; diskPart.Situation = ESituation::Lost; diskPart.Requested.Clear(); @@ -379,21 +376,7 @@ void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) { Y_ABORT_UNLESS(bool(id)); Y_ABORT_UNLESS(id.PartId() == 0); Y_ABORT_UNLESS(id.BlobSize() != 0); - auto it = BlobStates.find(id); - if (it == BlobStates.end()) { - auto doneIt = DoneBlobStates.find(id); - const char *errorMsg = doneIt == DoneBlobStates.end() ? - "This blobId is not in BlobStates or in DoneBlobStates" : - "This blobId is already in DoneBlobStates"; - Y_VERIFY_S(false, errorMsg << " BlobId# " << id << " Blackboard# " << ToString()); - } else { - if (!it->second.IsDone) { - DoneCount++; - it->second.IsDone = true; - } - auto node = BlobStates.extract(it); - DoneBlobStates.insert(std::move(node)); - } + DoneBlobStates.insert(BlobStates.extract(id)); } void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) { @@ -431,32 +414,27 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) { state.AddErrorResponse(*Info, id, orderNumber); } -EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec *finished) { - IStrategy& temp = const_cast(s); // better UX - Y_ABORT_UNLESS(BlobStates.size()); +EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec& s, + TBatchedVec *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) { TString errorReason; - for (auto it = BlobStates.begin(); it != BlobStates.end(); ++it) { + for (auto it = BlobStates.begin(); it != BlobStates.end(); ) { auto& blob = it->second; - if (!blob.IsChanged) { + if (!std::exchange(blob.IsChanged, false)) { + ++it; continue; } - blob.IsChanged = false; - // recalculate blob outcome if it is not yet determined - switch (auto res = temp.Process(logCtx, blob, *Info, *this, GroupDiskRequests)) { - case EStrategyOutcome::IN_PROGRESS: - if (blob.IsDone) { - DoneCount--; - blob.IsDone = false; - } - break; - case EStrategyOutcome::ERROR: - if (IsAllRequestsTogether) { - return res; - } else { - blob.Status = NKikimrProto::ERROR; - if (finished) { - finished->push_back(&*it); + // recalculate blob outcome if it is not yet determined + NKikimrProto::EReplyStatus status = NKikimrProto::OK; + for (IStrategy *strategy : s) { + switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) { + case EStrategyOutcome::IN_PROGRESS: + status = NKikimrProto::UNKNOWN; + break; + + case EStrategyOutcome::ERROR: + if (IsAllRequestsTogether) { + return res; } if (errorReason) { errorReason += " && "; @@ -464,34 +442,44 @@ EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& } else { errorReason = res.ErrorReason; } - } - if (!blob.IsDone) { - DoneCount++; - blob.IsDone = true; - } - break; + status = NKikimrProto::ERROR; + break; - case EStrategyOutcome::DONE: - if (!IsAllRequestsTogether) { - blob.Status = NKikimrProto::OK; - if (finished) { - finished->push_back(&*it); + case EStrategyOutcome::DONE: + if (expired && !blob.HasWrittenQuorum(*Info, *expired)) { + blob.IsChanged = true; + status = NKikimrProto::UNKNOWN; } - } - if (!blob.IsDone) { - DoneCount++; - blob.IsDone = true; - } + break; + } + if (status != NKikimrProto::OK) { break; + } + } + if (status != NKikimrProto::UNKNOWN) { + const auto [doneIt, inserted, node] = DoneBlobStates.insert(BlobStates.extract(it++)); + Y_ABORT_UNLESS(inserted); + if (!IsAllRequestsTogether) { + blob.Status = status; + if (finished) { + finished->push_back(&*doneIt); + } + } + } else { + ++it; } } - const bool isDone = (DoneCount == (BlobStates.size() + DoneBlobStates.size())); - EStrategyOutcome outcome(isDone ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS); + EStrategyOutcome outcome(BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS); outcome.ErrorReason = std::move(errorReason); return outcome; } +EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s, + TBatchedVec *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) { + return RunStrategies(logCtx, {const_cast(&s)}, finished, expired); +} + TBlobState& TBlackboard::GetState(const TLogoBlobID &id) { Y_ABORT_UNLESS(bool(id)); TLogoBlobID fullId = id.FullID(); @@ -607,13 +595,15 @@ TString TBlackboard::ToString() const { void TBlackboard::InvalidatePartStates(ui32 orderNumber) { const TVDiskID vdiskId = Info->GetVDiskId(orderNumber); for (auto& [id, state] : BlobStates) { - Y_ABORT_UNLESS(!state.IsDone); if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) { TBlobState::TDisk& disk = state.Disks[diskIdx]; for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) { TBlobState::TDiskPart& part = disk.DiskParts[partIdx]; - if (part.Situation == TBlobState::ESituation::Sent || part.Situation == TBlobState::ESituation::Present) { + if (part.Situation == TBlobState::ESituation::Present) { part.Situation = TBlobState::ESituation::Unknown; + if (state.WholeSituation == TBlobState::ESituation::Present) { + state.WholeSituation = TBlobState::ESituation::Unknown; + } state.IsChanged = true; } } diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index 26236c7b0c2..ddf083c69b8 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -84,7 +84,6 @@ struct TBlobState { ui8 BlobIdx; NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; bool IsChanged = false; - bool IsDone = false; std::vector> *ExtraBlockChecks = nullptr; NWilson::TSpan *Span = nullptr; bool Keep = false; @@ -196,7 +195,6 @@ struct TBlackboard { const NKikimrBlobStorage::EPutHandleClass PutHandleClass; const NKikimrBlobStorage::EGetHandleClass GetHandleClass; const bool IsAllRequestsTogether; - ui64 DoneCount = 0; TBlackboard(const TIntrusivePtr &info, const TIntrusivePtr &groupQueues, NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass, @@ -219,7 +217,10 @@ struct TBlackboard { void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber); void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber); void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep); - EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec *finished = nullptr); + EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec& strategies, + TBatchedVec *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr); + EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec *finished = nullptr, + const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr); TBlobState& GetState(const TLogoBlobID &id); ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex); void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status); diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 75baf25592f..3bef93e13ab 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -388,12 +388,14 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx, } EStrategyOutcome TGetImpl::RunBoldStrategy(TLogContext &logCtx) { - EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, TBoldStrategy(PhantomCheck)); - if (outcome == EStrategyOutcome::DONE && MustRestoreFirst) { - Blackboard.ChangeAll(); - outcome = Blackboard.RunStrategy(logCtx, TRestoreStrategy()); + TStackVec strategies; + TBoldStrategy s1(PhantomCheck); + strategies.push_back(&s1); + TRestoreStrategy s2; + if (MustRestoreFirst) { + strategies.push_back(&s2); } - return outcome; + return Blackboard.RunStrategies(logCtx, strategies); } EStrategyOutcome TGetImpl::RunMirror3dcStrategy(TLogContext &logCtx) { @@ -403,13 +405,14 @@ EStrategyOutcome TGetImpl::RunMirror3dcStrategy(TLogContext &logCtx) { } EStrategyOutcome TGetImpl::RunMirror3of4Strategy(TLogContext &logCtx) { - // run basic get strategy and, if blob restoration is required and we have successful get, restore the blob to full amount of parts - EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, TMirror3of4GetStrategy()); - if (outcome == EStrategyOutcome::DONE && MustRestoreFirst) { - Blackboard.ChangeAll(); - outcome = Blackboard.RunStrategy(logCtx, TPut3of4Strategy(TEvBlobStorage::TEvPut::TacticMaxThroughput)); + TStackVec strategies; + TMirror3of4GetStrategy s1; + strategies.push_back(&s1); + TPut3of4Strategy s2(TEvBlobStorage::TEvPut::TacticMaxThroughput); + if (MustRestoreFirst) { + strategies.push_back(&s2); } - return outcome; + return Blackboard.RunStrategies(logCtx, strategies); } EStrategyOutcome TGetImpl::RunStrategies(TLogContext &logCtx) { diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 9c6a299346e..fc525b6f3b6 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -51,7 +51,12 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor> IncarnationRecords; @@ -96,34 +102,54 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor - void SendPuts(auto&& callback) { + void SendPutsImpl(auto&& callback, TPutImpl::TPutResultVec& putResults) { TDeque> v; - callback(v); + callback(v, putResults); UpdatePengingVDiskResponseCount(v); RequestsSent += v.size(); CountPuts(v); SendToQueues(v, TimeStatsEnabled); } + template + bool SendPuts(T&& callback) { + TPutImpl::TPutResultVec putResults; + if (IsMultiPutMode) { + if constexpr (VMultiPut) { + SendPutsImpl(std::forward(callback), putResults); + } else { + Y_ABORT(); + } + } else { + if constexpr (VPut) { + SendPutsImpl(std::forward(callback), putResults); + } else { + Y_ABORT(); + } + } + return ReplyAndDieWithLastResponse(putResults); + } + void Accelerate() { if (IsAccelerated) { return; } IsAccelerated = true; - auto callback = [this](auto& v) { + const TMonotonic now = TActivationContext::Monotonic(); + + auto callback = [&](auto& v, auto& /*putResults*/) { PutImpl.Accelerate(LogCtx, v); *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size(); }; + SendPuts(callback); - if (IsMultiPutMode) { - SendPuts(callback); - } else { - SendPuts(callback); - } + IssueStatusForExpiredDisks(now); } void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) { + timestamp += TDuration::Seconds(15); // TODO: cooldown timeout + Y_ABORT_UNLESS(orderNumber < IncarnationRecords.size()); if (auto& record = IncarnationRecords[orderNumber]; !record) { record = TIncarnationRecord{ @@ -149,6 +175,57 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorExpirationTimestamp <= timestamp && + record->StatusIssueTimestamp == TMonotonic()) { + issue = true; + break; + } + } + if (!issue) { + return; + } + for (ui32 i = 0; i < IncarnationRecords.size(); ++i) { + if (auto& record = IncarnationRecords[i]; record && record->StatusIssueTimestamp == TMonotonic()) { + const TVDiskID vdiskId = Info->GetVDiskId(i); + A_LOG_INFO_S("BPP15", "sending TEvVStatus VDiskId# " << vdiskId); + SendToQueue(std::make_unique(vdiskId), i); + ++StatusMsgsSent; + record->StatusIssueTimestamp = timestamp; + } + } + } + + void Handle(TEvBlobStorage::TEvVStatusResult::TPtr& ev) { + A_LOG_INFO_S("BPP16", "received TEvVStatusResult " << ev->Get()->ToString()); + + ProcessReplyFromQueue(ev); + ++StatusResultMsgsReceived; + + const TMonotonic now = TActivationContext::Monotonic(); + + auto& record = ev->Get()->Record; + const ui32 orderNumber = ev->Cookie; + auto& incarnationRecord = IncarnationRecords[orderNumber]; + Y_ABORT_UNLESS(incarnationRecord); + const TMonotonic issue = std::exchange(incarnationRecord->StatusIssueTimestamp, TMonotonic()); + Y_ABORT_UNLESS(issue != TMonotonic()); + if (record.HasIncarnationGuid()) { + HandleIncarnation(issue, orderNumber, record.GetIncarnationGuid()); + } + + auto callback = [&](auto& v, auto& putResults) { + PutImpl.Step(LogCtx, v, putResults, CreateExpiredVDiskSet(now)); + }; + if (SendPuts(callback)) { + return; + } + + IssueStatusForExpiredDisks(now); + } + void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) { A_LOG_LOG_S(false, ev->Get()->Record.GetStatus() == NKikimrProto::OK ? NLog::PRI_DEBUG : NLog::PRI_NOTICE, "BPP01", "received " << ev->Get()->ToString() << " from# " << VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID())); @@ -156,6 +233,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGet()->Record.MutableTimestamps()->SetReceivedByDSProxyUs(GetCycleCountFast() / cyclesPerUs); const NKikimrBlobStorage::TEvVPutResult &record = ev->Get()->Record; @@ -184,7 +262,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGetOrderNumber(shortId), record.GetIncarnationGuid()); + // TODO: correct timestamp + HandleIncarnation(now, Info->GetOrderNumber(shortId), record.GetIncarnationGuid()); } LWPROBE(DSProxyVDiskRequestDuration, TEvBlobStorage::EvVPut, blob.BlobSize(), blob.TabletID(), @@ -201,38 +280,23 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor([&](auto& v) { - PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults); - }); - if (ReplyAndDieWithLastResponse(putResults)) { + auto callback = [&](auto& v, auto& putResults) { + PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults, CreateExpiredVDiskSet(now)); + }; + if (SendPuts(callback)) { return; } - Y_ABORT_UNLESS(RequestsSent > ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64, - ui64(RequestsSent), ui64(ResponsesReceived)); - - if (!IsAccelerateScheduled && !IsAccelerated) { - if (WaitingVDiskCount == 1 && RequestsSent > 1) { - ui64 timeToAccelerateUs = PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000; - TDuration timeSinceStart = TActivationContext::Now() - StartTime; - if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) { - ui64 causeIdx = RootCauseTrack.RegisterAccelerate(); - Schedule(TDuration::MicroSeconds(timeToAccelerateUs - timeSinceStart.MicroSeconds()), - new TEvAccelerate(causeIdx)); - IsAccelerateScheduled = true; - } else { - Accelerate(); - } - } - } + AccelerateIfNeeded(); SanityCheck(); // May Die - } + IssueStatusForExpiredDisks(now); + } void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) { ProcessReplyFromQueue(ev); ResponsesReceived++; + const TMonotonic now = TActivationContext::Monotonic(); const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; ev->Get()->Record.MutableTimestamps()->SetReceivedByDSProxyUs(GetCycleCountFast() / cyclesPerUs); const NKikimrBlobStorage::TEvVMultiPutResult &record = ev->Get()->Record; @@ -246,7 +310,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGetOrderNumber(shortId), record.GetIncarnationGuid()); + // TODO: correct timestamp + HandleIncarnation(now, Info->GetOrderNumber(shortId), record.GetIncarnationGuid()); } Y_ABORT_UNLESS(record.HasCookie()); @@ -315,22 +380,24 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor([&](auto& v) { - PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults); - }); - if (ReplyAndDieWithLastResponse(putResults)) { + auto callback = [&](auto& v, auto& putResults) { + PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults, CreateExpiredVDiskSet(now)); + }; + if (SendPuts(callback)) { return; } - Y_ABORT_UNLESS(RequestsSent > ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64 - " ResponsesSent# %" PRIu64 " BlobsCount# %" PRIu64 " TPutImpl# %s", ui64(RequestsSent), ui64(ResponsesReceived), - (ui64)ResponsesSent, (ui64)PutImpl.Blobs.size(), PutImpl.DumpFullState().data()); + AccelerateIfNeeded(); + SanityCheck(); // May Die + IssueStatusForExpiredDisks(now); + } + + void AccelerateIfNeeded() { if (!IsAccelerateScheduled && !IsAccelerated) { if (WaitingVDiskCount == 1 && RequestsSent > 1) { - ui64 timeToAccelerateUs = PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000; - TDuration timeSinceStart = TActivationContext::Now() - StartTime; + ui64 timeToAccelerateUs = Max(1, PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000); + TDuration timeSinceStart = TActivationContext::Monotonic() - StartTime; if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) { ui64 causeIdx = RootCauseTrack.RegisterAccelerate(); Schedule(TDuration::MicroSeconds(timeToAccelerateUs - timeSinceStart.MicroSeconds()), @@ -341,7 +408,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor; @@ -355,7 +421,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor &putResult, ui64 blobIdx) { + void SendReply(std::unique_ptr putResult, ui64 blobIdx) { NKikimrProto::EReplyStatus status = putResult->Status; A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPP21", "SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent << " PutImpl.Blobs.size# " << PutImpl.Blobs.size() << " Last# " << (ResponsesSent + 1 == PutImpl.Blobs.size() ? "true" : "false")); - const TDuration duration = TActivationContext::Now() - StartTime; + const TDuration duration = TActivationContext::Monotonic() - StartTime; TLogoBlobID blobId = putResult->Id; TLogoBlobID origBlobId = TLogoBlobID(blobId, 0); Mon->CountPutPesponseTime(Info->GetDeviceType(), HandleClass, PutImpl.Blobs[blobIdx].BufferSize, duration); @@ -470,7 +536,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGetTotalVDisksNum()) , Deadline(ev->Deadline) - , StartTime(now) , HandleClass(ev->HandleClass) , ReportedBytes(0) , TimeStatsEnabled(timeStatsEnabled) @@ -515,7 +580,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGetTotalVDisksNum()) , IsManyPuts(true) , Deadline(TInstant::Zero()) - , StartTime(now) , HandleClass(handleClass) , ReportedBytes(0) , TimeStatsEnabled(timeStatsEnabled) @@ -564,6 +628,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGroupID << " BlobIDs# " << BlobIdSequenceToString() << " Not answered in " - << (TActivationContext::Now() - StartTime).Seconds() << " seconds"); + << (TActivationContext::Monotonic() - StartTime) << " seconds"); if (TInstant::Now() > Deadline) { ErrorReason = "Deadline exceeded"; ReplyAndDie(NKikimrProto::DEADLINE); @@ -647,29 +714,72 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor(callback); - } else { - SendPuts(callback); - } + auto callback = [this](auto& v, auto& /*putResults*/) { PutImpl.GenerateInitialRequests(LogCtx, PartSets, v); }; + SendPuts(callback); + BootstrapInProgress = false; } else { TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvResume, 0, SelfId(), {}, nullptr, 0)); } SanityCheck(); } + void CheckRequests(ui32 type) { + auto dumpIncarnationRecords = [&] { + TStringStream s; + s << '{'; + for (ui32 i = 0; i < IncarnationRecords.size(); ++i) { + if (i) { + s << ' '; + } + s << i; + if (auto& record = IncarnationRecords[i]) { + s << '{'; + s << "IncarnationGuid# " << record->IncarnationGuid; + s << " ExpirationTimestamp# " << record->ExpirationTimestamp; + s << " StatusIssueTimestamp# " << record->StatusIssueTimestamp; + s << '}'; + } else { + s << ""; + } + } + s << '}'; + return s.Str(); + }; + const TMonotonic now = TActivationContext::Monotonic(); + Y_VERIFY_S(ResponsesSent == PutImpl.Blobs.size() + || BootstrapInProgress + || RequestsSent > ResponsesReceived + || StatusMsgsSent > StatusResultMsgsReceived, + "query stuck" + << " Type# 0x" << Sprintf("%08" PRIx32, type) + << " ResponsesSent# " << ResponsesSent + << " Blobs.size# " << PutImpl.Blobs.size() + << " BootstrapInProgress# " << BootstrapInProgress + << " RequestsSent# " << RequestsSent + << " ResponsesReceived# " << ResponsesReceived + << " StatusMsgsSent# " << StatusMsgsSent + << " StatusResultMsgsReceived# " << StatusResultMsgsReceived + << " Now# " << now + << " Passed# " << (now - StartTime) + << " ExpiredVDiskSet# " << CreateExpiredVDiskSet(now).ToString() + << " IncarnationRecords# " << dumpIncarnationRecords() + << " State# " << PutImpl.DumpFullState()); + } + STATEFN(StateWait) { if (ProcessEvent(ev)) { return; } - switch (ev->GetTypeRewrite()) { + const ui32 type = ev->GetTypeRewrite(); + switch (type) { + hFunc(TEvBlobStorage::TEvVStatusResult, Handle); hFunc(TEvBlobStorage::TEvVPutResult, Handle); hFunc(TEvBlobStorage::TEvVMultiPutResult, Handle); hFunc(TEvAccelerate, Handle); cFunc(TEvBlobStorage::EvResume, ResumeBootstrap); hFunc(TKikimrEvents::TEvWakeup, Handle); } + CheckRequests(type); } }; diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp index 1340829b573..0c473ff8b77 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp @@ -11,20 +11,21 @@ namespace NKikimr { using TPutResultVec = TPutImpl::TPutResultVec; -bool TPutImpl::RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults) { +bool TPutImpl::RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { switch (Info->Type.GetErasure()) { case TBlobStorageGroupType::ErasureMirror3dc: - return RunStrategy(logCtx, TPut3dcStrategy(Tactic, EnableRequestMod3x3ForMinLatecy), outPutResults); + return RunStrategy(logCtx, TPut3dcStrategy(Tactic, EnableRequestMod3x3ForMinLatecy), outPutResults, expired); case TBlobStorageGroupType::ErasureMirror3of4: - return RunStrategy(logCtx, TPut3of4Strategy(Tactic), outPutResults); + return RunStrategy(logCtx, TPut3of4Strategy(Tactic), outPutResults, expired); default: - return RunStrategy(logCtx, TRestoreStrategy(), outPutResults); + return RunStrategy(logCtx, TRestoreStrategy(), outPutResults, expired); } } -bool TPutImpl::RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults) { +bool TPutImpl::RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, + const TBlobStorageGroupInfo::TGroupVDisks& expired) { TBatchedVec finished; - const EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, strategy, &finished); + const EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, strategy, &finished, &expired); if (finished) { PrepareReply(logCtx, outcome.ErrorReason, finished, outPutResults); return true; diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index a8b48e04e7f..056460780f0 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -189,11 +189,10 @@ class TPutImpl { } TPutResultVec putResults; - bool workDone = Step(logCtx, outVPuts, putResults); + Step(logCtx, outVPuts, putResults, {&Info->GetTopology()}); IsInitialized = true; Y_ABORT_UNLESS(!outVPuts.empty()); Y_ABORT_UNLESS(putResults.empty()); - Y_ABORT_UNLESS(workDone); } template @@ -321,7 +320,8 @@ class TPutImpl { template void OnVPutEventResult(TLogContext &logCtx, TActorId sender, TVPutEventResult &ev, - TDeque> &outVPutEvents, TPutResultVec &outPutResults) + TDeque> &outVPutEvents, TPutResultVec &outPutResults, + const TBlobStorageGroupInfo::TGroupVDisks& expired) { constexpr bool isVPut = std::is_same_v; constexpr bool isVMultiPut = std::is_same_v; @@ -388,14 +388,7 @@ class TPutImpl { return; } - Step(logCtx, outVPutEvents, outPutResults); - Y_VERIFY_S(DoneBlobs == Blobs.size() || requests > responses, - "No put result while" - << " Type# " << putType - << " DoneBlobs# " << DoneBlobs - << " requests# " << requests - << " responses# " << responses - << " Blackboard# " << Blackboard.ToString()); + Step(logCtx, outVPutEvents, outPutResults, expired); } void PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logCtx, TString errorReason, @@ -434,25 +427,19 @@ class TPutImpl { Blackboard.InvalidatePartStates(orderNumber); } -protected: - bool RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults); - bool RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults); - - // Returns true if there are additional requests to send template - bool Step(TLogContext &logCtx, TDeque> &outVPuts, - TPutResultVec &outPutResults) { - if (!RunStrategies(logCtx, outPutResults)) { - const ui32 numRequests = outVPuts.size(); - PrepareVPuts(logCtx, outVPuts); - return outVPuts.size() > numRequests; - } else { + void Step(TLogContext &logCtx, TDeque> &outVPuts, TPutResultVec &outPutResults, + const TBlobStorageGroupInfo::TGroupVDisks& expired) { + if (RunStrategies(logCtx, outPutResults, expired)) { Y_ABORT_UNLESS(outPutResults.size()); - PrepareVPuts(logCtx, outVPuts); - return false; } + PrepareVPuts(logCtx, outVPuts); } +protected: + bool RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired); + bool RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, + const TBlobStorageGroupInfo::TGroupVDisks& expired); template void PrepareVPuts(TLogContext &logCtx, TDeque> &outVPutEvents) { diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h index 94367d6a3d6..313ec577bb5 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h @@ -37,7 +37,7 @@ class TAcceleratePutStrategy : public TStrategyBase { } } - return EStrategyOutcome::DONE; + return EStrategyOutcome::IN_PROGRESS; } }; diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h index 451980c4b81..2f2a431dac4 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h @@ -78,7 +78,7 @@ class TAcceleratePut3dcStrategy : public TStrategyBase { } } - return EStrategyOutcome::DONE; + return EStrategyOutcome::IN_PROGRESS; } }; diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp index 70aea83ffea..ba65fff8c92 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp @@ -193,24 +193,6 @@ void TStrategyBase::AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &group disk.DiskParts[partIdx].Requested.Add(intervalSet); } -bool TStrategyBase::VerifyTheWholeSituation(TBlobState &state) { - switch (state.WholeSituation) { - case TBlobState::ESituation::Unknown: - Y_ABORT("Blob Id# %s whole situation Unknown", state.Id.ToString().c_str()); - case TBlobState::ESituation::Lost: - Y_ABORT("Blob Id# %s whole situation Lost", state.Id.ToString().c_str()); - case TBlobState::ESituation::Error: - Y_ABORT("Blob Id# %s whole situation Error", state.Id.ToString().c_str()); - case TBlobState::ESituation::Sent: - Y_ABORT("Blob Id# %s whole situation Sent", state.Id.ToString().c_str()); - case TBlobState::ESituation::Absent: - return true; - case TBlobState::ESituation::Present: - return false; - } - Y_ABORT("Blob Id# %s unexpected WholeSituation# %" PRIu32, state.Id.ToString().c_str(), (ui32)state.WholeSituation); -} - void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info, TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskIdx) { Y_ABORT_UNLESS(layout); diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h index 6366404d887..cb65d10108e 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h @@ -33,7 +33,6 @@ class TStrategyBase : public IStrategy { bool doVerify, TBlobState &state); void AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &groupDiskRequests, TLogoBlobID &fullId, ui32 partIdx, TBlobState::TDisk &disk, TIntervalSet &intervalSet, const char *logMarker); - bool VerifyTheWholeSituation(TBlobState &state); void PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info, TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskIdx); bool IsPutNeeded(const TBlobState &state, const TBlobStorageGroupType::TPartPlacement &partPlacement); diff --git a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h index 38009f29b56..51e63aff8e8 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h +++ b/contrib/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h @@ -11,76 +11,56 @@ namespace NKikimr { class TRestoreStrategy : public TStrategyBase { public: - void EvaluateRestoreLayout(TLogContext &logCtx, TBlobState &state, - const TBlobStorageGroupInfo &info, TBlobStorageGroupInfo::EBlobState *pessimisticState, + void EvaluateRestoreLayout(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info, TBlobStorageGroupInfo::EBlobState *optimisticState) { - Y_ABORT_UNLESS(pessimisticState); - Y_ABORT_UNLESS(optimisticState); const ui32 totalPartCount = info.Type.TotalPartCount(); ui32 errorDisks = 0; - ui32 unknownDisks = 0; - TSubgroupPartLayout presentLayout; TSubgroupPartLayout optimisticLayout; for (ui32 diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { TBlobState::TDisk &disk = state.Disks[diskIdx]; - bool isHandoff = (diskIdx >= totalPartCount); - ui32 beginPartIdx = (isHandoff ? 0 : diskIdx); - ui32 endPartIdx = (isHandoff ? totalPartCount : (diskIdx + 1)); + bool isHandoff = diskIdx >= totalPartCount; + ui32 beginPartIdx = isHandoff ? 0 : diskIdx; + ui32 endPartIdx = isHandoff ? totalPartCount : (diskIdx + 1); bool isErrorDisk = false; for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) { - TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation; - if (partSituation == TBlobState::ESituation::Error) { + if (disk.DiskParts[partIdx].Situation == TBlobState::ESituation::Error) { R_LOG_DEBUG_SX(logCtx, "BPG50", "Id# " << state.Id.ToString() - << " Restore disk# " << diskIdx << " part# " << partIdx << " error"); - if (!isErrorDisk) { - isErrorDisk = true; - errorDisks++; - } + << " restore disk# " << diskIdx + << " part# " << partIdx + << " error"); + isErrorDisk = true; + errorDisks++; + break; } } - bool isUnknownDisk = true; if (!isErrorDisk) { for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) { - TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation; - if (partSituation == TBlobState::ESituation::Absent) { - R_LOG_DEBUG_SX(logCtx, "BPG51", "Id# " << state.Id.ToString() - << " restore disk# " << diskIdx << " part# " << partIdx << " absent"); - optimisticLayout.AddItem(diskIdx, partIdx, info.Type); - isUnknownDisk = false; - } else if (partSituation == TBlobState::ESituation::Lost) { - R_LOG_DEBUG_SX(logCtx, "BPG63", "Id# " << state.Id.ToString() - << " restore disk# " << diskIdx << " part# " << partIdx << " lost"); - optimisticLayout.AddItem(diskIdx, partIdx, info.Type); - isUnknownDisk = false; - } else if (partSituation == TBlobState::ESituation::Present) { - R_LOG_DEBUG_SX(logCtx, "BPG52", "Id# " << state.Id.ToString() - << " restore disk# " << diskIdx << " part# " << partIdx << " present"); - presentLayout.AddItem(diskIdx, partIdx, info.Type); - optimisticLayout.AddItem(diskIdx, partIdx, info.Type); - isUnknownDisk = false; - } else if (partSituation == TBlobState::ESituation::Unknown) { - R_LOG_DEBUG_SX(logCtx, "BPG53", "Id# " << state.Id.ToString() - << " restore disk# " << diskIdx << " part# " << partIdx << " unknown"); - optimisticLayout.AddItem(diskIdx, partIdx, info.Type); - } else if (partSituation == TBlobState::ESituation::Sent) { - R_LOG_DEBUG_SX(logCtx, "BPG54", "Id# " << state.Id.ToString() - << " restore disk# " << diskIdx << " part# " << partIdx << " sent"); - optimisticLayout.AddItem(diskIdx, partIdx, info.Type); + const TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation; + R_LOG_DEBUG_SX(logCtx, "BPG51", "Id# " << state.Id.ToString() + << " restore disk# " << diskIdx + << " part# " << partIdx + << " situation# " << TBlobState::SituationToString(partSituation)); + + switch (partSituation) { + case TBlobState::ESituation::Absent: + case TBlobState::ESituation::Lost: + case TBlobState::ESituation::Present: + case TBlobState::ESituation::Unknown: + case TBlobState::ESituation::Sent: + optimisticLayout.AddItem(diskIdx, partIdx, info.Type); + break; + + case TBlobState::ESituation::Error: + break; } } } - if (!isErrorDisk && isUnknownDisk) { - unknownDisks++; - } } - ui32 pessimisticReplicas = presentLayout.CountEffectiveReplicas(info.Type); - ui32 optimisticReplicas = optimisticLayout.CountEffectiveReplicas(info.Type); - *pessimisticState = info.BlobState(pessimisticReplicas, errorDisks + unknownDisks); + const ui32 optimisticReplicas = optimisticLayout.CountEffectiveReplicas(info.Type); *optimisticState = info.BlobState(optimisticReplicas, errorDisks); - R_LOG_DEBUG_SX(logCtx, "BPG55", "restore Id# " << state.Id.ToString() << " pessimisticReplicas# " << pessimisticReplicas - << " pessimisticState# " << TBlobStorageGroupInfo::BlobStateToString(*pessimisticState) + R_LOG_DEBUG_SX(logCtx, "BPG55", "restore Id# " << state.Id.ToString() << " optimisticReplicas# " << optimisticReplicas << " optimisticState# " << TBlobStorageGroupInfo::BlobStateToString(*optimisticState)); } @@ -99,36 +79,50 @@ class TRestoreStrategy : public TStrategyBase { return std::nullopt; } - std::optional IgnoreFullPessimistic(TBlobStorageGroupInfo::EBlobState pessimisticState) { - switch (pessimisticState) { - case TBlobStorageGroupInfo::EBS_DISINTEGRATED: - case TBlobStorageGroupInfo::EBS_UNRECOVERABLE_FRAGMENTARY: - case TBlobStorageGroupInfo::EBS_RECOVERABLE_FRAGMENTARY: - case TBlobStorageGroupInfo::EBS_RECOVERABLE_DOUBTED: - break; - case TBlobStorageGroupInfo::EBS_FULL: - return EStrategyOutcome::DONE; // TODO(alexvru): validate behaviour + EStrategyOutcome Process(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info, + TBlackboard &blackboard, TGroupDiskRequests &groupDiskRequests) override { + // Check if the work is already done. + if (state.WholeSituation == TBlobState::ESituation::Absent) { + return EStrategyOutcome::DONE; // nothing to restore } - return std::nullopt; - } + // Check if the blob is present in required number of replicas. + TSubgroupPartLayout present, presentOrSent; + const ui32 totalPartCount = info.Type.TotalPartCount(); + for (ui32 diskIdx = 0, numDisks = state.Disks.size(); diskIdx < numDisks; ++diskIdx) { + const TBlobState::TDisk& disk = state.Disks[diskIdx]; + const bool isHandoff = diskIdx >= totalPartCount; + const ui32 beginPartIdx = isHandoff ? 0 : diskIdx; + const ui32 endPartIdx = isHandoff ? totalPartCount : diskIdx + 1; + for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) { + switch (disk.DiskParts[partIdx].Situation) { + case TBlobState::ESituation::Present: + present.AddItem(diskIdx, partIdx, info.Type); + [[fallthrough]]; + case TBlobState::ESituation::Sent: + presentOrSent.AddItem(diskIdx, partIdx, info.Type); + break; + + default: + break; + } + } + } - EStrategyOutcome Process(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info, - TBlackboard &blackboard, TGroupDiskRequests &groupDiskRequests) override { - if (VerifyTheWholeSituation(state)) { - // TODO(alexvru): ensure this branch does not hit when there are not enough parts present! - return EStrategyOutcome::DONE; // blob is already marked as present + const auto& quorumChecker = info.GetQuorumChecker(); + const TBlobStorageGroupInfo::TSubgroupVDisks failed(&info.GetTopology()); + if (quorumChecker.GetBlobState(present, failed) == TBlobStorageGroupInfo::EBS_FULL) { + state.WholeSituation = TBlobState::ESituation::Present; + return EStrategyOutcome::DONE; + } else if (quorumChecker.GetBlobState(presentOrSent, failed) == TBlobStorageGroupInfo::EBS_FULL) { + return EStrategyOutcome::IN_PROGRESS; } // Look at the current layout and set the status if possible - TBlobStorageGroupInfo::EBlobState pessimisticState = TBlobStorageGroupInfo::EBS_DISINTEGRATED; TBlobStorageGroupInfo::EBlobState optimisticState = TBlobStorageGroupInfo::EBS_DISINTEGRATED; - EvaluateRestoreLayout(logCtx, state, info, &pessimisticState, &optimisticState); - + EvaluateRestoreLayout(logCtx, state, info, &optimisticState); if (auto res = SetErrorForUnrecoverableOptimistic(optimisticState)) { return *res; - } else if (auto res = IgnoreFullPessimistic(pessimisticState)) { - return *res; } // Find the slowest disk diff --git a/contrib/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp b/contrib/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp index 42894c3ae08..842a5b3ab20 100644 --- a/contrib/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp +++ b/contrib/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp @@ -119,7 +119,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) NKikimrProto::EReplyStatus status = group.OnVPut(vPut); vPutResult.MakeError(status, TString(), vPut.Record); - putImpl.OnVPutEventResult(logCtx, sender, vPutResult, nextVPuts, putResults); + putImpl.OnVPutEventResult(logCtx, sender, vPutResult, nextVPuts, putResults, {&group.GetInfo()->GetTopology()}); if (putResults.size()) { break; @@ -131,7 +131,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) auto& [_, result] = putResults.front(); UNIT_ASSERT(result->Status == NKikimrProto::OK); UNIT_ASSERT(result->Id == blobId); - UNIT_ASSERT(putImpl.GetHandoffPartsSent() == 7); + UNIT_ASSERT_VALUES_EQUAL(putImpl.GetHandoffPartsSent(), 2); } Y_UNIT_TEST(TestBlock42MaxPartCountOnHandoff) { @@ -275,7 +275,7 @@ struct TTestPutAllOk { TActorId sender; TDeque> vPuts2; - putImpl.OnVPutEventResult(LogCtx, sender, *vPutResults[resIdx], vPuts2, putResults); + putImpl.OnVPutEventResult(LogCtx, sender, *vPutResults[resIdx], vPuts2, putResults, &Group.GetInfo()->GetTopology()); if (putResults.size()) { break; } @@ -306,7 +306,8 @@ struct TTestPutAllOk { TActorId sender; TDeque> vMultiPuts2; - putImpl.OnVPutEventResult(LogCtx, sender, *vMultiPutResults[resIdx], vMultiPuts2, putResults); + putImpl.OnVPutEventResult(LogCtx, sender, *vMultiPutResults[resIdx], vMultiPuts2, putResults, + &Group.GetInfo()->GetTopology()); if (putResults.size() == BlobIds.size()) { break; } diff --git a/contrib/ydb/core/fq/libs/actors/run_actor.cpp b/contrib/ydb/core/fq/libs/actors/run_actor.cpp index 45c795ea5ee..1a4e85cc8f6 100644 --- a/contrib/ydb/core/fq/libs/actors/run_actor.cpp +++ b/contrib/ydb/core/fq/libs/actors/run_actor.cpp @@ -183,64 +183,68 @@ class TProgramRunnerActor : public NActors::TActorBootstrappedEnableResultPosition(); + Program = progFactory.Create("-stdin-", Sql, SessionId); + Program->EnableResultPosition(); - // parse phase - { - if (!Program->ParseSql(SqlSettings)) { - SendStatusAndDie(TProgram::TStatus::Error, "Failed to parse query"); - return; - } + // parse phase + { + if (!Program->ParseSql(SqlSettings)) { + SendStatusAndDie(TProgram::TStatus::Error, "Failed to parse query"); + return; + } - if (ExecuteMode == FederatedQuery::ExecuteMode::PARSE) { - SendStatusAndDie(TProgram::TStatus::Ok); - return; + if (ExecuteMode == FederatedQuery::ExecuteMode::PARSE) { + SendStatusAndDie(TProgram::TStatus::Ok); + return; + } } - } - // compile phase - { - if (!Program->Compile("")) { - SendStatusAndDie(TProgram::TStatus::Error, "Failed to compile query"); - return; + // compile phase + { + if (!Program->Compile("")) { + SendStatusAndDie(TProgram::TStatus::Error, "Failed to compile query"); + return; + } + + if (ExecuteMode == FederatedQuery::ExecuteMode::COMPILE) { + SendStatusAndDie(TProgram::TStatus::Ok); + return; + } } - if (ExecuteMode == FederatedQuery::ExecuteMode::COMPILE) { - SendStatusAndDie(TProgram::TStatus::Ok); + Compiled = true; + + // next phases can be async: optimize, validate, run + TProgram::TFutureStatus futureStatus; + switch (ExecuteMode) { + case FederatedQuery::ExecuteMode::EXPLAIN: + futureStatus = Program->OptimizeAsyncWithConfig("", TraceOptPipelineConfigurator); + break; + case FederatedQuery::ExecuteMode::VALIDATE: + futureStatus = Program->ValidateAsync(""); + break; + case FederatedQuery::ExecuteMode::RUN: + futureStatus = Program->RunAsyncWithConfig("", TraceOptPipelineConfigurator); + break; + default: + SendStatusAndDie(TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast(ExecuteMode)); return; } - } - Compiled = true; + futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { + actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); + }); - // next phases can be async: optimize, validate, run - TProgram::TFutureStatus futureStatus; - switch (ExecuteMode) { - case FederatedQuery::ExecuteMode::EXPLAIN: - futureStatus = Program->OptimizeAsyncWithConfig("", TraceOptPipelineConfigurator); - break; - case FederatedQuery::ExecuteMode::VALIDATE: - futureStatus = Program->ValidateAsync(""); - break; - case FederatedQuery::ExecuteMode::RUN: - futureStatus = Program->RunAsyncWithConfig("", TraceOptPipelineConfigurator); - break; - default: - SendStatusAndDie(TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast(ExecuteMode)); - return; + Become(&TProgramRunnerActor::StateFunc); + } catch (...) { + SendStatusAndDie(TProgram::TStatus::Error, CurrentExceptionMessage()); } - - futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { - actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); - }); - - Become(&TProgramRunnerActor::StateFunc); } void SendStatusAndDie(NYql::TProgram::TStatus status, const TString& message = "") { diff --git a/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp b/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp index d3ee6c6e977..438a8db36d4 100644 --- a/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp +++ b/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp @@ -41,8 +41,8 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< std::shared_ptr> partitions) : TBase(request, requester) , TDescribeTopicActorImpl(ConsumerOffsetSettings(consumers, partitions)) - , Requester_(requester) - , TopicName_(request.Topic) + , Requester(requester) + , TopicName(request.Topic) { Y_UNUSED(requester); }; @@ -63,7 +63,6 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< } } - // Noop void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, @@ -72,6 +71,12 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< Y_UNUSED(errorCode); Y_UNUSED(status); Y_UNUSED(ctx); + + THolder response(new TEvKafka::TEvCommitedOffsetsResponse()); + response->TopicName = TopicName; + response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::ERROR; + Send(Requester, response.Release()); + Die(ctx); } void ApplyResponse(TTabletInfo& tabletInfo, @@ -86,7 +91,7 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< consumerToOffset[consumerResult.GetConsumer()] = consumerResult.GetCommitedOffset(); } } - (*PartitionIdToOffsets_)[partResult.GetPartition()] = consumerToOffset; + (*PartitionIdToOffsets)[partResult.GetPartition()] = consumerToOffset; } }; @@ -99,17 +104,24 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< Y_UNUSED(ev); }; - // Noop + // Should never be called bool ApplyResponse(NKikimr::TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override { Y_UNUSED(ctx); Y_UNUSED(ev); - return true; + Y_ABORT(); }; void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override { const auto& response = ev->Get()->Request.Get()->ResultSet.front(); - Y_ABORT_UNLESS(response.PQGroupInfo); + if (!response.PQGroupInfo) { + THolder response(new TEvKafka::TEvCommitedOffsetsResponse()); + response->TopicName = TopicName; + response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::UNKNOWN_TOPIC; + Send(Requester, response.Release()); + TActorBootstrapped::PassAway(); + return; + } const auto& pqDescr = response.PQGroupInfo->Description; ProcessTablets(pqDescr, ActorContext()); @@ -117,17 +129,18 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< void Reply(const TActorContext& ctx) override { THolder response(new TEvKafka::TEvCommitedOffsetsResponse()); - response->TopicName = TopicName_; - response->PartitionIdToOffsets = PartitionIdToOffsets_; - Send(Requester_, response.Release()); + response->TopicName = TopicName; + response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::OK; + response->PartitionIdToOffsets = PartitionIdToOffsets; + Send(Requester, response.Release()); Die(ctx); }; private: - TActorId Requester_; - TString TopicName_; - std::unordered_map PartitionIdToOffset_ {}; - std::shared_ptr>> PartitionIdToOffsets_ = std::make_shared>>(); + TActorId Requester; + TString TopicName; + std::unordered_map PartitionIdToOffset {}; + std::shared_ptr>> PartitionIdToOffsets = std::make_shared>>(); }; NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message) { @@ -143,7 +156,27 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics topic; TString topicName = requestTopic.Name.value(); topic.Name = topicName; - auto partitionsToOffsets = TopicToOffsets_[topicName]; + if (UnknownTopics.contains(topicName)) { + for (auto requestPartition: requestTopic.PartitionIndexes) { + TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; + partition.PartitionIndex = requestPartition; + partition.ErrorCode = UNKNOWN_TOPIC_OR_PARTITION; + topic.Partitions.push_back(partition); + } + group.Topics.push_back(topic); + continue; + } + if (ErroredTopics.contains(topicName)) { + for (auto requestPartition: requestTopic.PartitionIndexes) { + TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; + partition.PartitionIndex = requestPartition; + partition.ErrorCode = UNKNOWN_SERVER_ERROR; + topic.Partitions.push_back(partition); + } + group.Topics.push_back(topic); + continue; + } + auto partitionsToOffsets = TopicToOffsets[topicName]; for (auto requestPartition: requestTopic.PartitionIndexes) { TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; partition.PartitionIndex = requestPartition; @@ -169,12 +202,12 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() partition.PartitionIndex = sourcePartition.PartitionIndex; partition.ErrorCode = sourcePartition.ErrorCode; } + response->Topics.push_back(topic); } } return response; } - void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { // If API level <= 7, Groups would be empty. In this case we convert message to level 8 and process it uniformely later if (Message->Groups.empty()) { @@ -196,7 +229,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { } } - for (const auto& topicToEntities : TopicToEntities_) { + for (const auto& topicToEntities : TopicToEntities) { NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest locationRequest{}; locationRequest.Topic = topicToEntities.first; locationRequest.Token = Context->UserToken->GetSerializedToken(); @@ -207,7 +240,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { SelfId(), topicToEntities.second.Partitions )); - InflyTopics_++; + InflyTopics++; } Become(&TKafkaOffsetFetchActor::StateWork); } @@ -215,27 +248,26 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { void TKafkaOffsetFetchActor::ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic) { TString topicName = topic.Name.value(); - if (!TopicToEntities_.contains(topicName)) { + if (!TopicToEntities.contains(topicName)) { TopicEntities newEntities; - TopicToEntities_[topicName] = newEntities; + TopicToEntities[topicName] = newEntities; } - TopicEntities& entities = TopicToEntities_[topicName]; + TopicEntities& entities = TopicToEntities[topicName]; entities.Consumers->insert(group); for (auto partition: topic.PartitionIndexes) { entities.Partitions->insert(partition); } }; -void TKafkaOffsetFetchActor::StateWork(TAutoPtr& ev) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle); - } -} - void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx) { - InflyTopics_--; - TopicToOffsets_[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets; - if (InflyTopics_ == 0) { + InflyTopics--; + TopicToOffsets[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets; + if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::ERROR) { + ErroredTopics.insert(ev->Get()->TopicName); + } else if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::UNKNOWN_TOPIC) { + UnknownTopics.insert(ev->Get()->TopicName); + } + if (InflyTopics == 0) { auto response = GetOffsetFetchResponse(); Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast(response->ErrorCode))); Die(ctx); diff --git a/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h b/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h index e5be327028f..79f1608cfb5 100644 --- a/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h +++ b/contrib/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h @@ -19,18 +19,27 @@ class TKafkaOffsetFetchActor: public NActors::TActorBootstrapped& ev); + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle); + } + } + void Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx); void ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic); TOffsetFetchResponseData::TPtr GetOffsetFetchResponse(); + void ReplyError(const TActorContext& ctx); private: const TContext::TPtr Context; const ui64 CorrelationId; const TMessagePtr Message; - std::unordered_map TopicToEntities_; - std::unordered_map>>> TopicToOffsets_; - ui32 InflyTopics_ = 0; + std::unordered_map TopicToEntities; + std::unordered_map>>> TopicToOffsets; + std::set UnknownTopics; + std::set ErroredTopics; + ui32 InflyTopics = 0; }; diff --git a/contrib/ydb/core/kafka_proxy/kafka_events.h b/contrib/ydb/core/kafka_proxy/kafka_events.h index c697a356a68..decab7ff036 100644 --- a/contrib/ydb/core/kafka_proxy/kafka_events.h +++ b/contrib/ydb/core/kafka_proxy/kafka_events.h @@ -210,10 +210,17 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase { + enum EStatus { + OK, + ERROR, + UNKNOWN_TOPIC, + }; + TEvCommitedOffsetsResponse() {} TString TopicName; + EStatus Status; std::shared_ptr>> PartitionIdToOffsets; }; diff --git a/contrib/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/contrib/ydb/core/kafka_proxy/ut/ut_protocol.cpp index db303996c76..44b07eb81ad 100644 --- a/contrib/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/contrib/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1208,6 +1208,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { ui64 minActivePartitions = 10; TString consumerName = "consumer-0"; + TString consumer1Name = "consumer-1"; TString key = "record-key"; TString value = "record-value"; @@ -1221,6 +1222,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { .CreateTopic(topicName, NYdb::NTopic::TCreateTopicSettings() .BeginAddConsumer(consumerName).EndAddConsumer() + .BeginAddConsumer(consumer1Name).EndAddConsumer() .PartitioningSettings(minActivePartitions, 100)) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); @@ -1270,6 +1272,36 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { static_cast(EKafkaErrors::NONE_ERROR)); } + { + // Commit offset for consumer-0 + auto settings = NTopic::TReadSessionSettings() + .AppendTopics(NTopic::TTopicReadSettings(topicName)) + .ConsumerName("consumer-0"); + auto topicReader = pqClient.CreateReadSession(settings); + + auto m = Read(topicReader); + UNIT_ASSERT_EQUAL(m.size(), 1); + + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + auto& m0 = m[0].GetMessages()[0]; + m0.Commit(); + } + + { + // Commit offset for consumer-1 + auto settings = NTopic::TReadSessionSettings() + .AppendTopics(NTopic::TTopicReadSettings(topicName)) + .ConsumerName("consumer-1"); + auto topicReader = pqClient.CreateReadSession(settings); + + auto m = Read(topicReader); + UNIT_ASSERT_EQUAL(m.size(), 1); + + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + auto& m0 = m[0].GetMessages()[0]; + m0.Commit(); + } + { // Check commited offset after produce std::map> topicsToPartions; @@ -1281,10 +1313,64 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); - // This check faled one time under asan, commented until I figure out the exact reason. - // UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); + UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); } + { + // Check with nonexistent topic + std::map> topicsToPartions; + topicsToPartions["nonexTopic"] = std::vector{0, 1}; + auto msg = client.OffsetFetch(consumerName, topicsToPartions); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2); + for (const auto& partition : msg->Groups[0].Topics[0].Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, UNKNOWN_TOPIC_OR_PARTITION); + } + } + + { + // Check with nonexistent consumer + std::map> topicsToPartions; + topicsToPartions[topicName] = std::vector{0, 1}; + auto msg = client.OffsetFetch("nonexConsumer", topicsToPartions); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2); + for (const auto& partition : msg->Groups[0].Topics[0].Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, RESOURCE_NOT_FOUND); + } + } + + { + // Check with 2 consumers + TOffsetFetchRequestData request; + + TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic; + topic.Name = topicName; + auto partitionIndexes = std::vector{0}; + topic.PartitionIndexes = partitionIndexes; + + TOffsetFetchRequestData::TOffsetFetchRequestGroup group0; + group0.GroupId = consumerName; + group0.Topics.push_back(topic); + request.Groups.push_back(group0); + + TOffsetFetchRequestData::TOffsetFetchRequestGroup group1; + group1.GroupId = consumer1Name; + group1.Topics.push_back(topic); + request.Groups.push_back(group1); + + auto msg = client.OffsetFetch(request); + + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 2); + for (const auto& group: msg->Groups) { + UNIT_ASSERT_VALUES_EQUAL(group.Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].CommittedOffset, 1); + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].ErrorCode, NONE_ERROR); + } + } } // Y_UNIT_TEST(OffsetFetchScenario) Y_UNIT_TEST(LoginWithApiKey) { diff --git a/contrib/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/contrib/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 631b0cc30fd..83e2de15908 100644 --- a/contrib/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/contrib/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1429,14 +1429,17 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } IDqGateway::TDqProgressWriter MakeDqProgressWriter(const TPublicIds::TPtr& publicIds) const { - IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, publicIds](const TString& stage) { - for (const auto& publicId : publicIds->AllPublicIds) { - auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress, stage); - if (publicId.second) { - p.Counters.ConstructInPlace(); - p.Counters->Running = p.Counters->Total = publicId.second; + IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, publicIds, current = std::make_shared()](const TString& stage) { + if (*current != stage) { + for (const auto& publicId : publicIds->AllPublicIds) { + auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress, stage); + if (publicId.second) { + p.Counters.ConstructInPlace(); + p.Counters->Running = p.Counters->Total = publicId.second; + } + progressWriter(p); } - progressWriter(p); + *current = stage; } }; return dqProgressWriter; diff --git a/contrib/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/contrib/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 9d96952822a..0fb7d6dc89b 100644 --- a/contrib/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/contrib/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -10,10 +10,13 @@ #include #include + #include #include -#include +#include +#include +#include #include @@ -21,36 +24,59 @@ namespace NYql { using namespace NThreading; -class TDqGatewayImpl: public std::enable_shared_from_this -{ +class TDqTaskScheduler : public TTaskScheduler { +private: + struct TDelay: public TTaskScheduler::ITask { + TDelay(TPromise p) + : Promise(std::move(p)) + { } + + TInstant Process() override { + Promise.SetValue(); + return TInstant::Max(); + } + + TPromise Promise; + }; + +public: + TDqTaskScheduler() + : TTaskScheduler(1) // threads + {} + + TFuture Delay(TDuration duration) { + TPromise promise = NewPromise(); + + auto future = promise.GetFuture(); + + if (!Add(MakeIntrusive(promise), TInstant::Now() + duration)) { + promise.SetException("cannot delay"); + } + + return future; + } +}; + +class TDqGatewaySession: public std::enable_shared_from_this { public: using TResult = IDqGateway::TResult; using TDqProgressWriter = IDqGateway::TDqProgressWriter; - TDqGatewayImpl(const TString& host, int port, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout, TDuration requestTimeout) - : GrpcConf(TStringBuilder() << host << ":" << port, requestTimeout) - , GrpcClient(1) - , Service(GrpcClient.CreateGRpcServiceConnection(GrpcConf)) - , VanillaJobPath(vanillaJobPath) - , VanillaJobMd5(vanillaJobMd5) - , TaskScheduler(1) - , OpenSessionTimeout(timeout) + TDqGatewaySession(const TString& sessionId, TDqTaskScheduler& taskScheduler, NYdbGrpc::TServiceConnection& service, TFuture&& openSessionFuture) + : SessionId(sessionId) + , TaskScheduler(taskScheduler) + , Service(service) + , OpenSessionFuture(std::move(openSessionFuture)) { - TaskScheduler.Start(); } - TString GetVanillaJobPath() { - return VanillaJobPath; - } - - TString GetVanillaJobMd5() { - return VanillaJobMd5; + const TString& GetSessionId() const { + return SessionId; } template - void OnResponse(TPromise promise, TString sessionId, NYdbGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap& modulesMapping, bool alwaysFallback = false) - { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + void OnResponse(TPromise promise, NYdbGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap& modulesMapping, bool alwaysFallback = false) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(SessionId); YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback"; TResult result; @@ -89,7 +115,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this auto& message = *queue.front(); queue.pop_front(); message.Setmessage(NBacktrace::Symbolize(message.Getmessage(), modulesMapping)); - for (auto &subMsg : *message.Mutableissues()) { + for (auto& subMsg : *message.Mutableissues()) { queue.push_back(&subMsg); } } @@ -153,56 +179,34 @@ class TDqGatewayImpl: public std::enable_shared_from_this promise.SetValue(result); } - TFuture Delay(TDuration duration) { - TPromise promise = NewPromise(); - - auto future = promise.GetFuture(); - - if (!TaskScheduler.Add(MakeIntrusive(promise), TInstant::Now() + duration)) { - promise.SetException("cannot delay"); - } - - return future; - } - template TFuture WithRetry( - const TString& sessionId, const TRequest& queryPB, TStub stub, int retry, const TDqSettings::TPtr& settings, - const THashMap& modulesMapping + const THashMap& modulesMapping, + const TDqProgressWriter& progressWriter ) { auto backoff = TDuration::MilliSeconds(settings->RetryBackoffMs.Get().GetOrElse(1000)); auto promise = NewPromise(); const auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default); const auto alwaysFallback = EFallbackPolicy::Always == fallbackPolicy; auto self = weak_from_this(); - auto callback = [self, promise, sessionId, alwaysFallback, modulesMapping](NYdbGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { + auto callback = [self, promise, sessionId = SessionId, alwaysFallback, modulesMapping](NYdbGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { auto this_ = self.lock(); if (!this_) { - YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; - promise.SetException("Gateway was closed"); + YQL_CLOG(DEBUG, ProviderDq) << "Session was closed: " << sessionId; + promise.SetException("Session was closed"); return; } - this_->OnResponse(std::move(promise), std::move(sessionId), std::move(status), std::move(resp), modulesMapping, alwaysFallback); + this_->OnResponse(std::move(promise), std::move(status), std::move(resp), modulesMapping, alwaysFallback); }; - Service->DoRequest(queryPB, callback, stub); + Service.DoRequest(queryPB, callback, stub); - { - TGuard lock(ProgressMutex); - auto i = RunningQueries.find(sessionId); - if (i != RunningQueries.end()) { - if (i->second.ProgressWriter) { - ScheduleQueryStatusRequest(sessionId); - } - } else { - return MakeFuture(TResult()); - } - } + ScheduleQueryStatusRequest(progressWriter); return promise.GetFuture().Apply([=](const TFuture& result) { if (result.HasException()) { @@ -215,31 +219,31 @@ class TDqGatewayImpl: public std::enable_shared_from_this return result; } - return this_->Delay(backoff) - .Apply([=](const TFuture& result) { + return this_->TaskScheduler.Delay(backoff) + .Apply([=, sessionId = this_->GetSessionId()](const TFuture& result) { auto this_ = self.lock(); try { result.TryRethrow(); if (!this_) { - YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; - throw std::runtime_error("Gateway was closed"); + YQL_CLOG(DEBUG, ProviderDq) << "Session was closed: " << sessionId; + throw std::runtime_error("Session was closed"); } } catch (...) { return MakeErrorFuture(std::current_exception()); } - return this_->WithRetry(sessionId, queryPB, stub, retry - 1, settings, modulesMapping); + return this_->WithRetry(queryPB, stub, retry - 1, settings, modulesMapping, progressWriter); }); }); } TFuture - ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector& columns, + ExecutePlan(NDqs::TPlan&& plan, const TVector& columns, const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, bool discard) { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + YQL_LOG_CTX_ROOT_SESSION_SCOPE(SessionId); Yql::DqsProto::ExecuteGraphRequest queryPB; for (const auto& task : plan.Tasks) { @@ -253,7 +257,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this YQL_ENSURE(!file.GetObjectId().empty()); } } - queryPB.SetSession(sessionId); + queryPB.SetSession(SessionId); queryPB.SetResultType(plan.ResultType); queryPB.SetSourceId(plan.SourceID.NodeId()-1); for (const auto& column : columns) { @@ -279,179 +283,197 @@ class TDqGatewayImpl: public std::enable_shared_from_this int retry = settings->MaxRetries.Get().GetOrElse(5); - TFuture sessionFuture; - { - TGuard lock(ProgressMutex); - auto it = RunningQueries.find(sessionId); - if (it == RunningQueries.end()) { - YQL_CLOG(DEBUG, ProviderDq) << "Session was closed: " << sessionId; - return MakeErrorFuture(std::make_exception_ptr(std::runtime_error("Session was closed"))); - } - it->second.ProgressWriter = progressWriter; - sessionFuture = it->second.OpenSessionFuture; - } - YQL_CLOG(DEBUG, ProviderDq) << "Send query of size " << queryPB.ByteSizeLong(); auto self = weak_from_this(); - return sessionFuture.Apply([self, sessionId, queryPB, retry, settings, modulesMapping](const TFuture& ) { + return OpenSessionFuture.Apply([self, sessionId = SessionId, queryPB, retry, settings, modulesMapping, progressWriter](const TFuture& f) { + f.TryRethrow(); auto this_ = self.lock(); if (!this_) { - YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; - return MakeErrorFuture(std::make_exception_ptr(std::runtime_error("Gateway was closed"))); + YQL_CLOG(DEBUG, ProviderDq) << "Session was closed: " << sessionId; + return MakeErrorFuture(std::make_exception_ptr(std::runtime_error("Session was closed"))); } return this_->WithRetry( - sessionId, queryPB, &Yql::DqsProto::DqService::Stub::AsyncExecuteGraph, retry, settings, - modulesMapping); + modulesMapping, + progressWriter); }); } - TFuture OpenSession(const TString& sessionId, const TString& username) { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); - YQL_CLOG(INFO, ProviderDq) << "OpenSession"; - Yql::DqsProto::OpenSessionRequest request; - request.SetSession(sessionId); - request.SetUsername(username); - - { - TGuard lock(ProgressMutex); - if (RunningQueries.find(sessionId) != RunningQueries.end()) { - return MakeFuture(); - } - } - - NYdbGrpc::TCallMeta meta; - meta.Timeout = OpenSessionTimeout; + TFuture Close() { + Yql::DqsProto::CloseSessionRequest request; + request.SetSession(SessionId); auto promise = NewPromise(); - auto self = weak_from_this(); - auto callback = [self, promise, sessionId](NYdbGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable { + auto callback = [promise, sessionId = SessionId](NYdbGrpc::TGrpcStatus&& status, Yql::DqsProto::CloseSessionResponse&& resp) mutable { Y_UNUSED(resp); YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); - auto this_ = self.lock(); - if (!this_) { - YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; - promise.SetException("Gateway was closed"); - return; - } if (status.Ok()) { - YQL_CLOG(INFO, ProviderDq) << "OpenSession OK"; - this_->SchedulePingSessionRequest(sessionId); + YQL_CLOG(DEBUG, ProviderDq) << "Async close session OK"; promise.SetValue(); } else { - YQL_CLOG(ERROR, ProviderDq) << "OpenSession error: " << status.Msg; - promise.SetException(status.Msg); + YQL_CLOG(ERROR, ProviderDq) << "Async close session error: " << status.GRpcStatusCode << ", message: " << status.Msg; + promise.SetException(TStringBuilder() << "Async close session error: " << status.GRpcStatusCode << ", message: " << status.Msg); } }; - Service->DoRequest( - request, callback, &Yql::DqsProto::DqService::Stub::AsyncOpenSession, meta); - - { - TGuard lock(ProgressMutex); - RunningQueries.emplace(sessionId, TSession { - std::optional {}, - "", - promise.GetFuture() - }); - } - - return MakeFuture(); - } - - TFuture CloseSession(const TString& sessionId) { - Yql::DqsProto::CloseSessionRequest request; - request.SetSession(sessionId); - - auto callback = [](NYdbGrpc::TGrpcStatus&& status, Yql::DqsProto::CloseSessionResponse&& resp) { - Y_UNUSED(resp); - Y_UNUSED(status); - }; - - { - TGuard lock(ProgressMutex); - RunningQueries.erase(sessionId); - } - - Service->DoRequest( + Service.DoRequest( request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession); - - return MakeFuture(); + return promise.GetFuture(); } - void OnRequestQueryStatus(const TString& sessionId, const TString& status, bool ok) { - TGuard lock(ProgressMutex); - TString stage; - TDqProgressWriter* dqProgressWriter = nullptr; - auto it = RunningQueries.find(sessionId); - if (it != RunningQueries.end() && ok) { - dqProgressWriter = it->second.ProgressWriter ? &*it->second.ProgressWriter:nullptr; - auto lastStatus = it->second.Status; - if (dqProgressWriter && lastStatus != status) { - stage = status; - it->second.Status = stage; + void OnRequestQueryStatus(const TDqProgressWriter& progressWriter, const TString& status, bool ok) { + if (ok) { + ScheduleQueryStatusRequest(progressWriter); + if (!status.empty()) { + progressWriter(status); } - - ScheduleQueryStatusRequest(sessionId); - } else if (it != RunningQueries.end()) { - it->second.ProgressWriter = {}; - } - - if (!stage.empty() && dqProgressWriter) { - (*dqProgressWriter)(stage); } } - void RequestQueryStatus(const TString& sessionId) { + void RequestQueryStatus(const TDqProgressWriter& progressWriter) { Yql::DqsProto::QueryStatusRequest request; - request.SetSession(sessionId); + request.SetSession(SessionId); auto self = weak_from_this(); - auto callback = [self, sessionId](NYdbGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) { + auto callback = [self, progressWriter](NYdbGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) { auto this_ = self.lock(); if (!this_) { return; } - this_->OnRequestQueryStatus(sessionId, resp.GetStatus(), status.Ok()); + this_->OnRequestQueryStatus(progressWriter, resp.GetStatus(), status.Ok()); }; - Service->DoRequest( + Service.DoRequest( request, callback, &Yql::DqsProto::DqService::Stub::AsyncQueryStatus, {}, nullptr); } - void StartQueryStatusRequest(const TString& sessionId, bool ok) { - TGuard lock(ProgressMutex); - auto it = RunningQueries.find(sessionId); - if (it != RunningQueries.end() && ok) { - RequestQueryStatus(sessionId); - } else if (it != RunningQueries.end()) { - it->second.ProgressWriter = {}; + void ScheduleQueryStatusRequest(const TDqProgressWriter& progressWriter) { + auto self = weak_from_this(); + TaskScheduler.Delay(TDuration::MilliSeconds(1000)).Subscribe([self, progressWriter](const TFuture& f) { + auto this_ = self.lock(); + if (!this_) { + return; + } + + if (!f.HasException()) { + this_->RequestQueryStatus(progressWriter); + } + }); + } + +private: + const TString SessionId; + TDqTaskScheduler& TaskScheduler; + NYdbGrpc::TServiceConnection& Service; + + TMutex ProgressMutex; + + std::optional ProgressWriter; + TString Status; + TFuture OpenSessionFuture; +}; + +class TDqGatewayImpl: public std::enable_shared_from_this { + using TResult = IDqGateway::TResult; + using TDqProgressWriter = IDqGateway::TDqProgressWriter; + +public: + TDqGatewayImpl(const TString& host, int port, TDuration timeout = TDuration::Minutes(60), TDuration requestTimeout = TDuration::Max()) + : GrpcConf(TStringBuilder() << host << ":" << port, requestTimeout) + , GrpcClient(1) + , Service(GrpcClient.CreateGRpcServiceConnection(GrpcConf)) + , TaskScheduler() + , OpenSessionTimeout(timeout) + { + TaskScheduler.Start(); + } + + ~TDqGatewayImpl() { + Stop(); + } + + void Stop() { + decltype(Sessions) sessions; + with_lock (Mutex) { + sessions = std::move(Sessions); + } + for (auto& pair: sessions) { + try { + pair.second->Close().GetValueSync(); + } catch (...) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(pair.first); + YQL_CLOG(ERROR, ProviderDq) << "Error closing session " << pair.first << ": " << CurrentExceptionMessage(); + } + } + sessions.clear(); // Destroy session objects explicitly before stopping grpc + TaskScheduler.Stop(); + try { + GrpcClient.Stop(); + } catch (...) { + YQL_CLOG(ERROR, ProviderDq) << "Error while stopping GRPC client: " << CurrentExceptionMessage(); + } + } + + void DropSession(const TString& sessionId) { + with_lock (Mutex) { + Sessions.erase(sessionId); } } - void ScheduleQueryStatusRequest(const TString& sessionId) { + TFuture OpenSession(const TString& sessionId, const TString& username) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + YQL_CLOG(INFO, ProviderDq) << "OpenSession"; + + auto promise = NewPromise(); + std::shared_ptr session = std::make_shared(sessionId, TaskScheduler, *Service, promise.GetFuture()); + with_lock (Mutex) { + if (!Sessions.emplace(sessionId, session).second) { + return MakeErrorFuture(std::make_exception_ptr(yexception() << "Duplicate session id: " << sessionId)); + } + } + + Yql::DqsProto::OpenSessionRequest request; + request.SetSession(sessionId); + request.SetUsername(username); + + NYdbGrpc::TCallMeta meta; + meta.Timeout = OpenSessionTimeout; + auto self = weak_from_this(); - Delay(TDuration::MilliSeconds(1000)).Subscribe([self, sessionId](TFuture fut) { + auto callback = [self, promise, sessionId](NYdbGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable { + Y_UNUSED(resp); + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); auto this_ = self.lock(); if (!this_) { + YQL_CLOG(ERROR, ProviderDq) << "Session was closed: " << sessionId; + promise.SetException("Session was closed"); return; } + if (status.Ok()) { + YQL_CLOG(INFO, ProviderDq) << "OpenSession OK"; + this_->SchedulePingSessionRequest(sessionId); + promise.SetValue(); + } else { + YQL_CLOG(ERROR, ProviderDq) << "OpenSession error: " << status.Msg; + this_->DropSession(sessionId); + promise.SetException(status.Msg); + } + }; - this_->StartQueryStatusRequest(sessionId, !fut.HasException()); - }); + Service->DoRequest( + request, callback, &Yql::DqsProto::DqService::Stub::AsyncOpenSession, meta); + + return MakeFuture(); } void SchedulePingSessionRequest(const TString& sessionId) { auto self = weak_from_this(); - auto callback = [self, sessionId]( - NYdbGrpc::TGrpcStatus&& status, - Yql::DqsProto::PingSessionResponse&&) mutable - { + auto callback = [self, sessionId] (NYdbGrpc::TGrpcStatus&& status, Yql::DqsProto::PingSessionResponse&&) mutable { auto this_ = self.lock(); if (!this_) { return; @@ -459,11 +481,12 @@ class TDqGatewayImpl: public std::enable_shared_from_this if (status.GRpcStatusCode == grpc::INVALID_ARGUMENT) { YQL_CLOG(INFO, ProviderDq) << "Session closed " << sessionId; + this_->DropSession(sessionId); } else { this_->SchedulePingSessionRequest(sessionId); } }; - Delay(TDuration::Seconds(10)).Subscribe([self, callback, sessionId](const TFuture&) { + TaskScheduler.Delay(TDuration::Seconds(10)).Subscribe([self, callback, sessionId](const TFuture&) { auto this_ = self.lock(); if (!this_) { return; @@ -479,21 +502,37 @@ class TDqGatewayImpl: public std::enable_shared_from_this }); } - struct TDelay: public TTaskScheduler::ITask { - TDelay(TPromise p) - : Promise(std::move(p)) - { } - - TInstant Process() override { - Promise.SetValue(); - return TInstant::Max(); + TFuture CloseSessionAsync(const TString& sessionId) { + std::shared_ptr session; + with_lock (Mutex) { + auto it = Sessions.find(sessionId); + if (it != Sessions.end()) { + session = it->second; + Sessions.erase(it); + } } + if (session) { + return session->Close(); + } + return MakeFuture(); + } - TPromise Promise; - }; - - void Stop() { - GrpcClient.Stop(); + TFuture ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector& columns, + const THashMap& secureParams, const THashMap& graphParams, + const TDqSettings::TPtr& settings, + const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, + bool discard) + { + std::shared_ptr session; + with_lock(Mutex) { + auto it = Sessions.find(sessionId); + if (it == Sessions.end()) { + YQL_CLOG(ERROR, ProviderDq) << "Session was closed: " << sessionId; + return MakeErrorFuture(std::make_exception_ptr(std::runtime_error("Session was closed"))); + } + session = it->second; + } + return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard); } private: @@ -501,66 +540,58 @@ class TDqGatewayImpl: public std::enable_shared_from_this NYdbGrpc::TGRpcClientLow GrpcClient; std::unique_ptr> Service; - TMutex ProgressMutex; - TMutex Mutex; - - struct TSession { - std::optional ProgressWriter; - TString Status; - TFuture OpenSessionFuture; - }; - THashMap RunningQueries; - TString VanillaJobPath; - TString VanillaJobMd5; - - TTaskScheduler TaskScheduler; + TDqTaskScheduler TaskScheduler; const TDuration OpenSessionTimeout; + + TMutex Mutex; + THashMap> Sessions; }; class TDqGateway: public IDqGateway { public: TDqGateway(const TString& host, int port, const TString& vanillaJobPath, const TString& vanillaJobMd5, TDuration timeout = TDuration::Minutes(60), TDuration requestTimeout = TDuration::Max()) - : Impl(std::make_shared(host, port, vanillaJobPath, vanillaJobMd5, timeout, requestTimeout)) - { } - - ~TDqGateway() + : Impl(std::make_shared(host, port, timeout, requestTimeout)) + , VanillaJobPath(vanillaJobPath) + , VanillaJobMd5(vanillaJobMd5) { - Stop(); + } + + ~TDqGateway() { } void Stop() override { Impl->Stop(); } - TFuture OpenSession(const TString& sessionId, const TString& username) override - { + TFuture OpenSession(const TString& sessionId, const TString& username) override { return Impl->OpenSession(sessionId, username); } - TFuture CloseSessionAsync(const TString& sessionId) override - { - return Impl->CloseSession(sessionId); + TFuture CloseSessionAsync(const TString& sessionId) override { + return Impl->CloseSessionAsync(sessionId); } TFuture ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector& columns, - const THashMap& secureParams, const THashMap& graphParams, - const TDqSettings::TPtr& settings, - const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) override + const THashMap& secureParams, const THashMap& graphParams, + const TDqSettings::TPtr& settings, + const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, + bool discard) override { return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard); } TString GetVanillaJobPath() override { - return Impl->GetVanillaJobPath(); + return VanillaJobPath; } TString GetVanillaJobMd5() override { - return Impl->GetVanillaJobMd5(); + return VanillaJobMd5; } private: std::shared_ptr Impl; + TString VanillaJobPath; + TString VanillaJobMd5; }; TIntrusivePtr CreateDqGateway(const TString& host, int port) {