Skip to content

Commit

Permalink
Ensuring we don't lose the first few position updates.
Browse files Browse the repository at this point in the history
We had two problems here.
a) the ingester loads in an asynchronous fashion, as it needs to replay
the WAL entirely.
Subscribing to shard positions events is part of this initialization.
We could be missing all of the event received before the initialization (via chitchat).
b) we start the cluster first. Long after we in the initialization
process, we start the ShardPositionsService. We could possibly be
missing the keys that were shared in between.

Closes #4622
  • Loading branch information
fulmicoton committed Feb 22, 2024
1 parent b724dc8 commit 84234d7
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use format::BodyFormat;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use once_cell::sync::Lazy;
use quickwit_actors::{ActorExitStatus, Mailbox, Universe};
use quickwit_actors::{ActorExitStatus, Mailbox, SpawnContext, Universe};
use quickwit_cluster::{
start_cluster_service, Cluster, ClusterChange, ClusterMember, ListenerHandle,
};
Expand Down Expand Up @@ -314,6 +314,27 @@ async fn start_control_plane_if_needed(
}
}

fn start_shard_positions_service(
ingester_service_opt: Option<IngesterServiceClient>,
cluster: Cluster,
event_broker: EventBroker,
spawn_ctx: SpawnContext,
) {
// We spawn a task here, because we need the ingester to be ready before spawning the
// the `ShardPositionsService`. If we don't all, all of the event we emit will be dismissed.
tokio::spawn(async move {
if let Some(ingester_service) = ingester_service_opt {
if wait_for_ingester_status(ingester_service, IngesterStatus::Ready)
.await
.is_err()
{
warn!("ingester failed to reach ready status");
}
}
ShardPositionsService::spawn(&spawn_ctx, event_broker, cluster);
});
}

pub async fn serve_quickwit(
node_config: NodeConfig,
runtimes_config: RuntimesConfig,
Expand Down Expand Up @@ -392,17 +413,6 @@ pub async fn serve_quickwit(
)
.await?;

// If one of the two following service is enabled, we need to enable the shard position service:
// - the control plane: as it is in charge of cleaning up shard reach eof.
// - the indexer: as it hosts ingesters, and ingesters use the shard positions to truncate
// their the queue associated to shards in mrecordlog, and publish indexers' progress to
// chitchat.
if node_config.is_service_enabled(QuickwitService::Indexer)
|| node_config.is_service_enabled(QuickwitService::ControlPlane)
{
ShardPositionsService::spawn(universe.spawn_ctx(), event_broker.clone(), cluster.clone());
}

// Set up the "control plane proxy" for the metastore.
let metastore_through_control_plane = MetastoreServiceClient::new(ControlPlaneMetastore::new(
control_plane_service.clone(),
Expand Down Expand Up @@ -448,6 +458,17 @@ pub async fn serve_quickwit(
)
.await?;

if node_config.is_service_enabled(QuickwitService::Indexer)
|| node_config.is_service_enabled(QuickwitService::ControlPlane)
{
start_shard_positions_service(
ingester_service_opt.clone(),
cluster.clone(),
event_broker.clone(),
universe.spawn_ctx().clone(),
);
}

// Any node can serve index management requests (create/update/delete index, add/remove source,
// etc.), so we always instantiate an index manager.
let mut index_manager = IndexManager::new(
Expand Down Expand Up @@ -719,6 +740,7 @@ async fn setup_ingest_v2(
burst_limit,
..Default::default()
};

// Instantiate ingester.
let ingester_opt = if node_config.is_service_enabled(QuickwitService::Indexer) {
let wal_dir_path = node_config.data_dir_path.join("wal");
Expand Down

0 comments on commit 84234d7

Please sign in to comment.