Skip to content

Commit

Permalink
Initializing positions
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Mar 27, 2024
1 parent 4f19652 commit 880c1a3
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 51 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ mod tests {
let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true)
.await
.unwrap();
let (_temp_dir, state) = IngesterState::for_test().await;
let (_temp_dir, state, _shard_positions_inbox) = IngesterState::for_test().await;
let weak_state = state.weak();
let task = BroadcastLocalShardsTask {
cluster,
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ mod tests {
use super::*;
use crate::ingest_v2::models::IngesterShard;
use crate::ingest_v2::state::IngesterState;
use crate::shard_positions;

#[tokio::test]
async fn test_close_idle_shards_run() {
let (_temp_dir, state) = IngesterState::for_test().await;
let (_temp_dir, state, _shard_positions) = IngesterState::for_test().await;
let weak_state = state.weak();
let idle_shard_timeout = Duration::from_millis(200);
let join_handle = CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout);
Expand Down
27 changes: 18 additions & 9 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub(super) const PERSIST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature =
Duration::from_secs(6)
};


#[derive(Clone)]
pub struct Ingester {
self_node_id: NodeId,
Expand Down Expand Up @@ -136,7 +135,12 @@ impl Ingester {
event_broker: EventBroker,
) -> IngestV2Result<Self> {
let self_node_id: NodeId = cluster.self_node_id().into();
let state = IngesterState::load(wal_dir_path, rate_limiter_settings, event_broker);
let state = IngesterState::load(
wal_dir_path,
rate_limiter_settings,
shard_positions_service,
event_broker,
);

let weak_state = state.weak();
BroadcastLocalShardsTask::spawn(cluster, weak_state.clone());
Expand Down Expand Up @@ -1072,7 +1076,6 @@ impl IngesterService for Ingester {
}
}


pub async fn wait_for_ingester_status(
mut ingester: IngesterServiceClient,
status: IngesterStatus,
Expand Down Expand Up @@ -1173,7 +1176,6 @@ mod tests {
.expect_advise_reset_shards()
.returning(|_| Ok(AdviseResetShardsResponse::default()));
let control_plane = ControlPlaneServiceClient::from(mock_control_plane);

Self {
node_id: "test-ingester".into(),
control_plane,
Expand All @@ -1188,7 +1190,6 @@ mod tests {
idle_shard_timeout: DEFAULT_IDLE_SHARD_TIMEOUT,
event_broker: EventBroker::default(),
}

}
}

Expand Down Expand Up @@ -1269,7 +1270,6 @@ mod tests {
self.idle_shard_timeout,
shard_positions_tx,
self.event_broker,

)
.await
.unwrap();
Expand Down Expand Up @@ -1305,7 +1305,8 @@ mod tests {
let event_broker = EventBroker::default();
let (ingester_ctx, ingester) = IngesterForTest::default()
.with_event_broker(event_broker.clone())
.build().await;
.build()
.await;
let mut state_guard = ingester.state.lock_fully().await.unwrap();

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
Expand Down Expand Up @@ -1367,9 +1368,16 @@ mod tests {

drop(state_guard);

let (shard_positions_tx, shard_positions_rx) =
Universe::new().create_test_mailbox::<ShardPositionsService>();
ingester
.state
.init(ingester_ctx.tempdir.path(), RateLimiterSettings::default(), event_broker)
.init(
ingester_ctx.tempdir.path(),
RateLimiterSettings::default(),
shard_positions_tx,
event_broker,
)
.await;

let state_guard = ingester.state.lock_fully().await.unwrap();
Expand Down Expand Up @@ -2888,7 +2896,8 @@ mod tests {
let event_broker = EventBroker::default();
let (_ingester_ctx, ingester) = IngesterForTest::default()
.with_event_broker(event_broker.clone())
.build().await;
.build()
.await;

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let shard_01 = Shard {
Expand Down
160 changes: 124 additions & 36 deletions quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ use std::time::{Duration, Instant};

use async_trait::async_trait;
use mrecordlog::error::{DeleteQueueError, TruncateError};
use quickwit_actors::Mailbox;
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_proto::control_plane::AdviseResetShardsResponse;
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, ShardState};
use quickwit_proto::types::{queue_id, Position, QueueId};
use quickwit_proto::types::{queue_id, split_queue_id, Position, QueueId, SourceUid};
use tokio::sync::{watch, Mutex, MutexGuard, RwLock, RwLockMappedWriteGuard, RwLockWriteGuard};
use tracing::{error, info, warn};

Expand All @@ -42,6 +43,7 @@ use super::rate_meter::RateMeter;
use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle};
use crate::ingest_v2::mrecordlog_utils::{force_delete_queue, queue_position_range};
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::shard_positions::{GetPosition, ShardPositionsService};
use crate::{with_lock_metrics, FollowerId, LeaderId};

/// Stores the state of the ingester and attempts to prevent deadlocks by exposing an API that
Expand Down Expand Up @@ -102,38 +104,69 @@ impl IngesterState {
}
}

pub fn load(wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings, event_broker: EventBroker) -> Self {
pub fn load(
wal_dir_path: &Path,
rate_limiter_settings: RateLimiterSettings,
shard_positions_service: Mailbox<ShardPositionsService>,
event_broker: EventBroker,
) -> Self {
let state = Self::new();
let state_clone = state.clone();
let wal_dir_path = wal_dir_path.to_path_buf();

let init_future = async move {
state_clone.init(&wal_dir_path, rate_limiter_settings, event_broker).await;
state_clone
.init(
&wal_dir_path,
rate_limiter_settings,
shard_positions_service,
event_broker,
)
.await;
};
tokio::spawn(init_future);

state
}

#[cfg(test)]
pub async fn for_test() -> (tempfile::TempDir, Self) {
pub async fn for_test() -> (
tempfile::TempDir,
Self,
quickwit_actors::Inbox<ShardPositionsService>,
) {
use quickwit_actors::Universe;

let temp_dir = tempfile::tempdir().unwrap();
let event_broker = EventBroker::default();
let mut state = IngesterState::load(temp_dir.path(), RateLimiterSettings::default(), event_broker);
let (shard_positions_tx, shard_positions_rx) =
Universe::default().create_test_mailbox::<ShardPositionsService>();
let mut state = IngesterState::load(
temp_dir.path(),
RateLimiterSettings::default(),
shard_positions_tx,
event_broker,
);

state
.status_rx
.wait_for(|status| *status == IngesterStatus::Ready)
.await
.unwrap();

(temp_dir, state)
(temp_dir, state, shard_positions_rx)
}

/// Initializes the internal state of the ingester. It loads the local WAL, then lists all its
/// queues. Empty queues are deleted, while non-empty queues are recovered. However, the
/// corresponding shards are closed and become read-only.
pub async fn init(&self, wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings, event_broker: EventBroker) {
pub async fn init(
&self,
wal_dir_path: &Path,
rate_limiter_settings: RateLimiterSettings,
shard_positions_service: Mailbox<ShardPositionsService>,
event_broker: EventBroker,
) {
let mut inner_guard = self.inner.lock().await;
let mut mrecordlog_guard = self.mrecordlog.write().await;

Expand Down Expand Up @@ -218,6 +251,46 @@ impl IngesterState {
event_broker
.subscribe_without_timeout::<ShardPositionsUpdate>(self.weak())
.forever();

let queue_ids: Vec<QueueId> = inner_guard.shards.keys().cloned().collect();

// Explicitly drop inner_guard early. We don't want to block the ingester use at this point.
drop(inner_guard);

let Some(ingester_state) = self.weak().upgrade() else {
error!("ingester state not present after initialization");
return;
};
for queue_id in queue_ids {
let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else {
warn!(
"queue_id `{queue_id}` could not be parsed into (index_uid, source_id, \
shard_id)"
);
continue;
};
let get_position = GetPosition {
source_uid: SourceUid {
index_uid,
source_id,
},
shard_id,
};
let Ok(shard_position) = shard_positions_service.ask(get_position).await else {
error!("failed to get shard position for `{queue_id}`");
continue;
};
let Some(shard_position) = shard_position else {
// The shard is not known by the shard positions service at this point.
// This is perfectly normal.
continue;
};
let Ok(mut state) = ingester_state.lock_fully().await else {
error!("failed to lock the ingester state");
continue;
};
apply_position_update(&mut state, &queue_id, &shard_position).await;
}
}

pub async fn wait_for_ready(&mut self) {
Expand Down Expand Up @@ -311,34 +384,6 @@ impl DerefMut for PartiallyLockedIngesterState<'_> {
}
}


#[async_trait]
impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) {
let Some(state) = self.upgrade() else {
warn!("ingester state update failed");
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_fully().await, "gc_shards", "write")
else {
error!("failed to lock the ingester state");
return;
};
let index_uid = shard_positions_update.source_uid.index_uid;
let source_id = shard_positions_update.source_uid.source_id;

for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
let queue_id = queue_id(&index_uid, &source_id, &shard_id);
if shard_position.is_eof() {
state_guard.delete_shard(&queue_id).await;
} else if !shard_position.is_beginning() {
state_guard.truncate_shard(&queue_id, &shard_position).await;
}
}
}
}

pub(super) struct FullyLockedIngesterState<'a> {
pub inner: MutexGuard<'a, InnerIngesterState>,
pub mrecordlog: RwLockMappedWriteGuard<'a, MultiRecordLogAsync>,
Expand Down Expand Up @@ -459,6 +504,45 @@ impl WeakIngesterState {
}
}

async fn apply_position_update(
state: &mut FullyLockedIngesterState<'_>,
queue_id: &QueueId,
shard_position: &Position,
) {
if state.shards.get(queue_id).is_none() {
return;
}
if shard_position.is_eof() {
state.delete_shard(queue_id).await;
} else if !shard_position.is_beginning() {
state.truncate_shard(queue_id, shard_position).await;
}
}

#[async_trait]
impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) {
let SourceUid {
index_uid,
source_id,
} = shard_positions_update.source_uid;
let Some(state) = self.upgrade() else {
warn!("ingester state update failed");
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_fully().await, "gc_shards", "write")
else {
error!("failed to lock the ingester state");
return;
};
for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
let queue_id = queue_id(&index_uid, &source_id, &shard_id);
apply_position_update(&mut state_guard, &queue_id, &shard_position).await;
}
}
}

