diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 70bd98f15b0c..d48435d3c9d5 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -71,7 +71,7 @@ class TKqpWorkloadService : public TActorBootstrapped { (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem }), IEventHandle::FlagTrackDelivery); - CpuQuotaManager = std::make_unique(ActorContext(), Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); + CpuQuotaManager = std::make_unique(Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools(); EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless(); @@ -556,7 +556,7 @@ class TKqpWorkloadService : public TActorBootstrapped { return &databaseIt->second; } LOG_I("Creating new database state for id " << databaseId); - return &DatabaseToState.insert({databaseId, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second; + return &DatabaseToState.insert({databaseId, TDatabaseState{.SelfId = SelfId(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second; } TPoolState* GetOrCreatePoolState(const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) { @@ -568,7 +568,7 @@ class TKqpWorkloadService : public TActorBootstrapped { LOG_I("Creating new handler for pool " << poolKey); const auto poolHandler = Register(CreatePoolHandlerActor(databaseId, poolId, poolConfig, EnableResourcePoolsCounters ? Counters.Counters : MakeIntrusive())); - const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; + const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler}}).first->second; Counters.ActivePools->Inc(); ScheduleIdleCheck(); diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 5fd22dbb032b..a2950bae88e6 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -16,7 +16,7 @@ constexpr TDuration IDLE_DURATION = TDuration::Seconds(60); struct TDatabaseState { - NActors::TActorContext ActorContext; + TActorId SelfId; bool& EnabledResourcePoolsOnServerless; std::vector PendingRequersts = {}; @@ -33,7 +33,7 @@ struct TDatabaseState { const TString& poolId = ev->Get()->PoolId; auto& subscribers = PendingSubscriptions[poolId]; if (subscribers.empty()) { - ActorContext.Register(CreatePoolFetcherActor(ActorContext.SelfID, ev->Get()->DatabaseId, poolId, nullptr)); + TActivationContext::Register(CreatePoolFetcherActor(SelfId, ev->Get()->DatabaseId, poolId, nullptr)); } subscribers.emplace(ev->Sender); @@ -45,7 +45,7 @@ struct TDatabaseState { PendingRequersts.emplace_back(std::move(ev)); if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) { - ActorContext.Register(CreateDatabaseFetcherActor(ActorContext.SelfID, DatabaseIdToDatabase(databaseId))); + TActivationContext::Register(CreateDatabaseFetcherActor(SelfId, DatabaseIdToDatabase(databaseId))); } else if (!DatabaseUnsupported) { StartPendingRequests(); } else { @@ -61,11 +61,11 @@ struct TDatabaseState { } if (ev->Get()->Status == Ydb::StatusIds::SUCCESS && poolHandler) { - ActorContext.Send(poolHandler, new TEvPrivate::TEvUpdatePoolSubscription(ev->Get()->PathId, subscribers)); + TActivationContext::Send(poolHandler, std::make_unique(ev->Get()->PathId, subscribers)); } else { const TString& databaseId = ev->Get()->DatabaseId; for (const auto& subscriber : subscribers) { - ActorContext.Send(subscriber, new TEvUpdatePoolInfo(databaseId, poolId, std::nullopt, std::nullopt)); + TActivationContext::Send(subscriber, std::make_unique(databaseId, poolId, std::nullopt, std::nullopt)); } } subscribers.clear(); @@ -79,7 +79,7 @@ struct TDatabaseState { } if (Serverless != ev->Get()->Serverless) { - ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvKqp::TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->DatabaseId, ev->Get()->Serverless)); + TActivationContext::Send(MakeKqpProxyID(SelfId.NodeId()), std::make_unique(ev->Get()->Database, ev->Get()->DatabaseId, ev->Get()->Serverless)); } LastUpdateTime = TInstant::Now(); @@ -103,17 +103,17 @@ struct TDatabaseState { } for (auto& ev : PendingRequersts) { - ActorContext.Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool)); + TActivationContext::Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool)); } PendingRequersts.clear(); } void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { for (const auto& ev : PendingRequersts) { - RemovePendingSession(ev->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) { - ActorContext.Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, {NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")})); + RemovePendingSession(ev->Get()->SessionId, [actorSystem = TActivationContext::ActorSystem()](TEvCleanupRequest::TPtr event) { + actorSystem->Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, NYql::TIssues{NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")})); }); - ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues)); + TActivationContext::Send(ev->Sender, std::make_unique(status, TString{}, NResourcePool::TPoolSettings{}, issues)); } PendingRequersts.clear(); } @@ -121,7 +121,6 @@ struct TDatabaseState { struct TPoolState { NActors::TActorId PoolHandler; - NActors::TActorContext ActorContext; std::queue PendingRequests = {}; bool WaitingInitialization = false; @@ -137,7 +136,7 @@ struct TPoolState { return; } - ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false)); + TActivationContext::Send(PoolHandler, std::make_unique(false)); PreviousPoolHandlers.insert(PoolHandler); PoolHandler = *NewPoolHandler; NewPoolHandler = std::nullopt; @@ -151,7 +150,7 @@ struct TPoolState { PlaceRequestRunning = true; InFlightRequests++; - ActorContext.Send(PendingRequests.front()->Forward(PoolHandler)); + TActivationContext::Send(PendingRequests.front()->Forward(PoolHandler)); PendingRequests.pop(); } @@ -163,31 +162,29 @@ struct TPoolState { void DoCleanupRequest(TEvCleanupRequest::TPtr event) { for (const auto& poolHandler : PreviousPoolHandlers) { - ActorContext.Send(poolHandler, new TEvCleanupRequest( + TActivationContext::Send(poolHandler, std::make_unique( event->Get()->DatabaseId, event->Get()->SessionId, event->Get()->PoolId, event->Get()->Duration, event->Get()->CpuConsumed )); } - ActorContext.Send(event->Forward(PoolHandler)); + TActivationContext::Send(event->Forward(PoolHandler)); } }; struct TCpuQuotaManagerState { TCpuQuotaManager CpuQuotaManager; - NActors::TActorContext ActorContext; bool CpuLoadRequestRunning = false; TInstant CpuLoadRequestTime = TInstant::Zero(); - TCpuQuotaManagerState(NActors::TActorContext actorContext, NMonitoring::TDynamicCounterPtr subComponent) + TCpuQuotaManagerState(NMonitoring::TDynamicCounterPtr subComponent) : CpuQuotaManager(TDuration::Seconds(1), TDuration::Seconds(10), IDLE_DURATION, 0.1, true, 0, subComponent) - , ActorContext(actorContext) {} void RequestCpuQuota(TActorId poolHandler, double maxClusterLoad, ui64 coockie) { auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad); bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS; - ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie); + TActivationContext::Send(poolHandler, std::make_unique(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie); // Schedule notification if (!quotaAccepted) { @@ -238,7 +235,7 @@ struct TCpuQuotaManagerState { } for (const TActorId& poolHandler : poolHandlers) { - ActorContext.Send(poolHandler, new TEvPrivate::TEvRefreshPoolState()); + TActivationContext::Send(poolHandler, std::make_unique()); HandlersLimits.erase(poolHandler); } PendingHandlers.erase(PendingHandlers.begin());