Skip to content

Commit

Permalink
Refactor MetastoreError
Browse files Browse the repository at this point in the history
Additionally, move `MetastoreError` and `MetastoreResult` to
`quickwit-proto`.
  • Loading branch information
guilload committed Aug 17, 2023
1 parent 5b205cb commit 3f34bed
Show file tree
Hide file tree
Showing 59 changed files with 887 additions and 786 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID};
use quickwit_metastore::{MetastoreError, MetastoreResolver, SplitState};
use quickwit_metastore::{MetastoreResolver, SplitState};
use quickwit_proto::metastore::{EntityKind, MetastoreError};
use serde_json::{json, Number, Value};
use tokio::time::{sleep, Duration};

Expand Down Expand Up @@ -98,7 +99,7 @@ async fn test_cmd_create() {

// Creating an existing index should fail.
let error = create_logs_index(&test_env).await.unwrap_err();
assert!(error.to_string().contains("already exists"),);
assert!(error.to_string().contains("already exist(s)"),);
}

#[tokio::test]
Expand Down Expand Up @@ -187,9 +188,9 @@ async fn test_cmd_ingest_on_non_existing_index() {

assert_eq!(
error.root_cause().downcast_ref::<MetastoreError>().unwrap(),
&MetastoreError::IndexesDoNotExist {
index_ids: vec!["index-does-not-exist".to_string()]
}
&MetastoreError::NotFound(EntityKind::Index {
index_id: "index-does-not-exist".to_string()
})
);
}

Expand Down
12 changes: 6 additions & 6 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use std::time::Duration;

