Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix shard position initialization #4802

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions quickwit/quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ mod processed_doc;
mod publish_lock;
mod publisher_message;
mod raw_doc_batch;
mod shard_positions;
mod split_attrs;

pub use indexed_split::{
Expand All @@ -50,8 +49,6 @@ pub use publish_lock::{NewPublishLock, PublishLock};
pub use publisher_message::SplitsUpdate;
use quickwit_proto::types::PublishToken;
pub use raw_doc_batch::RawDocBatch;
pub(crate) use shard_positions::LocalShardPositionsUpdate;
pub use shard_positions::ShardPositionsService;
pub use split_attrs::{create_split_metadata, SplitAttrs};

#[derive(Debug)]
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use quickwit_ingest::{
decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream,
};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::indexing::LocalShardPositionsUpdate;
use quickwit_proto::ingest::ingester::{
fetch_message, FetchEof, FetchPayload, IngesterService, TruncateShardsRequest,
TruncateShardsSubrequest,
Expand All @@ -55,7 +56,7 @@ use super::{
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};
use crate::actors::DocProcessor;
use crate::models::{LocalShardPositionsUpdate, NewPublishLock, NewPublishToken, PublishLock};
use crate::models::{NewPublishLock, NewPublishToken, PublishLock};

pub struct IngestSourceFactory;

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ mod tests {
let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true)
.await
.unwrap();
let (_temp_dir, state) = IngesterState::for_test().await;
let (_temp_dir, state, _shard_positions_inbox) = IngesterState::for_test().await;
let weak_state = state.weak();
let task = BroadcastLocalShardsTask {
cluster,
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ mod tests {
use super::*;
use crate::ingest_v2::models::IngesterShard;
use crate::ingest_v2::state::IngesterState;
use crate::shard_positions;

#[tokio::test]
async fn test_close_idle_shards_run() {
let (_temp_dir, state) = IngesterState::for_test().await;
let (_temp_dir, state, _shard_positions) = IngesterState::for_test().await;
let weak_state = state.weak();
let idle_shard_timeout = Duration::from_millis(200);
let join_handle = CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout);
Expand Down
127 changes: 64 additions & 63 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,18 @@ use bytesize::ByteSize;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mrecordlog::error::CreateQueueError;
use quickwit_actors::Mailbox;
use quickwit_cluster::Cluster;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_common::tower::Pool;
use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_config::IngestApiConfig;
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::ingest::ingester::{
AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest,
DecommissionResponse, FetchMessage, IngesterService, IngesterServiceClient,
Expand Down Expand Up @@ -74,11 +75,12 @@ use super::replication::{
ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask,
SYN_REPLICATION_STREAM_CAPACITY,
};
use super::state::{IngesterState, InnerIngesterState, WeakIngesterState};
use super::state::{IngesterState, InnerIngesterState};
use super::IngesterPool;
use crate::ingest_v2::metrics::report_wal_usage;
use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::shard_positions::ShardPositionsService;
use crate::{estimate_size, with_lock_metrics, FollowerId};

/// Minimum interval between two reset shards operations.
Expand Down Expand Up @@ -126,28 +128,40 @@ impl Ingester {
control_plane: ControlPlaneServiceClient,
ingester_pool: Pool<NodeId, IngesterServiceClient>,
wal_dir_path: &Path,
disk_capacity: ByteSize,
memory_capacity: ByteSize,
ingest_api_config: &IngestApiConfig,
rate_limiter_settings: RateLimiterSettings,
replication_factor: usize,
idle_shard_timeout: Duration,
shard_positions_service: Mailbox<ShardPositionsService>,
event_broker: EventBroker,
) -> IngestV2Result<Self> {
let self_node_id: NodeId = cluster.self_node_id().into();
let state = IngesterState::load(wal_dir_path, rate_limiter_settings);
let state = IngesterState::load(
wal_dir_path,
rate_limiter_settings,
shard_positions_service,
event_broker,
);

let weak_state = state.weak();
BroadcastLocalShardsTask::spawn(cluster, weak_state.clone());
CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout);

let IngestApiConfig {
max_queue_memory_usage,
max_queue_disk_usage,
replication_factor,
..
} = ingest_api_config;

let ingester = Self {
self_node_id,
control_plane,
ingester_pool,
state,
disk_capacity,
memory_capacity,
disk_capacity: *max_queue_disk_usage,
memory_capacity: *max_queue_memory_usage,
rate_limiter_settings,
replication_factor,
replication_factor: *replication_factor,
reset_shards_permits: Arc::new(Semaphore::new(1)),
};
ingester.background_reset_shards();
Expand Down Expand Up @@ -401,15 +415,6 @@ impl Ingester {
Ok(replication_client)
}

pub fn subscribe(&self, event_broker: &EventBroker) {
let weak_ingester_state = self.state.weak();
// This subscription is the one in charge of truncating the mrecordlog.
info!("subscribing ingester to shard positions updates");
event_broker
.subscribe_without_timeout::<ShardPositionsUpdate>(weak_ingester_state)
.forever();
}

async fn persist_inner(
&mut self,
persist_request: PersistRequest,
Expand Down Expand Up @@ -1071,33 +1076,6 @@ impl IngesterService for Ingester {
}
}

#[async_trait]
impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) {
let Some(state) = self.upgrade() else {
warn!("ingester state update failed");
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_fully().await, "gc_shards", "write")
else {
error!("failed to lock the ingester state");
return;
};
let index_uid = shard_positions_update.source_uid.index_uid;
let source_id = shard_positions_update.source_uid.source_id;

for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
let queue_id = queue_id(&index_uid, &source_id, &shard_id);
if shard_position.is_eof() {
state_guard.delete_shard(&queue_id).await;
} else if !shard_position.is_beginning() {
state_guard.truncate_shard(&queue_id, &shard_position).await;
}
}
}
}

pub async fn wait_for_ingester_status(
mut ingester: IngesterServiceClient,
status: IngesterStatus,
Expand Down Expand Up @@ -1156,11 +1134,13 @@ mod tests {
use std::sync::atomic::{AtomicU16, Ordering};

use bytes::Bytes;
use quickwit_actors::Universe;
use quickwit_cluster::{create_cluster_for_test_with_id, ChannelTransport};
use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX;
use quickwit_common::tower::ConstantRate;
use quickwit_config::service::QuickwitService;
use quickwit_proto::control_plane::{AdviseResetShardsResponse, MockControlPlaneService};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::ingest::ingester::{
IngesterServiceGrpcServer, IngesterServiceGrpcServerAdapter, PersistSubrequest,
TruncateShardsSubrequest,
Expand All @@ -1183,11 +1163,10 @@ mod tests {
node_id: NodeId,
control_plane: ControlPlaneServiceClient,
ingester_pool: IngesterPool,
disk_capacity: ByteSize,
memory_capacity: ByteSize,
rate_limiter_settings: RateLimiterSettings,
replication_factor: usize,
ingest_api_config: IngestApiConfig,
idle_shard_timeout: Duration,
event_broker: EventBroker,
}

impl Default for IngesterForTest {
Expand All @@ -1197,16 +1176,19 @@ mod tests {
.expect_advise_reset_shards()
.returning(|_| Ok(AdviseResetShardsResponse::default()));
let control_plane = ControlPlaneServiceClient::from(mock_control_plane);

Self {
node_id: "test-ingester".into(),
control_plane,
ingester_pool: IngesterPool::default(),
disk_capacity: ByteSize::mb(256),
memory_capacity: ByteSize::mb(1),
ingest_api_config: IngestApiConfig {
max_queue_memory_usage: ByteSize::mb(1),
max_queue_disk_usage: ByteSize::mb(256),
replication_factor: 1,
content_length_limit: ByteSize::mb(5),
},
rate_limiter_settings: RateLimiterSettings::default(),
replication_factor: 1,
idle_shard_timeout: DEFAULT_IDLE_SHARD_TIMEOUT,
event_broker: EventBroker::default(),
}
}
}
Expand All @@ -1216,7 +1198,6 @@ mod tests {
self.node_id = node_id.into();
self
}

pub fn with_control_plane(mut self, control_plane: ControlPlaneServiceClient) -> Self {
self.control_plane = control_plane;
self
Expand All @@ -1228,7 +1209,7 @@ mod tests {
}

pub fn with_disk_capacity(mut self, disk_capacity: ByteSize) -> Self {
self.disk_capacity = disk_capacity;
self.ingest_api_config.max_queue_disk_usage = disk_capacity;
self
}

Expand All @@ -1241,7 +1222,7 @@ mod tests {
}

pub fn with_replication(mut self) -> Self {
self.replication_factor = 2;
self.ingest_api_config.replication_factor = 2;
self
}

Expand All @@ -1250,6 +1231,11 @@ mod tests {
self
}

pub fn with_event_broker(mut self, event_broker: EventBroker) -> Self {
self.event_broker = event_broker;
self
}

pub async fn build(self) -> (IngesterContext, Ingester) {
static GOSSIP_ADVERTISE_PORT_SEQUENCE: AtomicU16 = AtomicU16::new(1u16);

Expand All @@ -1272,16 +1258,18 @@ mod tests {
.await
.unwrap();

let (shard_positions_tx, _shard_positions_rx) = Universe::new().create_test_mailbox();

let ingester = Ingester::try_new(
cluster.clone(),
self.control_plane.clone(),
self.ingester_pool.clone(),
wal_dir_path,
self.disk_capacity,
self.memory_capacity,
&self.ingest_api_config,
self.rate_limiter_settings,
self.replication_factor,
self.idle_shard_timeout,
shard_positions_tx,
self.event_broker,
)
.await
.unwrap();
Expand Down Expand Up @@ -1314,7 +1302,11 @@ mod tests {

#[tokio::test]
async fn test_ingester_init() {
let (ingester_ctx, ingester) = IngesterForTest::default().build().await;
let event_broker = EventBroker::default();
let (ingester_ctx, ingester) = IngesterForTest::default()
.with_event_broker(event_broker.clone())
.build()
.await;
let mut state_guard = ingester.state.lock_fully().await.unwrap();

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
Expand Down Expand Up @@ -1376,9 +1368,16 @@ mod tests {

drop(state_guard);

let (shard_positions_tx, shard_positions_rx) =
Universe::new().create_test_mailbox::<ShardPositionsService>();
ingester
.state
.init(ingester_ctx.tempdir.path(), RateLimiterSettings::default())
.init(
ingester_ctx.tempdir.path(),
RateLimiterSettings::default(),
shard_positions_tx,
event_broker,
)
.await;

let state_guard = ingester.state.lock_fully().await.unwrap();
Expand Down Expand Up @@ -2894,9 +2893,11 @@ mod tests {

#[tokio::test]
async fn test_ingester_truncate_on_shard_positions_update() {
let (_ingester_ctx, ingester) = IngesterForTest::default().build().await;
let event_broker = EventBroker::default();
ingester.subscribe(&event_broker);
let (_ingester_ctx, ingester) = IngesterForTest::default()
.with_event_broker(event_broker.clone())
.build()
.await;

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let shard_01 = Shard {
Expand Down
Loading
Loading