Skip to content

Commit

Permalink
Add implementation for open_shards in the metastore
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalesh0406 committed Nov 19, 2023
1 parent e25b9b0 commit 7557458
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 7 deletions.
106 changes: 100 additions & 6 deletions quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use itertools::Itertools;
use quickwit_common::uri::Uri;
use quickwit_common::PrettySample;
use quickwit_config::{
validate_index_id_pattern, MetastoreBackend, MetastoreConfig, PostgresMetastoreConfig,
};
use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, DeleteIndexRequest, DeleteQuery, DeleteShardsRequest,
Expand All @@ -40,10 +42,11 @@ use quickwit_proto::metastore::{
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse,
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, OpenShardsRequest,
OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
OpenShardsResponse, OpenShardsSubrequest, OpenShardsSubresponse, PublishSplitsRequest,
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
};
use quickwit_proto::types::IndexUid;
use quickwit_proto::types::{IndexUid, Position};
use sea_query::{
all, any, Asterisk, Cond, Expr, Func, Order, PostgresQueryBuilder, Query, SelectStatement,
};
Expand All @@ -56,7 +59,9 @@ use tracing::log::LevelFilter;
use tracing::{debug, error, info, instrument, warn};

use crate::checkpoint::IndexCheckpointDelta;
use crate::metastore::postgresql_model::{PgDeleteTask, PgIndex, PgSplit, Splits, ToTimestampFunc};
use crate::metastore::postgresql_model::{
PgDeleteTask, PgIndex, PgShardState, PgShards, PgSplit, Splits, ToTimestampFunc,
};
use crate::metastore::{instrument_metastore, FilterRange, PublishSplitsRequestExt};
use crate::{
AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt,
Expand Down Expand Up @@ -1291,9 +1296,23 @@ impl MetastoreService for PostgresqlMetastore {

async fn open_shards(
&mut self,
_request: OpenShardsRequest,
request: OpenShardsRequest,
) -> MetastoreResult<OpenShardsResponse> {
unimplemented!("`open_shards` is not implemented for PostgreSQL metastore")
let grouped_subrequests: HashMap<IndexUid, Vec<OpenShardsSubrequest>> = request
.subrequests
.into_iter()
.into_group_map_by(|subrequest| IndexUid::from(subrequest.index_uid.clone()));
let mut grouped_subresponses: Vec<Vec<OpenShardsSubresponse>> =
Vec::with_capacity(grouped_subrequests.len());

for (_index_uid, subrequests) in grouped_subrequests.into_iter() {
grouped_subresponses
.push(open_pgshards_by_index(&self.connection_pool, subrequests).await);
}

Ok(OpenShardsResponse {
subresponses: grouped_subresponses.concat(),
})
}

async fn acquire_shards(
Expand Down Expand Up @@ -1379,6 +1398,81 @@ fn tags_filter_expression_helper(tags: &TagFilterAst) -> Cond {
}
}

async fn open_pgshards_by_index(
connection_pool: &Pool<Postgres>,
subrequests: Vec<OpenShardsSubrequest>,
) -> Vec<OpenShardsSubresponse> {
let mut subresponses: Vec<OpenShardsSubresponse> = Vec::with_capacity(subrequests.len());

for subrequest in subrequests.into_iter() {
if let Ok(subresponse) = open_pgshard(connection_pool, subrequest).await {
subresponses.push(subresponse);
}
}

subresponses
}

async fn open_pgshard(
connection_pool: &Pool<Postgres>,
subrequest: OpenShardsSubrequest,
) -> MetastoreResult<OpenShardsSubresponse> {
let index_uid = subrequest.index_uid;

let pg_existing_shards: Option<PgShards> = sqlx::query_as::<_, PgShards>(
r#"
SELECT index_uid, source_id, shard_id FROM shards WHERE index_uid = $1 AND source_id = $2 AND shard_id = $3
"#
)
.bind(index_uid.to_string())
.bind(subrequest.source_id.to_string())
.bind(subrequest.next_shard_id as i64)
.fetch_optional(connection_pool)
.await?;

if pg_existing_shards.is_none() {
let shard_state: String = (PgShardState::Open).into();

sqlx::query(
r#"INSERT INTO shards (index_uid, source_id, shard_id, leader_id,
follower_id, shard_state, publish_position_inclusive, publish_token)
VALUES($1, $2, DEFAULT, $3, $4, $5, $6, $7)"#,
)
.bind(index_uid.to_string())
.bind(subrequest.source_id.to_string())
.bind(subrequest.leader_id.to_string())
.bind(subrequest.follower_id.clone())
.bind(shard_state)
.bind(Position::Beginning.as_str())
.bind(String::from(""))
.execute(connection_pool)
.await
.map_err(|err| convert_sqlx_err(IndexUid::from(index_uid.to_string()).index_id(), err))?;
}

let pg_last_opened_shard: PgShards = sqlx::query_as::<_, PgShards>(
r#"
SELECT * FROM shards WHERE index_uid = $1 AND source_id = $2 ORDER BY shard_id DESC
"#,
)
.bind(index_uid.to_string())
.bind(subrequest.source_id.to_string())
.fetch_one(connection_pool)
.await
.map_err(|err| convert_sqlx_err(IndexUid::from(index_uid.to_string()).index_id(), err))?;

let next_shard_id = pg_last_opened_shard.shard_id as u64 + 1;
let opened_shards: Vec<Shard> = vec![pg_last_opened_shard.into()];

Ok(OpenShardsSubresponse {
subrequest_id: subrequest.subrequest_id,
index_uid,
source_id: subrequest.source_id,
opened_shards,
next_shard_id,
})
}

/// Builds a SQL query that returns indexes which match at least one pattern in
/// `index_id_patterns`. For each pattern, we check if the pattern is valid and replace `*` by `%`
/// to build a SQL `LIKE` query.
Expand Down
65 changes: 64 additions & 1 deletion quickwit/quickwit-metastore/src/metastore/postgresql_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
use std::convert::TryInto;
use std::str::FromStr;

use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{DeleteQuery, DeleteTask, MetastoreError, MetastoreResult};
use quickwit_proto::types::IndexUid;
use quickwit_proto::types::{IndexUid, ShardId, SourceId};
use sea_query::{Iden, Write};
use tracing::error;

Expand Down Expand Up @@ -210,3 +211,65 @@ impl TryInto<DeleteTask> for PgDeleteTask {
})
}
}

