Skip to content

Commit

Permalink
Change of semantics of ShardPositionsUpdate (#4504)
Browse files Browse the repository at this point in the history
It now only contains the shards that are received an update, and the
event itself is only emitted if at least a shard received an update.

Closes #4500
  • Loading branch information
fulmicoton authored Feb 4, 2024
1 parent fa3f99b commit b630f9b
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 69 deletions.
8 changes: 4 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
.collect();
// let's identify the shard that have reached EOF but have not yet been removed.
let shard_ids_to_close: Vec<ShardId> = shard_positions_update
.shard_positions
.updated_shard_positions
.into_iter()
.filter(|(shard_id, position)| position.is_eof() && known_shard_ids.contains(shard_id))
.map(|(shard_id, _position)| shard_id)
Expand Down Expand Up @@ -1306,7 +1306,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(ShardId::from(17), Position::offset(1_000u64))],
updated_shard_positions: vec![(ShardId::from(17), Position::offset(1_000u64))],
})
.await
.unwrap();
Expand All @@ -1330,7 +1330,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid,
shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
updated_shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down Expand Up @@ -1432,7 +1432,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
updated_shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down
101 changes: 42 additions & 59 deletions quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;

Expand Down Expand Up @@ -176,9 +175,9 @@ impl Handler<ClusterShardPositionsUpdate> for ShardPositionsService {
source_uid,
shard_positions,
} = update;
let was_updated = self.apply_update(&source_uid, shard_positions);
if was_updated {
self.publish_shard_updates_to_event_broker(source_uid);
let updated_shard_positions = self.apply_update(&source_uid, shard_positions);
if !updated_shard_positions.is_empty() {
self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions);
}
Ok(())
}
Expand All @@ -192,17 +191,19 @@ impl Handler<LocalShardPositionsUpdate> for ShardPositionsService {
&mut self,
update: LocalShardPositionsUpdate,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
) -> Result<(), ActorExitStatus> {
let LocalShardPositionsUpdate {
source_uid,
shard_positions,
} = update;
let was_updated = self.apply_update(&source_uid, shard_positions);
if was_updated {
self.publish_positions_into_chitchat(source_uid.clone())
.await;
self.publish_shard_updates_to_event_broker(source_uid);
let updated_shard_positions: Vec<(ShardId, Position)> =
self.apply_update(&source_uid, shard_positions);
if updated_shard_positions.is_empty() {
return Ok(());
}
self.publish_positions_into_chitchat(source_uid.clone())
.await;
self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions);
Ok(())
}
}
Expand All @@ -223,51 +224,48 @@ impl ShardPositionsService {
.await;
}

