Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace actor heartbeat duration with emit timeout as the loop i… #4112

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ use serde_json::{json, Value as JsonValue};
use tokio::time;
use tracing::{debug, info, warn};

use super::SourceActor;
use super::{SourceActor, BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT};
use crate::actors::DocProcessor;
use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory};

const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000;
const DEFAULT_MAX_MESSAGES_PER_PULL: i32 = 1_000;

pub struct GcpPubSubSourceFactory;
Expand Down Expand Up @@ -168,7 +167,7 @@ impl Source for GcpPubSubSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch: BatchBuilder = BatchBuilder::default();
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);
// TODO: ensure we ACK the message after being commit: at least once
// TODO: ensure we increase_ack_deadline for the items
Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ use ulid::Ulid;

use super::{
Assignment, BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};
use crate::actors::DocProcessor;
use crate::models::{NewPublishLock, NewPublishToken, PublishLock};

const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 });

pub struct IngestSourceFactory;

#[async_trait]
Expand Down Expand Up @@ -309,7 +308,7 @@ impl Source for IngestSource {
Ok(Ok(fetch_payload)) => {
self.process_fetch_response(&mut batch_builder, fetch_payload)?;

if batch_builder.num_bytes >= 5 * 1024 * 1024 {
if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
}
}
Expand Down
20 changes: 5 additions & 15 deletions quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,10 @@ use tracing::{debug, info, warn};

use crate::actors::DocProcessor;
use crate::models::{NewPublishLock, PublishLock};
use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory};

/// Number of bytes after which we cut a new batch.
///
/// We try to emit chewable batches for the indexer.
/// One batch = one message to the indexer actor.
///
/// If batches are too large:
/// - we might not be able to observe the state of the indexer for 5 seconds.
/// - we will be needlessly occupying resident memory in the mailbox.
/// - we will not have a precise control of the timeout before commit.
///
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000;
use crate::source::{
BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};

type GroupId = String;

Expand Down Expand Up @@ -486,7 +476,7 @@ impl Source for KafkaSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch = BatchBuilder::default();
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use super::shard_consumer::{ShardConsumer, ShardConsumerHandle, ShardConsumerMes
use crate::actors::DocProcessor;
use crate::models::RawDocBatch;
use crate::source::kinesis::helpers::get_kinesis_client;
use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory};

const TARGET_BATCH_NUM_BYTES: u64 = 5_000_000;
use crate::source::{
Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT,
EMIT_BATCHES_TIMEOUT,
};

type ShardId = String;

Expand Down Expand Up @@ -215,7 +216,7 @@ impl Source for KinesisSource {
let mut docs = Vec::new();
let mut checkpoint_delta = SourceCheckpointDelta::default();

let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down Expand Up @@ -278,7 +279,7 @@ impl Source for KinesisSource {
).context("failed to record partition delta")?;
}
}
if batch_num_bytes >= TARGET_BATCH_NUM_BYTES {
if batch_num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
}
}
Expand Down
16 changes: 16 additions & 0 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use bytesize::ByteSize;
pub use file_source::{FileSource, FileSourceFactory};
#[cfg(feature = "gcp-pubsub")]
pub use gcp_pubsub_source::{GcpPubSubSource, GcpPubSubSourceFactory};
Expand Down Expand Up @@ -109,6 +110,21 @@ use crate::models::RawDocBatch;
use crate::source::ingest::IngestSourceFactory;
use crate::source::ingest_api_source::IngestApiSourceFactory;

/// Number of bytes after which we cut a new batch.
///
/// We try to emit chewable batches for the indexer.
/// One batch = one message to the indexer actor.
///
/// If batches are too large:
/// - we might not be able to observe the state of the indexer for 5 seconds.
/// - we will be needlessly occupying resident memory in the mailbox.
/// - we will not have a precise control of the timeout before commit.
///
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = ByteSize::mib(5).as_u64();

const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 });

/// Runtime configuration used during execution of a source actor.
pub struct SourceRuntimeArgs {
pub pipeline_id: IndexingPipelineId,
Expand Down
16 changes: 2 additions & 14 deletions quickwit/quickwit-indexing/src/source/pulsar_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,9 @@ use tracing::{debug, info, warn};
use crate::actors::DocProcessor;
use crate::source::{
BatchBuilder, Source, SourceActor, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};

/// Number of bytes after which we cut a new batch.
///
/// We try to emit chewable batches for the indexer.
/// One batch = one message to the indexer actor.
///
/// If batches are too large:
/// - we might not be able to observe the state of the indexer for 5 seconds.
/// - we will be needlessly occupying resident memory in the mailbox.
/// - we will not have a precise control of the timeout before commit.
///
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000;

type PulsarConsumer = Consumer<PulsarMessage, TokioExecutor>;

pub struct PulsarSourceFactory;
Expand Down Expand Up @@ -225,7 +213,7 @@ impl Source for PulsarSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch = BatchBuilder::default();
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down