From 880c1a3dc2adaec3044fbe10fa2a3e38e4f97231 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 28 Mar 2024 00:19:10 +0900 Subject: [PATCH] Initializing positions --- .../src/ingest_v2/broadcast.rs | 2 +- .../quickwit-ingest/src/ingest_v2/idle.rs | 3 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 27 ++- .../quickwit-ingest/src/ingest_v2/state.rs | 160 ++++++++++++++---- .../quickwit-ingest/src/shard_positions.rs | 5 +- quickwit/quickwit-serve/src/lib.rs | 5 +- 6 files changed, 151 insertions(+), 51 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 19264cf3de3..43107d34b49 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -466,7 +466,7 @@ mod tests { let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) .await .unwrap(); - let (_temp_dir, state) = IngesterState::for_test().await; + let (_temp_dir, state, _shard_positions_inbox) = IngesterState::for_test().await; let weak_state = state.weak(); let task = BroadcastLocalShardsTask { cluster, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs index 464d99e16e0..2835c5cce01 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs @@ -88,10 +88,11 @@ mod tests { use super::*; use crate::ingest_v2::models::IngesterShard; use crate::ingest_v2::state::IngesterState; + use crate::shard_positions; #[tokio::test] async fn test_close_idle_shards_run() { - let (_temp_dir, state) = IngesterState::for_test().await; + let (_temp_dir, state, _shard_positions) = IngesterState::for_test().await; let weak_state = state.weak(); let idle_shard_timeout = Duration::from_millis(200); let join_handle = CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index a9c32ea9ab6..b1a4d3e8e18 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -98,7 +98,6 @@ pub(super) const PERSIST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = Duration::from_secs(6) }; - #[derive(Clone)] pub struct Ingester { self_node_id: NodeId, @@ -136,7 +135,12 @@ impl Ingester { event_broker: EventBroker, ) -> IngestV2Result { let self_node_id: NodeId = cluster.self_node_id().into(); - let state = IngesterState::load(wal_dir_path, rate_limiter_settings, event_broker); + let state = IngesterState::load( + wal_dir_path, + rate_limiter_settings, + shard_positions_service, + event_broker, + ); let weak_state = state.weak(); BroadcastLocalShardsTask::spawn(cluster, weak_state.clone()); @@ -1072,7 +1076,6 @@ impl IngesterService for Ingester { } } - pub async fn wait_for_ingester_status( mut ingester: IngesterServiceClient, status: IngesterStatus, @@ -1173,7 +1176,6 @@ mod tests { .expect_advise_reset_shards() .returning(|_| Ok(AdviseResetShardsResponse::default())); let control_plane = ControlPlaneServiceClient::from(mock_control_plane); - Self { node_id: "test-ingester".into(), control_plane, @@ -1188,7 +1190,6 @@ mod tests { idle_shard_timeout: DEFAULT_IDLE_SHARD_TIMEOUT, event_broker: EventBroker::default(), } - } } @@ -1269,7 +1270,6 @@ mod tests { self.idle_shard_timeout, shard_positions_tx, self.event_broker, - ) .await .unwrap(); @@ -1305,7 +1305,8 @@ mod tests { let event_broker = EventBroker::default(); let (ingester_ctx, ingester) = IngesterForTest::default() .with_event_broker(event_broker.clone()) - .build().await; + .build() + .await; let mut state_guard = ingester.state.lock_fully().await.unwrap(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); @@ -1367,9 +1368,16 @@ mod tests { drop(state_guard); + let (shard_positions_tx, shard_positions_rx) = + Universe::new().create_test_mailbox::(); ingester .state - .init(ingester_ctx.tempdir.path(), RateLimiterSettings::default(), event_broker) + .init( + ingester_ctx.tempdir.path(), + RateLimiterSettings::default(), + shard_positions_tx, + event_broker, + ) .await; let state_guard = ingester.state.lock_fully().await.unwrap(); @@ -2888,7 +2896,8 @@ mod tests { let event_broker = EventBroker::default(); let (_ingester_ctx, ingester) = IngesterForTest::default() .with_event_broker(event_broker.clone()) - .build().await; + .build() + .await; let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let shard_01 = Shard { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 53bb08db3b8..b778ea05e96 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -26,6 +26,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use mrecordlog::error::{DeleteQueueError, TruncateError}; +use quickwit_actors::Mailbox; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -33,7 +34,7 @@ use quickwit_proto::control_plane::AdviseResetShardsResponse; use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, ShardState}; -use quickwit_proto::types::{queue_id, Position, QueueId}; +use quickwit_proto::types::{queue_id, split_queue_id, Position, QueueId, SourceUid}; use tokio::sync::{watch, Mutex, MutexGuard, RwLock, RwLockMappedWriteGuard, RwLockWriteGuard}; use tracing::{error, info, warn}; @@ -42,6 +43,7 @@ use super::rate_meter::RateMeter; use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle}; use crate::ingest_v2::mrecordlog_utils::{force_delete_queue, queue_position_range}; use crate::mrecordlog_async::MultiRecordLogAsync; +use crate::shard_positions::{GetPosition, ShardPositionsService}; use crate::{with_lock_metrics, FollowerId, LeaderId}; /// Stores the state of the ingester and attempts to prevent deadlocks by exposing an API that @@ -102,13 +104,25 @@ impl IngesterState { } } - pub fn load(wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings, event_broker: EventBroker) -> Self { + pub fn load( + wal_dir_path: &Path, + rate_limiter_settings: RateLimiterSettings, + shard_positions_service: Mailbox, + event_broker: EventBroker, + ) -> Self { let state = Self::new(); let state_clone = state.clone(); let wal_dir_path = wal_dir_path.to_path_buf(); let init_future = async move { - state_clone.init(&wal_dir_path, rate_limiter_settings, event_broker).await; + state_clone + .init( + &wal_dir_path, + rate_limiter_settings, + shard_positions_service, + event_broker, + ) + .await; }; tokio::spawn(init_future); @@ -116,10 +130,23 @@ impl IngesterState { } #[cfg(test)] - pub async fn for_test() -> (tempfile::TempDir, Self) { + pub async fn for_test() -> ( + tempfile::TempDir, + Self, + quickwit_actors::Inbox, + ) { + use quickwit_actors::Universe; + let temp_dir = tempfile::tempdir().unwrap(); let event_broker = EventBroker::default(); - let mut state = IngesterState::load(temp_dir.path(), RateLimiterSettings::default(), event_broker); + let (shard_positions_tx, shard_positions_rx) = + Universe::default().create_test_mailbox::(); + let mut state = IngesterState::load( + temp_dir.path(), + RateLimiterSettings::default(), + shard_positions_tx, + event_broker, + ); state .status_rx @@ -127,13 +154,19 @@ impl IngesterState { .await .unwrap(); - (temp_dir, state) + (temp_dir, state, shard_positions_rx) } /// Initializes the internal state of the ingester. It loads the local WAL, then lists all its /// queues. Empty queues are deleted, while non-empty queues are recovered. However, the /// corresponding shards are closed and become read-only. - pub async fn init(&self, wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings, event_broker: EventBroker) { + pub async fn init( + &self, + wal_dir_path: &Path, + rate_limiter_settings: RateLimiterSettings, + shard_positions_service: Mailbox, + event_broker: EventBroker, + ) { let mut inner_guard = self.inner.lock().await; let mut mrecordlog_guard = self.mrecordlog.write().await; @@ -218,6 +251,46 @@ impl IngesterState { event_broker .subscribe_without_timeout::(self.weak()) .forever(); + + let queue_ids: Vec = inner_guard.shards.keys().cloned().collect(); + + // Explicitly drop inner_guard early. We don't want to block the ingester use at this point. + drop(inner_guard); + + let Some(ingester_state) = self.weak().upgrade() else { + error!("ingester state not present after initialization"); + return; + }; + for queue_id in queue_ids { + let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else { + warn!( + "queue_id `{queue_id}` could not be parsed into (index_uid, source_id, \ + shard_id)" + ); + continue; + }; + let get_position = GetPosition { + source_uid: SourceUid { + index_uid, + source_id, + }, + shard_id, + }; + let Ok(shard_position) = shard_positions_service.ask(get_position).await else { + error!("failed to get shard position for `{queue_id}`"); + continue; + }; + let Some(shard_position) = shard_position else { + // The shard is not known by the shard positions service at this point. + // This is perfectly normal. + continue; + }; + let Ok(mut state) = ingester_state.lock_fully().await else { + error!("failed to lock the ingester state"); + continue; + }; + apply_position_update(&mut state, &queue_id, &shard_position).await; + } } pub async fn wait_for_ready(&mut self) { @@ -311,34 +384,6 @@ impl DerefMut for PartiallyLockedIngesterState<'_> { } } - -#[async_trait] -impl EventSubscriber for WeakIngesterState { - async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) { - let Some(state) = self.upgrade() else { - warn!("ingester state update failed"); - return; - }; - let Ok(mut state_guard) = - with_lock_metrics!(state.lock_fully().await, "gc_shards", "write") - else { - error!("failed to lock the ingester state"); - return; - }; - let index_uid = shard_positions_update.source_uid.index_uid; - let source_id = shard_positions_update.source_uid.source_id; - - for (shard_id, shard_position) in shard_positions_update.updated_shard_positions { - let queue_id = queue_id(&index_uid, &source_id, &shard_id); - if shard_position.is_eof() { - state_guard.delete_shard(&queue_id).await; - } else if !shard_position.is_beginning() { - state_guard.truncate_shard(&queue_id, &shard_position).await; - } - } - } -} - pub(super) struct FullyLockedIngesterState<'a> { pub inner: MutexGuard<'a, InnerIngesterState>, pub mrecordlog: RwLockMappedWriteGuard<'a, MultiRecordLogAsync>, @@ -459,6 +504,45 @@ impl WeakIngesterState { } } +async fn apply_position_update( + state: &mut FullyLockedIngesterState<'_>, + queue_id: &QueueId, + shard_position: &Position, +) { + if state.shards.get(queue_id).is_none() { + return; + } + if shard_position.is_eof() { + state.delete_shard(queue_id).await; + } else if !shard_position.is_beginning() { + state.truncate_shard(queue_id, shard_position).await; + } +} + +#[async_trait] +impl EventSubscriber for WeakIngesterState { + async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) { + let SourceUid { + index_uid, + source_id, + } = shard_positions_update.source_uid; + let Some(state) = self.upgrade() else { + warn!("ingester state update failed"); + return; + }; + let Ok(mut state_guard) = + with_lock_metrics!(state.lock_fully().await, "gc_shards", "write") + else { + error!("failed to lock the ingester state"); + return; + }; + for (shard_id, shard_position) in shard_positions_update.updated_shard_positions { + let queue_id = queue_id(&index_uid, &source_id, &shard_id); + apply_position_update(&mut state_guard, &queue_id, &shard_position).await; + } + } +} + #[cfg(test)] mod tests { use tokio::time::timeout; @@ -499,7 +583,11 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); state - .init(temp_dir.path(), RateLimiterSettings::default(), EventBroker::default()) + .init( + temp_dir.path(), + RateLimiterSettings::default(), + EventBroker::default(), + ) .await; timeout(Duration::from_millis(100), state.wait_for_ready()) diff --git a/quickwit/quickwit-ingest/src/shard_positions.rs b/quickwit/quickwit-ingest/src/shard_positions.rs index bdd205c5bbe..40999c5b44e 100644 --- a/quickwit/quickwit-ingest/src/shard_positions.rs +++ b/quickwit/quickwit-ingest/src/shard_positions.rs @@ -35,9 +35,10 @@ use tracing::{debug, error, info, warn}; const SHARD_POSITIONS_PREFIX: &str = "indexer.shard_positions:"; /// Message to request for the position of a given shard. +#[derive(Debug)] pub struct GetPosition { - source_uid: SourceUid, - shard_id: ShardId, + pub source_uid: SourceUid, + pub shard_id: ShardId, } /// This event is an internal detail of the `ShardPositionsService`. diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 8b13bfbaa17..3ba35b6e1b0 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -488,7 +488,7 @@ pub async fn serve_quickwit( &event_broker, control_plane_service.clone(), ingester_pool, - &universe + &universe, ) .await .context("failed to start ingest v2 service")?; @@ -777,7 +777,8 @@ async fn setup_ingest_v2( let ingester_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { let wal_dir_path = node_config.data_dir_path.join("wal"); fs::create_dir_all(&wal_dir_path)?; - let shard_positions_service: Mailbox = universe.get_one::() + let shard_positions_service: Mailbox = universe + .get_one::() .expect("the shard positions service should be spawned before the ingester service"); let idle_shard_timeout = get_idle_shard_timeout(); let ingester = Ingester::try_new(