Skip to content

Commit

Permalink
Added logs to debug stage/publish errors (#5167)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Jun 26, 2024
1 parent ae2227a commit de66092
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 2 deletions.
10 changes: 9 additions & 1 deletion quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter_vec, new_gauge, new_gauge_vec, IntCounterVec, IntGauge, IntGaugeVec,
new_counter, new_counter_vec, new_gauge, new_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec
};

pub struct IndexerMetrics {
Expand All @@ -31,6 +31,8 @@ pub struct IndexerMetrics {
pub ongoing_merge_operations: IntGauge,
pub pending_merge_operations: IntGauge,
pub pending_merge_bytes: IntGauge,
// We use a lazy counter, as most users do not use Kafka.
pub kafka_rebalance_total: Lazy<IntCounter>,
}

impl Default for IndexerMetrics {
Expand Down Expand Up @@ -91,6 +93,12 @@ impl Default for IndexerMetrics {
"indexing",
&[],
),
kafka_rebalance_total: Lazy::new(|| new_counter(
"kafka_rebalance_total",
"Number of kafka rebalances",
"indexing",
&[]
)),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ macro_rules! return_if_err {
/// <https://docs.confluent.io/2.0.0/clients/librdkafka/classRdKafka_1_1RebalanceCb.html>
impl ConsumerContext for RdKafkaContext {
fn pre_rebalance(&self, rebalance: &Rebalance) {
crate::metrics::INDEXER_METRICS.kafka_rebalance_total.inc();
quickwit_common::rate_limited_info!(limit_per_min=3, topic=self.topic, "rebalance");
if let Rebalance::Revoke(tpl) = rebalance {
let partitions = collect_partitions(tpl, &self.topic);
debug!(partitions=?partitions, "revoke partitions");
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-metastore/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ impl SourceCheckpoint {
) -> Result<(), IncompatibleCheckpointDelta> {
self.check_compatibility(&delta)?;
debug!(delta=?delta, checkpoint=?self, "applying delta to checkpoint");

for (partition_id, partition_position) in delta.per_partition {
self.per_partition
.insert(partition_id, partition_position.to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ impl FileBackedIndex {
.checkpoint
.try_apply_delta(checkpoint_delta)
.map_err(|error| {
quickwit_common::rate_limited_error!(limit_per_min=6, index=self.index_id(), "failed to apply checkpoint delta");
let entity = EntityKind::CheckpointDelta {
index_id: self.index_id().to_string(),
source_id,
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-proto/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,25 @@ impl ServiceError for MetastoreError {

impl GrpcServiceError for MetastoreError {
fn new_internal(message: String) -> Self {
quickwit_common::rate_limited_error!(limit_per_min=6, message=%message.as_str(), "metastore error: internal");
Self::Internal {
message,
cause: "".to_string(),
}
}

fn new_timeout(message: String) -> Self {
quickwit_common::rate_limited_error!(limit_per_min=6, message=%message.as_str(), "metastore error: timeout");
Self::Timeout(message)
}

fn new_too_many_requests() -> Self {
quickwit_common::rate_limited_error!(limit_per_min=6, "metastore error: too many requests");
Self::TooManyRequests
}

fn new_unavailable(message: String) -> Self {
quickwit_common::rate_limited_error!(limit_per_min=6, message=%message.as_str(), "metastore error: unavailable metastore");
Self::Unavailable(message)
}
}
Expand Down

0 comments on commit de66092

Please sign in to comment.