Skip to content

Commit

Permalink
Implement Shard API for PostgreSQL metastore
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jan 17, 2024
1 parent 9d0a223 commit 3616d6c
Show file tree
Hide file tree
Showing 30 changed files with 1,727 additions and 572 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE shards
ALTER COLUMN shard_id TYPE BIGSERIAL,
ALTER COLUMN shard_id DROP NOT NULL,
ALTER COLUMN shard_state DROP DEFAULT,
ALTER COLUMN publish_position_inclusive DROP DEFAULT,
DROP CONSTRAINT shards_index_uid_fkey,
ADD CONSTRAINT shards_index_uid_fkey FOREIGN KEY (index_uid) REFERENCES indexes(index_uid)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE shards
ALTER COLUMN shard_id TYPE VARCHAR(255),
ALTER COLUMN shard_id SET NOT NULL,
ALTER COLUMN shard_state SET DEFAULT 'open',
ALTER COLUMN publish_position_inclusive SET DEFAULT '',
DROP CONSTRAINT shards_index_uid_fkey,
ADD CONSTRAINT shards_index_uid_fkey FOREIGN KEY (index_uid) REFERENCES indexes(index_uid) ON DELETE CASCADE
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_config::TestableForRegression;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;

use crate::file_backed_metastore::file_backed_index::FileBackedIndex;
use crate::file_backed::file_backed_index::FileBackedIndex;
use crate::{IndexMetadata, SplitMetadata};

/// In order to avoid confusion, we need to make sure that the
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-metastore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ use std::ops::Range;

