Skip to content

Commit

Permalink
Revert "Consider ingesters returning ResourceExhausted temporarily un…
Browse files Browse the repository at this point in the history
…available (#5155)" (#5191)

This reverts commit de2e150.
  • Loading branch information
fulmicoton authored Jul 5, 2024
1 parent 622a12f commit d1022d6
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 292 deletions.
26 changes: 2 additions & 24 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,17 +920,6 @@ impl Ingester {
if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
}
// If the WAL disk usage is too high, we reject the request.
let wal_usage = state_guard.mrecordlog.resource_usage();
report_wal_usage(wal_usage);

let disk_used = wal_usage.disk_used_bytes as u64;

if disk_used >= self.disk_capacity.as_u64() * 95 / 100 {
return Err(IngestV2Error::Internal(
"WAL disk usage too high".to_string(),
));
}
let mut successes = Vec::with_capacity(init_shards_request.subrequests.len());
let mut failures = Vec::new();
let now = Instant::now();
Expand Down Expand Up @@ -1692,18 +1681,15 @@ mod tests {
doc_mapping_json,
}],
};
let response = ingester
.init_shards(init_shards_request.clone())
.await
.unwrap();
let response = ingester.init_shards(init_shards_request).await.unwrap();
assert_eq!(response.successes.len(), 1);
assert_eq!(response.failures.len(), 0);

let init_shard_success = &response.successes[0];
assert_eq!(init_shard_success.subrequest_id, 0);
assert_eq!(init_shard_success.shard, Some(shard));

let mut state_guard = ingester.state.lock_fully().await.unwrap();
let state_guard = ingester.state.lock_fully().await.unwrap();

let queue_id = queue_id(&index_uid, "test-source", &ShardId::from(1));
let shard = state_guard.shards.get(&queue_id).unwrap();
Expand All @@ -1714,14 +1700,6 @@ mod tests {

assert!(state_guard.rate_trackers.contains_key(&queue_id));
assert!(state_guard.mrecordlog.queue_exists(&queue_id));

state_guard.set_status(IngesterStatus::Decommissioned);
drop(state_guard);

let error = ingester.init_shards(init_shards_request).await.unwrap_err();
assert!(
matches!(error, IngestV2Error::Internal(message) if message.contains("decommissioned"))
);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit d1022d6

Please sign in to comment.