Skip to content

Commit

Permalink
add delete_indexes to index_management
Browse files Browse the repository at this point in the history
add delete indexes by index id patterns
  • Loading branch information
JerryKwan committed Sep 29, 2023
1 parent 23df2e5 commit 1b5b02d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 33 deletions.
43 changes: 42 additions & 1 deletion quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use futures::future::try_join_all;
use itertools::Itertools;
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, Metastore, SplitInfo, SplitMetadata, SplitState,
IndexMetadata, ListIndexesQuery, ListSplitsQuery, Metastore, SplitInfo, SplitMetadata,
SplitState,
};
use quickwit_proto::metastore::{EntityKind, MetastoreError};
use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode};
Expand Down Expand Up @@ -193,6 +196,44 @@ impl IndexService {
Ok(deleted_splits)
}

/// Deletes the indexes specified with `index_id_patterns`.
/// This is a wrapper of delete_index, and support index delete with index pattern
///
/// * `index_id_patterns` - The target index Id patterns.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
pub async fn delete_indexes(
&self,
index_id_patterns: Vec<String>,
dry_run: bool,
) -> Result<Vec<SplitInfo>, IndexServiceError> {
// get all index_ids by index_id_patterns
let indexes_metadata = self
.metastore()
.list_indexes_metadatas(ListIndexesQuery::IndexIdPatterns(index_id_patterns.clone()))
.await?;
// if indexes_metadata.is_empty() {
// return Err(IndexServiceError::Internal(format!(
// "can not find index using: {:?}",
// index_id_patterns
// )));
// }
let index_ids = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect_vec();
info!(index_ids = ?index_ids);

// setup delete index tasks
let mut delete_index_tasks = Vec::new();
for index_id in index_ids {
delete_index_tasks.push(async move {
info!("delete_index:{}", index_id);
self.delete_index(index_id, dry_run).await
})
}
let delete_index_responses: Vec<Vec<SplitInfo>> = try_join_all(delete_index_tasks).await?;
Ok(delete_index_responses.concat())
}
/// Detect all dangling splits and associated files from the index and removes them.
///
/// * `index_id` - The target index Id.
Expand Down
35 changes: 3 additions & 32 deletions quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
use std::sync::Arc;

use bytes::Bytes;
use futures::future::try_join_all;
use hyper::header::CONTENT_TYPE;
use itertools::Itertools;
use quickwit_common::uri::Uri;
use quickwit_config::{
load_source_config_from_user_config, ConfigFormat, NodeConfig, SourceConfig, SourceParams,
Expand Down Expand Up @@ -511,36 +509,9 @@ async fn delete_index(
index_service: IndexService,
) -> Result<Vec<SplitInfo>, IndexServiceError> {
info!(index_id_patterns = ?index_id_patterns, dry_run = delete_index_query_param.dry_run, "delete_index");
// get all index_ids by index_id_patterns
let indexes_metadata = index_service
.metastore()
.list_indexes_metadatas(ListIndexesQuery::IndexIdPatterns(index_id_patterns.clone()))
.await?;
if indexes_metadata.is_empty() {
return Err(IndexServiceError::Internal(format!(
"can not find index using: {:?}",
index_id_patterns
)));
}
let index_ids = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect_vec();
info!(index_ids = ?index_ids);
let index_service_arc = Arc::new(index_service);

let mut delete_index_tasks = Vec::new();
for index_id in index_ids {
let index_service_clone = index_service_arc.clone();
delete_index_tasks.push(async move {
info!("delete_index:{}", index_id);
index_service_clone
.delete_index(index_id, delete_index_query_param.dry_run)
.await
})
}
let delete_index_responses: Vec<Vec<SplitInfo>> = try_join_all(delete_index_tasks).await?;
Ok(delete_index_responses.concat())
index_service
.delete_indexes(index_id_patterns, delete_index_query_param.dry_run)
.await
}

fn create_source_handler(
Expand Down

0 comments on commit 1b5b02d

Please sign in to comment.