From 7464ec90ef80761c35a0e385a02669b54c69a923 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 9 Feb 2024 17:06:01 +0100 Subject: [PATCH] add env var to disable delete tasks service (#4559) Co-authored-by: Adrien Guillo --- .../quickwit-janitor/src/janitor_service.rs | 10 +++++-- quickwit/quickwit-janitor/src/lib.rs | 29 ++++++++++++------- quickwit/quickwit-serve/src/lib.rs | 2 ++ 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/quickwit/quickwit-janitor/src/janitor_service.rs b/quickwit/quickwit-janitor/src/janitor_service.rs index 584e19a7903..91e821fad61 100644 --- a/quickwit/quickwit-janitor/src/janitor_service.rs +++ b/quickwit/quickwit-janitor/src/janitor_service.rs @@ -26,14 +26,14 @@ use serde_json::{json, Value as JsonValue}; use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor}; pub struct JanitorService { - delete_task_service_handle: ActorHandle, + delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, } impl JanitorService { pub fn new( - delete_task_service_handle: ActorHandle, + delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, ) -> Self { @@ -45,7 +45,11 @@ impl JanitorService { } fn is_healthy(&self) -> bool { - self.delete_task_service_handle.state() != ActorState::Failure + self.delete_task_service_handle + .as_ref() + .map_or(true, |delete_task_service_handle| { + delete_task_service_handle.state() != ActorState::Failure + }) && self.garbage_collector_handle.state() != ActorState::Failure && self.retention_policy_executor_handle.state() != ActorState::Failure } diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index 70783d860db..213d0d5465d 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -51,6 +51,7 @@ pub async fn start_janitor_service( search_job_placer: SearchJobPlacer, storage_resolver: StorageResolver, event_broker: EventBroker, + run_delete_task_service: bool, ) -> anyhow::Result> { info!("starting janitor service"); let garbage_collector = GarbageCollector::new(metastore.clone(), storage_resolver.clone()); @@ -59,17 +60,23 @@ pub async fn start_janitor_service( let retention_policy_executor = RetentionPolicyExecutor::new(metastore.clone()); let (_, retention_policy_executor_handle) = universe.spawn_builder().spawn(retention_policy_executor); - let delete_task_service = DeleteTaskService::new( - metastore, - search_job_placer, - storage_resolver, - config.data_dir_path.clone(), - config.indexer_config.max_concurrent_split_uploads, - universe.get_or_spawn_one::(), - event_broker, - ) - .await?; - let (_, delete_task_service_handle) = universe.spawn_builder().spawn(delete_task_service); + let delete_task_service_handle = if run_delete_task_service { + let delete_task_service = DeleteTaskService::new( + metastore, + search_job_placer, + storage_resolver, + config.data_dir_path.clone(), + config.indexer_config.max_concurrent_split_uploads, + universe.get_or_spawn_one::(), + event_broker, + ) + .await?; + let (_, delete_task_service_handle) = universe.spawn_builder().spawn(delete_task_service); + Some(delete_task_service_handle) + } else { + tracing::warn!("delete task service is disabled: delete queries will not be processed"); + None + }; let janitor_service = JanitorService::new( delete_task_service_handle, diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index a2b551fc1d1..f7a5e851459 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -126,6 +126,7 @@ const READINESS_REPORTING_INTERVAL: Duration = if cfg!(any(test, feature = "test const METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY: &str = "QW_METASTORE_CLIENT_MAX_CONCURRENCY"; const DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY: usize = 6; +const DISABLE_DELETE_TASK_SERVICE_ENV_KEY: &str = "QW_DISABLE_DELETE_TASK_SERVICE"; fn get_metastore_client_max_concurrency() -> usize { std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok() @@ -514,6 +515,7 @@ pub async fn serve_quickwit( search_job_placer, storage_resolver.clone(), event_broker.clone(), + std::env::var(DISABLE_DELETE_TASK_SERVICE_ENV_KEY).is_err(), ) .await?; Some(janitor_service)