Skip to content

Commit

Permalink
Ensuring we don't lose the first few position updates.
Browse files Browse the repository at this point in the history
We had two problems here.
a) the ingester loads in an asynchronous fashion, as it needs to replay
the WAL entirely.
Subscribing to shard positions events is part of this initialization.
We could be missing all of the event received before the initialization (via chitchat).
b) we start the cluster first. Long after we in the initialization
process, we start the ShardPositionsService. We could possibly be
missing the keys that were shared in between.

Closes #4622
  • Loading branch information
fulmicoton committed Feb 26, 2024
1 parent 6337ac0 commit 7a72a6a
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 71 deletions.
172 changes: 113 additions & 59 deletions quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::time::{Duration, Instant};

use anyhow::Context;
use async_trait::async_trait;
use fnv::FnvHashMap;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, SpawnContext};
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, SpawnContext};
use quickwit_cluster::{Cluster, ListenerHandle};
use quickwit_common::pubsub::{Event, EventBroker};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::types::{Position, ShardId, SourceUid};
use tracing::{error, warn};
use tracing::{error, info, warn};

/// Prefix used in chitchat to publish the shard positions.
const SHARD_POSITIONS_PREFIX: &str = "indexer.shard_positions:";
Expand Down Expand Up @@ -106,31 +107,76 @@ fn parse_shard_positions_from_kv(
})
}

fn push_position_update(
shard_positions_service_mailbox: &Mailbox<ShardPositionsService>,
key: &str,
value: &str,
) {
let shard_positions = match parse_shard_positions_from_kv(key, value) {
Ok(shard_positions) => shard_positions,
Err(error) => {
error!(key=key, value=value, error=%error, "failed to parse shard positions from cluster kv");
return;
}
};
if shard_positions_service_mailbox
.try_send_message(shard_positions)
.is_err()
{
error!("failed to send shard positions to the shard positions service");
}
}

#[async_trait]
impl Actor for ShardPositionsService {
type ObservableState = ();
fn observable_state(&self) {}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
let mailbox = ctx.mailbox().clone();

self.cluster_listener_handle_opt = Some(
self.cluster
.subscribe(SHARD_POSITIONS_PREFIX, move |event| {
let shard_positions= match parse_shard_positions_from_kv(event.key, event.value) {
Ok(shard_positions) => {
shard_positions
}
Err(error) => {
error!(key=event.key, value=event.value, error=%error, "failed to parse shard positions from cluster kv");
return;
}
};
if mailbox.try_send_message(shard_positions).is_err() {
error!("failed to send shard positions to the shard positions service");
}
push_position_update(&mailbox, event.key, event.value);
})
.await
.await,
);

// We are now listening to new updates. However the cluster has been started earlier.
// It might have already received shard updates from other nodes.
//
// Let's also sync our `ShardPositionsService` with the current state of the cluster.
// Shard positions update are trivially idempotent, so we can just replay all the events,
// without worrying about duplicates.

let now = Instant::now();
let chitchat = self.cluster.chitchat().await;
let chitchat_lock = chitchat.lock().await;
let mut num_keys = 0;
for node_state in chitchat_lock.node_states().values() {
for (key, versioned_value) in node_state.iter_prefix(SHARD_POSITIONS_PREFIX) {
let key_stripped = key.strip_prefix(SHARD_POSITIONS_PREFIX).unwrap();
push_position_update(ctx.mailbox(), key_stripped, &versioned_value.value);
}
num_keys += 1;
// It is tempting to yield here, but we are holding the chitchat lock.
// Let's just log the amount of time it takes for the moment.
}
let elapsed = now.elapsed();
if elapsed > Duration::from_millis(300) {
warn!(
"initializing shard positions took longer than expected: ({:?})ms ({} keys)",
elapsed.as_millis(),
num_keys
);
} else {
info!(
"initializing shard positions took ({:?})ms ({} keys)",
elapsed.as_millis(),
num_keys
);
}
Ok(())
}
}
Expand Down Expand Up @@ -286,19 +332,59 @@ mod tests {
quickwit_common::setup_logging_for_tests();

let transport = ChannelTransport::default();

let universe1 = Universe::with_accelerated_time();
let universe2 = Universe::with_accelerated_time();
let cluster1 = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true)

