From e802e628e714a36ddc9901372081c9c7c2d29315 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 21 Aug 2024 14:08:36 +0200 Subject: [PATCH] Add update timestamp to shard model --- docs/internals/backward-compatibility.md | 7 ++-- .../src/control_plane.rs | 4 ++ .../src/ingest/ingest_controller.rs | 5 ++- .../src/source/ingest/mod.rs | 9 +++++ .../src/source/queue_sources/shared_state.rs | 2 + .../quickwit-ingest/src/ingest_v2/ingester.rs | 1 + ..._add-shard-update-timestamp-field.down.sql | 2 + ...21_add-shard-update-timestamp-field.up.sql | 4 ++ .../file_backed/file_backed_index/mod.rs | 1 + .../file_backed/file_backed_index/shards.rs | 3 ++ .../src/metastore/postgres/metastore.rs | 38 +++++++++++++++---- .../src/metastore/postgres/model.rs | 2 + .../postgres/queries/shards/insert.sql | 4 +- .../postgres/queries/shards/open.sql | 4 +- .../quickwit-metastore/src/tests/shard.rs | 28 ++++++++++++++ .../file-backed-index/v0.7.expected.json | 3 +- .../file-backed-index/v0.8.expected.json | 3 +- .../file-backed-index/v0.9.expected.json | 3 +- .../test-data/file-backed-index/v0.9.json | 3 +- quickwit/quickwit-proto/build.rs | 4 ++ .../protos/quickwit/ingest.proto | 3 ++ .../src/codegen/quickwit/quickwit.ingest.rs | 4 ++ quickwit/quickwit-proto/src/lib.rs | 7 ++++ 23 files changed, 123 insertions(+), 21 deletions(-) create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.up.sql diff --git a/docs/internals/backward-compatibility.md b/docs/internals/backward-compatibility.md index ff8bf5ed13c..4c8101fd81e 100644 --- a/docs/internals/backward-compatibility.md +++ b/docs/internals/backward-compatibility.md @@ -1,7 +1,7 @@ # Backward compatibility in Quickwit. If you are reading this, chances are you want to make a change to one of the resource -of quickwit's meta/config: +of Quickwit's meta/config: User edited: - QuickwitConfig @@ -19,7 +19,7 @@ Quickwit currently manages backward compatibility of all of these resources but This document describes how to handle a change, and how to make test such a change, and spot eventual regression. -# How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`? +## How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`? There are two types of upgrades. @@ -45,6 +45,7 @@ non-regression. When introducing such a change: - modify your model with the help of the attributes above. +- modify the example for the model by editing its `TestableForRegression` trait implementation. - commit the 2 files that were updated by build.rs - eyeball the diff on the `.expected.json` that failed, and send it with your PR. @@ -121,5 +122,3 @@ most recent version. The unit test will start making sense in future updates thanks to the update phase described in the previous section. - - diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 26b6a00b7be..e04def73ccd 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -1924,6 +1924,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: None, publish_token: None, + update_timestamp: 1724158996, }], }], }; @@ -2054,6 +2055,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: None, publish_token: None, + update_timestamp: 1724158996, }], }], }; @@ -2342,6 +2344,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, + update_timestamp: 1724158996, }), }], }; @@ -2495,6 +2498,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, + update_timestamp: 1724158996, }), }], }; diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 8636cca7379..6782a906c48 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -789,10 +789,12 @@ impl IngestController { shard_id: Some(shard_id), leader_id: leader_id.to_string(), follower_id: follower_id_opt.as_ref().map(ToString::to_string), - shard_state: ShardState::Open as i32, doc_mapping_uid: Some(doc_mapping_uid), + // TODO: these fields are not used by init_shard() + shard_state: ShardState::Open as i32, publish_position_inclusive: Some(Position::Beginning), publish_token: None, + update_timestamp: 0, }; let init_shard_subrequest = InitShardSubrequest { subrequest_id: subrequest_id as u32, @@ -2136,6 +2138,7 @@ mod tests { doc_mapping_uid: subrequest.doc_mapping_uid, publish_position_inclusive: Some(Position::Beginning), publish_token: None, + update_timestamp: 1724158996, }; let response = OpenShardsResponse { subresponses: vec![OpenShardSubresponse { diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 76aafab3344..3a73fc52205 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -729,6 +729,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(10u64)), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }], }; Ok(response) @@ -752,6 +753,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }], }; Ok(response) @@ -776,6 +778,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }, Shard { leader_id: "test-ingester-0".to_string(), @@ -787,6 +790,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(12u64)), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }, ], }; @@ -1078,6 +1082,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::eof(11u64)), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }, Shard { leader_id: "test-ingester-0".to_string(), @@ -1089,6 +1094,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning.as_eof()), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }, ], }; @@ -1221,6 +1227,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }, Shard { leader_id: "test-ingester-0".to_string(), @@ -1232,6 +1239,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::eof(22u64)), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }, ], }; @@ -1575,6 +1583,7 @@ mod tests { doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, }], }; Ok(response) diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs index cdd0ade05b3..04be2806282 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs @@ -194,6 +194,7 @@ pub mod shared_state_for_tests { doc_mapping_uid: sub_req.doc_mapping_uid, publish_position_inclusive: Some(position), shard_state: ShardState::Open as i32, + update_timestamp: 1724158996, }), } }) @@ -233,6 +234,7 @@ pub mod shared_state_for_tests { doc_mapping_uid: None, publish_position_inclusive: Some(position), shard_state: ShardState::Open as i32, + update_timestamp: 1724158996, } }) .collect(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 1fae0c3b829..fb9a9980c8b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -1687,6 +1687,7 @@ mod tests { doc_mapping_uid: Some(doc_mapping_uid), publish_position_inclusive: None, publish_token: None, + update_timestamp: 1724158996, }; let init_shards_request = InitShardsRequest { subrequests: vec![InitShardSubrequest { diff --git a/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql new file mode 100644 index 00000000000..2bdbc180281 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE shards + DROP IF EXISTS COLUMN update_timestamps; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.up.sql new file mode 100644 index 00000000000..2c091dbcda4 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE shards + -- We prefer a fix value here because it makes tests simpler. + -- Very few users use the shard API in versions <0.9 anyway. + ADD COLUMN IF NOT EXISTS update_timestamp TIMESTAMP NOT NULL DEFAULT '2024-01-01 00:00:00+00'; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index d7c75e9a6f3..d34a6ea781b 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -108,6 +108,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex { follower_id: Some("follower-ingester".to_string()), doc_mapping_uid: Some(DocMappingUid::for_test(1)), publish_position_inclusive: Some(Position::Beginning), + update_timestamp: 1724240908, ..Default::default() }; let shards = Shards::from_shards_vec(index_uid.clone(), source_id.clone(), vec![shard]); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index c9246b1aacf..eb7bb3d013b 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -29,6 +29,7 @@ use quickwit_proto::metastore::{ OpenShardSubrequest, OpenShardSubresponse, }; use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId}; +use time::OffsetDateTime; use tracing::{info, warn}; use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta}; @@ -132,6 +133,7 @@ impl Shards { doc_mapping_uid: subrequest.doc_mapping_uid, publish_position_inclusive: Some(Position::Beginning), publish_token: subrequest.publish_token.clone(), + update_timestamp: OffsetDateTime::now_utc().unix_timestamp(), }; mutation_occurred = true; entry.insert(shard.clone()); @@ -288,6 +290,7 @@ impl Shards { shard.shard_state = ShardState::Closed as i32; } shard.publish_position_inclusive = Some(publish_position_inclusive); + shard.update_timestamp = OffsetDateTime::now_utc().unix_timestamp(); } Ok(MutationOccurred::Yes(())) } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 7d916fcd29f..8fad26cde82 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -261,7 +261,8 @@ async fn try_apply_delta_v2( shards SET publish_position_inclusive = new_positions.position, - shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END + shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END, + update_timestamp = CURRENT_TIMESTAMP AT TIME ZONE 'UTC' FROM UNNEST($3, $4) AS new_positions(shard_id, position) @@ -1794,16 +1795,37 @@ mod tests { const INSERT_SHARD_QUERY: &str = include_str!("queries/shards/insert.sql"); for shard in shards { + assert_eq!(&shard.source_id, source_id); + assert_eq!(shard.index_uid(), index_uid); + // explicit destructuring to ensure new fields are properly handled + let Shard { + doc_mapping_uid, + follower_id, + index_uid, + leader_id, + publish_position_inclusive, + publish_token, + shard_id, + shard_state, + source_id, + update_timestamp, + } = shard; + let shard_state_name = ShardState::from_i32(shard_state) + .unwrap() + .as_json_str_name(); + let update_timestamp = OffsetDateTime::from_unix_timestamp(update_timestamp) + .expect("Bad timestamp format"); sqlx::query(INSERT_SHARD_QUERY) .bind(index_uid) .bind(source_id) - .bind(shard.shard_id()) - .bind(shard.shard_state().as_json_str_name()) - .bind(&shard.leader_id) - .bind(&shard.follower_id) - .bind(shard.doc_mapping_uid) - .bind(&shard.publish_position_inclusive().to_string()) - .bind(&shard.publish_token) + .bind(shard_id.unwrap()) + .bind(shard_state_name) + .bind(leader_id) + .bind(follower_id) + .bind(doc_mapping_uid) + .bind(publish_position_inclusive.unwrap().to_string()) + .bind(publish_token) + .bind(update_timestamp) .execute(&self.connection_pool) .await .unwrap(); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index 1b8124f30e5..c745b364643 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -263,6 +263,7 @@ pub(super) struct PgShard { pub doc_mapping_uid: DocMappingUid, pub publish_position_inclusive: String, pub publish_token: Option, + pub update_timestamp: sqlx::types::time::PrimitiveDateTime, } impl From for Shard { @@ -277,6 +278,7 @@ impl From for Shard { doc_mapping_uid: Some(pg_shard.doc_mapping_uid), publish_position_inclusive: Some(pg_shard.publish_position_inclusive.into()), publish_token: pg_shard.publish_token, + update_timestamp: pg_shard.update_timestamp.assume_utc().unix_timestamp(), } } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql index 9981580a405..a535d85e7d8 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql @@ -1,2 +1,2 @@ -INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token) - VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9) +INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token, update_timestamp) + VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9, $10) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql index bd9e9240688..b9bd5139504 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql @@ -1,5 +1,5 @@ -INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token) - VALUES ($1, $2, $3, $4, $5, $6, $7) +INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token, update_timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP AT TIME ZONE 'UTC') ON CONFLICT DO NOTHING RETURNING diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index ed3327774ec..8b53d3901dd 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use quickwit_common::rand::append_random_suffix; use quickwit_config::{IndexConfig, SourceConfig}; +use quickwit_proto::compatibility_shard_update_timestamp; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ AcquireShardsRequest, AddSourceRequest, CreateIndexRequest, DeleteShardsRequest, EntityKind, @@ -152,6 +153,9 @@ pub async fn test_metastore_open_shards< assert_eq!(shard.follower_id(), "test-ingester-bar"); assert_eq!(shard.doc_mapping_uid(), DocMappingUid::default(),); assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + let shard_ts = shard.update_timestamp; + assert_ne!(shard_ts, compatibility_shard_update_timestamp()); + assert_ne!(shard_ts, 0); assert!(shard.publish_token.is_none()); // Test open shard #1 is idempotent. @@ -181,6 +185,7 @@ pub async fn test_metastore_open_shards< assert_eq!(shard.leader_id, "test-ingester-foo"); assert_eq!(shard.follower_id(), "test-ingester-bar"); assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert_eq!(shard.update_timestamp, shard_ts); assert!(shard.publish_token.is_none()); // Test open shard #2. @@ -238,6 +243,7 @@ pub async fn test_metastore_acquire_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-foo".to_string()), + update_timestamp: 1724158996, }, Shard { index_uid: Some(test_index.index_uid.clone()), @@ -249,6 +255,7 @@ pub async fn test_metastore_acquire_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), + update_timestamp: 1724158996, }, Shard { index_uid: Some(test_index.index_uid.clone()), @@ -260,6 +267,7 @@ pub async fn test_metastore_acquire_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, + update_timestamp: 1724158996, }, Shard { index_uid: Some(test_index.index_uid.clone()), @@ -271,6 +279,7 @@ pub async fn test_metastore_acquire_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, + update_timestamp: 1724158996, }, ]; metastore @@ -362,6 +371,7 @@ pub async fn test_metastore_list_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-foo".to_string()), + update_timestamp: 1724158996, }, Shard { index_uid: Some(test_index.index_uid.clone()), @@ -373,6 +383,7 @@ pub async fn test_metastore_list_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), + update_timestamp: 1724158997, }, ]; metastore @@ -421,6 +432,7 @@ pub async fn test_metastore_list_shards< assert_eq!(shard.follower_id(), "test-ingester-bar"); assert_eq!(shard.publish_position_inclusive(), Position::Beginning); assert_eq!(shard.publish_token(), "test-publish-token-foo"); + assert_eq!(shard.update_timestamp, 1724158996); let shard = &subresponse.shards[1]; assert_eq!(shard.index_uid(), &test_index.index_uid); @@ -431,6 +443,7 @@ pub async fn test_metastore_list_shards< assert_eq!(shard.follower_id(), "test-ingester-qux"); assert_eq!(shard.publish_position_inclusive(), Position::Beginning); assert_eq!(shard.publish_token(), "test-publish-token-bar"); + assert_eq!(shard.update_timestamp, 1724158997); } // Test list shards with shard state filter. @@ -639,6 +652,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_single_shard< MetastoreError::NotFound(EntityKind::Shard { .. }) )); + let dummy_create_timestamp = 1; let shards = vec![Shard { index_uid: Some(test_index.index_uid.clone()), source_id: test_index.source_id.clone(), @@ -647,6 +661,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_single_shard< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), + update_timestamp: dummy_create_timestamp, ..Default::default() }]; metastore @@ -691,6 +706,10 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_single_shard< shards[0].publish_position_inclusive(), Position::offset(0u64) ); + assert!( + shards[0].update_timestamp > dummy_create_timestamp, + "shard timestamp was not updated" + ); let index_checkpoint_delta_json = serde_json::to_string(&index_checkpoint_delta).unwrap(); let publish_splits_request = PublishSplitsRequest { @@ -754,6 +773,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< ) .await; + let dummy_create_timestamp = 1; let shards = vec![ Shard { index_uid: Some(test_index.index_uid.clone()), @@ -763,6 +783,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(0u64)), publish_token: Some("test-publish-token-foo".to_string()), + update_timestamp: dummy_create_timestamp, ..Default::default() }, Shard { @@ -773,6 +794,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(1u64)), publish_token: Some("test-publish-token-foo".to_string()), + update_timestamp: dummy_create_timestamp, ..Default::default() }, Shard { @@ -783,6 +805,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(2u64)), publish_token: Some("test-publish-token-foo".to_string()), + update_timestamp: dummy_create_timestamp, ..Default::default() }, Shard { @@ -793,6 +816,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(3u64)), publish_token: Some("test-publish-token-bar".to_string()), + update_timestamp: dummy_create_timestamp, ..Default::default() }, ]; @@ -850,21 +874,25 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< assert_eq!(shard.shard_id(), ShardId::from(0)); assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.publish_position_inclusive(), Position::offset(10u64)); + assert!(shard.update_timestamp > dummy_create_timestamp); let shard = &shards[1]; assert_eq!(shard.shard_id(), ShardId::from(1)); assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.publish_position_inclusive(), Position::offset(11u64)); + assert!(shard.update_timestamp > dummy_create_timestamp); let shard = &shards[2]; assert_eq!(shard.shard_id(), ShardId::from(2)); assert_eq!(shard.shard_state(), ShardState::Closed); assert_eq!(shard.publish_position_inclusive(), Position::eof(12u64)); + assert!(shard.update_timestamp > dummy_create_timestamp); let shard = &shards[3]; assert_eq!(shard.shard_id(), ShardId::from(3)); assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.publish_position_inclusive(), Position::offset(3u64)); + assert_eq!(shard.update_timestamp, dummy_create_timestamp); cleanup_index(&mut metastore, test_index.index_uid).await; } diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json index 6a45c2858ea..0e520f0ce26 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json @@ -179,7 +179,8 @@ "follower_id": "follower-ingester", "shard_state": 1, "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000000" + "doc_mapping_uid": "00000000000000000000000000", + "update_timestamp": 1704067200 } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json index 6a45c2858ea..0e520f0ce26 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json @@ -179,7 +179,8 @@ "follower_id": "follower-ingester", "shard_state": 1, "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000000" + "doc_mapping_uid": "00000000000000000000000000", + "update_timestamp": 1704067200 } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json index 8694ae89469..a9d233c72dd 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json @@ -179,7 +179,8 @@ "follower_id": "follower-ingester", "shard_state": 1, "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000001" + "doc_mapping_uid": "00000000000000000000000001", + "update_timestamp": 1724240908 } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json index 8694ae89469..a9d233c72dd 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json @@ -179,7 +179,8 @@ "follower_id": "follower-ingester", "shard_state": 1, "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000001" + "doc_mapping_uid": "00000000000000000000000001", + "update_timestamp": 1724240908 } ] }, diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index ca28372a8f5..2a93593e84c 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -158,6 +158,10 @@ fn main() -> Result<(), Box> { .field_attribute( "Shard.replication_position_inclusive", "#[serde(default, skip_serializing_if = \"Option::is_none\")]", + ) + .field_attribute( + "Shard.update_timestamp", + "#[serde(default = \"super::compatibility_shard_update_timestamp\")]", ); Codegen::builder() diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index fd546646bae..404bbf7c660 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -98,6 +98,9 @@ message Shard { // The UID of the index doc mapping when the shard was created. quickwit.common.DocMappingUid doc_mapping_uid = 11; + + // Time when the shard was last updated + int64 update_timestamp = 12; } // A group of shards belonging to the same index and source. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs index c6efc801307..9b071d20db4 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs @@ -67,6 +67,10 @@ pub struct Shard { /// The UID of the index doc mapping when the shard was created. #[prost(message, optional, tag = "11")] pub doc_mapping_uid: ::core::option::Option, + /// Time when the shard was last updated + #[prost(int64, tag = "12")] + #[serde(default = "super::compatibility_shard_update_timestamp")] + pub update_timestamp: i64, } /// A group of shards belonging to the same index and source. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index 0458e993c5b..93333e6718e 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -242,3 +242,10 @@ impl search::SortOrder { } impl quickwit_common::pubsub::Event for ReportSplitsRequest {} + +/// Shard update_timestamp to use when reading file metastores i64 { + // We prefer a fix value here because it makes backward compatibility tests + // simpler. Very few users use the shard API in versions <0.9 anyway. + 1704067200 // 2024-00-00T00:00:00Z +}