#[cfg(test)]
mod tests {
use tokio::time::timeout;
Expand Down Expand Up @@ -499,7 +583,11 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();

state
.init(temp_dir.path(), RateLimiterSettings::default(), EventBroker::default())
.init(
temp_dir.path(),
RateLimiterSettings::default(),
EventBroker::default(),
)
.await;

timeout(Duration::from_millis(100), state.wait_for_ready())
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-ingest/src/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use tracing::{debug, error, info, warn};
const SHARD_POSITIONS_PREFIX: &str = "indexer.shard_positions:";

/// Message to request for the position of a given shard.
#[derive(Debug)]
pub struct GetPosition {
source_uid: SourceUid,
shard_id: ShardId,
pub source_uid: SourceUid,
pub shard_id: ShardId,
}

/// This event is an internal detail of the `ShardPositionsService`.
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ pub async fn serve_quickwit(
&event_broker,
control_plane_service.clone(),
ingester_pool,
&universe
&universe,
)
.await
.context("failed to start ingest v2 service")?;
Expand Down Expand Up @@ -777,7 +777,8 @@ async fn setup_ingest_v2(
let ingester_opt = if node_config.is_service_enabled(QuickwitService::Indexer) {
let wal_dir_path = node_config.data_dir_path.join("wal");
fs::create_dir_all(&wal_dir_path)?;
let shard_positions_service: Mailbox<ShardPositionsService> = universe.get_one::<ShardPositionsService>()
let shard_positions_service: Mailbox<ShardPositionsService> = universe
.get_one::<ShardPositionsService>()
.expect("the shard positions service should be spawned before the ingester service");
let idle_shard_timeout = get_idle_shard_timeout();
let ingester = Ingester::try_new(
Expand Down

0 comments on commit 880c1a3

Please sign in to comment.