Skip to content

Commit

Permalink
update shard_table position on ShardPositionsUpdate (#4639)
Browse files Browse the repository at this point in the history
* update shard_table position on ShardPositionsUpdate

* add test

* add test for getting position from metastore on startup

* Update quickwit/quickwit-control-plane/src/control_plane.rs

---------

Co-authored-by: Paul Masurel <[email protected]>
Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
3 people authored Feb 28, 2024
1 parent 060c577 commit 5f321eb
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 10 deletions.
118 changes: 108 additions & 10 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,20 +346,23 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
) -> Result<(), ActorExitStatus> {
let Some(shard_entries) = self
.model
.get_shards_for_source(&shard_positions_update.source_uid)
.get_shards_for_source_mut(&shard_positions_update.source_uid)
else {
// The source no longer exists.
return Ok(());
};
// let's identify the shard that have reached EOF but have not yet been removed.
let shard_ids_to_close: Vec<ShardId> = shard_positions_update
.updated_shard_positions
.into_iter()
.filter(|(shard_id, position)| {
position.is_eof() && shard_entries.contains_key(shard_id)
})
.map(|(shard_id, _position)| shard_id)
.collect();

let mut shard_ids_to_close = Vec::new();
for (shard_id, position) in shard_positions_update.updated_shard_positions {
if let Some(shard) = shard_entries.get_mut(&shard_id) {
shard.publish_position_inclusive =
Some(shard.publish_position_inclusive().max(&position).clone());
if position.is_eof() {
// identify shards that have reached EOF but have not yet been removed.
shard_ids_to_close.push(shard_id);
}
}
}
if shard_ids_to_close.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -1412,6 +1415,7 @@ mod tests {
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: Some(ShardId::from(17)),
leader_id: "test_node".to_string(),
publish_position_inclusive: Some(Position::Beginning),
..Default::default()
};
shard.set_shard_state(ShardState::Open);
Expand Down Expand Up @@ -1467,6 +1471,17 @@ mod tests {
.unwrap();
assert_eq!(indexing_tasks.len(), 1);
assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]);
let control_plane_state = control_plane_mailbox
.ask(GetDebugStateRequest {})
.await
.unwrap()
.unwrap();
let shard_state = &control_plane_state.shard_table[0].shards[0];
assert_eq!(shard_state.shard_id(), ShardId::from(17));
assert_eq!(
shard_state.publish_position_inclusive(),
Position::offset(1_000u64)
);
let _ = client_inbox.drain_for_test();

universe.sleep(Duration::from_secs(30)).await;
Expand Down Expand Up @@ -1499,6 +1514,89 @@ mod tests {
universe.assert_quit().await;
}

#[tokio::test]
async fn test_fill_shard_table_position_from_metastore_on_startup() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();
let node_id = NodeId::new("control-plane-node".to_string());
let indexer_pool = IndexerPool::default();
let (client_mailbox, _client_inbox) = universe.create_test_mailbox();
let client = IndexingServiceClient::from_mailbox::<IndexingService>(client_mailbox);
let indexer_node_info = IndexerNodeInfo {
client,
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
};
indexer_pool.insert("indexer-node-1".to_string(), indexer_node_info);
let ingester_pool = IngesterPool::default();
let mut mock_metastore = MetastoreServiceClient::mock();

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let mut source = SourceConfig::ingest_v2();
source.enabled = true;
index_0.add_source(source.clone()).unwrap();

let index_0_clone = index_0.clone();
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
index_0_clone.clone()
])
.unwrap())
},
);

let mut shard = Shard {
index_uid: Some(index_0.index_uid.clone()),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: Some(ShardId::from(17)),
leader_id: "test_node".to_string(),
publish_position_inclusive: Some(Position::Offset(1234u64.into())),
..Default::default()
};
shard.set_shard_state(ShardState::Open);

let index_uid_clone = index_0.index_uid.clone();
mock_metastore.expect_list_shards().return_once(
move |_list_shards_request: ListShardsRequest| {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: Some(index_uid_clone),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![shard],
}],
};
Ok(list_shards_resp)
},
);

let cluster_config = ClusterConfig::for_test();
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn(
&universe,
cluster_config,
node_id,
cluster_change_stream_factory,
indexer_pool,
ingester_pool,
MetastoreServiceClient::from(mock_metastore),
);

let control_plane_state = control_plane_mailbox
.ask(GetDebugStateRequest {})
.await
.unwrap()
.unwrap();
let shard_state = &control_plane_state.shard_table[0].shards[0];
assert_eq!(shard_state.shard_id(), ShardId::from(17));
assert_eq!(
shard_state.publish_position_inclusive(),
Position::offset(1234u64)
);
universe.assert_quit().await;
}

#[tokio::test]
async fn test_delete_non_existing_shard() {
quickwit_common::setup_logging_for_tests();
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ impl ControlPlaneModel {
self.shard_table.get_shards(source_uid)
}

/// Lists the shards of a given source. Returns `None` if the source does not exist.
pub fn get_shards_for_source_mut(
&mut self,
source_uid: &SourceUid,
) -> Option<&mut FnvHashMap<ShardId, ShardEntry>> {
self.shard_table.get_shards_mut(source_uid)
}

/// Inserts the shards that have just been opened by calling `open_shards` on the metastore.
pub fn insert_shards(
&mut self,
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ impl ShardTable {
.map(|table_entry| &table_entry.shard_entries)
}

/// Lists the shards of a given source. Returns `None` if the source does not exist.
pub fn get_shards_mut(
&mut self,
source_uid: &SourceUid,
) -> Option<&mut FnvHashMap<ShardId, ShardEntry>> {
self.table_entries
.get_mut(source_uid)
.map(|table_entry| &mut table_entry.shard_entries)
}

/// Inserts the shards into the shard table.
pub fn insert_shards(
&mut self,
Expand Down

0 comments on commit 5f321eb

Please sign in to comment.