pub use error::MetastoreResolverError;
pub use metastore::control_plane_metastore::ControlPlaneMetastore;
pub use metastore::file_backed_metastore::FileBackedMetastore;
pub use metastore::file_backed::FileBackedMetastore;
pub(crate) use metastore::index_metadata::serialize::{IndexMetadataV0_7, VersionedIndexMetadata};
#[cfg(feature = "postgres")]
pub use metastore::postgresql_metastore::PostgresqlMetastore;
pub use metastore::postgres::PostgresqlMetastore;
pub use metastore::{
file_backed_metastore, AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata,
file_backed, AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata,
IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery,
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,12 +727,34 @@ mod tests {

use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_doc_mapper::{BinaryFormat, FieldMappingType};
use quickwit_proto::types::IndexUid;
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::ListShardsSubrequest;
use quickwit_proto::types::{IndexUid, SourceId};

use super::FileBackedIndex;
use crate::file_backed_metastore::file_backed_index::split_query_predicate;
use crate::file_backed::file_backed_index::split_query_predicate;
use crate::{ListSplitsQuery, Split, SplitMetadata, SplitState};

impl FileBackedIndex {
pub(crate) fn insert_shards(&mut self, source_id: &SourceId, shards: Vec<Shard>) {
self.per_source_shards
.get_mut(source_id)
.unwrap()
.insert_shards(shards)
}

pub(crate) fn list_all_shards(&self, source_id: &SourceId) -> Vec<Shard> {
self.per_source_shards
.get(source_id)
.unwrap()
.list_shards(ListShardsSubrequest {
..Default::default()
})
.unwrap()
.shards
}
}

fn make_splits() -> [Split; 3] {
[
Split {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_proto::types::SourceId;
use serde::{Deserialize, Serialize};

use super::shards::Shards;
use crate::file_backed_metastore::file_backed_index::FileBackedIndex;
use crate::file_backed::file_backed_index::FileBackedIndex;
use crate::metastore::DeleteTask;
use crate::{IndexMetadata, Split};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use quickwit_proto::metastore::{
ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult,
OpenShardsSubrequest, OpenShardsSubresponse,
};
use quickwit_proto::types::{queue_id, IndexUid, Position, ShardId, SourceId};
use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId};
use tracing::{info, warn};

use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta};
use crate::file_backed_metastore::MutationOccurred;
use crate::file_backed::MutationOccurred;

// TODO: Rename `SourceShards`
/// Manages the shards of a source.
Expand Down Expand Up @@ -204,8 +204,8 @@ impl Shards {
if let Entry::Occupied(entry) = self.shards.entry(shard_id.clone()) {
let shard = entry.get();
if !force && !shard.publish_position_inclusive().is_eof() {
let message = format!("shard `{shard_id}` is not deletable");
return Err(MetastoreError::InvalidArgument { message });
warn!("shard `{shard_id}` is not deletable");
continue;
}
info!(
index_id=%self.index_uid.index_id(),
Expand Down Expand Up @@ -236,7 +236,7 @@ impl Shards {
pub(super) fn try_apply_delta(
&mut self,
checkpoint_delta: SourceCheckpointDelta,
publish_token: String,
publish_token: PublishToken,
) -> MetastoreResult<MutationOccurred<()>> {
if checkpoint_delta.is_empty() {
return Ok(MutationOccurred::No(()));
Expand Down Expand Up @@ -294,6 +294,15 @@ mod tests {

use super::*;

impl Shards {
pub(crate) fn insert_shards(&mut self, shards: Vec<Shard>) {
for shard in shards {
let shard_id = shard.shard_id().clone();
self.shards.insert(shard_id, shard);
}
}
}

#[test]
fn test_open_shards() {
let index_uid: IndexUid = "test-index:0".into();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl MetastoreFactory for FileBackedMetastoreFactory {
mod tests {
use std::time::Duration;

use crate::metastore::file_backed_metastore::file_backed_metastore_factory::extract_polling_interval_from_uri;
use crate::metastore::file_backed::file_backed_metastore_factory::extract_polling_interval_from_uri;

#[test]
fn test_extract_polling_interval_from_uri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,9 @@ mod tests {
use futures::executor::block_on;
use quickwit_common::uri::Protocol;
use quickwit_config::IndexConfig;
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::{DeleteQuery, MetastoreError};
use quickwit_proto::types::SourceId;
use quickwit_query::query_ast::qast_helper;
use quickwit_storage::{MockStorage, RamStorage, Storage, StorageErrorKind};
use rand::Rng;
Expand All @@ -993,9 +995,36 @@ mod tests {
};
use super::*;
use crate::metastore::MetastoreServiceStreamSplitsExt;
use crate::tests::shard::ReadWriteShardsForTest;
use crate::tests::DefaultForTest;
use crate::{metastore_test_suite, IndexMetadata, ListSplitsQuery, SplitMetadata, SplitState};

#[async_trait]
impl ReadWriteShardsForTest for FileBackedMetastore {
async fn insert_shards(
&mut self,
index_uid: &IndexUid,
source_id: &SourceId,
shards: Vec<Shard>,
) {
self.mutate(index_uid.clone(), |index| {
index.insert_shards(source_id, shards);
Ok(MutationOccurred::Yes(()))
})
.await
.unwrap();
}

async fn list_all_shards(&self, index_uid: &IndexUid, source_id: &SourceId) -> Vec<Shard> {
self.read(index_uid.clone(), |index| {
let shards = index.list_all_shards(source_id);
Ok(shards)
})
.await
.unwrap()
}
}

metastore_test_suite!(crate::FileBackedMetastore);

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_storage::{Storage, StorageError, StorageErrorKind};
use serde::{Deserialize, Serialize};

use super::{IndexState, LazyFileBackedIndex};
use crate::metastore::file_backed_metastore::file_backed_index::FileBackedIndex;
use crate::metastore::file_backed::file_backed_index::FileBackedIndex;

/// Indexes states file managed by [`FileBackedMetastore`](crate::FileBackedMetastore).
const INDEXES_STATES_FILENAME: &str = "indexes_states.json";
Expand Down
8 changes: 3 additions & 5 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

pub mod file_backed_metastore;
pub mod file_backed;
pub(crate) mod index_metadata;
#[cfg(feature = "postgres")]
pub mod postgresql_metastore;
#[cfg(feature = "postgres")]
mod postgresql_model;
pub mod postgres;

pub mod control_plane_metastore;

Expand All @@ -49,7 +47,7 @@ use crate::checkpoint::IndexCheckpointDelta;
use crate::{Split, SplitMetadata, SplitState};

/// Splits batch size returned by the stream splits API
const STREAM_SPLITS_CHUNK_SIZE: usize = 1_000;
const STREAM_SPLITS_CHUNK_SIZE: usize = 100;

static METASTORE_METRICS_LAYER: Lazy<PrometheusMetricsLayer<1>> =
Lazy::new(|| PrometheusMetricsLayer::new("quickwit_metastore", ["request"]));
Expand Down
70 changes: 70 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_proto::metastore::{EntityKind, MetastoreError};
use sqlx::postgres::PgDatabaseError;
use tracing::error;

// https://www.postgresql.org/docs/current/errcodes-appendix.html
mod pg_error_codes {
pub const FOREIGN_KEY_VIOLATION: &str = "23503";
pub const UNIQUE_VIOLATION: &str = "23505";
}

pub(super) fn convert_sqlx_err(index_id: &str, sqlx_error: sqlx::Error) -> MetastoreError {
match &sqlx_error {
sqlx::Error::Database(boxed_db_error) => {
let pg_db_error = boxed_db_error.downcast_ref::<PgDatabaseError>();
let pg_error_code = pg_db_error.code();
let pg_error_table = pg_db_error.table();

match (pg_error_code, pg_error_table) {
(pg_error_codes::FOREIGN_KEY_VIOLATION, _) => {
MetastoreError::NotFound(EntityKind::Index {
index_id: index_id.to_string(),
})
}
(pg_error_codes::UNIQUE_VIOLATION, Some(table)) if table.starts_with("indexes") => {
MetastoreError::AlreadyExists(EntityKind::Index {
index_id: index_id.to_string(),
})
}
(pg_error_codes::UNIQUE_VIOLATION, _) => {
error!(error=?boxed_db_error, "postgresql-error");
MetastoreError::Internal {
message: "unique key violation".to_string(),
cause: format!("DB error {boxed_db_error:?}"),
}
}
_ => {
error!(error=?boxed_db_error, "postgresql-error");
MetastoreError::Db {
message: boxed_db_error.to_string(),
}
}
}
}
_ => {
error!(error=?sqlx_error, "an error has occurred in the database operation");
MetastoreError::Db {
message: sqlx_error.to_string(),
}
}
}
}
100 changes: 100 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{MetastoreBackend, MetastoreConfig};
use quickwit_proto::metastore::MetastoreServiceClient;
use tokio::sync::Mutex;
use tracing::debug;

use crate::metastore::instrument_metastore;
use crate::{MetastoreFactory, MetastoreResolverError, PostgresqlMetastore};

#[derive(Clone, Default)]
pub struct PostgresqlMetastoreFactory {
// Under normal conditions of use, this cache will contain a single `Metastore`.
//
// In contrast to the file-backed metastore, we use a strong pointer here, so that the
// `Metastore` doesn't get dropped. This is done in order to keep the underlying connection
// pool to Postgres alive.
cache: Arc<Mutex<HashMap<Uri, MetastoreServiceClient>>>,
}

impl PostgresqlMetastoreFactory {
async fn get_from_cache(&self, uri: &Uri) -> Option<MetastoreServiceClient> {
let cache_lock = self.cache.lock().await;
cache_lock.get(uri).map(MetastoreServiceClient::clone)
}

/// If there is a valid entry in the cache to begin with, we trash the new
/// one and return the old one.
///
/// This way we make sure that we keep only one instance associated
/// to the key `uri` outside of this struct.
async fn cache_metastore(
&self,
uri: Uri,
metastore: MetastoreServiceClient,
) -> MetastoreServiceClient {
let mut cache_lock = self.cache.lock().await;
if let Some(metastore) = cache_lock.get(&uri) {
return metastore.clone();
}
cache_lock.insert(uri, metastore.clone());
metastore
}
}

#[async_trait]
impl MetastoreFactory for PostgresqlMetastoreFactory {
fn backend(&self) -> MetastoreBackend {
MetastoreBackend::PostgreSQL
}

async fn resolve(
&self,
metastore_config: &MetastoreConfig,
uri: &Uri,
) -> Result<MetastoreServiceClient, MetastoreResolverError> {
if let Some(metastore) = self.get_from_cache(uri).await {
debug!("using metastore from cache");
return Ok(metastore);
}
debug!("metastore not found in cache");
let postgresql_metastore_config = metastore_config.as_postgres().ok_or_else(|| {
let message = format!(
"expected PostgreSQL metastore config, got `{:?}`",
metastore_config.backend()
);
MetastoreResolverError::InvalidConfig(message)
})?;
let postgresql_metastore = PostgresqlMetastore::new(postgresql_metastore_config, uri)
.await
.map_err(MetastoreResolverError::Initialization)?;
let instrumented_metastore = instrument_metastore(postgresql_metastore);
let unique_metastore_for_uri = self
.cache_metastore(uri.clone(), instrumented_metastore)
.await;
Ok(unique_metastore_for_uri)
}
}
Loading

0 comments on commit 3616d6c

Please sign in to comment.