Skip to content

Commit

Permalink
Allocate new shards uniformly (#4720)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Mar 14, 2024
1 parent 7c444b3 commit 6153d96
Show file tree
Hide file tree
Showing 13 changed files with 393 additions and 686 deletions.
15 changes: 11 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,9 @@ mod tests {
use quickwit_metastore::{
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
};
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
use quickwit_proto::control_plane::{
GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsSubrequest,
};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, CpuCapacity, IndexingServiceClient};
use quickwit_proto::ingest::ingester::{IngesterServiceClient, RetainShardsResponse};
use quickwit_proto::ingest::{Shard, ShardState};
Expand Down Expand Up @@ -1998,7 +2000,7 @@ mod tests {
MetastoreServiceClient::from(mock_metastore),
);

let error = control_plane_mailbox
let response = control_plane_mailbox
.ask(GetOrCreateOpenShardsRequest {
subrequests: vec![GetOrCreateOpenShardsSubrequest {
subrequest_id: 0,
Expand All @@ -2010,8 +2012,13 @@ mod tests {
})
.await
.unwrap()
.unwrap_err();
assert!(matches!(error, ControlPlaneError::Unavailable { .. }));
.unwrap();
assert!(response.successes.is_empty());
assert_eq!(response.failures.len(), 1);
assert!(matches!(
response.failures[0].reason(),
GetOrCreateOpenShardsFailureReason::NoIngestersAvailable
));

let control_plane_state = control_plane_mailbox.ask(Observe).await.unwrap();
assert_eq!(control_plane_state.num_indexes, 1);
Expand Down
737 changes: 355 additions & 382 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ impl ControlPlaneModel {
.set_shards_as_unavailable(unavailable_leaders);
}

#[cfg(test)]
pub(crate) fn all_shards(&self) -> impl Iterator<Item = &ShardEntry> + '_ {
self.shard_table.all_shards()
}
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ impl ShardTable {
self.check_invariant();
}

#[cfg(test)]
pub(crate) fn all_shards(&self) -> impl Iterator<Item = &ShardEntry> + '_ {
self.table_entries
.values()
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ impl From<io::Error> for IngestServiceError {
impl From<IngestV2Error> for IngestServiceError {
fn from(error: IngestV2Error) -> Self {
match error {
IngestV2Error::IngesterUnavailable { .. }
| IngestV2Error::Timeout(_)
| IngestV2Error::Unavailable(_) => IngestServiceError::Unavailable,
IngestV2Error::Timeout(_) | IngestV2Error::Unavailable(_) => {
IngestServiceError::Unavailable
}
IngestV2Error::Internal(message) => IngestServiceError::Internal(message),
IngestV2Error::ShardNotFound { .. } => {
IngestServiceError::Internal("shard not found".to_string())
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,19 +448,19 @@ async fn fault_tolerant_fetch_stream(
index_uid=%index_uid,
source_id=%source_id,
shard_id=%shard_id,
"ingester `{ingester_id}` is not available: failing over to ingester `{failover_ingester_id}`"
"ingester `{ingester_id}` is unavailable: failing over to ingester `{failover_ingester_id}`"
);
} else {
error!(
client_id=%client_id,
index_uid=%index_uid,
source_id=%source_id,
shard_id=%shard_id,
"ingester `{ingester_id}` is not available: closing fetch stream"
"ingester `{ingester_id}` is unavailable: closing fetch stream"
);
let ingest_error = IngestV2Error::IngesterUnavailable {
ingester_id: ingester_id.clone(),
};
let message =
format!("ingester `{ingester_id}` is unavailable: closing fetch stream");
let ingest_error = IngestV2Error::Unavailable(message);
// Attempt to send the error to the consumer in a best-effort manner before
// returning.
let fetch_stream_error = FetchStreamError {
Expand Down
46 changes: 7 additions & 39 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ use quickwit_proto::ingest::ingester::{
IngesterServiceStream, IngesterStatus, InitShardsRequest, InitShardsResponse,
ObservationMessage, OpenFetchStreamRequest, OpenObservationStreamRequest,
OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure,
PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, PingRequest,
PingResponse, ReplicateFailureReason, ReplicateSubrequest, RetainShardsForSource,
RetainShardsRequest, RetainShardsResponse, SynReplicationMessage, TruncateShardsRequest,
TruncateShardsResponse,
PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, ReplicateFailureReason,
ReplicateSubrequest, RetainShardsForSource, RetainShardsRequest, RetainShardsResponse,
SynReplicationMessage, TruncateShardsRequest, TruncateShardsResponse,
};
use quickwit_proto::ingest::{
CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardIds, ShardState,
Expand Down Expand Up @@ -372,12 +371,10 @@ impl Ingester {
.try_send(open_message)
.expect("channel should be open and have capacity");

let mut ingester =
self.ingester_pool
.get(&follower_id)
.ok_or(IngestV2Error::IngesterUnavailable {
ingester_id: follower_id.clone(),
})?;
let mut ingester = self.ingester_pool.get(&follower_id).ok_or_else(|| {
let message = format!("ingester `{follower_id}` is unavailable");
IngestV2Error::Unavailable(message)
})?;
let mut ack_replication_stream = ingester
.open_replication_stream(syn_replication_stream)
.await?;
Expand Down Expand Up @@ -948,31 +945,6 @@ impl Ingester {
Ok(CloseShardsResponse {})
}

async fn ping_inner(&mut self, ping_request: PingRequest) -> IngestV2Result<PingResponse> {
let state_guard = self.state.lock_partially().await?;

if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
}
if ping_request.leader_id != self.self_node_id {
let ping_response = PingResponse {};
return Ok(ping_response);
};
let Some(follower_id) = &ping_request.follower_id else {
let ping_response = PingResponse {};
return Ok(ping_response);
};
let follower_id: NodeId = follower_id.clone().into();
let mut ingester = self.ingester_pool.get(&follower_id).ok_or({
IngestV2Error::IngesterUnavailable {
ingester_id: follower_id,
}
})?;
ingester.ping(ping_request).await?;
let ping_response = PingResponse {};
Ok(ping_response)
}

async fn decommission_inner(
&mut self,
_decommission_request: DecommissionRequest,
Expand Down Expand Up @@ -1077,10 +1049,6 @@ impl IngesterService for Ingester {
self.close_shards_inner(close_shards_request).await
}

async fn ping(&mut self, ping_request: PingRequest) -> IngestV2Result<PingResponse> {
self.ping_inner(ping_request).await
}

async fn decommission(
&mut self,
decommission_request: DecommissionRequest,
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ impl IngestWorkbench {
GetOrCreateOpenShardsFailureReason::SourceNotFound => {
SubworkbenchFailure::SourceNotFound
}
GetOrCreateOpenShardsFailureReason::NoIngestersAvailable => {
SubworkbenchFailure::NoShardsAvailable
}
GetOrCreateOpenShardsFailureReason::Unspecified => {
warn!(
"failure reason for subrequest `{}` is unspecified",
Expand Down Expand Up @@ -145,7 +148,7 @@ impl IngestWorkbench {
// `NotFound`, and `TooManyRequests`: in reality, we should never have to handle these cases
// here.
match persist_error {
IngestV2Error::IngesterUnavailable { .. } | IngestV2Error::Unavailable(_) => {
IngestV2Error::Unavailable(_) => {
self.unavailable_leaders.insert(persist_summary.leader_id);

for subrequest_id in persist_summary.subrequest_ids {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ enum GetOrCreateOpenShardsFailureReason {
GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_UNSPECIFIED = 0;
GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_INDEX_NOT_FOUND = 1;
GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_SOURCE_NOT_FOUND = 2;
GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_NO_INGESTERS_AVAILABLE = 3;
}

message GetOrCreateOpenShardsFailure {
Expand Down
12 changes: 0 additions & 12 deletions quickwit/quickwit-proto/protos/quickwit/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ service IngesterService {
// Closes a set of shards. This RPC is called by the control plane.
rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse);

// Pings an ingester to check if it is ready to host shards and serve requests.
rpc Ping(PingRequest) returns (PingResponse);

// Decommissions the ingester.
rpc Decommission(DecommissionRequest) returns (DecommissionResponse);

}

message RetainShardsForSource {
Expand Down Expand Up @@ -265,14 +261,6 @@ message CloseShardsRequest {
message CloseShardsResponse {
}

message PingRequest {
string leader_id = 1;
optional string follower_id = 2;
}

message PingResponse {
}

message DecommissionRequest {
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6153d96

Please sign in to comment.