Skip to content

Commit

Permalink
Issue on query per list shards request instead of one per subrequest
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored and fulmicoton committed May 9, 2024
1 parent 3e105f0 commit f98ee93
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 145 deletions.
4 changes: 2 additions & 2 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls",
] }
rust-embed = "6.8.1"
sea-query = { version = "0" }
sea-query-binder = { version = "0", features = [
sea-query = { version = "0.30" }
sea-query-binder = { version = "0.5", features = [
"runtime-tokio-rustls",
"sqlx-postgres",
] }
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,12 +787,12 @@ impl MetastoreService for FileBackedMetastore {
};
// We must group the subrequests by `index_uid` to mutate each index only once, since each
// mutation triggers an IO.
let grouped_subrequests: HashMap<IndexUid, Vec<OpenShardSubrequest>> = request
let per_index_uid_subrequests: HashMap<IndexUid, Vec<OpenShardSubrequest>> = request
.subrequests
.into_iter()
.into_group_map_by(|subrequest| subrequest.index_uid().clone());

for (index_uid, subrequests) in grouped_subrequests {
for (index_uid, subrequests) in per_index_uid_subrequests {
let subresponses = self
.mutate(&index_uid, |index| index.open_shards(subrequests))
.await?;
Expand Down
108 changes: 75 additions & 33 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::fmt::{self, Write};

use async_trait::async_trait;
Expand Down Expand Up @@ -47,6 +48,8 @@ use quickwit_proto::metastore::{
};
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId};
use sea_query::{Asterisk, PostgresQueryBuilder, Query};
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId};
use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType};
use sea_query_binder::SqlxBinder;
use sqlx::{Acquire, Executor, Postgres, Transaction};
use tracing::{debug, info, instrument, warn};
Expand All @@ -61,6 +64,7 @@ use crate::checkpoint::{
IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta,
};
use crate::file_backed::MutationOccurred;
use crate::metastore::postgres::model::Shards;
use crate::metastore::postgres::utils::split_maturity_timestamp;
use crate::metastore::{PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE};
use crate::{
Expand Down Expand Up @@ -723,17 +727,17 @@ impl MetastoreService for PostgresqlMetastore {
&mut self,
request: ListSplitsRequest,
) -> MetastoreResult<MetastoreServiceStream<ListSplitsResponse>> {
let query = request.deserialize_list_splits_query()?;
let mut sql_builder = Query::select();
sql_builder.column(Asterisk).from(Splits::Table);
append_query_filters(&mut sql_builder, &query);
let list_splits_query = request.deserialize_list_splits_query()?;
let mut sql_query_builder = Query::select();
sql_query_builder.column(Asterisk).from(Splits::Table);
append_query_filters(&mut sql_query_builder, &list_splits_query);

let (sql, values) = sql_builder.build_sqlx(PostgresQueryBuilder);
let (sql_query, values) = sql_query_builder.build_sqlx(PostgresQueryBuilder);
let pg_split_stream = SplitStream::new(
self.connection_pool.clone(),
sql,
|connection_pool: &TrackedPool<Postgres>, sql: &String| {
sqlx::query_as_with::<_, PgSplit, _>(sql, values).fetch(connection_pool)
sql_query,
|connection_pool: &TrackedPool<Postgres>, sql_query: &String| {
sqlx::query_as_with::<_, PgSplit, _>(sql_query, values).fetch(connection_pool)
},
);
let split_stream =
Expand Down Expand Up @@ -1257,35 +1261,73 @@ impl MetastoreService for PostgresqlMetastore {
&mut self,
request: ListShardsRequest,
) -> MetastoreResult<ListShardsResponse> {
const LIST_SHARDS_QUERY: &str = include_str!("queries/shards/list.sql");
if request.subrequests.is_empty() {
return Ok(Default::default());
}
let mut sql_query_builder = Query::select();

let mut subresponses = Vec::with_capacity(request.subrequests.len());
for (idx, subrequest) in request.subrequests.iter().enumerate() {
let mut sql_subquery_builder = Query::select();

for subrequest in request.subrequests {
let shard_state: Option<&'static str> = match subrequest.shard_state() {
ShardState::Unspecified => None,
ShardState::Open => Some("open"),
ShardState::Closed => Some("closed"),
ShardState::Unavailable => Some("unavailable"),
};
let pg_shards: Vec<PgShard> = sqlx::query_as(LIST_SHARDS_QUERY)
.bind(subrequest.index_uid().to_string())
.bind(&subrequest.source_id)
.bind(shard_state)
.fetch_all(&self.connection_pool)
.await?;
sql_subquery_builder
.column(Asterisk)
.from(Shards::Table)
.and_where(Expr::col(Shards::IndexUid).eq(subrequest.index_uid().to_string()))
.and_where(Expr::col(Shards::SourceId).eq(&subrequest.source_id));

let shard_state = subrequest.shard_state();

let shards = pg_shards
if shard_state != ShardState::Unspecified {
let shard_state_str = shard_state.as_json_str_name();
let shard_state_alias = Alias::new("SHARD_STATE");
let cast_expr = Func::cast_as(shard_state_str, shard_state_alias);
sql_subquery_builder.and_where(Expr::col(Shards::ShardState).eq(cast_expr));
}
if idx == 0 {
sql_query_builder = sql_subquery_builder;
} else {
sql_query_builder.union(UnionType::All, sql_subquery_builder);
}
}
let (sql_query, values) = sql_query_builder.build_sqlx(PostgresQueryBuilder);

let pg_shards: Vec<PgShard> = sqlx::query_as_with::<_, PgShard, _>(&sql_query, values)
.fetch_all(&self.connection_pool)
.await?;

let mut per_source_subresponses: HashMap<(IndexUid, SourceId), ListShardsSubresponse> =
request
.subrequests
.into_iter()
.map(|pg_shard| pg_shard.into())
.map(|subrequest| {
let index_uid = subrequest.index_uid().clone();
let source_id = subrequest.source_id.clone();
(
(index_uid, source_id),
ListShardsSubresponse {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shards: Vec::new(),
},
)
})
.collect();

subresponses.push(ListShardsSubresponse {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shards,
});
for pg_shard in pg_shards {
let shard: Shard = pg_shard.into();
let source_key = (shard.index_uid().clone(), shard.source_id.clone());

let Some(subresponse) = per_source_subresponses.get_mut(&source_key) else {
warn!(
index_uid=%shard.index_uid(),
source_id=%shard.source_id,
"could not find source in subresponses: this should never happen, please report"
);
continue;
};
subresponse.shards.push(shard);
}
let subresponses = per_source_subresponses.into_values().collect();
Ok(ListShardsResponse { subresponses })
}

Expand Down Expand Up @@ -1472,7 +1514,7 @@ async fn open_or_fetch_shard<'e>(
const OPEN_SHARDS_QUERY: &str = include_str!("queries/shards/open.sql");

let pg_shard_opt: Option<PgShard> = sqlx::query_as(OPEN_SHARDS_QUERY)
.bind(subrequest.index_uid().to_string())
.bind(subrequest.index_uid())
.bind(&subrequest.source_id)
.bind(subrequest.shard_id().as_str())
.bind(&subrequest.leader_id)
Expand All @@ -1483,7 +1525,7 @@ async fn open_or_fetch_shard<'e>(
if let Some(pg_shard) = pg_shard_opt {
let shard: Shard = pg_shard.into();
info!(
index_id=%shard.index_uid(),
index_uid=%shard.index_uid(),
source_id=%shard.source_id,
shard_id=%shard.shard_id(),
leader_id=%shard.leader_id,
Expand All @@ -1495,7 +1537,7 @@ async fn open_or_fetch_shard<'e>(
const FETCH_SHARD_QUERY: &str = include_str!("queries/shards/fetch.sql");

let pg_shard_opt: Option<PgShard> = sqlx::query_as(FETCH_SHARD_QUERY)
.bind(subrequest.index_uid().to_string())
.bind(subrequest.index_uid())
.bind(&subrequest.source_id)
.bind(subrequest.shard_id().as_str())
.fetch_optional(executor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ pub(super) struct PgShard {
impl From<PgShard> for Shard {
fn from(pg_shard: PgShard) -> Self {
Shard {
index_uid: pg_shard.index_uid.into(),
index_uid: Some(pg_shard.index_uid),
source_id: pg_shard.source_id,
shard_id: Some(pg_shard.shard_id),
shard_state: ShardState::from(pg_shard.shard_state) as i32,
Expand Down

This file was deleted.

Loading

0 comments on commit f98ee93

Please sign in to comment.