Skip to content

Commit

Permalink
First stab at bugfix.
Browse files Browse the repository at this point in the history
We avoid evicting from the ingester pool a node that experienced a timeout or a transport error.

No code path would re-add it to the ingester pool, and that ingester pool is also used by the ingest source
resulting in bug #4336.

Instead we simply treat these errors as any other transient errors, and rely on the round robbin logic to avoid picking the same node over and over again.
  • Loading branch information
fulmicoton committed Jan 2, 2024
1 parent 929534e commit f8ffbb1
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 47 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/tower/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
}

/// Removes a value from the pool.
pub fn remove(&self, key: &K) {
fn remove(&self, key: &K) {
self.pool
.write()
.expect("lock should not be poisoned")
Expand Down
19 changes: 6 additions & 13 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ impl IngestRouter {

pub fn subscribe(&self, event_broker: &EventBroker) {
let weak_router_state = WeakRouterState(Arc::downgrade(&self.state));

event_broker
.subscribe::<LocalShardsUpdate>(weak_router_state.clone())
.forever();
Expand Down Expand Up @@ -138,8 +137,8 @@ impl IngestRouter {
if !state_guard.routing_table.has_open_shards(
&subrequest.index_id,
&subrequest.source_id,
&mut closed_shards,
ingester_pool,
&mut closed_shards,
&mut unavailable_leaders,
) {
let subrequest = GetOrCreateOpenShardsSubrequest {
Expand All @@ -153,16 +152,10 @@ impl IngestRouter {
drop(state_guard);

if !closed_shards.is_empty() {
info!(
"reporting {} closed shard(s) to control plane",
closed_shards.len()
)
info!(closed_shards=?closed_shards, "reporting closed shard(s) to control plane");
}
if !unavailable_leaders.is_empty() {
info!(
"reporting {} unavailable leader(s) to control plane",
unavailable_leaders.len()
);
info!(unvailable_leaders=?unavailable_leaders, "reporting unavailable leader(s) to control plane");
}
GetOrCreateOpenShardsRequest {
subrequests: get_open_shards_subrequests,
Expand Down Expand Up @@ -267,9 +260,10 @@ impl IngestRouter {
for subrequest_id in persist_summary.subrequest_ids {
workbench.record_no_shards_available(subrequest_id);
}
self.ingester_pool.remove(&persist_summary.leader_id);
}
_ => {
IngestV2Error::TooManyRequests
| IngestV2Error::Internal(_)
| IngestV2Error::ShardNotFound { .. } => {
for subrequest_id in persist_summary.subrequest_ids {
workbench.record_internal_error(
subrequest_id,
Expand Down Expand Up @@ -394,7 +388,6 @@ impl IngestRouter {
) -> IngestV2Result<IngestResponseV2> {
let commit_type = ingest_request.commit_type();
let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts);

while !workbench.is_complete() {
workbench.new_attempt();
self.batch_persist(&mut workbench, commit_type).await;
Expand Down
69 changes: 37 additions & 32 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,21 @@ impl RoutingTableEntry {
/// unavailable ingesters encountered along the way.
pub fn has_open_shards(
&self,
closed_shard_ids: &mut Vec<ShardId>,
ingester_pool: &IngesterPool,
closed_shard_ids: &mut Vec<ShardId>,
unavailable_leaders: &mut HashSet<NodeId>,
) -> bool {
for shards in [&self.local_shards, &self.remote_shards] {
for shard in shards {
if shard.shard_state.is_closed() {
closed_shard_ids.push(shard.shard_id);
} else if shard.shard_state.is_open() {
if ingester_pool.contains_key(&shard.leader_id) {
return true;
} else {
let leader_id: NodeId = shard.leader_id.clone();
unavailable_leaders.insert(leader_id);
}
let shards = self.local_shards.iter().chain(self.remote_shards.iter());
for shard in shards {
if shard.shard_state.is_closed() {
closed_shard_ids.push(shard.shard_id);
} else if shard.shard_state.is_open() {
if ingester_pool.contains_key(&shard.leader_id) {
return true;
} else {
// TODO should we change the state of the shard?
let leader_id: NodeId = shard.leader_id.clone();
unavailable_leaders.insert(leader_id);
}
}
}
Expand Down Expand Up @@ -328,31 +328,36 @@ impl RoutingTable {
self.table.get(&key)
}

/// Returns `true` if the router already knows about a shard for a given source that has
/// an available `leader`.
///
/// If this function returns false, it populates the set of unavailable leaders and closed
/// shards. These will be joined to the GetOrCreate shard request emitted to the control
/// plane.
pub fn has_open_shards(
&self,
index_id: impl Into<IndexId>,
source_id: impl Into<SourceId>,
closed_shards: &mut Vec<ShardIds>,
ingester_pool: &IngesterPool,
closed_shards: &mut Vec<ShardIds>,
unavailable_leaders: &mut HashSet<NodeId>,
) -> bool {
if let Some(entry) = self.find_entry(index_id, source_id) {
let mut closed_shard_ids: Vec<ShardId> = Vec::new();

let result =
entry.has_open_shards(&mut closed_shard_ids, ingester_pool, unavailable_leaders);

if !closed_shard_ids.is_empty() {
closed_shards.push(ShardIds {
index_uid: entry.index_uid.clone().into(),
source_id: entry.source_id.clone(),
shard_ids: closed_shard_ids,
});
}
result
} else {
false
let Some(entry) = self.find_entry(index_id, source_id) else {
return false;
};
let mut closed_shard_ids: Vec<ShardId> = Vec::new();

let result =
entry.has_open_shards(ingester_pool, &mut closed_shard_ids, unavailable_leaders);

if !closed_shard_ids.is_empty() {
closed_shards.push(ShardIds {
index_uid: entry.index_uid.clone().into(),
source_id: entry.source_id.clone(),
shard_ids: closed_shard_ids,
});
}
result
}

/// Replaces the routing table entry for the source with the provided shards.
Expand Down Expand Up @@ -530,8 +535,8 @@ mod tests {
let mut unavailable_leaders = HashSet::new();

assert!(!table_entry.has_open_shards(
&mut closed_shard_ids,
&ingester_pool,
&mut closed_shard_ids,
&mut unavailable_leaders
));
assert!(closed_shard_ids.is_empty());
Expand Down Expand Up @@ -570,8 +575,8 @@ mod tests {
remote_round_robin_idx: AtomicUsize::default(),
};
assert!(table_entry.has_open_shards(
&mut closed_shard_ids,
&ingester_pool,
&mut closed_shard_ids,
&mut unavailable_leaders
));
assert_eq!(closed_shard_ids.len(), 1);
Expand Down Expand Up @@ -611,8 +616,8 @@ mod tests {
remote_round_robin_idx: AtomicUsize::default(),
};
assert!(table_entry.has_open_shards(
&mut closed_shard_ids,
&ingester_pool,
&mut closed_shard_ids,
&mut unavailable_leaders
));
assert_eq!(closed_shard_ids.len(), 1);
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ impl IngestWorkbench {
self.num_attempts += 1;
}

/// Returns true if all subrequests were successful or if the number of
/// attempts has been exhausted.
pub fn is_complete(&self) -> bool {
self.num_successes >= self.subworkbenches.len()
|| self.num_attempts >= self.max_num_attempts
Expand Down Expand Up @@ -262,11 +264,15 @@ impl IngestSubworkbench {
self.persist_success_opt.is_none() && self.last_failure_is_transient()
}

/// Returns `false` if and only if the last attempt suggest retrying will fail.
/// e.g.:
/// - the index does not exist
/// - the source does not exist.
fn last_failure_is_transient(&self) -> bool {
match self.last_failure_opt {
Some(SubworkbenchFailure::IndexNotFound) => false,
Some(SubworkbenchFailure::SourceNotFound) => false,
Some(SubworkbenchFailure::Internal(_)) => false,
Some(SubworkbenchFailure::Internal(_)) => true,
Some(SubworkbenchFailure::NoShardsAvailable) => true,
Some(SubworkbenchFailure::Persist(_)) => true,
None => true,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum IngestV2Error {
ShardNotFound { shard_id: ShardId },
#[error("request timed out")]
Timeout,
// This error is provoked by semaphore located on the router.
#[error("too many requests")]
TooManyRequests,
// TODO: Merge `Transport` and `IngesterUnavailable` into a single `Unavailable` error.
Expand Down

0 comments on commit f8ffbb1

Please sign in to comment.