use futures::Future;
use quickwit_common::{PrettySample, Progress};
use quickwit_metastore::{
ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_metastore::{ListSplitsQuery, Metastore, SplitInfo, SplitMetadata, SplitState};
use quickwit_proto::metastore::MetastoreError;
use quickwit_proto::IndexUid;
use quickwit_storage::{BulkDeleteError, Storage};
use thiserror::Error;
Expand Down Expand Up @@ -330,6 +329,7 @@ mod tests {
use quickwit_metastore::{
metastore_for_test, ListSplitsQuery, MockMetastore, SplitMetadata, SplitState,
};
use quickwit_proto::metastore::EntityKind;
use quickwit_proto::IndexUid;
use quickwit_storage::{
storage_for_test, BulkDeleteError, DeleteFailure, MockStorage, PutPayload,
Expand Down Expand Up @@ -652,9 +652,9 @@ mod tests {

let mut mock_metastore = MockMetastore::new();
mock_metastore.expect_delete_splits().return_once(|_, _| {
Err(MetastoreError::IndexesDoNotExist {
index_ids: vec![index_id.to_string()],
})
Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_id.to_string(),
}))
});
let metastore = Arc::new(mock_metastore);

Expand Down
35 changes: 18 additions & 17 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
IndexMetadata, ListSplitsQuery, Metastore, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_proto::metastore::{EntityKind, MetastoreError};
use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode};
use quickwit_storage::{StorageResolver, StorageResolverError};
use thiserror::Error;
Expand All @@ -40,11 +41,11 @@ use crate::garbage_collection::{
#[derive(Error, Debug)]
pub enum IndexServiceError {
#[error("Failed to resolve the storage `{0}`.")]
StorageError(#[from] StorageResolverError),
Storage(#[from] StorageResolverError),
#[error("Metastore error `{0}`.")]
MetastoreError(#[from] MetastoreError),
Metastore(#[from] MetastoreError),
#[error("Split deletion error `{0}`.")]
SplitDeletionError(#[from] DeleteSplitsError),
SplitDeletion(#[from] DeleteSplitsError),
#[error("Invalid config: {0:#}.")]
InvalidConfig(anyhow::Error),
#[error("Invalid identifier: {0}.")]
Expand All @@ -58,13 +59,13 @@ pub enum IndexServiceError {
impl ServiceError for IndexServiceError {
fn status_code(&self) -> ServiceErrorCode {
match self {
Self::StorageError(_) => ServiceErrorCode::Internal,
Self::MetastoreError(error) => error.status_code(),
Self::SplitDeletionError(_) => ServiceErrorCode::Internal,
Self::Internal(_) => ServiceErrorCode::Internal,
Self::InvalidConfig(_) => ServiceErrorCode::BadRequest,
Self::InvalidIdentifier(_) => ServiceErrorCode::BadRequest,
Self::Metastore(error) => error.status_code(),
Self::OperationNotAllowed(_) => ServiceErrorCode::MethodNotAllowed,
Self::Internal(_) => ServiceErrorCode::Internal,
Self::SplitDeletion(_) => ServiceErrorCode::Internal,
Self::Storage(_) => ServiceErrorCode::Internal,
}
}
}
Expand Down Expand Up @@ -102,10 +103,10 @@ impl IndexService {
if overwrite {
match self.delete_index(&index_config.index_id, false).await {
Ok(_)
| Err(IndexServiceError::MetastoreError(MetastoreError::IndexesDoNotExist {
index_ids: _,
})) => {
// Ignore IndexesDoNotExist error.
| Err(IndexServiceError::Metastore(MetastoreError::NotFound(
EntityKind::Index { .. },
))) => {
// Ignore index not found error.
}
Err(error) => {
return Err(error);
Expand Down Expand Up @@ -320,9 +321,9 @@ impl IndexService {
.sources
.get(source_id)
.ok_or_else(|| {
IndexServiceError::MetastoreError(MetastoreError::SourceDoesNotExist {
IndexServiceError::Metastore(MetastoreError::NotFound(EntityKind::Source {
source_id: source_id.to_string(),
})
}))
})?
.clone();

Expand Down Expand Up @@ -380,11 +381,11 @@ mod tests {
.create_index(index_config.clone(), false)
.await
.unwrap_err();
let IndexServiceError::MetastoreError(inner_error) = error else {
let IndexServiceError::Metastore(inner_error) = error else {
panic!("Expected `MetastoreError` variant, got {:?}", error)
};
assert!(
matches!(inner_error, MetastoreError::IndexAlreadyExists { index_id } if index_id == index_metadata_0.index_id())
matches!(inner_error, MetastoreError::AlreadyExists(EntityKind::Index { index_id }) if index_id == index_metadata_0.index_id())
);

let index_metadata_1 = index_service
Expand Down Expand Up @@ -442,7 +443,7 @@ mod tests {
.await
.unwrap_err();
assert!(
matches!(error, MetastoreError::IndexesDoNotExist { index_ids } if index_ids == vec![index_uid.index_id().to_string()])
matches!(error, MetastoreError::NotFound(EntityKind::Index { index_id }) if index_id == index_uid.index_id())
);
assert!(!storage.exists(split_path).await.unwrap());
}
Expand Down
10 changes: 6 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_config::{IndexingSettings, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, MetastoreError};
use quickwit_metastore::Metastore;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use quickwit_storage::Storage;
use tokio::join;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -504,7 +505,7 @@ impl Handler<Spawn> for IndexingPipeline {
}
self.previous_generations_statistics.num_spawn_attempts = 1 + spawn.retry_count;
if let Err(spawn_error) = self.spawn_pipeline(ctx).await {
if let Some(MetastoreError::IndexesDoNotExist { .. }) =
if let Some(MetastoreError::NotFound { .. }) =
spawn_error.downcast_ref::<MetastoreError>()
{
info!(error = ?spawn_error, "Could not spawn pipeline, index might have been deleted.");
Expand Down Expand Up @@ -550,7 +551,8 @@ mod tests {
use quickwit_actors::{Command, Universe};
use quickwit_config::{IndexingSettings, SourceInputFormat, SourceParams, VoidSourceParams};
use quickwit_doc_mapper::{default_doc_mapper_for_test, DefaultDocMapper};
use quickwit_metastore::{IndexMetadata, MetastoreError, MockMetastore};
use quickwit_metastore::{IndexMetadata, MockMetastore};
use quickwit_proto::metastore::MetastoreError;
use quickwit_proto::IndexUid;
use quickwit_storage::RamStorage;

Expand Down Expand Up @@ -583,7 +585,7 @@ mod tests {
return Ok(index_metadata);
}
num_fails -= 1;
Err(MetastoreError::ConnectionError {
Err(MetastoreError::Connection {
message: "MetastoreError Alarm".to_string(),
})
});
Expand Down
16 changes: 7 additions & 9 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use quickwit_common::io::IoControls;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState};
use quickwit_metastore::{ListSplitsQuery, Metastore, SplitState};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};
Expand Down Expand Up @@ -428,7 +429,7 @@ impl Handler<Spawn> for MergePipeline {
}
self.previous_generations_statistics.num_spawn_attempts = 1 + spawn.retry_count;
if let Err(spawn_error) = self.spawn_pipeline(ctx).await {
if let Some(MetastoreError::IndexesDoNotExist { .. }) =
if let Some(MetastoreError::NotFound { .. }) =
spawn_error.downcast_ref::<MetastoreError>()
{
info!(error = ?spawn_error, "Could not spawn pipeline, index might have been deleted.");
Expand Down Expand Up @@ -491,17 +492,14 @@ mod tests {
.expect_list_splits()
.times(1)
.returning(move |list_split_query| {
assert_eq!(list_split_query.index_uids, vec![index_uid.clone()]);
assert_eq!(list_split_query.index_uids, &[index_uid.clone()]);
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
);
match list_split_query.mature {
Bound::Excluded(_) => {}
_ => {
panic!("Expected excluded bound.");
}
}
let Bound::Excluded(_) = list_split_query.mature else {
panic!("Expected excluded bound.");
};
Ok(Vec::new())
});
let universe = Universe::with_accelerated_time();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ mod tests {
.returning(move |_: LeafSearchRequest| {
if leaf_search_num_failures > 0 {
leaf_search_num_failures -= 1;
return Err(SearchError::InternalError("leaf search error".to_string()));
return Err(SearchError::Internal("leaf search error".to_string()));
}
Ok(LeafSearchResponse {
num_hits: 1,
Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ use quickwit_common::uri::Uri;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_indexing::actors::MergeSplitDownloader;
use quickwit_indexing::merge_policy::MergeOperation;
use quickwit_metastore::{
split_tag_filter, split_time_range_filter, Metastore, MetastoreResult, Split,
};
use quickwit_proto::metastore::DeleteTask;
use quickwit_metastore::{split_tag_filter, split_time_range_filter, Metastore, Split};
use quickwit_proto::metastore::{DeleteTask, MetastoreResult};
use quickwit_proto::search::SearchRequest;
use quickwit_proto::IndexUid;
use quickwit_search::{jobs_to_leaf_requests, IndexMetasForLeafSearch, SearchJob, SearchJobPlacer};
Expand Down
11 changes: 5 additions & 6 deletions quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ mod tests {
use quickwit_actors::Universe;
use quickwit_common::shared_consts::DELETION_GRACE_PERIOD;
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, MetastoreError, MockMetastore, Split, SplitMetadata,
SplitState,
IndexMetadata, ListSplitsQuery, MockMetastore, Split, SplitMetadata, SplitState,
};
use quickwit_proto::metastore::MetastoreError;
use quickwit_storage::MockStorage;
use time::OffsetDateTime;

Expand Down Expand Up @@ -479,7 +479,7 @@ mod tests {
.expect_list_indexes_metadatas()
.times(4)
.returning(move |_list_indexes_query: ListIndexesQuery| {
Err(MetastoreError::DbError {
Err(MetastoreError::Db {
message: "Fail to list indexes.".to_string(),
})
});
Expand Down Expand Up @@ -552,11 +552,10 @@ mod tests {
assert!(["test-index-1", "test-index-2"].contains(&query.index_uids[0].index_id()));

if query.index_uids[0].index_id() == "test-index-2" {
return Err(MetastoreError::DbError {
return Err(MetastoreError::Db {
message: "fail to delete".to_string(),
});
}

let splits = match query.split_states[0] {
SplitState::Staged => make_splits(&["a"], SplitState::Staged),
SplitState::MarkedForDeletion => {
Expand Down Expand Up @@ -649,7 +648,7 @@ mod tests {
// instead this should simply get logged and return the list of splits
// which have successfully been deleted.
if index_uid.index_id() == "test-index-2" {
Err(MetastoreError::DbError {
Err(MetastoreError::Db {
message: "fail to delete".to_string(),
})
} else {
Expand Down
14 changes: 7 additions & 7 deletions quickwit/quickwit-janitor/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_metastore::MetastoreError;
use quickwit_proto::metastore::MetastoreError;
use quickwit_proto::{ServiceError, ServiceErrorCode};
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand All @@ -28,18 +28,18 @@ use thiserror::Error;
pub enum JanitorError {
#[error("Invalid delete query: `{0}`.")]
InvalidDeleteQuery(String),
#[error("Internal error: {0}")]
InternalError(String),
#[error("Metastore error `{0}`.")]
MetastoreError(#[from] MetastoreError),
#[error("Internal error: `{0}`")]
Internal(String),
#[error("Metastore error: `{0}`.")]
Metastore(#[from] MetastoreError),
}

impl ServiceError for JanitorError {
fn status_code(&self) -> ServiceErrorCode {
match self {
JanitorError::InvalidDeleteQuery(_) => ServiceErrorCode::BadRequest,
JanitorError::InternalError(_) => ServiceErrorCode::Internal,
JanitorError::MetastoreError(error) => error.status_code(),
JanitorError::Internal(_) => ServiceErrorCode::Internal,
JanitorError::Metastore(error) => error.status_code(),
}
}
}
13 changes: 7 additions & 6 deletions quickwit/quickwit-metastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ documentation = "https://quickwit.io/docs/"
anyhow = { workspace = true }
async-trait = { workspace = true }
byte-unit = { workspace = true }
sqlx = { workspace = true, optional = true }
futures = { workspace = true }
http = { workspace = true }
itertools = { workspace = true }
Expand All @@ -24,15 +23,16 @@ regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
sqlx = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
utoipa = { workspace = true }
ulid = { workspace = true, features = ["serde"] }
utoipa = { workspace = true }

quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
Expand All @@ -47,15 +47,16 @@ futures = { workspace = true }
md5 = { workspace = true }
mockall = { workspace = true }
rand = { workspace = true }
tracing-subscriber = { workspace = true }
tempfile = { workspace = true }
tracing-subscriber = { workspace = true }

quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
quickwit-doc-mapper = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true, features = ["testsuite"] }

[features]
testsuite = ["mockall", "tempfile", "quickwit-config/testsuite"]
ci-test = []
postgres = ["sqlx"]
azure = ["quickwit-storage/azure"]
ci-test = []
postgres = ["quickwit-proto/postgres", "sqlx"]
testsuite = ["mockall", "tempfile", "quickwit-config/testsuite"]
Loading

0 comments on commit 3f34bed

Please sign in to comment.