From e6f602e691f08b336567ea04d683e52194f9674a Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Tue, 8 Oct 2024 12:45:24 +0300 Subject: [PATCH] YQ-3560 RowDispatcher: local mode to use in dqrun (#10072) --- .../libs/config/protos/row_dispatcher.proto | 2 +- ydb/core/fq/libs/init/init.cpp | 1 - .../libs/row_dispatcher/leader_election.cpp | 4 ++++ .../fq/libs/row_dispatcher/row_dispatcher.cpp | 6 ----- .../fq/libs/row_dispatcher/row_dispatcher.h | 1 - .../row_dispatcher/row_dispatcher_service.cpp | 2 -- .../row_dispatcher/row_dispatcher_service.h | 1 - .../row_dispatcher/ut/leader_election_ut.cpp | 23 +++++++++++++++---- .../row_dispatcher/ut/row_dispatcher_ut.cpp | 2 -- .../dummy/yql_pq_file_topic_client.cpp | 6 ++++- 10 files changed, 28 insertions(+), 20 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 10ca10285ea0..26e4ecbfc7b4 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -11,6 +11,7 @@ import "ydb/core/fq/libs/config/protos/storage.proto"; message TRowDispatcherCoordinatorConfig { TYdbStorageConfig Database = 1; string CoordinationNodePath = 2; + bool LocalMode = 3; // Use only local row_dispatcher. } message TRowDispatcherConfig { bool Enabled = 1; @@ -19,5 +20,4 @@ message TRowDispatcherConfig { uint64 MaxSessionUsedMemory = 4; bool WithoutConsumer = 5; TRowDispatcherCoordinatorConfig Coordinator = 6; - } diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 21258d33960e..53de65df1242 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -192,7 +192,6 @@ void Init( if (protoConfig.GetRowDispatcher().GetEnabled()) { auto rowDispatcher = NFq::NewRowDispatcherService( protoConfig.GetRowDispatcher(), - protoConfig.GetCommon(), NKikimr::CreateYdbCredentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp index 6817cfc292c0..5f945ddc9f38 100644 --- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp +++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp @@ -222,6 +222,10 @@ void TLeaderElection::Bootstrap() { Become(&TLeaderElection::StateFunc); LogPrefix = "TLeaderElection " + SelfId().ToString() + " "; LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString()); + if (Config.GetLocalMode()) { + TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId)); + return; + } ProcessState(); } diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 3d327385cf0c..f1d8b93d9aeb 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -108,7 +108,6 @@ class TRowDispatcher : public TActorBootstrapped { NConfig::TRowDispatcherConfig Config; - NConfig::TCommonConfig CommonConfig; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; TYqSharedResources::TPtr YqSharedResources; TMaybe CoordinatorActorId; @@ -171,7 +170,6 @@ class TRowDispatcher : public TActorBootstrapped { public: explicit TRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -234,7 +232,6 @@ class TRowDispatcher : public TActorBootstrapped { TRowDispatcher::TRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -242,7 +239,6 @@ TRowDispatcher::TRowDispatcher( const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory, const ::NMonitoring::TDynamicCounterPtr& counters) : Config(config) - , CommonConfig(commonConfig) , CredentialsProviderFactory(credentialsProviderFactory) , YqSharedResources(yqSharedResources) , CredentialsFactory(credentialsFactory) @@ -586,7 +582,6 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintState::TPtr&) { std::unique_ptr NewRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -596,7 +591,6 @@ std::unique_ptr NewRowDispatcher( { return std::unique_ptr(new TRowDispatcher( config, - commonConfig, credentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h index 54c3b1521afd..ff71aab8bd6a 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h @@ -16,7 +16,6 @@ namespace NFq { std::unique_ptr NewRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp index 1300f419d7de..e314da700134 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp @@ -11,7 +11,6 @@ using namespace NActors; std::unique_ptr NewRowDispatcherService( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -20,7 +19,6 @@ std::unique_ptr NewRowDispatcherService( { return NewRowDispatcher( config, - commonConfig, credentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h index ef8a9f29099d..c3ee492c0665 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h @@ -16,7 +16,6 @@ namespace NFq { std::unique_ptr NewRowDispatcherService( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp index 93ccaa8c151e..bdef4408327e 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp @@ -23,15 +23,18 @@ class TFixture : public NUnitTest::TBaseFixture { Runtime.Initialize(app->Unwrap()); Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG); auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory; - auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive())); + YqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive())); RowDispatcher = Runtime.AllocateEdgeActor(); Coordinator1 = Runtime.AllocateEdgeActor(); Coordinator2 = Runtime.AllocateEdgeActor(); Coordinator3 = Runtime.AllocateEdgeActor(); + } + void Init(bool localMode = false) { NConfig::TRowDispatcherCoordinatorConfig config; config.SetCoordinationNodePath("row_dispatcher"); + config.SetLocalMode(localMode); auto& database = *config.MutableDatabase(); database.SetEndpoint(GetEnv("YDB_ENDPOINT")); database.SetDatabase(GetEnv("YDB_DATABASE")); @@ -42,7 +45,7 @@ class TFixture : public NUnitTest::TBaseFixture { Coordinator1, config, NKikimr::CreateYdbCredentialsProviderFactory, - yqSharedResources, + YqSharedResources, "/tenant", MakeIntrusive() ).release()); @@ -52,7 +55,7 @@ class TFixture : public NUnitTest::TBaseFixture { Coordinator2, config, NKikimr::CreateYdbCredentialsProviderFactory, - yqSharedResources, + YqSharedResources, "/tenant", MakeIntrusive() ).release()); @@ -62,7 +65,7 @@ class TFixture : public NUnitTest::TBaseFixture { Coordinator3, config, NKikimr::CreateYdbCredentialsProviderFactory, - yqSharedResources, + YqSharedResources, "/tenant", MakeIntrusive() ).release()); @@ -95,10 +98,12 @@ class TFixture : public NUnitTest::TBaseFixture { NActors::TActorId Coordinator2; NActors::TActorId Coordinator3; NActors::TActorId LeaderDetector; + TYqSharedResources::TPtr YqSharedResources; }; Y_UNIT_TEST_SUITE(LeaderElectionTests) { Y_UNIT_TEST_F(Test1, TFixture) { + Init(); auto coordinatorId1 = ExpectCoordinatorChanged(); auto coordinatorId2 = ExpectCoordinatorChanged(); @@ -134,7 +139,15 @@ Y_UNIT_TEST_SUITE(LeaderElectionTests) { auto coordinatorId6 = ExpectCoordinatorChanged(); UNIT_ASSERT(coordinatorId6 != coordinatorId4); } -} + Y_UNIT_TEST_F(TestLocalMode, TFixture) { + Init(true); + auto coordinatorId1 = ExpectCoordinatorChanged(); + auto coordinatorId2 = ExpectCoordinatorChanged(); + auto coordinatorId3 = ExpectCoordinatorChanged(); + TSet set {coordinatorId1, coordinatorId2, coordinatorId3}; + UNIT_ASSERT(set.size() == 3); + } } +} diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index f5641e815539..550c35447acc 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -61,7 +61,6 @@ class TFixture : public NUnitTest::TBaseFixture { database.SetDatabase("YDB_DATABASE"); database.SetToken(""); - NConfig::TCommonConfig commonConfig; auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory; auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive())); @@ -74,7 +73,6 @@ class TFixture : public NUnitTest::TBaseFixture { RowDispatcher = Runtime.Register(NewRowDispatcher( config, - commonConfig, NKikimr::CreateYdbCredentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp index c882575dd16d..55f284b8f865 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp @@ -78,7 +78,7 @@ class TBlockingEQueue { class TFileTopicReadSession : public NYdb::NTopic::IReadSession { -constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100); +constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); public: TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = ""): @@ -182,10 +182,14 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100); TString rawMsg; TVector msgs; size_t size = 0; + ui64 maxBatchRowSize = 100; while (size_t read = fi.ReadLine(rawMsg)) { msgs.emplace_back(MakeNextMessage(rawMsg)); MsgOffset_++; + if (!maxBatchRowSize--) { + break; + } size += rawMsg.size(); } if (!msgs.empty()) {