From 15f5887bb0e526260fa1f44a7ebca29b81b27480 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Wed, 20 Dec 2023 18:28:30 +0100 Subject: [PATCH] Implement Shard API for PostgreSQL metastore --- .../postgresql/14_update-shard-id.down.sql | 7 + .../postgresql/14_update-shard-id.up.sql | 7 + .../src/backward_compatibility_tests/mod.rs | 2 +- quickwit/quickwit-metastore/src/lib.rs | 6 +- .../file_backed_index/mod.rs | 26 +- .../file_backed_index/serialize.rs | 2 +- .../file_backed_index/shards.rs | 19 +- .../file_backed_metastore_factory.rs | 2 +- .../lazy_file_backed_index.rs | 0 .../mod.rs | 29 + .../store_operations.rs | 2 +- .../quickwit-metastore/src/metastore/mod.rs | 8 +- .../src/metastore/postgres/error.rs | 70 ++ .../src/metastore/postgres/factory.rs | 100 +++ .../src/metastore/postgres/migrator.rs | 45 + .../mod.rs} | 835 +++++++++--------- .../model.rs} | 67 +- .../postgres/queries/acquire_shards.sql | 9 + .../postgres/queries/delete_shards.sql | 10 + .../postgres/queries/fetch_shard.sql | 8 + .../postgres/queries/insert_shard.sql | 13 + .../postgres/queries/list_shards.sql | 11 + .../metastore/postgres/queries/open_shard.sql | 12 + .../src/metastore/postgres/split_stream.rs | 43 + .../src/metastore/postgres/utils.rs | 201 +++++ .../src/metastore_resolver.rs | 4 +- .../src/tests/list_splits.rs | 12 +- quickwit/quickwit-metastore/src/tests/mod.rs | 27 +- .../quickwit-metastore/src/tests/shard.rs | 720 ++++++++++++--- quickwit/quickwit-proto/src/types/mod.rs | 6 + 30 files changed, 1729 insertions(+), 574 deletions(-) create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.down.sql create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.up.sql rename quickwit/quickwit-metastore/src/metastore/{file_backed_metastore => file_backed}/file_backed_index/mod.rs (97%) rename quickwit/quickwit-metastore/src/metastore/{file_backed_metastore => file_backed}/file_backed_index/serialize.rs (98%) rename quickwit/quickwit-metastore/src/metastore/{file_backed_metastore => file_backed}/file_backed_index/shards.rs (96%) rename quickwit/quickwit-metastore/src/metastore/{file_backed_metastore => file_backed}/file_backed_metastore_factory.rs (98%) rename quickwit/quickwit-metastore/src/metastore/{file_backed_metastore => file_backed}/lazy_file_backed_index.rs (100%) rename quickwit/quickwit-metastore/src/metastore/{file_backed_metastore => file_backed}/mod.rs (98%) rename quickwit/quickwit-metastore/src/metastore/{file_backed_metastore => file_backed}/store_operations.rs (99%) create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/error.rs create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/factory.rs create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs rename quickwit/quickwit-metastore/src/metastore/{postgresql_metastore.rs => postgres/mod.rs} (76%) rename quickwit/quickwit-metastore/src/metastore/{postgresql_model.rs => postgres/model.rs} (81%) create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/acquire_shards.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/delete_shards.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/fetch_shard.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/insert_shard.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/list_shards.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/open_shard.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/utils.rs diff --git a/quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.down.sql new file mode 100644 index 00000000000..045d375fd39 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.down.sql @@ -0,0 +1,7 @@ +ALTER TABLE shards + ALTER COLUMN shard_id TYPE BIGSERIAL, + ALTER COLUMN shard_id DROP NOT NULL, + ALTER COLUMN shard_state DROP DEFAULT, + ALTER COLUMN publish_position_inclusive DROP DEFAULT, + DROP CONSTRAINT shards_index_uid_fkey, + ADD CONSTRAINT shards_index_uid_fkey FOREIGN KEY (index_uid) REFERENCES indexes(index_uid) diff --git a/quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.up.sql new file mode 100644 index 00000000000..e6d1e2a290f --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.up.sql @@ -0,0 +1,7 @@ +ALTER TABLE shards + ALTER COLUMN shard_id TYPE VARCHAR(255), + ALTER COLUMN shard_id SET NOT NULL, + ALTER COLUMN shard_state SET DEFAULT 'open', + ALTER COLUMN publish_position_inclusive SET DEFAULT '', + DROP CONSTRAINT shards_index_uid_fkey, + ADD CONSTRAINT shards_index_uid_fkey FOREIGN KEY (index_uid) REFERENCES indexes(index_uid) ON DELETE CASCADE diff --git a/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs b/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs index 7f78688d428..9618e24d3df 100644 --- a/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs +++ b/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs @@ -25,7 +25,7 @@ use quickwit_config::TestableForRegression; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; -use crate::file_backed_metastore::file_backed_index::FileBackedIndex; +use crate::file_backed::file_backed_index::FileBackedIndex; use crate::{IndexMetadata, SplitMetadata}; /// In order to avoid confusion, we need to make sure that the diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index 17328938b95..7d93e9c1175 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -43,12 +43,12 @@ use std::ops::Range; pub use error::MetastoreResolverError; pub use metastore::control_plane_metastore::ControlPlaneMetastore; -pub use metastore::file_backed_metastore::FileBackedMetastore; +pub use metastore::file_backed::FileBackedMetastore; pub(crate) use metastore::index_metadata::serialize::{IndexMetadataV0_7, VersionedIndexMetadata}; #[cfg(feature = "postgres")] -pub use metastore::postgresql_metastore::PostgresqlMetastore; +pub use metastore::postgres::PostgresqlMetastore; pub use metastore::{ - file_backed_metastore, AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, + file_backed, AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs similarity index 97% rename from quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs rename to quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index ab88c1785e3..8154280cbcf 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -727,12 +727,34 @@ mod tests { use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_doc_mapper::{BinaryFormat, FieldMappingType}; - use quickwit_proto::types::IndexUid; + use quickwit_proto::ingest::Shard; + use quickwit_proto::metastore::ListShardsSubrequest; + use quickwit_proto::types::{IndexUid, SourceId}; use super::FileBackedIndex; - use crate::file_backed_metastore::file_backed_index::split_query_predicate; + use crate::file_backed::file_backed_index::split_query_predicate; use crate::{ListSplitsQuery, Split, SplitMetadata, SplitState}; + impl FileBackedIndex { + pub(crate) fn insert_shards(&mut self, source_id: &SourceId, shards: Vec) { + self.per_source_shards + .get_mut(source_id) + .unwrap() + .insert_shards(shards) + } + + pub(crate) fn list_all_shards(&self, source_id: &SourceId) -> Vec { + self.per_source_shards + .get(source_id) + .unwrap() + .list_shards(ListShardsSubrequest { + ..Default::default() + }) + .unwrap() + .shards + } + } + fn make_splits() -> [Split; 3] { [ Split { diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/serialize.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/serialize.rs similarity index 98% rename from quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/serialize.rs rename to quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/serialize.rs index 072fa53dd03..f59a99cbbff 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/serialize.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/serialize.rs @@ -27,7 +27,7 @@ use quickwit_proto::types::SourceId; use serde::{Deserialize, Serialize}; use super::shards::Shards; -use crate::file_backed_metastore::file_backed_index::FileBackedIndex; +use crate::file_backed::file_backed_index::FileBackedIndex; use crate::metastore::DeleteTask; use crate::{IndexMetadata, Split}; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs similarity index 96% rename from quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs rename to quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index 0160c750a18..5ffead0ee31 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -27,11 +27,11 @@ use quickwit_proto::metastore::{ ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardsSubrequest, OpenShardsSubresponse, }; -use quickwit_proto::types::{queue_id, IndexUid, Position, ShardId, SourceId}; +use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId}; use tracing::{info, warn}; use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta}; -use crate::file_backed_metastore::MutationOccurred; +use crate::file_backed::MutationOccurred; // TODO: Rename `SourceShards` /// Manages the shards of a source. @@ -204,8 +204,8 @@ impl Shards { if let Entry::Occupied(entry) = self.shards.entry(shard_id.clone()) { let shard = entry.get(); if !force && !shard.publish_position_inclusive().is_eof() { - let message = format!("shard `{shard_id}` is not deletable"); - return Err(MetastoreError::InvalidArgument { message }); + warn!("shard `{shard_id}` is not deletable"); + continue; } info!( index_id=%self.index_uid.index_id(), @@ -236,7 +236,7 @@ impl Shards { pub(super) fn try_apply_delta( &mut self, checkpoint_delta: SourceCheckpointDelta, - publish_token: String, + publish_token: PublishToken, ) -> MetastoreResult> { if checkpoint_delta.is_empty() { return Ok(MutationOccurred::No(())); @@ -294,6 +294,15 @@ mod tests { use super::*; + impl Shards { + pub(crate) fn insert_shards(&mut self, shards: Vec) { + for shard in shards { + let shard_id = shard.shard_id().clone(); + self.shards.insert(shard_id, shard); + } + } + } + #[test] fn test_open_shards() { let index_uid: IndexUid = "test-index:0".into(); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_metastore_factory.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs similarity index 98% rename from quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_metastore_factory.rs rename to quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs index a94830a80a0..7781822513e 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_metastore_factory.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs @@ -154,7 +154,7 @@ impl MetastoreFactory for FileBackedMetastoreFactory { mod tests { use std::time::Duration; - use crate::metastore::file_backed_metastore::file_backed_metastore_factory::extract_polling_interval_from_uri; + use crate::metastore::file_backed::file_backed_metastore_factory::extract_polling_interval_from_uri; #[test] fn test_extract_polling_interval_from_uri() { diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/lazy_file_backed_index.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs similarity index 100% rename from quickwit/quickwit-metastore/src/metastore/file_backed_metastore/lazy_file_backed_index.rs rename to quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs similarity index 98% rename from quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs rename to quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 23e6c8f44d8..6da4d2d6f13 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -980,7 +980,9 @@ mod tests { use futures::executor::block_on; use quickwit_common::uri::Protocol; use quickwit_config::IndexConfig; + use quickwit_proto::ingest::Shard; use quickwit_proto::metastore::{DeleteQuery, MetastoreError}; + use quickwit_proto::types::SourceId; use quickwit_query::query_ast::qast_helper; use quickwit_storage::{MockStorage, RamStorage, Storage, StorageErrorKind}; use rand::Rng; @@ -993,9 +995,36 @@ mod tests { }; use super::*; use crate::metastore::MetastoreServiceStreamSplitsExt; + use crate::tests::shard::ReadWriteShardsForTest; use crate::tests::DefaultForTest; use crate::{metastore_test_suite, IndexMetadata, ListSplitsQuery, SplitMetadata, SplitState}; + #[async_trait] + impl ReadWriteShardsForTest for FileBackedMetastore { + async fn insert_shards( + &mut self, + index_uid: &IndexUid, + source_id: &SourceId, + shards: Vec, + ) { + self.mutate(index_uid.clone(), |index| { + index.insert_shards(source_id, shards); + Ok(MutationOccurred::Yes(())) + }) + .await + .unwrap(); + } + + async fn list_all_shards(&self, index_uid: &IndexUid, source_id: &SourceId) -> Vec { + self.read(index_uid.clone(), |index| { + let shards = index.list_all_shards(source_id); + Ok(shards) + }) + .await + .unwrap() + } + } + metastore_test_suite!(crate::FileBackedMetastore); #[tokio::test] diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/store_operations.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/store_operations.rs similarity index 99% rename from quickwit/quickwit-metastore/src/metastore/file_backed_metastore/store_operations.rs rename to quickwit/quickwit-metastore/src/metastore/file_backed/store_operations.rs index c488e5ffca7..8518ee5fd25 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/store_operations.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/store_operations.rs @@ -27,7 +27,7 @@ use quickwit_storage::{Storage, StorageError, StorageErrorKind}; use serde::{Deserialize, Serialize}; use super::{IndexState, LazyFileBackedIndex}; -use crate::metastore::file_backed_metastore::file_backed_index::FileBackedIndex; +use crate::metastore::file_backed::file_backed_index::FileBackedIndex; /// Indexes states file managed by [`FileBackedMetastore`](crate::FileBackedMetastore). const INDEXES_STATES_FILENAME: &str = "indexes_states.json"; diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 73afdcc048d..8bba47e10eb 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -17,12 +17,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -pub mod file_backed_metastore; +pub mod file_backed; pub(crate) mod index_metadata; #[cfg(feature = "postgres")] -pub mod postgresql_metastore; -#[cfg(feature = "postgres")] -mod postgresql_model; +pub mod postgres; pub mod control_plane_metastore; @@ -49,7 +47,7 @@ use crate::checkpoint::IndexCheckpointDelta; use crate::{Split, SplitMetadata, SplitState}; /// Splits batch size returned by the stream splits API -const STREAM_SPLITS_CHUNK_SIZE: usize = 1_000; +const STREAM_SPLITS_CHUNK_SIZE: usize = 100; static METASTORE_METRICS_LAYER: Lazy> = Lazy::new(|| PrometheusMetricsLayer::new("quickwit_metastore", ["request"])); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/error.rs b/quickwit/quickwit-metastore/src/metastore/postgres/error.rs new file mode 100644 index 00000000000..3ea64753dad --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/error.rs @@ -0,0 +1,70 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use quickwit_proto::metastore::{EntityKind, MetastoreError}; +use sqlx::postgres::PgDatabaseError; +use tracing::error; + +// https://www.postgresql.org/docs/current/errcodes-appendix.html +mod pg_error_codes { + pub const FOREIGN_KEY_VIOLATION: &str = "23503"; + pub const UNIQUE_VIOLATION: &str = "23505"; +} + +pub(super) fn convert_sqlx_err(index_id: &str, sqlx_error: sqlx::Error) -> MetastoreError { + match &sqlx_error { + sqlx::Error::Database(boxed_db_error) => { + let pg_db_error = boxed_db_error.downcast_ref::(); + let pg_error_code = pg_db_error.code(); + let pg_error_table = pg_db_error.table(); + + match (pg_error_code, pg_error_table) { + (pg_error_codes::FOREIGN_KEY_VIOLATION, _) => { + MetastoreError::NotFound(EntityKind::Index { + index_id: index_id.to_string(), + }) + } + (pg_error_codes::UNIQUE_VIOLATION, Some(table)) if table.starts_with("indexes") => { + MetastoreError::AlreadyExists(EntityKind::Index { + index_id: index_id.to_string(), + }) + } + (pg_error_codes::UNIQUE_VIOLATION, _) => { + error!(error=?boxed_db_error, "postgresql-error"); + MetastoreError::Internal { + message: "unique key violation".to_string(), + cause: format!("DB error {boxed_db_error:?}"), + } + } + _ => { + error!(error=?boxed_db_error, "postgresql-error"); + MetastoreError::Db { + message: boxed_db_error.to_string(), + } + } + } + } + _ => { + error!(error=?sqlx_error, "an error has occurred in the database operation"); + MetastoreError::Db { + message: sqlx_error.to_string(), + } + } + } +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs new file mode 100644 index 00000000000..32f3fcb5da2 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs @@ -0,0 +1,100 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use quickwit_common::uri::Uri; +use quickwit_config::{MetastoreBackend, MetastoreConfig}; +use quickwit_proto::metastore::MetastoreServiceClient; +use tokio::sync::Mutex; +use tracing::debug; + +use crate::metastore::instrument_metastore; +use crate::{MetastoreFactory, MetastoreResolverError, PostgresqlMetastore}; + +#[derive(Clone, Default)] +pub struct PostgresqlMetastoreFactory { + // Under normal conditions of use, this cache will contain a single `Metastore`. + // + // In contrast to the file-backed metastore, we use a strong pointer here, so that the + // `Metastore` doesn't get dropped. This is done in order to keep the underlying connection + // pool to Postgres alive. + cache: Arc>>, +} + +impl PostgresqlMetastoreFactory { + async fn get_from_cache(&self, uri: &Uri) -> Option { + let cache_lock = self.cache.lock().await; + cache_lock.get(uri).map(MetastoreServiceClient::clone) + } + + /// If there is a valid entry in the cache to begin with, we trash the new + /// one and return the old one. + /// + /// This way we make sure that we keep only one instance associated + /// to the key `uri` outside of this struct. + async fn cache_metastore( + &self, + uri: Uri, + metastore: MetastoreServiceClient, + ) -> MetastoreServiceClient { + let mut cache_lock = self.cache.lock().await; + if let Some(metastore) = cache_lock.get(&uri) { + return metastore.clone(); + } + cache_lock.insert(uri, metastore.clone()); + metastore + } +} + +#[async_trait] +impl MetastoreFactory for PostgresqlMetastoreFactory { + fn backend(&self) -> MetastoreBackend { + MetastoreBackend::PostgreSQL + } + + async fn resolve( + &self, + metastore_config: &MetastoreConfig, + uri: &Uri, + ) -> Result { + if let Some(metastore) = self.get_from_cache(uri).await { + debug!("using metastore from cache"); + return Ok(metastore); + } + debug!("metastore not found in cache"); + let postgresql_metastore_config = metastore_config.as_postgres().ok_or_else(|| { + let message = format!( + "expected PostgreSQL metastore config, got `{:?}`", + metastore_config.backend() + ); + MetastoreResolverError::InvalidConfig(message) + })?; + let postgresql_metastore = PostgresqlMetastore::new(postgresql_metastore_config, uri) + .await + .map_err(MetastoreResolverError::Initialization)?; + let instrumented_metastore = instrument_metastore(postgresql_metastore); + let unique_metastore_for_uri = self + .cache_metastore(uri.clone(), instrumented_metastore) + .await; + Ok(unique_metastore_for_uri) + } +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs new file mode 100644 index 00000000000..cba3dc4c15e --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -0,0 +1,45 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; +use sqlx::migrate::Migrator; +use sqlx::{Pool, Postgres}; +use tracing::{error, instrument}; + +static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql"); + +/// Initializes the database and runs the SQL migrations stored in the +/// `quickwit-metastore/migrations` directory. +#[instrument(skip_all)] +pub(super) async fn run_migrations(pool: &Pool) -> MetastoreResult<()> { + let tx = pool.begin().await?; + let migrate_result = MIGRATOR.run(pool).await; + + let Err(migrate_error) = migrate_result else { + tx.commit().await?; + return Ok(()); + }; + tx.rollback().await?; + error!(error=%migrate_error, "failed to run PostgreSQL migrations"); + + Err(MetastoreError::Internal { + message: "failed to run PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }) +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs similarity index 76% rename from quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs rename to quickwit/quickwit-metastore/src/metastore/postgres/mod.rs index 69efe0e12d9..b7b7ff2e4f0 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs @@ -17,114 +17,60 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; -use std::fmt::{self, Display, Write}; -use std::ops::Bound; -use std::str::FromStr; -use std::sync::Arc; +mod error; +mod factory; +mod migrator; +mod model; +mod split_stream; +mod utils; + +use std::fmt::{self, Write}; use std::time::Duration; use async_trait::async_trait; -use futures::stream::BoxStream; use futures::StreamExt; -use ouroboros::self_referencing; use quickwit_common::uri::Uri; use quickwit_common::{PrettySample, ServiceStream}; -use quickwit_config::{ - validate_index_id_pattern, MetastoreBackend, MetastoreConfig, PostgresMetastoreConfig, -}; +use quickwit_config::{validate_index_id_pattern, PostgresMetastoreConfig, INGEST_V2_SOURCE_ID}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; +use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ - AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, - CreateIndexResponse, DeleteIndexRequest, DeleteQuery, DeleteShardsRequest, + AcquireShardsRequest, AcquireShardsResponse, AcquireShardsSubresponse, AddSourceRequest, + CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind, IndexMetadataRequest, IndexMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, - ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, - MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, - MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, - UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, -}; -use quickwit_proto::types::IndexUid; -use sea_query::{ - all, any, Asterisk, Cond, Expr, Func, Order, PostgresQueryBuilder, Query, SelectStatement, + ListShardsSubresponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, + MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, + MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, OpenShardsSubrequest, + OpenShardsSubresponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, + ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; +use quickwit_proto::types::{IndexUid, Position, PublishToken, SourceId}; +use sea_query::{all, Asterisk, Cond, Expr, PostgresQueryBuilder, Query}; use sea_query_binder::SqlxBinder; -use sqlx::migrate::Migrator; -use sqlx::postgres::{PgConnectOptions, PgDatabaseError, PgPoolOptions}; -use sqlx::{ConnectOptions, Pool, Postgres, Transaction}; -use tokio::sync::Mutex; -use tokio_stream::Stream; -use tracing::log::LevelFilter; -use tracing::{debug, error, info, instrument, warn}; - +use sqlx::{Executor, Pool, Postgres, Transaction}; +use tracing::{debug, info, instrument, warn}; + +use self::error::convert_sqlx_err; +pub use self::factory::PostgresqlMetastoreFactory; +use self::migrator::run_migrations; +use self::model::{PgDeleteTask, PgIndex, PgShard, PgSplit, Splits}; +use self::split_stream::SplitStream; +use self::utils::{append_query_filters, establish_connection}; use super::STREAM_SPLITS_CHUNK_SIZE; -use crate::checkpoint::IndexCheckpointDelta; -use crate::metastore::postgresql_model::{PgDeleteTask, PgIndex, PgSplit, Splits, ToTimestampFunc}; -use crate::metastore::{instrument_metastore, FilterRange, PublishSplitsRequestExt}; +use crate::checkpoint::{ + IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, +}; +use crate::metastore::postgres::utils::split_maturity_timestamp; +use crate::metastore::PublishSplitsRequestExt; use crate::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, - ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, - MetastoreFactory, MetastoreResolverError, MetastoreServiceExt, Split, SplitMaturity, - SplitMetadata, SplitState, StageSplitsRequestExt, + ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, + MetastoreServiceExt, Split, SplitState, StageSplitsRequestExt, }; -static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql"); - -// https://www.postgresql.org/docs/current/errcodes-appendix.html -mod pg_error_code { - pub const FOREIGN_KEY_VIOLATION: &str = "23503"; - pub const UNIQUE_VIOLATION: &str = "23505"; -} - -/// Establishes a connection to the given database URI. -async fn establish_connection( - connection_uri: &Uri, - min_connections: usize, - max_connections: usize, - acquire_timeout: Duration, - idle_timeout_opt: Option, - max_lifetime_opt: Option, -) -> MetastoreResult> { - let pool_options = PgPoolOptions::new() - .min_connections(min_connections as u32) - .max_connections(max_connections as u32) - .acquire_timeout(acquire_timeout) - .idle_timeout(idle_timeout_opt) - .max_lifetime(max_lifetime_opt); - let pg_connect_options: PgConnectOptions = - PgConnectOptions::from_str(connection_uri.as_str())?.log_statements(LevelFilter::Info); - pool_options - .connect_with(pg_connect_options) - .await - .map_err(|error| { - error!(connection_uri=%connection_uri, error=?error, "failed to establish connection to database"); - MetastoreError::Connection { - message: error.to_string(), - } - }) -} - -/// Initialize the database. -/// The sql used for the initialization is stored in quickwit-metastore/migrations directory. -#[instrument(skip_all)] -async fn run_postgres_migrations(pool: &Pool) -> MetastoreResult<()> { - let tx = pool.begin().await?; - let migration_res = MIGRATOR.run(pool).await; - if let Err(migration_err) = migration_res { - tx.rollback().await?; - error!(err=?migration_err, "Database migrations failed"); - return Err(MetastoreError::Internal { - message: "failed to run migration on Postgresql database".to_string(), - cause: migration_err.to_string(), - }); - } - tx.commit().await?; - Ok(()) -} - /// PostgreSQL metastore implementation. #[derive(Clone)] pub struct PostgresqlMetastore { @@ -160,7 +106,8 @@ impl PostgresqlMetastore { None, ) .await?; - run_postgres_migrations(&connection_pool).await?; + run_migrations(&connection_pool).await?; + Ok(PostgresqlMetastore { uri: connection_uri.clone(), connection_pool, @@ -204,7 +151,7 @@ where FOR UPDATE "#, ) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .fetch_optional(executor) .await .map_err(|error| MetastoreError::Db { @@ -227,182 +174,93 @@ async fn index_metadata( .index_metadata() } -/// Extends an existing SQL string with the generated filter range appended to the query. -/// -/// This method is **not** SQL injection proof and should not be used with user-defined values. -fn append_range_filters( - sql: &mut SelectStatement, - field_name: Splits, - filter_range: &FilterRange, - value_formatter: impl Fn(&V) -> Expr, -) { - if let Bound::Included(value) = &filter_range.start { - sql.cond_where(Expr::col(field_name).gte((value_formatter)(value))); - }; - - if let Bound::Excluded(value) = &filter_range.start { - sql.cond_where(Expr::col(field_name).gt((value_formatter)(value))); - }; - - if let Bound::Included(value) = &filter_range.end { - sql.cond_where(Expr::col(field_name).lte((value_formatter)(value))); - }; - - if let Bound::Excluded(value) = &filter_range.end { - sql.cond_where(Expr::col(field_name).lt((value_formatter)(value))); - }; -} - -fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) { - // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. - - let or_condition = query - .index_uids - .iter() - .fold(Cond::any(), |cond, index_uid| { - cond.add(Expr::col(Splits::IndexUid).eq(Expr::val(index_uid.to_string()))) - }); - sql.cond_where(or_condition); - - if !query.split_states.is_empty() { - sql.cond_where( - Expr::col(Splits::SplitState) - .is_in(query.split_states.iter().map(|val| val.to_string())), - ); - }; - - if let Some(tags) = query.tags.as_ref() { - sql.cond_where(tags_filter_expression_helper(tags)); - }; - - match query.time_range.start { - Bound::Included(v) => { - sql.cond_where(any![ - Expr::col(Splits::TimeRangeEnd).gte(v), - Expr::col(Splits::TimeRangeEnd).is_null() - ]); - } - Bound::Excluded(v) => { - sql.cond_where(any![ - Expr::col(Splits::TimeRangeEnd).gt(v), - Expr::col(Splits::TimeRangeEnd).is_null() - ]); - } - Bound::Unbounded => {} - }; - - match query.time_range.end { - Bound::Included(v) => { - sql.cond_where(any![ - Expr::col(Splits::TimeRangeStart).lte(v), - Expr::col(Splits::TimeRangeStart).is_null() - ]); - } - Bound::Excluded(v) => { - sql.cond_where(any![ - Expr::col(Splits::TimeRangeStart).lt(v), - Expr::col(Splits::TimeRangeStart).is_null() - ]); - } - Bound::Unbounded => {} - }; - - match &query.mature { - Bound::Included(evaluation_datetime) => { - sql.cond_where(any![ - Expr::col(Splits::MaturityTimestamp) - .eq(Func::cust(ToTimestampFunc).arg(Expr::val(0))), - Expr::col(Splits::MaturityTimestamp).lte( - Func::cust(ToTimestampFunc) - .arg(Expr::val(evaluation_datetime.unix_timestamp())) - ) - ]); - } - Bound::Excluded(evaluation_datetime) => { - sql.cond_where(Expr::col(Splits::MaturityTimestamp).gt( - Func::cust(ToTimestampFunc).arg(Expr::val(evaluation_datetime.unix_timestamp())), - )); - } - Bound::Unbounded => {} - }; - append_range_filters( - sql, - Splits::UpdateTimestamp, - &query.update_timestamp, - |&val| Expr::expr(Func::cust(ToTimestampFunc).arg(Expr::val(val))), - ); - append_range_filters( - sql, - Splits::CreateTimestamp, - &query.create_timestamp, - |&val| Expr::expr(Func::cust(ToTimestampFunc).arg(Expr::val(val))), - ); - append_range_filters(sql, Splits::DeleteOpstamp, &query.delete_opstamp, |&val| { - Expr::expr(val) - }); - - if let Some(limit) = query.limit { - sql.limit(limit as u64); - } +async fn try_apply_delta_v2( + tx: &mut Transaction<'_, Postgres>, + index_uid: &IndexUid, + source_id: &SourceId, + checkpoint_delta: SourceCheckpointDelta, + publish_token: PublishToken, +) -> MetastoreResult<()> { + let num_partitions = checkpoint_delta.num_partitions(); + let shard_ids: Vec = checkpoint_delta + .partitions() + .map(|partition_id| partition_id.to_string()) + .collect(); + + let shards: Vec<(String, String, Option)> = sqlx::query_as( + r#" + SELECT + shard_id, publish_position_inclusive, publish_token + FROM + shards + WHERE + index_uid = $1 + AND source_id = $2 + AND shard_id = ANY($3) + FOR UPDATE + "#, + ) + .bind(index_uid.as_str()) + .bind(source_id) + .bind(shard_ids) + .fetch_all(tx.as_mut()) + .await?; - if let Some(offset) = query.offset { - sql.order_by(Splits::SplitId, Order::Asc) - .offset(offset as u64); + if shards.len() != num_partitions { + let queue_id = format!("{index_uid}/{source_id}"); // FIXME + let entity_kind = EntityKind::Shard { queue_id }; + return Err(MetastoreError::NotFound(entity_kind)); } -} + let mut current_checkpoint = SourceCheckpoint::default(); -/// Returns the unix timestamp at which the split becomes mature. -/// If the split is mature (`SplitMaturity::Mature`), we return 0 -/// as we don't want the maturity to depend on datetime. -fn split_maturity_timestamp(split_metadata: &SplitMetadata) -> i64 { - match split_metadata.maturity { - SplitMaturity::Mature => 0, - SplitMaturity::Immature { maturation_period } => { - split_metadata.create_timestamp + maturation_period.as_secs() as i64 + for (shard_id, current_position, current_publish_token_opt) in shards { + if current_publish_token_opt.is_none() + || current_publish_token_opt.unwrap() != publish_token + { + let message = "failed to apply checkpoint delta: invalid publish token".to_string(); + return Err(MetastoreError::InvalidArgument { message }); } + let partition_id = PartitionId::from(shard_id); + let current_position = Position::from(current_position); + current_checkpoint.add_partition(partition_id, current_position); } -} + current_checkpoint + .try_apply_delta(checkpoint_delta) + .map_err(|error| MetastoreError::InvalidArgument { + message: error.to_string(), + })?; -fn convert_sqlx_err(index_id: &str, sqlx_err: sqlx::Error) -> MetastoreError { - match &sqlx_err { - sqlx::Error::Database(boxed_db_err) => { - let pg_db_error = boxed_db_err.downcast_ref::(); - let pg_error_code = pg_db_error.code(); - let pg_error_table = pg_db_error.table(); - - match (pg_error_code, pg_error_table) { - (pg_error_code::FOREIGN_KEY_VIOLATION, _) => { - MetastoreError::NotFound(EntityKind::Index { - index_id: index_id.to_string(), - }) - } - (pg_error_code::UNIQUE_VIOLATION, Some(table)) if table.starts_with("indexes") => { - MetastoreError::AlreadyExists(EntityKind::Index { - index_id: index_id.to_string(), - }) - } - (pg_error_code::UNIQUE_VIOLATION, _) => { - error!(pg_db_err=?boxed_db_err, "postgresql-error"); - MetastoreError::Internal { - message: "unique key violation".to_string(), - cause: format!("DB error {boxed_db_err:?}"), - } - } - _ => { - error!(pg_db_err=?boxed_db_err, "postgresql-error"); - MetastoreError::Db { - message: boxed_db_err.to_string(), - } - } - } - } - _ => { - error!(err=?sqlx_err, "an error has occurred in the database operation"); - MetastoreError::Db { - message: sqlx_err.to_string(), - } - } + let mut shard_ids = Vec::with_capacity(num_partitions); + let mut new_positions = Vec::with_capacity(num_partitions); + + for (partition_id, new_position) in current_checkpoint.iter() { + let shard_id = partition_id.to_string(); + shard_ids.push(shard_id.to_string()); + new_positions.push(new_position.to_string()); } + sqlx::query( + r#" + UPDATE + shards + SET + publish_position_inclusive = new_positions.position, + shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END + FROM + UNNEST($3, $4) + AS new_positions(shard_id, position) + WHERE + index_uid = $1 + AND source_id = $2 + AND shards.shard_id = new_positions.shard_id + "#, + ) + .bind(index_uid.as_str()) + .bind(source_id) + .bind(shard_ids) + .bind(new_positions) + .execute(tx.as_mut()) + .await?; + Ok(()) } /// This macro is used to systematically wrap the metastore @@ -463,7 +321,7 @@ where "#, ) .bind(index_metadata_json) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .execute(tx.as_mut()) .await?; if update_index_res.rows_affected() == 0 { @@ -529,7 +387,7 @@ impl MetastoreService for PostgresqlMetastore { .bind(&index_metadata_json) .execute(&self.connection_pool) .await - .map_err(|error| convert_sqlx_err(index_metadata.index_id(), error))?; + .map_err(|sqlx_error| convert_sqlx_err(index_metadata.index_id(), sqlx_error))?; Ok(CreateIndexResponse { index_uid: index_metadata.index_uid.to_string(), }) @@ -541,15 +399,20 @@ impl MetastoreService for PostgresqlMetastore { request: DeleteIndexRequest, ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid.into(); - let delete_res = sqlx::query("DELETE FROM indexes WHERE index_uid = $1") - .bind(index_uid.to_string()) + let delete_result = sqlx::query("DELETE FROM indexes WHERE index_uid = $1") + .bind(index_uid.as_str()) .execute(&self.connection_pool) .await?; - if delete_res.rows_affected() == 0 { + // FIXME: This is not idempotent. + if delete_result.rows_affected() == 0 { return Err(MetastoreError::NotFound(EntityKind::Index { index_id: index_uid.index_id().to_string(), })); } + info!( + index_id = index_uid.index_id(), + "deleted index successfully" + ); Ok(EmptyResponse {}) } @@ -610,7 +473,7 @@ impl MetastoreService for PostgresqlMetastore { $9 as index_uid FROM UNNEST($1, $2, $3, $4, $5, $6, $7) - as tr(split_id, time_range_start, time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp) + AS staged_splits (split_id, time_range_start, time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp) ON CONFLICT(split_id) DO UPDATE SET time_range_start = excluded.time_range_start, @@ -633,10 +496,10 @@ impl MetastoreService for PostgresqlMetastore { .bind(delete_opstamps) .bind(maturity_timestamps) .bind(SplitState::Staged.as_str()) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .fetch_all(tx.as_mut()) .await - .map_err(|error| convert_sqlx_err(index_uid.index_id(), error))?; + .map_err(|sqlx_error| convert_sqlx_err(index_uid.index_id(), sqlx_error))?; if upserted_split_ids.len() != split_ids.len() { let failed_split_ids: Vec = split_ids @@ -649,9 +512,10 @@ impl MetastoreService for PostgresqlMetastore { let message = "splits are not staged".to_string(); return Err(MetastoreError::FailedPrecondition { entity, message }); } - - debug!(index_id=%index_uid.index_id(), num_splits=split_ids.len(), "splits successfully staged"); - + info!( + index_id=%index_uid.index_id(), + "staged `{}` splits successfully", split_ids.len() + ); Ok(EmptyResponse {}) }) } @@ -666,6 +530,7 @@ impl MetastoreService for PostgresqlMetastore { let index_uid: IndexUid = request.index_uid.into(); let staged_split_ids = request.staged_split_ids; let replaced_split_ids = request.replaced_split_ids; + run_with_tx!(self.connection_pool, tx, { let mut index_metadata = index_metadata(tx, index_uid.index_id()).await?; if index_metadata.index_uid != index_uid { @@ -675,17 +540,36 @@ impl MetastoreService for PostgresqlMetastore { } if let Some(checkpoint_delta) = checkpoint_delta_opt { let source_id = checkpoint_delta.source_id.clone(); - index_metadata - .checkpoint - .try_apply_delta(checkpoint_delta) - .map_err(|error| { - let entity = EntityKind::CheckpointDelta { - index_id: index_uid.index_id().to_string(), - source_id, - }; - let message = error.to_string(); - MetastoreError::FailedPrecondition { entity, message } + + if source_id == INGEST_V2_SOURCE_ID { + let publish_token = request.publish_token_opt.ok_or_else(|| { + let message = format!( + "publish token is required for publishing splits for source \ + `{source_id}`" + ); + MetastoreError::InvalidArgument { message } })?; + try_apply_delta_v2( + tx, + &index_uid, + &source_id, + checkpoint_delta.source_delta, + publish_token, + ) + .await?; + } else { + index_metadata + .checkpoint + .try_apply_delta(checkpoint_delta) + .map_err(|error| { + let entity = EntityKind::CheckpointDelta { + index_id: index_uid.index_id().to_string(), + source_id, + }; + let message = error.to_string(); + MetastoreError::FailedPrecondition { entity, message } + })?; + } } let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| { MetastoreError::JsonSerializeError { @@ -768,13 +652,13 @@ impl MetastoreService for PostgresqlMetastore { not_marked_split_ids, ): (i64, i64, Vec, Vec, Vec) = sqlx::query_as(PUBLISH_SPLITS_QUERY) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .bind(index_metadata_json) .bind(staged_split_ids) .bind(replaced_split_ids) .fetch_one(tx.as_mut()) .await - .map_err(|error| convert_sqlx_err(index_uid.index_id(), error))?; + .map_err(|sqlx_error| convert_sqlx_err(index_uid.index_id(), sqlx_error))?; if !not_found_split_ids.is_empty() { return Err(MetastoreError::NotFound(EntityKind::Splits { @@ -797,7 +681,7 @@ impl MetastoreService for PostgresqlMetastore { } info!( index_id=%index_uid.index_id(), - "Published {} splits and marked {} splits for deletion successfully.", + "published {} splits and marked {} for deletion successfully", num_published_splits, num_marked_splits ); Ok(EmptyResponse {}) @@ -810,26 +694,25 @@ impl MetastoreService for PostgresqlMetastore { request: ListSplitsRequest, ) -> MetastoreResult> { let query = request.deserialize_list_splits_query()?; - let mut sql = Query::select(); - sql.column(Asterisk).from(Splits::Table); - append_query_filters(&mut sql, &query); + let mut sql_builder = Query::select(); + sql_builder.column(Asterisk).from(Splits::Table); + append_query_filters(&mut sql_builder, &query); - let (sql, values) = sql.build_sqlx(PostgresQueryBuilder); - let split_stream = SplitStream::new( + let (sql, values) = sql_builder.build_sqlx(PostgresQueryBuilder); + let pg_split_stream = SplitStream::new( self.connection_pool.clone(), sql, |connection_pool: &Pool, sql: &String| { sqlx::query_as_with::<_, PgSplit, _>(sql, values).fetch(connection_pool) }, ); - - let mapped_split_stream = - split_stream + let split_stream = + pg_split_stream .chunks(STREAM_SPLITS_CHUNK_SIZE) - .map(|pg_splits_res| { - let mut splits = Vec::with_capacity(pg_splits_res.len()); - for pg_split_res in pg_splits_res { - let pg_split = match pg_split_res { + .map(|pg_splits_results| { + let mut splits = Vec::with_capacity(pg_splits_results.len()); + for pg_split_result in pg_splits_results { + let pg_split = match pg_split_result { Ok(pg_split) => pg_split, Err(error) => { return Err(MetastoreError::Internal { @@ -842,7 +725,7 @@ impl MetastoreService for PostgresqlMetastore { Ok(split) => split, Err(error) => { return Err(MetastoreError::Internal { - message: "failed to convert `PgSplit` into `Split`".to_string(), + message: "failed to convert `PgSplit` to `Split`".to_string(), cause: error.to_string(), }) } @@ -851,7 +734,7 @@ impl MetastoreService for PostgresqlMetastore { } ListSplitsResponse::try_from_splits(splits) }); - let service_stream = ServiceStream::new(Box::pin(mapped_split_stream)); + let service_stream = ServiceStream::new(Box::pin(split_stream)); Ok(service_stream) } @@ -899,11 +782,11 @@ impl MetastoreService for PostgresqlMetastore { "#; let (num_found_splits, num_marked_splits, not_found_split_ids): (i64, i64, Vec) = sqlx::query_as(MARK_SPLITS_FOR_DELETION_QUERY) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .bind(split_ids.clone()) .fetch_one(&self.connection_pool) .await - .map_err(|error| convert_sqlx_err(index_uid.index_id(), error))?; + .map_err(|sqlx_error| convert_sqlx_err(index_uid.index_id(), sqlx_error))?; if num_found_splits == 0 && index_opt(&self.connection_pool, index_uid.index_id()) @@ -982,11 +865,11 @@ impl MetastoreService for PostgresqlMetastore { Vec, Vec, ) = sqlx::query_as(DELETE_SPLITS_QUERY) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .bind(split_ids) .fetch_one(&self.connection_pool) .await - .map_err(|error| convert_sqlx_err(index_uid.index_id(), error))?; + .map_err(|sqlx_error| convert_sqlx_err(index_uid.index_id(), sqlx_error))?; if num_found_splits == 0 && index_opt_for_uid(&self.connection_pool, index_uid.clone()) @@ -1088,11 +971,24 @@ impl MetastoreService for PostgresqlMetastore { request: DeleteSourceRequest, ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid.into(); + let source_id = request.source_id.clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata(tx, index_uid, |index_metadata| { - index_metadata.delete_source(&request.source_id) + mutate_index_metadata(tx, index_uid.clone(), |index_metadata| { + index_metadata.delete_source(&source_id) }) .await?; + sqlx::query( + r#" + DELETE FROM shards + WHERE + index_uid = $1 + AND source_id = $2 + "#, + ) + .bind(index_uid.as_str()) + .bind(source_id) + .execute(tx.as_mut()) + .await?; Ok(()) })?; Ok(EmptyResponse {}) @@ -1125,7 +1021,7 @@ impl MetastoreService for PostgresqlMetastore { SELECT COALESCE(MAX(opstamp), 0) FROM delete_tasks WHERE index_uid = $1 - "#, + "#, ) .bind(request.index_uid) .fetch_one(&self.connection_pool) @@ -1152,8 +1048,8 @@ impl MetastoreService for PostgresqlMetastore { let (create_timestamp, opstamp): (sqlx::types::time::PrimitiveDateTime, i64) = sqlx::query_as( r#" - INSERT INTO delete_tasks (index_uid, delete_query_json) VALUES ($1, $2) - RETURNING create_timestamp, opstamp + INSERT INTO delete_tasks (index_uid, delete_query_json) VALUES ($1, $2) + RETURNING create_timestamp, opstamp "#, ) .bind(delete_query.index_uid.to_string()) @@ -1185,7 +1081,7 @@ impl MetastoreService for PostgresqlMetastore { if split_ids.is_empty() { return Ok(UpdateSplitsDeleteOpstampResponse {}); } - let update_res = sqlx::query( + let update_result = sqlx::query( r#" UPDATE splits SET @@ -1198,16 +1094,16 @@ impl MetastoreService for PostgresqlMetastore { WHERE index_uid = $2 AND split_id = ANY($3) - "#, + "#, ) .bind(request.delete_opstamp as i64) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .bind(split_ids) .execute(&self.connection_pool) .await?; // If no splits were updated, maybe the index does not exist in the first place? - if update_res.rows_affected() == 0 + if update_result.rows_affected() == 0 && index_opt_for_uid(&self.connection_pool, index_uid.clone()) .await? .is_none() @@ -1234,14 +1130,14 @@ impl MetastoreService for PostgresqlMetastore { AND opstamp > $2 "#, ) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .bind(request.opstamp_start as i64) .fetch_all(&self.connection_pool) .await?; - let delete_tasks = pg_delete_tasks + let delete_tasks: Vec = pg_delete_tasks .into_iter() .map(|pg_delete_task| pg_delete_task.try_into()) - .collect::>>()?; + .collect::>()?; Ok(ListDeleteTasksResponse { delete_tasks }) } @@ -1254,7 +1150,7 @@ impl MetastoreService for PostgresqlMetastore { request: ListStaleSplitsRequest, ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid.into(); - let pg_stale_splits: Vec = sqlx::query_as::<_, PgSplit>( + let stale_pg_splits: Vec = sqlx::query_as::<_, PgSplit>( r#" SELECT * FROM splits @@ -1265,76 +1161,186 @@ impl MetastoreService for PostgresqlMetastore { AND (maturity_timestamp = to_timestamp(0) OR (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') >= maturity_timestamp) ORDER BY delete_opstamp ASC, publish_timestamp ASC LIMIT $4 - "#, + "#, ) - .bind(index_uid.to_string()) + .bind(index_uid.as_str()) .bind(request.delete_opstamp as i64) .bind(SplitState::Published.as_str()) .bind(request.num_splits as i64) .fetch_all(&self.connection_pool) .await?; - let splits = pg_stale_splits + let stale_splits: Vec = stale_pg_splits .into_iter() .map(|pg_split| pg_split.try_into()) - .collect::>>()?; - let response = ListSplitsResponse::try_from_splits(splits)?; + .collect::>()?; + let response = ListSplitsResponse::try_from_splits(stale_splits)?; Ok(response) } async fn open_shards( &mut self, - _request: OpenShardsRequest, + request: OpenShardsRequest, ) -> MetastoreResult { - unimplemented!("`open_shards` is not implemented for PostgreSQL metastore") + let mut subresponses = Vec::with_capacity(request.subrequests.len()); + + for subrequest in request.subrequests { + let shard: Shard = open_or_fetch_shard(&self.connection_pool, &subrequest).await?; + + subresponses.push(OpenShardsSubresponse { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + opened_shards: vec![shard], + }); + } + Ok(OpenShardsResponse { subresponses }) } async fn acquire_shards( &mut self, - _request: AcquireShardsRequest, + request: AcquireShardsRequest, ) -> MetastoreResult { - unimplemented!("`close_shards` is not implemented for PostgreSQL metastore") + const ACQUIRE_SHARDS_QUERY: &str = include_str!("queries/acquire_shards.sql"); + + let mut subresponses = Vec::with_capacity(request.subrequests.len()); + + for subrequest in request.subrequests { + let shard_ids: Vec<&str> = subrequest + .shard_ids + .iter() + .map(|shard_id| shard_id.as_str()) + .collect(); + let pg_shards: Vec = sqlx::query_as(ACQUIRE_SHARDS_QUERY) + .bind(&subrequest.index_uid) + .bind(&subrequest.source_id) + .bind(shard_ids) + .bind(subrequest.publish_token) + .fetch_all(&self.connection_pool) + .await?; + + let acquired_shards = pg_shards + .into_iter() + .map(|pg_shard| pg_shard.into()) + .collect(); + + subresponses.push(AcquireShardsSubresponse { + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + acquired_shards, + }); + } + Ok(AcquireShardsResponse { subresponses }) } async fn list_shards( &mut self, - _request: ListShardsRequest, + request: ListShardsRequest, ) -> MetastoreResult { - unimplemented!("`list_shards` is not implemented for PostgreSQL metastore") + const LIST_SHARDS_QUERY: &str = include_str!("queries/list_shards.sql"); + + let mut subresponses = Vec::with_capacity(request.subrequests.len()); + + for subrequest in request.subrequests { + let shard_state: Option<&'static str> = match subrequest.shard_state() { + ShardState::Unspecified => None, + ShardState::Open => Some("open"), + ShardState::Closed => Some("closed"), + ShardState::Unavailable => Some("unavailable"), + }; + let pg_shards: Vec = sqlx::query_as(LIST_SHARDS_QUERY) + .bind(&subrequest.index_uid) + .bind(&subrequest.source_id) + .bind(shard_state) + .fetch_all(&self.connection_pool) + .await?; + + let shards = pg_shards + .into_iter() + .map(|pg_shard| pg_shard.into()) + .collect(); + + subresponses.push(ListShardsSubresponse { + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shards, + }); + } + Ok(ListShardsResponse { subresponses }) } async fn delete_shards( &mut self, - _request: DeleteShardsRequest, + request: DeleteShardsRequest, ) -> MetastoreResult { - unimplemented!("`delete_shards` is not implemented for PostgreSQL metastore") + const DELETE_SHARDS_QUERY: &str = include_str!("queries/delete_shards.sql"); + + for subrequest in request.subrequests { + let shard_ids: Vec<&str> = subrequest + .shard_ids + .iter() + .map(|shard_id| shard_id.as_str()) + .collect(); + + sqlx::query(DELETE_SHARDS_QUERY) + .bind(&subrequest.index_uid) + .bind(&subrequest.source_id) + .bind(shard_ids) + .bind(request.force) + .execute(&self.connection_pool) + .await?; + } + Ok(DeleteShardsResponse {}) } } -impl MetastoreServiceExt for PostgresqlMetastore {} +async fn open_or_fetch_shard<'e>( + executor: impl Executor<'e, Database = Postgres> + Clone, + subrequest: &OpenShardsSubrequest, +) -> MetastoreResult { + const OPEN_SHARDS_QUERY: &str = include_str!("queries/open_shard.sql"); + + let pg_shard_opt: Option = sqlx::query_as(OPEN_SHARDS_QUERY) + .bind(&subrequest.index_uid) + .bind(&subrequest.source_id) + .bind(subrequest.shard_id().as_str()) + .bind(&subrequest.leader_id) + .bind(&subrequest.follower_id) + .fetch_optional(executor.clone()) + .await?; -#[self_referencing] -struct SplitStream { - connection_pool: Pool, - sql: String, - #[borrows(connection_pool, sql)] - #[covariant] - inner: BoxStream<'this, Result>, -} + if let Some(pg_shard) = pg_shard_opt { + let shard: Shard = pg_shard.into(); + info!( + index_id=%shard.index_uid, + source_id=%shard.source_id, + shard_id=%shard.shard_id(), + leader_id=%shard.leader_id, + follower_id=?shard.follower_id, + "opened shard" + ); + return Ok(shard); + } + const FETCH_SHARD_QUERY: &str = include_str!("queries/fetch_shard.sql"); -impl Stream for SplitStream { - type Item = Result; + let pg_shard_opt: Option = sqlx::query_as(FETCH_SHARD_QUERY) + .bind(&subrequest.index_uid) + .bind(&subrequest.source_id) + .bind(subrequest.shard_id().as_str()) + .fetch_optional(executor) + .await?; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - SplitStream::with_inner_mut(&mut self, |this| { - std::pin::Pin::new(&mut this.as_mut()).poll_next(cx) - }) + if let Some(pg_shard) = pg_shard_opt { + return Ok(pg_shard.into()); } + Err(MetastoreError::NotFound(EntityKind::Source { + index_id: subrequest.index_uid.clone(), + source_id: subrequest.source_id.clone(), + })) } +impl MetastoreServiceExt for PostgresqlMetastore {} + // We use dollar-quoted strings in Postgresql. // // In order to ensure that we do not risk SQL injection, @@ -1401,9 +1407,6 @@ fn build_index_id_patterns_sql_query(index_id_patterns: &[String]) -> anyhow::Re if index_id_patterns.is_empty() { anyhow::bail!("The list of index id patterns may not be empty."); } - if index_id_patterns == ["*"] { - return Ok("SELECT * FROM indexes".to_string()); - } if index_id_patterns.iter().any(|pattern| pattern == "*") { return Ok("SELECT * FROM indexes".to_string()); } @@ -1427,75 +1430,6 @@ fn build_index_id_patterns_sql_query(index_id_patterns: &[String]) -> anyhow::Re } /// A postgres metastore factory -#[derive(Clone, Default)] -pub struct PostgresqlMetastoreFactory { - // In a normal run, this cache will contain a single Metastore. - // - // In contrast to the file backe metastore, we use a strong pointer here, so that Metastore - // doesn't get dropped. This is done in order to keep the underlying connection pool to - // postgres alive. - cache: Arc>>, -} - -impl PostgresqlMetastoreFactory { - async fn get_from_cache(&self, uri: &Uri) -> Option { - let cache_lock = self.cache.lock().await; - cache_lock.get(uri).map(MetastoreServiceClient::clone) - } - - /// If there is a valid entry in the cache to begin with, we trash the new - /// one and return the old one. - /// - /// This way we make sure that we keep only one instance associated - /// to the key `uri` outside of this struct. - async fn cache_metastore( - &self, - uri: Uri, - metastore: MetastoreServiceClient, - ) -> MetastoreServiceClient { - let mut cache_lock = self.cache.lock().await; - if let Some(metastore) = cache_lock.get(&uri) { - return metastore.clone(); - } - cache_lock.insert(uri, metastore.clone()); - metastore - } -} - -#[async_trait] -impl MetastoreFactory for PostgresqlMetastoreFactory { - fn backend(&self) -> MetastoreBackend { - MetastoreBackend::PostgreSQL - } - - async fn resolve( - &self, - metastore_config: &MetastoreConfig, - uri: &Uri, - ) -> Result { - if let Some(metastore) = self.get_from_cache(uri).await { - debug!("using metastore from cache"); - return Ok(metastore); - } - debug!("metastore not found in cache"); - let postgresql_metastore_config = metastore_config.as_postgres().ok_or_else(|| { - let message = format!( - "expected PostgreSQL metastore config, got `{:?}`", - metastore_config.backend() - ); - MetastoreResolverError::InvalidConfig(message) - })?; - let postgresql_metastore = PostgresqlMetastore::new(postgresql_metastore_config, uri) - .await - .map_err(MetastoreResolverError::Initialization)?; - let instrumented_metastore = instrument_metastore(postgresql_metastore); - let unique_metastore_for_uri = self - .cache_metastore(uri.clone(), instrumented_metastore) - .await; - Ok(unique_metastore_for_uri) - } -} - #[cfg(test)] #[async_trait] impl crate::tests::DefaultForTest for PostgresqlMetastore { @@ -1514,30 +1448,83 @@ impl crate::tests::DefaultForTest for PostgresqlMetastore { // unit tests running (= number of test-threads). dotenv::dotenv().ok(); let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") - .expect("Environment variable `QW_TEST_DATABASE_URL` should be set.") + .expect("environment variable `QW_TEST_DATABASE_URL` should be set") .parse() - .expect("Environment variable `QW_TEST_DATABASE_URL` should be a valid URI."); + .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); PostgresqlMetastore::new(&PostgresMetastoreConfig::default(), &uri) .await - .expect("Failed to initialize test PostgreSQL metastore.") + .expect("failed to initialize PostgreSQL metastore test") } } #[cfg(test)] mod tests { + use async_trait::async_trait; use quickwit_common::uri::Protocol; use quickwit_doc_mapper::tag_pruning::{no_tag, tag, TagFilterAst}; + use quickwit_proto::ingest::Shard; use quickwit_proto::metastore::MetastoreService; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, SourceId}; use sea_query::{all, any, Asterisk, Cond, Expr, PostgresQueryBuilder, Query}; use time::OffsetDateTime; + use super::model::PgShard; use super::{append_query_filters, tags_filter_expression_helper, PostgresqlMetastore}; - use crate::metastore::postgresql_metastore::build_index_id_patterns_sql_query; - use crate::metastore::postgresql_model::Splits; + use crate::metastore::postgres::build_index_id_patterns_sql_query; + use crate::metastore::postgres::model::Splits; + use crate::tests::shard::ReadWriteShardsForTest; use crate::tests::DefaultForTest; use crate::{metastore_test_suite, ListSplitsQuery, SplitState}; + #[async_trait] + impl ReadWriteShardsForTest for PostgresqlMetastore { + async fn insert_shards( + &mut self, + index_uid: &IndexUid, + source_id: &SourceId, + shards: Vec, + ) { + const INSERT_SHARD_QUERY: &str = include_str!("queries/insert_shard.sql"); + + for shard in shards { + sqlx::query(INSERT_SHARD_QUERY) + .bind(index_uid.as_str()) + .bind(source_id) + .bind(shard.shard_id().as_str()) + .bind(shard.shard_state().as_json_str_name()) + .bind(&shard.leader_id) + .bind(&shard.follower_id) + .bind(&shard.publish_position_inclusive().to_string()) + .bind(&shard.publish_token) + .execute(&self.connection_pool) + .await + .unwrap(); + } + } + + async fn list_all_shards(&self, index_uid: &IndexUid, source_id: &SourceId) -> Vec { + let pg_shards: Vec = sqlx::query_as( + r#" + SELECT * + FROM shards + WHERE + index_uid = $1 + AND source_id = $2 + "#, + ) + .bind(index_uid.as_str()) + .bind(source_id.as_str()) + .fetch_all(&self.connection_pool) + .await + .unwrap(); + + pg_shards + .into_iter() + .map(|pg_shard| pg_shard.into()) + .collect() + } + } + metastore_test_suite!(crate::PostgresqlMetastore); #[tokio::test] diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs similarity index 81% rename from quickwit/quickwit-metastore/src/metastore/postgresql_model.rs rename to quickwit/quickwit-metastore/src/metastore/postgres/model.rs index 4666097c6fe..e8110d84268 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -20,8 +20,9 @@ use std::convert::TryInto; use std::str::FromStr; +use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{DeleteQuery, DeleteTask, MetastoreError, MetastoreResult}; -use quickwit_proto::types::IndexUid; +use quickwit_proto::types::{IndexUid, ShardId, SourceId}; use sea_query::{Iden, Write}; use tracing::error; @@ -210,3 +211,67 @@ impl TryInto for PgDeleteTask { }) } } + +#[derive(Iden, Clone, Copy)] +#[allow(dead_code)] +pub enum Shards { + Table, + IndexUid, + SourceId, + ShardId, + ShardState, + LeaderId, + FollowerId, + PublishPositionInclusive, + PublishToken, +} + +#[derive(sqlx::Type, PartialEq, Debug)] +#[sqlx(type_name = "SHARD_STATE", rename_all = "snake_case")] +pub enum PgShardState { + Unspecified, + Open, + Unavailable, + Closed, +} + +impl From for ShardState { + fn from(pg_shard_state: PgShardState) -> Self { + match pg_shard_state { + PgShardState::Unspecified => ShardState::Unspecified, + PgShardState::Open => ShardState::Open, + PgShardState::Unavailable => ShardState::Unavailable, + PgShardState::Closed => ShardState::Closed, + } + } +} + +#[derive(sqlx::FromRow, Debug)] +pub struct PgShard { + #[sqlx(try_from = "String")] + pub index_uid: IndexUid, + #[sqlx(try_from = "String")] + pub source_id: SourceId, + #[sqlx(try_from = "String")] + pub shard_id: ShardId, + pub leader_id: String, + pub follower_id: Option, + pub shard_state: PgShardState, + pub publish_position_inclusive: String, + pub publish_token: Option, +} + +impl From for Shard { + fn from(pg_shard: PgShard) -> Self { + Shard { + index_uid: pg_shard.index_uid.into(), + source_id: pg_shard.source_id, + shard_id: Some(pg_shard.shard_id), + shard_state: ShardState::from(pg_shard.shard_state) as i32, + leader_id: pg_shard.leader_id, + follower_id: pg_shard.follower_id, + publish_position_inclusive: Some(pg_shard.publish_position_inclusive.into()), + publish_token: pg_shard.publish_token, + } + } +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/acquire_shards.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/acquire_shards.sql new file mode 100644 index 00000000000..2421a4436c8 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/acquire_shards.sql @@ -0,0 +1,9 @@ +UPDATE + shards +SET + publish_token = $4 +WHERE + index_uid = $1 + AND source_id = $2 + AND shard_id = ANY($3) +RETURNING * diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/delete_shards.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/delete_shards.sql new file mode 100644 index 00000000000..f5290c0fc38 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/delete_shards.sql @@ -0,0 +1,10 @@ +DELETE FROM + shards +WHERE + index_uid = $1 + AND source_id = $2 + AND shard_id = ANY($3) + AND ( + $4 = TRUE + OR publish_position_inclusive LIKE '~%' + ) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/fetch_shard.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/fetch_shard.sql new file mode 100644 index 00000000000..591a67acb50 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/fetch_shard.sql @@ -0,0 +1,8 @@ +SELECT + * +FROM + shards +WHERE + index_uid = $1 + AND source_id = $2 + AND shard_id = $3 diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/insert_shard.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/insert_shard.sql new file mode 100644 index 00000000000..7d08622f96b --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/insert_shard.sql @@ -0,0 +1,13 @@ +INSERT INTO + shards ( + index_uid, + source_id, + shard_id, + shard_state, + leader_id, + follower_id, + publish_position_inclusive, + publish_token + ) +VALUES + ($1, $2, $3, CAST($4 as SHARD_STATE), $5, $6, $7, $8) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/list_shards.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/list_shards.sql new file mode 100644 index 00000000000..2faf0a806a7 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/list_shards.sql @@ -0,0 +1,11 @@ +SELECT + * +FROM + shards +WHERE + index_uid = $1 + AND source_id = $2 + AND ( + $3 IS NULL + OR shard_state = CAST($3 AS SHARD_STATE) + ) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/open_shard.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/open_shard.sql new file mode 100644 index 00000000000..f19a26de211 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/open_shard.sql @@ -0,0 +1,12 @@ +INSERT INTO + shards ( + index_uid, + source_id, + shard_id, + leader_id, + follower_id + ) +VALUES + ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING +RETURNING + * diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs b/quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs new file mode 100644 index 00000000000..3891389416f --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs @@ -0,0 +1,43 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::stream::BoxStream; +use ouroboros::self_referencing; +use sqlx::{Pool, Postgres}; +use tokio_stream::Stream; + +#[self_referencing(pub_extras)] +pub struct SplitStream { + connection_pool: Pool, + sql: String, + #[borrows(connection_pool, sql)] + #[covariant] + inner: BoxStream<'this, Result>, +} + +impl Stream for SplitStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + SplitStream::with_inner_mut(&mut self, |this| Pin::new(&mut this.as_mut()).poll_next(cx)) + } +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs new file mode 100644 index 00000000000..3c33358d59e --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -0,0 +1,201 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fmt::Display; +use std::ops::Bound; +use std::str::FromStr; +use std::time::Duration; + +use quickwit_common::uri::Uri; +use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; +use sea_query::{any, Cond, Expr, Func, Order, SelectStatement}; +use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; +use sqlx::{ConnectOptions, Pool, Postgres}; +use tracing::error; +use tracing::log::LevelFilter; + +use super::model::{Splits, ToTimestampFunc}; +use super::tags_filter_expression_helper; +use crate::metastore::FilterRange; +use crate::{ListSplitsQuery, SplitMaturity, SplitMetadata}; + +/// Establishes a connection to the given database URI. +pub(super) async fn establish_connection( + connection_uri: &Uri, + min_connections: usize, + max_connections: usize, + acquire_timeout: Duration, + idle_timeout_opt: Option, + max_lifetime_opt: Option, +) -> MetastoreResult> { + let pool_options = PgPoolOptions::new() + .min_connections(min_connections as u32) + .max_connections(max_connections as u32) + .acquire_timeout(acquire_timeout) + .idle_timeout(idle_timeout_opt) + .max_lifetime(max_lifetime_opt); + let connect_options: PgConnectOptions = PgConnectOptions::from_str(connection_uri.as_str())? + .application_name("quickwit-metastore") + .log_statements(LevelFilter::Info); + pool_options + .connect_with(connect_options) + .await + .map_err(|error| { + error!(connection_uri=%connection_uri, error=?error, "failed to establish connection to database"); + MetastoreError::Connection { + message: error.to_string(), + } + }) +} + +/// Extends an existing SQL string with the generated filter range appended to the query. +/// +/// This method is **not** SQL injection proof and should not be used with user-defined values. +pub(super) fn append_range_filters( + sql: &mut SelectStatement, + field_name: Splits, + filter_range: &FilterRange, + value_formatter: impl Fn(&V) -> Expr, +) { + if let Bound::Included(value) = &filter_range.start { + sql.cond_where(Expr::col(field_name).gte((value_formatter)(value))); + }; + + if let Bound::Excluded(value) = &filter_range.start { + sql.cond_where(Expr::col(field_name).gt((value_formatter)(value))); + }; + + if let Bound::Included(value) = &filter_range.end { + sql.cond_where(Expr::col(field_name).lte((value_formatter)(value))); + }; + + if let Bound::Excluded(value) = &filter_range.end { + sql.cond_where(Expr::col(field_name).lt((value_formatter)(value))); + }; +} + +pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) { + // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. + + let or_condition = query + .index_uids + .iter() + .fold(Cond::any(), |cond, index_uid| { + cond.add(Expr::col(Splits::IndexUid).eq(Expr::val(index_uid.to_string()))) + }); + sql.cond_where(or_condition); + + if !query.split_states.is_empty() { + sql.cond_where( + Expr::col(Splits::SplitState) + .is_in(query.split_states.iter().map(|val| val.to_string())), + ); + }; + + if let Some(tags) = query.tags.as_ref() { + sql.cond_where(tags_filter_expression_helper(tags)); + }; + + match query.time_range.start { + Bound::Included(v) => { + sql.cond_where(any![ + Expr::col(Splits::TimeRangeEnd).gte(v), + Expr::col(Splits::TimeRangeEnd).is_null() + ]); + } + Bound::Excluded(v) => { + sql.cond_where(any![ + Expr::col(Splits::TimeRangeEnd).gt(v), + Expr::col(Splits::TimeRangeEnd).is_null() + ]); + } + Bound::Unbounded => {} + }; + + match query.time_range.end { + Bound::Included(v) => { + sql.cond_where(any![ + Expr::col(Splits::TimeRangeStart).lte(v), + Expr::col(Splits::TimeRangeStart).is_null() + ]); + } + Bound::Excluded(v) => { + sql.cond_where(any![ + Expr::col(Splits::TimeRangeStart).lt(v), + Expr::col(Splits::TimeRangeStart).is_null() + ]); + } + Bound::Unbounded => {} + }; + + match &query.mature { + Bound::Included(evaluation_datetime) => { + sql.cond_where(any![ + Expr::col(Splits::MaturityTimestamp) + .eq(Func::cust(ToTimestampFunc).arg(Expr::val(0))), + Expr::col(Splits::MaturityTimestamp).lte( + Func::cust(ToTimestampFunc) + .arg(Expr::val(evaluation_datetime.unix_timestamp())) + ) + ]); + } + Bound::Excluded(evaluation_datetime) => { + sql.cond_where(Expr::col(Splits::MaturityTimestamp).gt( + Func::cust(ToTimestampFunc).arg(Expr::val(evaluation_datetime.unix_timestamp())), + )); + } + Bound::Unbounded => {} + }; + append_range_filters( + sql, + Splits::UpdateTimestamp, + &query.update_timestamp, + |&val| Expr::expr(Func::cust(ToTimestampFunc).arg(Expr::val(val))), + ); + append_range_filters( + sql, + Splits::CreateTimestamp, + &query.create_timestamp, + |&val| Expr::expr(Func::cust(ToTimestampFunc).arg(Expr::val(val))), + ); + append_range_filters(sql, Splits::DeleteOpstamp, &query.delete_opstamp, |&val| { + Expr::expr(val) + }); + + if let Some(limit) = query.limit { + sql.limit(limit as u64); + } + + if let Some(offset) = query.offset { + sql.order_by(Splits::SplitId, Order::Asc) + .offset(offset as u64); + } +} + +/// Returns the unix timestamp at which the split becomes mature. +/// If the split is mature (`SplitMaturity::Mature`), we return 0 +/// as we don't want the maturity to depend on datetime. +pub(super) fn split_maturity_timestamp(split_metadata: &SplitMetadata) -> i64 { + match split_metadata.maturity { + SplitMaturity::Mature => 0, + SplitMaturity::Immature { maturation_period } => { + split_metadata.create_timestamp + maturation_period.as_secs() as i64 + } + } +} diff --git a/quickwit/quickwit-metastore/src/metastore_resolver.rs b/quickwit/quickwit-metastore/src/metastore_resolver.rs index fc4b7f995bf..6d02387a5a2 100644 --- a/quickwit/quickwit-metastore/src/metastore_resolver.rs +++ b/quickwit/quickwit-metastore/src/metastore_resolver.rs @@ -28,9 +28,9 @@ use quickwit_config::{MetastoreBackend, MetastoreConfig, MetastoreConfigs}; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_storage::StorageResolver; -use crate::metastore::file_backed_metastore::FileBackedMetastoreFactory; +use crate::metastore::file_backed::FileBackedMetastoreFactory; #[cfg(feature = "postgres")] -use crate::metastore::postgresql_metastore::PostgresqlMetastoreFactory; +use crate::metastore::postgres::PostgresqlMetastoreFactory; use crate::{MetastoreFactory, MetastoreResolverError}; type FactoryAndConfig = (Box, MetastoreConfig); diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index 069b2f694fa..71cf5709cb8 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -182,8 +182,8 @@ pub async fn test_metastore_stream_splits5}"); + for split_idx in 1..1001 { + let split_id = format!("{index_id}--split-{split_idx:0>4}"); let split_metadata = SplitMetadata { split_id: split_id.clone(), index_uid: index_uid.clone(), @@ -191,7 +191,7 @@ pub async fn test_metastore_stream_splits 0 && idx % 1000 == 0 { + if split_idx > 0 && split_idx % 100 == 0 { let staged_split_ids: Vec = split_metadatas_to_create .iter() .map(|split_metadata| split_metadata.split_id.clone()) @@ -226,14 +226,14 @@ pub async fn test_metastore_stream_splits> {} - async fn create_channel(client: tokio::io::DuplexStream) -> anyhow::Result { use http::Uri; use quickwit_proto::tonic::transport::Endpoint; @@ -105,13 +102,13 @@ async fn create_channel(client: tokio::io::DuplexStream) -> anyhow::Result, - > -); +// crate::metastore_test_suite!( +// quickwit_proto::metastore::MetastoreServiceGrpcClientAdapter< +// quickwit_proto::metastore::metastore_service_grpc_client::MetastoreServiceGrpcClient< +// quickwit_proto::tonic::transport::Channel, +// >, +// > +// ); fn collect_split_ids(splits: &[Split]) -> Vec<&str> { splits @@ -389,6 +386,16 @@ macro_rules! metastore_test_suite { async fn test_metastore_delete_shards() { $crate::tests::shard::test_metastore_delete_shards::<$metastore_type>().await; } + + #[tokio::test] + async fn test_metastore_apply_checkpoint_delta_v2_single_shard() { + $crate::tests::shard::test_metastore_apply_checkpoint_delta_v2_single_shard::<$metastore_type>().await; + } + + #[tokio::test] + async fn test_metastore_apply_checkpoint_delta_v2_multi_shards() { + $crate::tests::shard::test_metastore_apply_checkpoint_delta_v2_multi_shards::<$metastore_type>().await; + } } }; } diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index 9cb0b6e2e54..a1ed474e7f9 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -17,47 +17,38 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use async_trait::async_trait; use quickwit_common::rand::append_random_suffix; use quickwit_config::{IndexConfig, SourceConfig}; +use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ - AddSourceRequest, CreateIndexRequest, EntityKind, MetastoreError, MetastoreService, - OpenShardsRequest, OpenShardsSubrequest, + AcquireShardsRequest, AcquireShardsSubrequest, AddSourceRequest, CreateIndexRequest, + DeleteShardsRequest, DeleteShardsSubrequest, EntityKind, ListShardsRequest, + ListShardsSubrequest, MetastoreError, MetastoreService, OpenShardsRequest, + OpenShardsSubrequest, PublishSplitsRequest, }; -use quickwit_proto::types::{IndexUid, ShardId, SourceId}; +use quickwit_proto::types::{IndexUid, Position, ShardId, SourceId}; use super::DefaultForTest; +use crate::checkpoint::{IndexCheckpointDelta, PartitionId, SourceCheckpointDelta}; use crate::tests::cleanup_index; -use crate::{AddSourceRequestExt, CreateIndexRequestExt, FileBackedMetastore, MetastoreServiceExt}; - -// TODO: Remove when `PostgresqlMetastore` implements Shard API. -pub trait RunTests { - fn run_open_shards_test() -> bool { - true - } - - fn run_other_tests() -> bool { - true - } -} - -impl RunTests for FileBackedMetastore {} - -#[cfg(feature = "postgres")] -impl RunTests for crate::PostgresqlMetastore { - fn run_other_tests() -> bool { - false - } +use crate::{AddSourceRequestExt, CreateIndexRequestExt, MetastoreServiceExt}; + +#[async_trait] +pub trait ReadWriteShardsForTest { + async fn insert_shards( + &mut self, + index_uid: &IndexUid, + source_id: &SourceId, + shards: Vec, + ); - fn run_open_shards_test() -> bool { - false - } + async fn list_all_shards(&self, index_uid: &IndexUid, source_id: &SourceId) -> Vec; } struct TestIndex { index_uid: IndexUid, - _index_config: IndexConfig, - _source_id: SourceId, - source_config: SourceConfig, + source_id: SourceId, } impl TestIndex { @@ -82,19 +73,14 @@ impl TestIndex { Self { index_uid, - _index_config: index_config, - _source_id: source_config.source_id.clone(), - source_config, + source_id: source_config.source_id, } } } pub async fn test_metastore_open_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, >() { - if !MetastoreUnderTest::run_open_shards_test() { - return; - } let mut metastore = MetastoreUnderTest::default_for_test().await; let test_index = TestIndex::create_index_with_source( @@ -112,166 +98,676 @@ pub async fn test_metastore_open_shards< assert!(open_shards_response.subresponses.is_empty()); // Test index not found. - let open_shards_request = OpenShardsRequest { - subrequests: vec![OpenShardsSubrequest { - index_uid: "index-does-not-exist:0".to_string(), - source_id: test_index.source_config.source_id.clone(), - leader_id: "test-ingester-foo".to_string(), - ..Default::default() - }], - }; - let error = metastore - .open_shards(open_shards_request) - .await - .unwrap_err(); - assert!( - matches!(error, MetastoreError::NotFound(EntityKind::Index { index_id }) if index_id == "index-does-not-exist") - ); - - // Test source not found. - let open_shards_request = OpenShardsRequest { - subrequests: vec![OpenShardsSubrequest { - index_uid: test_index.index_uid.clone().into(), - source_id: "source-does-not-exist".to_string(), - leader_id: "test-ingester-foo".to_string(), - ..Default::default() - }], - }; - let error = metastore - .open_shards(open_shards_request) - .await - .unwrap_err(); - assert!( - matches!(error, MetastoreError::NotFound(EntityKind::Source { source_id, ..}) if source_id == "source-does-not-exist") - ); + // let open_shards_request = OpenShardsRequest { + // subrequests: vec![OpenShardsSubrequest { + // index_uid: "index-does-not-exist:0".to_string(), + // source_id: test_index.source_id.clone(), + // leader_id: "test-ingester-foo".to_string(), + // ..Default::default() + // }], + // }; + // let error = metastore + // .open_shards(open_shards_request) + // .await + // .unwrap_err(); + // assert!( + // matches!(error, MetastoreError::NotFound(EntityKind::Index { index_id }) if index_id == + // "index-does-not-exist") ); + + // // Test source not found. + // let open_shards_request = OpenShardsRequest { + // subrequests: vec![OpenShardsSubrequest { + // index_uid: test_index.index_uid.clone().into(), + // source_id: "source-does-not-exist".to_string(), + // leader_id: "test-ingester-foo".to_string(), + // ..Default::default() + // }], + // }; + // let error = metastore + // .open_shards(open_shards_request) + // .await + // .unwrap_err(); + // assert!( + // matches!(error, MetastoreError::NotFound(EntityKind::Source { source_id, ..}) if + // source_id == "source-does-not-exist") ); // Test open shard #1. let open_shards_request = OpenShardsRequest { subrequests: vec![OpenShardsSubrequest { + subrequest_id: 0, index_uid: test_index.index_uid.clone().into(), - source_id: test_index.source_config.source_id.clone(), + source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(1)), leader_id: "test-ingester-foo".to_string(), - ..Default::default() + follower_id: Some("test-ingester-bar".to_string()), }], }; let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); assert_eq!(open_shards_response.subresponses.len(), 1); let subresponse = &open_shards_response.subresponses[0]; - assert_eq!(subresponse.index_uid, test_index.index_uid.as_str()); - assert_eq!(subresponse.source_id, test_index.source_config.source_id); + assert_eq!(subresponse.index_uid, test_index.index_uid); + assert_eq!(subresponse.source_id, test_index.source_id); assert_eq!(subresponse.opened_shards.len(), 1); let shard = &subresponse.opened_shards[0]; - assert_eq!(shard.index_uid, test_index.index_uid.as_str()); - assert_eq!(shard.source_id, test_index.source_config.source_id); + assert_eq!(shard.index_uid, test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); assert_eq!(shard.shard_id(), ShardId::from(1)); + assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.leader_id, "test-ingester-foo"); + assert_eq!(shard.follower_id(), "test-ingester-bar"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert!(shard.publish_token.is_none()); // Test open shard #1 is idempotent. let open_shards_request = OpenShardsRequest { subrequests: vec![OpenShardsSubrequest { + subrequest_id: 0, index_uid: test_index.index_uid.clone().into(), - source_id: test_index.source_config.source_id.clone(), + source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(1)), - leader_id: "test-ingester-bar".to_string(), - ..Default::default() + leader_id: "test-ingester-foo".to_string(), + follower_id: Some("test-ingester-bar".to_string()), }], }; let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); assert_eq!(open_shards_response.subresponses.len(), 1); let subresponse = &open_shards_response.subresponses[0]; + assert_eq!(subresponse.index_uid, test_index.index_uid); + assert_eq!(subresponse.source_id, test_index.source_id); assert_eq!(subresponse.opened_shards.len(), 1); let shard = &subresponse.opened_shards[0]; + assert_eq!(shard.index_uid, test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); assert_eq!(shard.shard_id(), ShardId::from(1)); + assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.leader_id, "test-ingester-foo"); + assert_eq!(shard.follower_id(), "test-ingester-bar"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert!(shard.publish_token.is_none()); - // Test open shard #2. - let open_shards_request = OpenShardsRequest { - subrequests: vec![OpenShardsSubrequest { + cleanup_index(&mut metastore, test_index.index_uid).await; +} + +pub async fn test_metastore_acquire_shards< + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, +>() { + let mut metastore = MetastoreUnderTest::default_for_test().await; + + let test_index = TestIndex::create_index_with_source( + &mut metastore, + "test-acquire-shards", + SourceConfig::ingest_v2_default(), + ) + .await; + + let shards = vec![ + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Closed as i32, + leader_id: "test-ingester-foo".to_string(), + follower_id: Some("test-ingester-bar".to_string()), + publish_position_inclusive: Some(Position::Beginning), + publish_token: Some("test-publish-token-foo".to_string()), + }, + Shard { index_uid: test_index.index_uid.clone().into(), - source_id: test_index.source_config.source_id.clone(), + source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-bar".to_string(), + follower_id: Some("test-ingester-qux".to_string()), + publish_position_inclusive: Some(Position::Beginning), + publish_token: Some("test-publish-token-bar".to_string()), + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(3)), + shard_state: ShardState::Open as i32, leader_id: "test-ingester-qux".to_string(), - ..Default::default() + follower_id: Some("test-ingester-baz".to_string()), + publish_position_inclusive: Some(Position::Beginning), + publish_token: None, + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(4)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-baz".to_string(), + follower_id: Some("test-ingester-tux".to_string()), + publish_position_inclusive: Some(Position::Beginning), + publish_token: None, + }, + ]; + metastore + .insert_shards(&test_index.index_uid, &test_index.source_id, shards) + .await; + + // Test acquire shards. + let acquires_shards_request = AcquireShardsRequest { + subrequests: vec![AcquireShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)], + publish_token: "test-publish-token-foo".to_string(), }], }; - let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); - assert_eq!(open_shards_response.subresponses.len(), 1); + let acquire_shards_response = metastore + .acquire_shards(acquires_shards_request) + .await + .unwrap(); + assert_eq!(acquire_shards_response.subresponses.len(), 1); - let subresponse = &open_shards_response.subresponses[0]; - assert_eq!(subresponse.index_uid, test_index.index_uid.as_str()); - assert_eq!(subresponse.source_id, test_index.source_config.source_id); - assert_eq!(subresponse.opened_shards.len(), 1); + let mut subresponses = acquire_shards_response.subresponses; + assert_eq!(subresponses[0].index_uid, test_index.index_uid); + assert_eq!(subresponses[0].source_id, test_index.source_id); + assert_eq!(subresponses[0].acquired_shards.len(), 3); - let shard = &subresponse.opened_shards[0]; - assert_eq!(shard.index_uid, test_index.index_uid.as_str()); - assert_eq!(shard.source_id, test_index.source_config.source_id); + subresponses[0] + .acquired_shards + .sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); + + let shard = &subresponses[0].acquired_shards[0]; + assert_eq!(shard.index_uid, test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); + assert_eq!(shard.shard_id(), ShardId::from(1)); + assert_eq!(shard.shard_state(), ShardState::Closed); + assert_eq!(shard.leader_id, "test-ingester-foo"); + assert_eq!(shard.follower_id(), "test-ingester-bar"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert_eq!(shard.publish_token(), "test-publish-token-foo"); + + let shard = &subresponses[0].acquired_shards[1]; + assert_eq!(shard.index_uid, test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); assert_eq!(shard.shard_id(), ShardId::from(2)); + assert_eq!(shard.shard_state(), ShardState::Open); + assert_eq!(shard.leader_id, "test-ingester-bar"); + assert_eq!(shard.follower_id(), "test-ingester-qux"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert_eq!(shard.publish_token(), "test-publish-token-foo"); + + let shard = &subresponses[0].acquired_shards[2]; + assert_eq!(shard.index_uid, test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); + assert_eq!(shard.shard_id(), ShardId::from(3)); + assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.leader_id, "test-ingester-qux"); + assert_eq!(shard.follower_id(), "test-ingester-baz"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert_eq!(shard.publish_token(), "test-publish-token-foo"); cleanup_index(&mut metastore, test_index.index_uid).await; } -pub async fn test_metastore_acquire_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, +pub async fn test_metastore_list_shards< + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, >() { - if !MetastoreUnderTest::run_other_tests() { - return; - } let mut metastore = MetastoreUnderTest::default_for_test().await; let test_index = TestIndex::create_index_with_source( &mut metastore, - "test-acquire-shards", + "test-list-shards", SourceConfig::ingest_v2_default(), ) .await; - // TODO + let shards = vec![ + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-foo".to_string(), + follower_id: Some("test-ingester-bar".to_string()), + publish_position_inclusive: Some(Position::Beginning), + publish_token: Some("test-publish-token-foo".to_string()), + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Closed as i32, + leader_id: "test-ingester-bar".to_string(), + follower_id: Some("test-ingester-qux".to_string()), + publish_position_inclusive: Some(Position::Beginning), + publish_token: Some("test-publish-token-bar".to_string()), + }, + ]; + metastore + .insert_shards(&test_index.index_uid, &test_index.source_id, shards) + .await; + + // Test list shards. + let list_shards_request = ListShardsRequest { + subrequests: vec![ListShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_state: None, + }], + }; + let list_shards_response = metastore.list_shards(list_shards_request).await.unwrap(); + assert_eq!(list_shards_response.subresponses.len(), 1); + + let mut subresponses = list_shards_response.subresponses; + assert_eq!(subresponses[0].index_uid, test_index.index_uid); + assert_eq!(subresponses[0].source_id, test_index.source_id); + assert_eq!(subresponses[0].shards.len(), 2); + + subresponses[0] + .shards + .sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); + + let shard = &subresponses[0].shards[0]; + assert_eq!(shard.index_uid, test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); + assert_eq!(shard.shard_id(), ShardId::from(1)); + assert_eq!(shard.shard_state(), ShardState::Open); + assert_eq!(shard.leader_id, "test-ingester-foo"); + assert_eq!(shard.follower_id(), "test-ingester-bar"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert_eq!(shard.publish_token(), "test-publish-token-foo"); + + let shard = &subresponses[0].shards[1]; + assert_eq!(shard.index_uid, test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); + assert_eq!(shard.shard_id(), ShardId::from(2)); + assert_eq!(shard.shard_state(), ShardState::Closed); + assert_eq!(shard.leader_id, "test-ingester-bar"); + assert_eq!(shard.follower_id(), "test-ingester-qux"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert_eq!(shard.publish_token(), "test-publish-token-bar"); + + // Test list shards with shard state filter. + let list_shards_request = ListShardsRequest { + subrequests: vec![ListShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_state: Some(ShardState::Open as i32), + }], + }; + let list_shards_response = metastore.list_shards(list_shards_request).await.unwrap(); + assert_eq!(list_shards_response.subresponses.len(), 1); + assert_eq!(list_shards_response.subresponses[0].shards.len(), 1); + + let shard = &list_shards_response.subresponses[0].shards[0]; + assert_eq!(shard.shard_id(), ShardId::from(1)); + assert_eq!(shard.shard_state(), ShardState::Open); + + let list_shards_request = ListShardsRequest { + subrequests: vec![ListShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_state: Some(ShardState::Unavailable as i32), + }], + }; + let list_shards_response = metastore.list_shards(list_shards_request).await.unwrap(); + assert_eq!(list_shards_response.subresponses.len(), 1); cleanup_index(&mut metastore, test_index.index_uid).await; } -pub async fn test_metastore_list_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, +pub async fn test_metastore_delete_shards< + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, >() { - if !MetastoreUnderTest::run_other_tests() { - return; - } let mut metastore = MetastoreUnderTest::default_for_test().await; let test_index = TestIndex::create_index_with_source( &mut metastore, - "test-open-shards", + "test-delete-shards", SourceConfig::ingest_v2_default(), ) .await; - // TODO + let shards = vec![ + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::Beginning), + ..Default::default() + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Closed as i32, + publish_position_inclusive: Some(Position::Beginning), + ..Default::default() + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(3)), + shard_state: ShardState::Closed as i32, + publish_position_inclusive: Some(Position::Eof(None)), + ..Default::default() + }, + ]; + metastore + .insert_shards(&test_index.index_uid, &test_index.source_id, shards) + .await; + + // Attempt to delete shards #1, #2, #3, and #4. + let delete_index_request = DeleteShardsRequest { + subrequests: vec![DeleteShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_ids: vec![ + ShardId::from(1), + ShardId::from(2), + ShardId::from(3), + ShardId::from(4), + ], + }], + force: false, + }; + metastore.delete_shards(delete_index_request).await.unwrap(); + + let mut all_shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(all_shards.len(), 2); + + all_shards.sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); + + assert_eq!(all_shards[0].shard_id(), ShardId::from(1)); + assert_eq!(all_shards[1].shard_id(), ShardId::from(2)); + + // Attempt to delete shards #1, #2, #3, and #4. + let delete_index_request = DeleteShardsRequest { + subrequests: vec![DeleteShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_ids: vec![ + ShardId::from(1), + ShardId::from(2), + ShardId::from(3), + ShardId::from(4), + ], + }], + force: true, + }; + metastore.delete_shards(delete_index_request).await.unwrap(); + + let all_shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(all_shards.len(), 0); cleanup_index(&mut metastore, test_index.index_uid).await; } -pub async fn test_metastore_delete_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, +pub async fn test_metastore_apply_checkpoint_delta_v2_single_shard< + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, >() { - if !MetastoreUnderTest::run_other_tests() { - return; - } let mut metastore = MetastoreUnderTest::default_for_test().await; let test_index = TestIndex::create_index_with_source( &mut metastore, - "test-open-shards", + "test-delete-shards", SourceConfig::ingest_v2_default(), ) .await; - // TODO + let mut source_delta = SourceCheckpointDelta::default(); + source_delta + .record_partition_delta( + PartitionId::from(0u64), + Position::Beginning, + Position::offset(0u64), + ) + .unwrap(); + let index_checkpoint_delta = IndexCheckpointDelta { + source_id: test_index.source_id.clone(), + source_delta, + }; + let index_checkpoint_delta_json = serde_json::to_string(&index_checkpoint_delta).unwrap(); + let publish_splits_request = PublishSplitsRequest { + index_uid: test_index.index_uid.clone().into(), + staged_split_ids: Vec::new(), + replaced_split_ids: Vec::new(), + index_checkpoint_delta_json_opt: Some(index_checkpoint_delta_json), + publish_token_opt: Some("test-publish-token-foo".to_string()), + }; + let error = metastore + .publish_splits(publish_splits_request) + .await + .unwrap_err(); + assert!(matches!( + error, + MetastoreError::NotFound(EntityKind::Shard { .. }) + )); + + let shards = vec![Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(0)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::Beginning), + publish_token: Some("test-publish-token-bar".to_string()), + ..Default::default() + }]; + metastore + .insert_shards(&test_index.index_uid, &test_index.source_id, shards) + .await; + + let index_checkpoint_delta_json = serde_json::to_string(&index_checkpoint_delta).unwrap(); + let publish_splits_request = PublishSplitsRequest { + index_uid: test_index.index_uid.clone().into(), + staged_split_ids: Vec::new(), + replaced_split_ids: Vec::new(), + index_checkpoint_delta_json_opt: Some(index_checkpoint_delta_json), + publish_token_opt: Some("test-publish-token-foo".to_string()), + }; + let error = metastore + .publish_splits(publish_splits_request.clone()) + .await + .unwrap_err(); + assert!( + matches!(error, MetastoreError::InvalidArgument { message } if message.contains("token")) + ); + + let index_checkpoint_delta_json = serde_json::to_string(&index_checkpoint_delta).unwrap(); + let publish_splits_request = PublishSplitsRequest { + index_uid: test_index.index_uid.clone().into(), + staged_split_ids: Vec::new(), + replaced_split_ids: Vec::new(), + index_checkpoint_delta_json_opt: Some(index_checkpoint_delta_json), + publish_token_opt: Some("test-publish-token-bar".to_string()), + }; + metastore + .publish_splits(publish_splits_request.clone()) + .await + .unwrap(); + + let shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].shard_state(), ShardState::Open); + assert_eq!( + shards[0].publish_position_inclusive(), + Position::offset(0u64) + ); + + let index_checkpoint_delta_json = serde_json::to_string(&index_checkpoint_delta).unwrap(); + let publish_splits_request = PublishSplitsRequest { + index_uid: test_index.index_uid.clone().into(), + staged_split_ids: Vec::new(), + replaced_split_ids: Vec::new(), + index_checkpoint_delta_json_opt: Some(index_checkpoint_delta_json), + publish_token_opt: Some("test-publish-token-bar".to_string()), + }; + let error = metastore + .publish_splits(publish_splits_request.clone()) + .await + .unwrap_err(); + assert!( + matches!(error, MetastoreError::InvalidArgument { message } if message.contains("checkpoint")) + ); + + let mut source_delta = SourceCheckpointDelta::default(); + source_delta + .record_partition_delta( + PartitionId::from(0u64), + Position::offset(0u64), + Position::eof(1u64), + ) + .unwrap(); + let index_checkpoint_delta = IndexCheckpointDelta { + source_id: test_index.source_id.clone(), + source_delta, + }; + let index_checkpoint_delta_json = serde_json::to_string(&index_checkpoint_delta).unwrap(); + let publish_splits_request = PublishSplitsRequest { + index_uid: test_index.index_uid.clone().into(), + staged_split_ids: Vec::new(), + replaced_split_ids: Vec::new(), + index_checkpoint_delta_json_opt: Some(index_checkpoint_delta_json), + publish_token_opt: Some("test-publish-token-bar".to_string()), + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].shard_state(), ShardState::Closed); + assert_eq!(shards[0].publish_position_inclusive(), Position::eof(1u64)); + cleanup_index(&mut metastore, test_index.index_uid).await; +} + +pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, +>() { + let mut metastore = MetastoreUnderTest::default_for_test().await; + + let test_index = TestIndex::create_index_with_source( + &mut metastore, + "test-delete-shards", + SourceConfig::ingest_v2_default(), + ) + .await; + + let shards = vec![ + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(0)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::offset(0u64)), + publish_token: Some("test-publish-token-foo".to_string()), + ..Default::default() + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::offset(1u64)), + publish_token: Some("test-publish-token-foo".to_string()), + ..Default::default() + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::offset(2u64)), + publish_token: Some("test-publish-token-foo".to_string()), + ..Default::default() + }, + Shard { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(3)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::offset(3u64)), + publish_token: Some("test-publish-token-bar".to_string()), + ..Default::default() + }, + ]; + metastore + .insert_shards(&test_index.index_uid, &test_index.source_id, shards) + .await; + + let mut source_delta = SourceCheckpointDelta::default(); + source_delta + .record_partition_delta( + PartitionId::from(0u64), + Position::offset(0u64), + Position::offset(10u64), + ) + .unwrap(); + source_delta + .record_partition_delta( + PartitionId::from(1u64), + Position::offset(1u64), + Position::offset(11u64), + ) + .unwrap(); + source_delta + .record_partition_delta( + PartitionId::from(2u64), + Position::offset(2u64), + Position::eof(12u64), + ) + .unwrap(); + let index_checkpoint_delta = IndexCheckpointDelta { + source_id: test_index.source_id.clone(), + source_delta, + }; + let index_checkpoint_delta_json = serde_json::to_string(&index_checkpoint_delta).unwrap(); + let publish_splits_request = PublishSplitsRequest { + index_uid: test_index.index_uid.clone().into(), + staged_split_ids: Vec::new(), + replaced_split_ids: Vec::new(), + index_checkpoint_delta_json_opt: Some(index_checkpoint_delta_json), + publish_token_opt: Some("test-publish-token-foo".to_string()), + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mut shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(shards.len(), 4); + + shards.sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); + + let shard = &shards[0]; + assert_eq!(shard.shard_id(), ShardId::from(0)); + assert_eq!(shard.shard_state(), ShardState::Open); + assert_eq!(shard.publish_position_inclusive(), Position::offset(10u64)); + + let shard = &shards[1]; + assert_eq!(shard.shard_id(), ShardId::from(1)); + assert_eq!(shard.shard_state(), ShardState::Open); + assert_eq!(shard.publish_position_inclusive(), Position::offset(11u64)); + + let shard = &shards[2]; + assert_eq!(shard.shard_id(), ShardId::from(2)); + assert_eq!(shard.shard_state(), ShardState::Closed); + assert_eq!(shard.publish_position_inclusive(), Position::eof(12u64)); + + let shard = &shards[3]; + assert_eq!(shard.shard_id(), ShardId::from(3)); + assert_eq!(shard.shard_state(), ShardState::Open); + assert_eq!(shard.publish_position_inclusive(), Position::offset(3u64)); cleanup_index(&mut metastore, test_index.index_uid).await; } diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs index b5adc7aef7a..20853814cf6 100644 --- a/quickwit/quickwit-proto/src/types/mod.rs +++ b/quickwit/quickwit-proto/src/types/mod.rs @@ -191,6 +191,12 @@ impl PartialEq for IndexUid { } } +impl PartialEq for String { + fn eq(&self, other: &IndexUid) -> bool { + *self == other.0 + } +} + /// It can however appear only once in a given index. /// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is. #[derive(PartialEq, Eq, Debug, PartialOrd, Ord, Hash, Clone)]