#[derive(sqlx::Type)]
#[sqlx(type_name = "SHARD_STATE")]
pub enum PgShardState {
Unspecified,
Open,
Unavailable,
Closed,
}

impl From<PgShardState> for String {
fn from(val: PgShardState) -> Self {
match val {
PgShardState::Unspecified => String::from("unspecified"),
PgShardState::Open => String::from("open"),
PgShardState::Unavailable => String::from("unavailable"),
PgShardState::Closed => String::from("closed"),
}
}
}

impl From<PgShardState> for ShardState {
fn from(val: PgShardState) -> Self {
match val {
PgShardState::Unspecified => ShardState::Unspecified,
PgShardState::Open => ShardState::Open,
PgShardState::Unavailable => ShardState::Unavailable,
PgShardState::Closed => ShardState::Closed,
}
}
}

#[derive(sqlx::FromRow)]
pub struct PgShards {
#[sqlx(try_from = "String")]
pub index_uid: IndexUid,
#[sqlx(try_from = "String")]
pub source_id: SourceId,
#[sqlx(try_from = "i64")]
pub shard_id: ShardId,
pub leader_id: String,
pub follower_id: Option<String>,
pub shard_state: PgShardState,
pub publish_position_inclusive: String,
pub publish_token: Option<String>,
}

impl From<PgShards> for Shard {
fn from(val: PgShards) -> Self {
let shard_state: ShardState = val.shard_state.into();
Shard {
index_uid: val.index_uid.clone().into(),
source_id: val.source_id.clone(),
shard_id: val.shard_id,
leader_id: val.leader_id.clone(),
follower_id: val.follower_id.clone(),
shard_state: shard_state.into(),
publish_position_inclusive: Some(val.publish_position_inclusive.clone().into()),
publish_token: val.publish_token.clone(),
}
}
}

0 comments on commit 7557458

Please sign in to comment.