Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update timestamp to Shard model #5332

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/internals/backward-compatibility.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Backward compatibility in Quickwit.

If you are reading this, chances are you want to make a change to one of the resource
of quickwit's meta/config:
of Quickwit's meta/config:

User edited:
- QuickwitConfig
Expand All @@ -19,7 +19,7 @@ Quickwit currently manages backward compatibility of all of these resources but
This document describes how to handle a change, and how to make test such a change,
and spot eventual regression.

# How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`?
## How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`?

There are two types of upgrades.

Expand All @@ -45,6 +45,7 @@ non-regression.

When introducing such a change:
- modify your model with the help of the attributes above.
- modify the example for the model by editing its `TestableForRegression` trait implementation.
- commit the 2 files that were updated by build.rs
- eyeball the diff on the `.expected.json` that failed, and send it with your PR.

Expand Down Expand Up @@ -121,5 +122,3 @@ most recent version.

The unit test will start making sense in future updates thanks to the update phase
described in the previous section.


4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
}],
}],
};
Expand Down Expand Up @@ -2054,6 +2055,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
}],
}],
};
Expand Down Expand Up @@ -2342,6 +2344,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
}),
}],
};
Expand Down Expand Up @@ -2495,6 +2498,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
}),
}],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ impl IngestController {
doc_mapping_uid: Some(doc_mapping_uid),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 0, // assigned later by the metastore
};
let init_shard_subrequest = InitShardSubrequest {
subrequest_id: subrequest_id as u32,
Expand Down Expand Up @@ -2136,6 +2137,7 @@ mod tests {
doc_mapping_uid: subrequest.doc_mapping_uid,
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
};
let response = OpenShardsResponse {
subresponses: vec![OpenShardSubresponse {
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(10u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand All @@ -752,6 +753,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand All @@ -776,6 +778,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -787,6 +790,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(12u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1078,6 +1082,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::eof(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -1089,6 +1094,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning.as_eof()),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1221,6 +1227,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -1232,6 +1239,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::eof(22u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1575,6 +1583,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub mod shared_state_for_tests {
doc_mapping_uid: sub_req.doc_mapping_uid,
publish_position_inclusive: Some(position),
shard_state: ShardState::Open as i32,
update_timestamp: 1724158996,
}),
}
})
Expand Down Expand Up @@ -233,6 +234,7 @@ pub mod shared_state_for_tests {
doc_mapping_uid: None,
publish_position_inclusive: Some(position),
shard_state: ShardState::Open as i32,
update_timestamp: 1724158996,
}
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,7 @@ mod tests {
doc_mapping_uid: Some(doc_mapping_uid),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
};
let init_shards_request = InitShardsRequest {
subrequests: vec![InitShardSubrequest {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE shards
DROP IF EXISTS COLUMN update_timestamps;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE shards
-- We prefer a fix value here because it makes tests simpler.
-- Very few users use the shard API in versions <0.9 anyway.
ADD COLUMN IF NOT EXISTS update_timestamp TIMESTAMP NOT NULL DEFAULT '2024-01-01 00:00:00+00';
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex {
follower_id: Some("follower-ingester".to_string()),
doc_mapping_uid: Some(DocMappingUid::for_test(1)),
publish_position_inclusive: Some(Position::Beginning),
update_timestamp: 1724240908,
..Default::default()
};
let shards = Shards::from_shards_vec(index_uid.clone(), source_id.clone(), vec![shard]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use quickwit_proto::metastore::{
OpenShardSubrequest, OpenShardSubresponse,
};
use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId};
use time::OffsetDateTime;
use tracing::{info, warn};

use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta};
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Shards {
doc_mapping_uid: subrequest.doc_mapping_uid,
publish_position_inclusive: Some(Position::Beginning),
publish_token: subrequest.publish_token.clone(),
update_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
};
mutation_occurred = true;
entry.insert(shard.clone());
Expand Down Expand Up @@ -288,6 +290,7 @@ impl Shards {
shard.shard_state = ShardState::Closed as i32;
}
shard.publish_position_inclusive = Some(publish_position_inclusive);
shard.update_timestamp = OffsetDateTime::now_utc().unix_timestamp();
}
Ok(MutationOccurred::Yes(()))
}
Expand Down
44 changes: 36 additions & 8 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId,
use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType};
use sea_query_binder::SqlxBinder;
use sqlx::{Acquire, Executor, Postgres, Transaction};
use time::OffsetDateTime;
use tracing::{debug, info, instrument, warn};

use super::error::convert_sqlx_err;
Expand Down Expand Up @@ -255,13 +256,15 @@ async fn try_apply_delta_v2(
shard_ids.push(shard_id.to_string());
new_positions.push(new_position.to_string());
}

sqlx::query(
r#"
UPDATE
shards
SET
publish_position_inclusive = new_positions.position,
shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END
shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END,
update_timestamp = $5
FROM
UNNEST($3, $4)
AS new_positions(shard_id, position)
Expand All @@ -275,6 +278,8 @@ async fn try_apply_delta_v2(
.bind(source_id)
.bind(shard_ids)
.bind(new_positions)
// Use a timestamp generated by the metastore node to avoid clock drift issues
.bind(OffsetDateTime::now_utc())
.execute(tx.as_mut())
.await?;
Ok(())
Expand Down Expand Up @@ -1638,6 +1643,8 @@ async fn open_or_fetch_shard<'e>(
.bind(&subrequest.follower_id)
.bind(subrequest.doc_mapping_uid)
.bind(&subrequest.publish_token)
// Use a timestamp generated by the metastore node to avoid clock drift issues
.bind(OffsetDateTime::now_utc())
.fetch_optional(executor.clone())
.await?;

Expand Down Expand Up @@ -1794,16 +1801,37 @@ mod tests {
const INSERT_SHARD_QUERY: &str = include_str!("queries/shards/insert.sql");

for shard in shards {
assert_eq!(&shard.source_id, source_id);
assert_eq!(shard.index_uid(), index_uid);
// explicit destructuring to ensure new fields are properly handled
let Shard {
doc_mapping_uid,
follower_id,
index_uid,
leader_id,
publish_position_inclusive,
publish_token,
shard_id,
shard_state,
source_id,
update_timestamp,
} = shard;
let shard_state_name = ShardState::from_i32(shard_state)
.unwrap()
.as_json_str_name();
let update_timestamp = OffsetDateTime::from_unix_timestamp(update_timestamp)
.expect("Bad timestamp format");
sqlx::query(INSERT_SHARD_QUERY)
.bind(index_uid)
.bind(source_id)
.bind(shard.shard_id())
.bind(shard.shard_state().as_json_str_name())
.bind(&shard.leader_id)
.bind(&shard.follower_id)
.bind(shard.doc_mapping_uid)
.bind(&shard.publish_position_inclusive().to_string())
.bind(&shard.publish_token)
.bind(shard_id.unwrap())
.bind(shard_state_name)
.bind(leader_id)
.bind(follower_id)
.bind(doc_mapping_uid)
.bind(publish_position_inclusive.unwrap().to_string())
.bind(publish_token)
.bind(update_timestamp)
.execute(&self.connection_pool)
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub(super) struct PgShard {
pub doc_mapping_uid: DocMappingUid,
pub publish_position_inclusive: String,
pub publish_token: Option<String>,
pub update_timestamp: sqlx::types::time::PrimitiveDateTime,
}

impl From<PgShard> for Shard {
Expand All @@ -277,6 +278,7 @@ impl From<PgShard> for Shard {
doc_mapping_uid: Some(pg_shard.doc_mapping_uid),
publish_position_inclusive: Some(pg_shard.publish_position_inclusive.into()),
publish_token: pg_shard.publish_token,
update_timestamp: pg_shard.update_timestamp.assume_utc().unix_timestamp(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token)
VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9)
INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token, update_timestamp)
VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9, $10)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token, update_timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT
DO NOTHING
RETURNING
Expand Down
Loading
Loading