fn publish_shard_updates_to_event_broker(&self, source_uid: SourceUid) {
let Some(shard_positions_map) = self.shard_positions_per_source.get(&source_uid) else {
return;
};
let shard_positions: Vec<(ShardId, Position)> = shard_positions_map
.iter()
.map(|(shard_id, position)| (shard_id.clone(), position.clone()))
.collect();
fn publish_shard_updates_to_event_broker(
&self,
source_uid: SourceUid,
shard_positions: Vec<(ShardId, Position)>,
) {
self.event_broker.publish(ShardPositionsUpdate {
source_uid,
shard_positions,
updated_shard_positions: shard_positions,
});
}

/// Updates the internal model holding the last position per shard, and
/// returns true if at least one of the publish position was updated.
/// returns the list of shards that were updated.
fn apply_update(
&mut self,
source_uid: &SourceUid,
published_positions_per_shard: Vec<(ShardId, Position)>,
) -> bool {
) -> Vec<(ShardId, Position)> {
if published_positions_per_shard.is_empty() {
warn!("received an empty publish shard positions update");
return false;
return Vec::new();
}
let mut was_modified = false;
let current_shard_positions = self
.shard_positions_per_source
.entry(source_uid.clone())
.or_default();
for (shard, new_position) in published_positions_per_shard {
match current_shard_positions.entry(shard) {
Entry::Occupied(mut occupied) => {
if *occupied.get() < new_position {
occupied.insert(new_position);
was_modified = true;
}
}
Entry::Vacant(vacant) => {
was_modified = true;
vacant.insert(new_position.clone());
}
}

let updated_positions_per_shard = published_positions_per_shard
.into_iter()
.filter(|(shard, new_position)| {
let Some(position) = current_shard_positions.get(shard) else {
return true;
};
new_position > position
})
.collect::<Vec<_>>();

for (shard, position) in updated_positions_per_shard.iter() {
current_shard_positions.insert(shard.clone(), position.clone());
}
was_modified

updated_positions_per_shard
}
}

Expand Down Expand Up @@ -380,26 +378,17 @@ mod tests {
for _ in 0..4 {
let update = rx1.recv().await.unwrap();
assert_eq!(update.source_uid, source_uid);
updates1.push(update.shard_positions);
updates1.push(update.updated_shard_positions);
}

// The updates as seen from the first node.
assert_eq!(
updates1,
vec![
vec![(ShardId::from(1), Position::Beginning)],
vec![
(ShardId::from(1), Position::Beginning),
(ShardId::from(2), Position::offset(10u64))
],
vec![
(ShardId::from(1), Position::offset(10u64)),
(ShardId::from(2), Position::offset(10u64)),
],
vec![
(ShardId::from(1), Position::offset(10u64)),
(ShardId::from(2), Position::offset(12u64)),
],
vec![(ShardId::from(2), Position::offset(10u64))],
vec![(ShardId::from(1), Position::offset(10u64)),],
vec![(ShardId::from(2), Position::offset(12u64)),],
]
);

Expand All @@ -408,21 +397,15 @@ mod tests {
for _ in 0..4 {
let update = rx2.recv().await.unwrap();
assert_eq!(update.source_uid, source_uid);
updates2.push(update.shard_positions);
updates2.push(update.updated_shard_positions);
}
assert_eq!(
updates2,
vec![
vec![(ShardId::from(2), Position::offset(10u64))],
vec![(ShardId::from(2), Position::offset(12u64))],
vec![
(ShardId::from(1), Position::Beginning),
(ShardId::from(2), Position::offset(12u64))
],
vec![
(ShardId::from(1), Position::offset(10u64)),
(ShardId::from(2), Position::offset(12u64))
],
vec![(ShardId::from(1), Position::Beginning),],
vec![(ShardId::from(1), Position::offset(10u64)),],
]
);

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
let index_uid = shard_positions_update.source_uid.index_uid;
let source_id = shard_positions_update.source_uid.source_id;

for (shard_id, shard_position) in shard_positions_update.shard_positions {
for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
let queue_id = queue_id(index_uid.as_str(), &source_id, &shard_id);

if shard_position.is_eof() {
Expand Down Expand Up @@ -2786,7 +2786,7 @@ mod tests {
index_uid: "test-index:0".into(),
source_id: "test-source".to_string(),
},
shard_positions: vec![
updated_shard_positions: vec![
(ShardId::from(1), Position::offset(0u64)),
(ShardId::from(2), Position::eof(0u64)),
(ShardId::from(1337), Position::offset(1337u64)),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakRouterState {
};
let mut deleted_shard_ids: Vec<ShardId> = Vec::new();

for (shard_id, shard_position) in shard_positions_update.shard_positions {
for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
if shard_position.is_eof() {
deleted_shard_ids.push(shard_id);
}
Expand Down Expand Up @@ -1514,7 +1514,7 @@ mod tests {
index_uid: "test-index-0:0".into(),
source_id: "test-source".to_string(),
},
shard_positions: vec![(ShardId::from(1), Position::eof(0u64))],
updated_shard_positions: vec![(ShardId::from(1), Position::eof(0u64))],
};
event_broker.publish(shard_positions_update);

Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl FullyLockedIngesterState<'_> {
/// Deletes the shard identified by `queue_id` from the ingester state. It removes the
/// mrecordlog queue first and then removes the associated in-memory shard and rate trackers.
pub async fn delete_shard(&mut self, queue_id: &QueueId) {
// This if-statement is here to avoid needless log.
if self.inner.shards.contains_key(queue_id) {
// No need to do anything. This queue is not on this ingester.
return;
}
match self.mrecordlog.delete_queue(queue_id).await {
Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => {
self.shards.remove(queue_id);
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ impl From<CpuCapacity> for CpuCapacityForSerialization {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShardPositionsUpdate {
pub source_uid: SourceUid,
// All of the shards known are listed here, regardless of whether they were updated or not.
pub shard_positions: Vec<(ShardId, Position)>,
// Only shards that received an update are listed here.
pub updated_shard_positions: Vec<(ShardId, Position)>,
}

impl Event for ShardPositionsUpdate {}
Expand Down

0 comments on commit b630f9b

Please sign in to comment.