From 1b5b02df62a664c54bf49b06bfdd0043e70b3f75 Mon Sep 17 00:00:00 2001 From: Jerry Date: Fri, 29 Sep 2023 21:41:57 +0800 Subject: [PATCH] add delete_indexes to index_management add delete indexes by index id patterns --- .../quickwit-index-management/src/index.rs | 43 ++++++++++++++++++- .../src/index_api/rest_handler.rs | 35 ++------------- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 7d06d3982ce..d9ac677e112 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -21,11 +21,14 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; +use futures::future::try_join_all; +use itertools::Itertools; use quickwit_common::fs::{empty_dir, get_cache_directory_path}; use quickwit_config::{validate_identifier, IndexConfig, SourceConfig}; use quickwit_indexing::check_source_connectivity; use quickwit_metastore::{ - IndexMetadata, ListSplitsQuery, Metastore, SplitInfo, SplitMetadata, SplitState, + IndexMetadata, ListIndexesQuery, ListSplitsQuery, Metastore, SplitInfo, SplitMetadata, + SplitState, }; use quickwit_proto::metastore::{EntityKind, MetastoreError}; use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode}; @@ -193,6 +196,44 @@ impl IndexService { Ok(deleted_splits) } + /// Deletes the indexes specified with `index_id_patterns`. + /// This is a wrapper of delete_index, and support index delete with index pattern + /// + /// * `index_id_patterns` - The target index Id patterns. + /// * `dry_run` - Should this only return a list of affected files without performing deletion. + pub async fn delete_indexes( + &self, + index_id_patterns: Vec, + dry_run: bool, + ) -> Result, IndexServiceError> { + // get all index_ids by index_id_patterns + let indexes_metadata = self + .metastore() + .list_indexes_metadatas(ListIndexesQuery::IndexIdPatterns(index_id_patterns.clone())) + .await?; + // if indexes_metadata.is_empty() { + // return Err(IndexServiceError::Internal(format!( + // "can not find index using: {:?}", + // index_id_patterns + // ))); + // } + let index_ids = indexes_metadata + .iter() + .map(|index_metadata| index_metadata.index_id()) + .collect_vec(); + info!(index_ids = ?index_ids); + + // setup delete index tasks + let mut delete_index_tasks = Vec::new(); + for index_id in index_ids { + delete_index_tasks.push(async move { + info!("delete_index:{}", index_id); + self.delete_index(index_id, dry_run).await + }) + } + let delete_index_responses: Vec> = try_join_all(delete_index_tasks).await?; + Ok(delete_index_responses.concat()) + } /// Detect all dangling splits and associated files from the index and removes them. /// /// * `index_id` - The target index Id. diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index a8d45262c54..9f4ced84292 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -20,9 +20,7 @@ use std::sync::Arc; use bytes::Bytes; -use futures::future::try_join_all; use hyper::header::CONTENT_TYPE; -use itertools::Itertools; use quickwit_common::uri::Uri; use quickwit_config::{ load_source_config_from_user_config, ConfigFormat, NodeConfig, SourceConfig, SourceParams, @@ -511,36 +509,9 @@ async fn delete_index( index_service: IndexService, ) -> Result, IndexServiceError> { info!(index_id_patterns = ?index_id_patterns, dry_run = delete_index_query_param.dry_run, "delete_index"); - // get all index_ids by index_id_patterns - let indexes_metadata = index_service - .metastore() - .list_indexes_metadatas(ListIndexesQuery::IndexIdPatterns(index_id_patterns.clone())) - .await?; - if indexes_metadata.is_empty() { - return Err(IndexServiceError::Internal(format!( - "can not find index using: {:?}", - index_id_patterns - ))); - } - let index_ids = indexes_metadata - .iter() - .map(|index_metadata| index_metadata.index_id()) - .collect_vec(); - info!(index_ids = ?index_ids); - let index_service_arc = Arc::new(index_service); - - let mut delete_index_tasks = Vec::new(); - for index_id in index_ids { - let index_service_clone = index_service_arc.clone(); - delete_index_tasks.push(async move { - info!("delete_index:{}", index_id); - index_service_clone - .delete_index(index_id, delete_index_query_param.dry_run) - .await - }) - } - let delete_index_responses: Vec> = try_join_all(delete_index_tasks).await?; - Ok(delete_index_responses.concat()) + index_service + .delete_indexes(index_id_patterns, delete_index_query_param.dry_run) + .await } fn create_source_handler(