diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 49e8c06a878..b7299c56ec1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -48,7 +48,7 @@ use quickwit_proto::ingest::ingester::{ TruncateShardsResponse, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; -use quickwit_proto::types::{queue_id, NodeId, Position, QueueId}; +use quickwit_proto::types::{queue_id, NodeId, Position, QueueId, SourceId}; use tokio::sync::watch; use tracing::{debug, error, info, warn}; @@ -385,7 +385,10 @@ impl Ingester { } let mut persist_successes = Vec::with_capacity(persist_request.subrequests.len()); let mut persist_failures = Vec::new(); - let mut replicate_subrequests: HashMap> = HashMap::new(); + let mut replicate_subrequests: HashMap> = + HashMap::new(); + let mut local_persist_subrequests: Vec = + Vec::with_capacity(persist_request.subrequests.len()); // Keep track of the shards that need to be closed following an IO error. let mut shards_to_close: HashSet = HashSet::new(); @@ -420,174 +423,276 @@ impl Ingester { }; return Ok(persist_response); } - for subrequest in persist_request.subrequests { - let queue_id = subrequest.queue_id(); - let Some(shard) = state_guard.shards.get_mut(&queue_id) else { - let persist_failure = PersistFailure { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - reason: PersistFailureReason::ShardNotFound as i32, - }; - persist_failures.push(persist_failure); - continue; - }; - if shard.shard_state.is_closed() { - let persist_failure = PersistFailure { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - reason: PersistFailureReason::ShardClosed as i32, - }; - persist_failures.push(persist_failure); - continue; - } - let follower_id_opt = shard.follower_id_opt().cloned(); - let from_position_exclusive = shard.replication_position_inclusive.clone(); - - let doc_batch = match subrequest.doc_batch { - Some(doc_batch) if !doc_batch.is_empty() => doc_batch, - _ => { - warn!("received empty persist request"); + // first verify if we would locally accept each subrequest + { + let mut sum_of_requested_capacity = bytesize::ByteSize::b(0); + for subrequest in persist_request.subrequests { + let queue_id = subrequest.queue_id(); - let persist_success = PersistSuccess { + let Some(shard) = state_guard.shards.get_mut(&queue_id) else { + let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - replication_position_inclusive: Some( - shard.replication_position_inclusive.clone(), - ), + reason: PersistFailureReason::ShardNotFound as i32, }; - persist_successes.push(persist_success); + persist_failures.push(persist_failure); continue; - } - }; - let requested_capacity = estimate_size(&doc_batch); - - let current_usage = match check_enough_capacity( - &state_guard.mrecordlog, - self.disk_capacity, - self.memory_capacity, - requested_capacity, - ) { - Ok(usage) => usage, - Err(error) => { - warn!( - "failed to persist records to ingester `{}`: {error}", - self.self_node_id - ); + }; + if shard.shard_state.is_closed() { let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: PersistFailureReason::ResourceExhausted as i32, + reason: PersistFailureReason::ShardClosed as i32, }; persist_failures.push(persist_failure); continue; } - }; - let (rate_limiter, rate_meter) = state_guard - .rate_trackers - .get_mut(&queue_id) - .expect("rate limiter should be initialized"); - - if !rate_limiter.acquire_bytes(requested_capacity) { - debug!("failed to persist records to shard `{queue_id}`: rate limited"); - let persist_failure = PersistFailure { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - reason: PersistFailureReason::RateLimited as i32, + let follower_id_opt = shard.follower_id_opt().cloned(); + let from_position_exclusive = shard.replication_position_inclusive.clone(); + + let doc_batch = match subrequest.doc_batch { + Some(doc_batch) if !doc_batch.is_empty() => doc_batch, + _ => { + warn!("received empty persist request"); + + let persist_success = PersistSuccess { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + replication_position_inclusive: Some( + shard.replication_position_inclusive.clone(), + ), + }; + persist_successes.push(persist_success); + continue; + } + }; + let requested_capacity = estimate_size(&doc_batch); + + match check_enough_capacity( + &state_guard.mrecordlog, + self.disk_capacity, + self.memory_capacity, + requested_capacity + sum_of_requested_capacity, + ) { + Ok(_usage) => (), + Err(error) => { + warn!( + "failed to persist records to ingester `{}`: {error}", + self.self_node_id + ); + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: PersistFailureReason::ResourceExhausted as i32, + }; + persist_failures.push(persist_failure); + continue; + } }; - persist_failures.push(persist_failure); - continue; - } - let batch_num_bytes = doc_batch.num_bytes() as u64; - let batch_num_docs = doc_batch.num_docs() as u64; - rate_meter.update(batch_num_bytes); + let (rate_limiter, rate_meter) = state_guard + .rate_trackers + .get_mut(&queue_id) + .expect("rate limiter should be initialized"); - let append_result = append_non_empty_doc_batch( - &mut state_guard.mrecordlog, - &queue_id, - &doc_batch, - force_commit, - ) - .await; + if !rate_limiter.acquire_bytes(requested_capacity) { + debug!("failed to persist records to shard `{queue_id}`: rate limited"); - let current_position_inclusive = match append_result { - Ok(current_position_inclusive) => current_position_inclusive, - Err(append_error) => { - let reason = match &append_error { - AppendDocBatchError::Io(io_error) => { - error!("failed to persist records to shard `{queue_id}`: {io_error}"); - shards_to_close.insert(queue_id); - PersistFailureReason::ShardClosed - } - AppendDocBatchError::QueueNotFound(_) => { - error!( - "failed to persist records to shard `{queue_id}`: WAL queue not \ - found" - ); - shards_to_delete.insert(queue_id); - PersistFailureReason::ShardNotFound - } - }; let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: reason as i32, + reason: PersistFailureReason::RateLimited as i32, }; persist_failures.push(persist_failure); continue; } - }; - // It's more precise the compute the new usage from the current usage + the requested - // capacity than from continuously summing up the requested capacities, which are - // approximations. - let new_disk_usage = current_usage.disk + requested_capacity; - let new_memory_usage = current_usage.memory + requested_capacity; - - INGEST_V2_METRICS - .wal_disk_usage_bytes - .set(new_disk_usage.as_u64() as i64); - INGEST_V2_METRICS - .wal_memory_usage_bytes - .set(new_memory_usage.as_u64() as i64); - - INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); - INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); - - state_guard - .shards - .get_mut(&queue_id) - .expect("primary shard should exist") - .set_replication_position_inclusive(current_position_inclusive.clone()); - - if let Some(follower_id) = follower_id_opt { - let replicate_subrequest = ReplicateSubrequest { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - from_position_exclusive: Some(from_position_exclusive), - to_position_inclusive: Some(current_position_inclusive), - doc_batch: Some(doc_batch), + + let batch_num_bytes = doc_batch.num_bytes() as u64; + rate_meter.update(batch_num_bytes); + sum_of_requested_capacity += requested_capacity; + + if let Some(follower_id) = follower_id_opt { + let replicate_subrequest = ReplicateSubrequest { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + from_position_exclusive: Some(from_position_exclusive), + doc_batch: Some(doc_batch), + }; + replicate_subrequests + .entry(follower_id) + .or_default() + .push((replicate_subrequest, queue_id)); + } else { + local_persist_subrequests.push(LocalPersistSubrequest { + queue_id, + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + doc_batch, + expected_position_inclusive: None, + }) + } + } + } + + // replicate to the follower + { + let mut replicate_futures = FuturesUnordered::new(); + let mut doc_batch_map = HashMap::new(); + + for (follower_id, subrequests_with_queue_id) in replicate_subrequests { + let replication_client = state_guard + .replication_streams + .get(&follower_id) + .expect("replication stream should be initialized") + .replication_client(); + let leader_id = self.self_node_id.clone(); + let mut subrequests = Vec::with_capacity(subrequests_with_queue_id.len()); + for (subrequest, queue_id) in subrequests_with_queue_id { + let doc_batch = subrequest + .doc_batch + .clone() + .expect("we already verified doc is present and not empty"); + doc_batch_map.insert(subrequest.subrequest_id, (doc_batch, queue_id)); + subrequests.push(subrequest); + } + let replicate_future = + replication_client.replicate(leader_id, follower_id, subrequests, commit_type); + replicate_futures.push(replicate_future); + } + + while let Some(replication_result) = replicate_futures.next().await { + let replicate_response = match replication_result { + Ok(replicate_response) => replicate_response, + Err(_) => { + // TODO: Handle replication error: + // 1. Close and evict all the shards hosted by the follower. + // 2. Close and evict the replication client. + // 3. Return `PersistFailureReason::ShardClosed` to router. + continue; + } }; - replicate_subrequests - .entry(follower_id) - .or_default() - .push(replicate_subrequest); - } else { + for replicate_success in replicate_response.successes { + let (doc_batch, queue_id) = doc_batch_map + .remove(&replicate_success.subrequest_id) + .expect("expected known subrequest id"); + let local_persist_subrequest = LocalPersistSubrequest { + queue_id, + subrequest_id: replicate_success.subrequest_id, + index_uid: replicate_success.index_uid, + source_id: replicate_success.source_id, + shard_id: replicate_success.shard_id, + doc_batch, + expected_position_inclusive: replicate_success + .replication_position_inclusive, + }; + local_persist_subrequests.push(local_persist_subrequest); + } + for replicate_failure in replicate_response.failures { + // TODO: If the replica shard is closed, close the primary shard if it is not + // already. + let persist_failure_reason = match replicate_failure.reason() { + ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, + ReplicateFailureReason::ShardNotFound => { + PersistFailureReason::ShardNotFound + } + ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, + ReplicateFailureReason::ResourceExhausted => { + PersistFailureReason::ResourceExhausted + } + }; + let persist_failure = PersistFailure { + subrequest_id: replicate_failure.subrequest_id, + index_uid: replicate_failure.index_uid, + source_id: replicate_failure.source_id, + shard_id: replicate_failure.shard_id, + reason: persist_failure_reason as i32, + }; + persist_failures.push(persist_failure); + } + } + } + + // finally write locally + { + for subrequest in local_persist_subrequests { + let queue_id = subrequest.queue_id; + + let append_result = append_non_empty_doc_batch( + &mut state_guard.mrecordlog, + &queue_id, + &subrequest.doc_batch, + force_commit, + ) + .await; + + let current_position_inclusive = match append_result { + Ok(current_position_inclusive) => current_position_inclusive, + Err(append_error) => { + let reason = match &append_error { + AppendDocBatchError::Io(io_error) => { + error!( + "failed to persist records to shard `{queue_id}`: {io_error}" + ); + shards_to_close.insert(queue_id); + PersistFailureReason::ShardClosed + } + AppendDocBatchError::QueueNotFound(_) => { + error!( + "failed to persist records to shard `{queue_id}`: WAL queue \ + not found" + ); + shards_to_delete.insert(queue_id); + PersistFailureReason::ShardNotFound + } + }; + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: reason as i32, + }; + persist_failures.push(persist_failure); + continue; + } + }; + + if let Some(expected_position_inclusive) = subrequest.expected_position_inclusive { + if expected_position_inclusive != current_position_inclusive { + return Err(IngestV2Error::Internal(format!( + "bad replica position: expected {expected_position_inclusive:?}, got \ + {current_position_inclusive:?}" + ))); + } + } + + state_guard + .shards + .get_mut(&queue_id) + .expect("primary shard should exist") + .set_replication_position_inclusive(current_position_inclusive.clone()); + + let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64; + let batch_num_docs = subrequest.doc_batch.num_docs() as u64; + INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); + INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); + let persist_success = PersistSuccess { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -598,6 +703,7 @@ impl Ingester { persist_successes.push(persist_success); } } + if !shards_to_close.is_empty() { for queue_id in &shards_to_close { let shard = state_guard @@ -620,76 +726,16 @@ impl Ingester { } info!("deleted {} dangling shard(s)", shards_to_delete.len()); } - if replicate_subrequests.is_empty() { - let leader_id = self.self_node_id.to_string(); - let persist_response = PersistResponse { - leader_id, - successes: persist_successes, - failures: persist_failures, - }; - return Ok(persist_response); - } - let mut replicate_futures = FuturesUnordered::new(); - for (follower_id, subrequests) in replicate_subrequests { - let replication_client = state_guard - .replication_streams - .get(&follower_id) - .expect("replication stream should be initialized") - .replication_client(); - let leader_id = self.self_node_id.clone(); - let replicate_future = - replication_client.replicate(leader_id, follower_id, subrequests, commit_type); - replicate_futures.push(replicate_future); - } - // Drop the write lock AFTER pushing the replicate request into the replication client - // channel to ensure that sequential writes in mrecordlog turn into sequential replicate - // requests in the same order. + INGEST_V2_METRICS + .wal_disk_usage_bytes + .set(state_guard.mrecordlog.disk_usage() as i64); + INGEST_V2_METRICS + .wal_memory_usage_bytes + .set(state_guard.mrecordlog.memory_usage() as i64); + drop(state_guard); - while let Some(replication_result) = replicate_futures.next().await { - let replicate_response = match replication_result { - Ok(replicate_response) => replicate_response, - Err(_) => { - // TODO: Handle replication error: - // 1. Close and evict all the shards hosted by the follower. - // 2. Close and evict the replication client. - // 3. Return `PersistFailureReason::ShardClosed` to router. - continue; - } - }; - for replicate_success in replicate_response.successes { - let persist_success = PersistSuccess { - subrequest_id: replicate_success.subrequest_id, - index_uid: replicate_success.index_uid, - source_id: replicate_success.source_id, - shard_id: replicate_success.shard_id, - replication_position_inclusive: replicate_success - .replication_position_inclusive, - }; - persist_successes.push(persist_success); - } - for replicate_failure in replicate_response.failures { - // TODO: If the replica shard is closed, close the primary shard if it is not - // already. - let persist_failure_reason = match replicate_failure.reason() { - ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, - ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound, - ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, - ReplicateFailureReason::ResourceExhausted => { - PersistFailureReason::ResourceExhausted - } - }; - let persist_failure = PersistFailure { - subrequest_id: replicate_failure.subrequest_id, - index_uid: replicate_failure.index_uid, - source_id: replicate_failure.source_id, - shard_id: replicate_failure.shard_id, - reason: persist_failure_reason as i32, - }; - persist_failures.push(persist_failure); - } - } let leader_id = self.self_node_id.to_string(); let persist_response = PersistResponse { leader_id, @@ -1110,6 +1156,16 @@ pub async fn wait_for_ingester_decommission(ingester_opt: Option, + doc_batch: quickwit_proto::ingest::DocBatchV2, + expected_position_inclusive: Option, +} + #[cfg(test)] mod tests { #![allow(clippy::mutable_key_type)] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 112c6f6b474..c72876ca3ed 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -541,7 +541,6 @@ impl ReplicationTask { for subrequest in replicate_request.subrequests { let queue_id = subrequest.queue_id(); let from_position_exclusive = subrequest.from_position_exclusive().clone(); - let to_position_inclusive = subrequest.to_position_inclusive().clone(); let Some(shard) = state_guard.shards.get(&queue_id) else { let replicate_failure = ReplicateFailure { @@ -652,12 +651,6 @@ impl ReplicationTask { .replicated_num_docs_total .inc_by(batch_num_docs); - if current_position_inclusive != to_position_inclusive { - return Err(IngestV2Error::Internal(format!( - "bad replica position: expected {to_position_inclusive:?}, got \ - {current_position_inclusive:?}" - ))); - } let replica_shard = state_guard .shards .get_mut(&queue_id) @@ -868,14 +861,21 @@ mod tests { let replicate_successes = replicate_request .subrequests .iter() - .map(|subrequest| ReplicateSuccess { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid.clone(), - source_id: subrequest.source_id.clone(), - shard_id: subrequest.shard_id.clone(), - replication_position_inclusive: Some( - subrequest.to_position_inclusive().clone(), - ), + .map(|subrequest| { + let batch_len = subrequest.doc_batch.as_ref().unwrap().num_docs(); + let replication_position_inclusive = subrequest + .from_position_exclusive() + .as_usize() + .map_or(batch_len - 1, |pos| pos + batch_len); + ReplicateSuccess { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid.clone(), + source_id: subrequest.source_id.clone(), + shard_id: subrequest.shard_id.clone(), + replication_position_inclusive: Some(Position::offset( + replication_position_inclusive, + )), + } }) .collect::>(); @@ -903,7 +903,6 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), from_position_exclusive: Some(Position::Beginning), - to_position_inclusive: Some(Position::offset(0u64)), }, ReplicateSubrequest { subrequest_id: 1, @@ -912,7 +911,6 @@ mod tests { shard_id: Some(ShardId::from(2)), doc_batch: Some(DocBatchV2::for_test(["test-doc-bar", "test-doc-baz"])), from_position_exclusive: Some(Position::Beginning), - to_position_inclusive: Some(Position::offset(1u64)), }, ReplicateSubrequest { subrequest_id: 2, @@ -921,7 +919,6 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-qux", "test-doc-tux"])), from_position_exclusive: Some(Position::offset(0u64)), - to_position_inclusive: Some(Position::offset(2u64)), }, ]; let replicate_response = replication_stream_task_handle @@ -1142,7 +1139,6 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), from_position_exclusive: Some(Position::Beginning), - to_position_inclusive: Some(Position::offset(0u64)), }, ReplicateSubrequest { subrequest_id: 1, @@ -1151,7 +1147,6 @@ mod tests { shard_id: Some(ShardId::from(2)), doc_batch: Some(DocBatchV2::for_test(["test-doc-bar", "test-doc-baz"])), from_position_exclusive: Some(Position::Beginning), - to_position_inclusive: Some(Position::offset(1u64)), }, ReplicateSubrequest { subrequest_id: 2, @@ -1160,7 +1155,6 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-qux", "test-doc-tux"])), from_position_exclusive: Some(Position::Beginning), - to_position_inclusive: Some(Position::offset(1u64)), }, ], replication_seqno: 3, @@ -1236,7 +1230,6 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-moo"])), from_position_exclusive: Some(Position::offset(0u64)), - to_position_inclusive: Some(Position::offset(1u64)), }], replication_seqno: 4, }; @@ -1322,7 +1315,6 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), from_position_exclusive: Position::offset(0u64).into(), - to_position_inclusive: Some(Position::offset(1u64)), }], replication_seqno: 0, }; @@ -1399,7 +1391,6 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), from_position_exclusive: Some(Position::Beginning), - to_position_inclusive: Some(Position::offset(0u64)), }], replication_seqno: 0, }; diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 694a32be1b6..7bbbc09d8e2 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -167,8 +167,7 @@ message ReplicateSubrequest { string source_id = 3; quickwit.ingest.ShardId shard_id = 4; quickwit.ingest.Position from_position_exclusive = 5; - quickwit.ingest.Position to_position_inclusive = 6; - ingest.DocBatchV2 doc_batch = 7; + ingest.DocBatchV2 doc_batch = 6; } message ReplicateResponse { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 43bd2fe7fb8..1c26d72bcb9 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -198,8 +198,6 @@ pub struct ReplicateSubrequest { #[prost(message, optional, tag = "5")] pub from_position_exclusive: ::core::option::Option, #[prost(message, optional, tag = "6")] - pub to_position_inclusive: ::core::option::Option, - #[prost(message, optional, tag = "7")] pub doc_batch: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index f63859620dd..8982179161d 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -227,12 +227,6 @@ impl ReplicateSubrequest { .as_ref() .expect("`from_position_exclusive` should be a required field") } - - pub fn to_position_inclusive(&self) -> &Position { - self.to_position_inclusive - .as_ref() - .expect("`to_position_inclusive` should be a required field") - } } impl ReplicateSuccess {