diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 8fad26cde82..06c5cfbd6f8 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -52,6 +52,7 @@ use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType}; use sea_query_binder::SqlxBinder; use sqlx::{Acquire, Executor, Postgres, Transaction}; +use time::OffsetDateTime; use tracing::{debug, info, instrument, warn}; use super::error::convert_sqlx_err; @@ -255,6 +256,7 @@ async fn try_apply_delta_v2( shard_ids.push(shard_id.to_string()); new_positions.push(new_position.to_string()); } + sqlx::query( r#" UPDATE @@ -262,7 +264,7 @@ async fn try_apply_delta_v2( SET publish_position_inclusive = new_positions.position, shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END, - update_timestamp = CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + update_timestamp = $5 FROM UNNEST($3, $4) AS new_positions(shard_id, position) @@ -276,6 +278,8 @@ async fn try_apply_delta_v2( .bind(source_id) .bind(shard_ids) .bind(new_positions) + // Use a timestamp generated by the metastore node to avoid clock drift issues + .bind(OffsetDateTime::now_utc()) .execute(tx.as_mut()) .await?; Ok(()) @@ -1639,6 +1643,8 @@ async fn open_or_fetch_shard<'e>( .bind(&subrequest.follower_id) .bind(subrequest.doc_mapping_uid) .bind(&subrequest.publish_token) + // Use a timestamp generated by the metastore node to avoid clock drift issues + .bind(OffsetDateTime::now_utc()) .fetch_optional(executor.clone()) .await?; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql index b9bd5139504..7b4273e587d 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql @@ -1,5 +1,5 @@ INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token, update_timestamp) - VALUES ($1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP AT TIME ZONE 'UTC') + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING RETURNING