let event_broker1 = EventBroker::default();
let event_broker2 = EventBroker::default();

let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel::<ShardPositionsUpdate>();
let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel::<ShardPositionsUpdate>();

event_broker1
.subscribe(move |update: ShardPositionsUpdate| {
tx1.send(update).unwrap();
})
.forever();

event_broker2
.subscribe(move |update: ShardPositionsUpdate| {
tx2.send(update).unwrap();
})
.forever();

let index_uid = IndexUid::new_with_random_ulid("index-test");
let source_id = "test-source".to_string();
let source_uid = SourceUid {
index_uid,
source_id,
};

let cluster1 = create_cluster_for_test(vec![], &["indexer", "metastore"], &transport, true)
.await
.unwrap();
ShardPositionsService::spawn(
universe1.spawn_ctx(),
event_broker1.clone(),
cluster1.clone(),
);

// One of the event is published before cluster formation.
event_broker1.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(ShardId::from(20), Position::offset(100u64))],
));

let cluster2 = create_cluster_for_test(
vec![cluster1.gossip_listen_addr.to_string()],
&["indexer", "metastore"],
&["indexer"],
&transport,
true,
)
.await
.unwrap();

cluster1
.wait_for_ready_members(|members| members.len() == 2, Duration::from_secs(5))
.await
Expand All @@ -308,49 +394,16 @@ mod tests {
.await
.unwrap();

let event_broker1 = EventBroker::default();
let event_broker2 = EventBroker::default();
ShardPositionsService::spawn(
universe1.spawn_ctx(),
event_broker1.clone(),
cluster1.clone(),
);
ShardPositionsService::spawn(
universe2.spawn_ctx(),
event_broker2.clone(),
cluster2.clone(),
);

let index_uid = IndexUid::new_with_random_ulid("index-test");
let source_id = "test-source".to_string();
let source_uid = SourceUid {
index_uid,
source_id,
};

let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel::<ShardPositionsUpdate>();
let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel::<ShardPositionsUpdate>();

event_broker1
.subscribe(move |update: ShardPositionsUpdate| {
tx1.send(update).unwrap();
})
.forever();

event_broker2
.subscribe(move |update: ShardPositionsUpdate| {
tx2.send(update).unwrap();
})
.forever();

// ----------------------
// One of the node publishes a given shard position update.
// This is done using a LocalPublishShardPositionUpdate

event_broker1.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(ShardId::from(1), Position::Beginning)],
));
event_broker1.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(ShardId::from(2), Position::offset(10u64))],
Expand Down Expand Up @@ -386,27 +439,28 @@ mod tests {
assert_eq!(
updates1,
vec![
vec![(ShardId::from(1), Position::Beginning)],
vec![(ShardId::from(2), Position::offset(10u64))],
vec![(ShardId::from(1), Position::offset(10u64)),],
vec![(ShardId::from(2), Position::offset(12u64)),],
vec![(ShardId::from(20), Position::offset(100u64))],
vec![(ShardId::from(2u64), Position::offset(10u64))],
vec![(ShardId::from(1u64), Position::offset(10u64)),],
vec![(ShardId::from(2u64), Position::offset(12u64)),],
]
);

// The updates as seen from the second.
let mut updates2: Vec<Vec<(ShardId, Position)>> = Vec::new();
for _ in 0..4 {
for _ in 0..5 {
let update = rx2.recv().await.unwrap();
assert_eq!(update.source_uid, source_uid);
updates2.push(update.updated_shard_positions);
}
assert_eq!(
updates2,
vec![
vec![(ShardId::from(2), Position::offset(10u64))],
vec![(ShardId::from(2), Position::offset(12u64))],
vec![(ShardId::from(1), Position::Beginning),],
vec![(ShardId::from(1), Position::offset(10u64)),],
vec![(ShardId::from(20u64), Position::offset(100u64))],
vec![(ShardId::from(2u64), Position::offset(10u64))],
vec![(ShardId::from(2u64), Position::offset(12u64))],
vec![(ShardId::from(1u64), Position::Beginning)],
vec![(ShardId::from(1u64), Position::offset(10u64))]
]
);

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ impl Ingester {
pub fn subscribe(&self, event_broker: &EventBroker) {
let weak_ingester_state = self.state.weak();
// This subscription is the one in charge of truncating the mrecordlog.
info!("subscribing ingester to shard positions updates");
event_broker
.subscribe_without_timeout::<ShardPositionsUpdate>(weak_ingester_state)
.forever();
Expand Down
50 changes: 38 additions & 12 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use format::BodyFormat;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use quickwit_actors::{ActorExitStatus, Mailbox, Universe};
use quickwit_actors::{ActorExitStatus, Mailbox, SpawnContext, Universe};
use quickwit_cluster::{
start_cluster_service, Cluster, ClusterChange, ClusterChangeStream, ClusterMember,
ListenerHandle,
Expand Down Expand Up @@ -314,6 +314,27 @@ async fn start_control_plane_if_needed(
}
}

fn start_shard_positions_service(
ingester_service_opt: Option<IngesterServiceClient>,
cluster: Cluster,
event_broker: EventBroker,
spawn_ctx: SpawnContext,
) {
// We spawn a task here, because we need the ingester to be ready before spawning the
// the `ShardPositionsService`. If we don't all, all of the event we emit will be dismissed.
tokio::spawn(async move {
if let Some(ingester_service) = ingester_service_opt {
if wait_for_ingester_status(ingester_service, IngesterStatus::Ready)
.await
.is_err()
{
warn!("ingester failed to reach ready status");
}
}
ShardPositionsService::spawn(&spawn_ctx, event_broker, cluster);
});
}

pub async fn serve_quickwit(
node_config: NodeConfig,
runtimes_config: RuntimesConfig,
Expand Down Expand Up @@ -392,17 +413,6 @@ pub async fn serve_quickwit(
)
.await?;

// If one of the two following service is enabled, we need to enable the shard position service:
// - the control plane: as it is in charge of cleaning up shard reach eof.
// - the indexer: as it hosts ingesters, and ingesters use the shard positions to truncate
// their the queue associated to shards in mrecordlog, and publish indexers' progress to
// chitchat.
if node_config.is_service_enabled(QuickwitService::Indexer)
|| node_config.is_service_enabled(QuickwitService::ControlPlane)
{
ShardPositionsService::spawn(universe.spawn_ctx(), event_broker.clone(), cluster.clone());
}

// Set up the "control plane proxy" for the metastore.
let metastore_through_control_plane = MetastoreServiceClient::new(ControlPlaneMetastore::new(
control_plane_service.clone(),
Expand Down Expand Up @@ -447,6 +457,17 @@ pub async fn serve_quickwit(
)
.await?;

if node_config.is_service_enabled(QuickwitService::Indexer)
|| node_config.is_service_enabled(QuickwitService::ControlPlane)
{
start_shard_positions_service(
ingester_service_opt.clone(),
cluster.clone(),
event_broker.clone(),
universe.spawn_ctx().clone(),
);
}

// Any node can serve index management requests (create/update/delete index, add/remove source,
// etc.), so we always instantiate an index manager.
let mut index_manager = IndexManager::new(
Expand Down Expand Up @@ -715,6 +736,7 @@ async fn setup_ingest_v2(
burst_limit,
..Default::default()
};

// Instantiate ingester.
let ingester_opt = if node_config.is_service_enabled(QuickwitService::Indexer) {
let wal_dir_path = node_config.data_dir_path.join("wal");
Expand All @@ -731,6 +753,10 @@ async fn setup_ingest_v2(
)
.await?;
ingester.subscribe(event_broker);
// We will now receive all new shard positions update events, from chitchat.
// Unfortunately at this point, chitchat is already running.
//
// We need to make sure the existing positions are loaded too.
Some(ingester)
} else {
None
Expand Down

0 comments on commit 7a72a6a

Please sign in to comment.