Skip to content

Commit

Permalink
Handle index/source not found and timeout/transport errors
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Nov 1, 2023
1 parent 6d22ae0 commit 6999fb2
Show file tree
Hide file tree
Showing 26 changed files with 1,137 additions and 449 deletions.
10 changes: 6 additions & 4 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
pub fn as_grpc_service(&self) -> #grpc_server_package_name::#grpc_server_name<#grpc_server_adapter_name> {
let adapter = #grpc_server_adapter_name::new(self.clone());
#grpc_server_package_name::#grpc_server_name::new(adapter)
.max_decoding_message_size(10 * 1024 * 1024)
.max_encoding_message_size(10 * 1024 * 1024)
}

pub fn from_channel(addr: std::net::SocketAddr, channel: tonic::transport::Channel) -> Self
Expand All @@ -585,7 +587,10 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
pub fn from_balance_channel(balance_channel: quickwit_common::tower::BalanceChannel<std::net::SocketAddr>) -> #client_name
{
let connection_keys_watcher = balance_channel.connection_keys_watcher();
let adapter = #grpc_client_adapter_name::new(#grpc_client_package_name::#grpc_client_name::new(balance_channel), connection_keys_watcher);
let client = #grpc_client_package_name::#grpc_client_name::new(balance_channel)
.max_decoding_message_size(10 * 1024 * 1024)
.max_encoding_message_size(10 * 1024 * 1024);
let adapter = #grpc_client_adapter_name::new(client, connection_keys_watcher);
Self::new(adapter)
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Default for IngestApiConfig {
max_queue_memory_usage: Byte::from_bytes(2 * 1024 * 1024 * 1024), /* 2 GiB // TODO maybe we want more? */
max_queue_disk_usage: Byte::from_bytes(4 * 1024 * 1024 * 1024), /* 4 GiB // TODO maybe we want more? */
replication_factor: 1,
content_length_limit: 10 * 1024 * 1024, // 10 MB
content_length_limit: 10 * 1024 * 1024, // 10 MiB
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ mod tests {

#[tokio::test]
async fn test_control_plane_add_source() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();

let cluster_id = "test-cluster".to_string();
Expand Down Expand Up @@ -564,7 +563,6 @@ mod tests {

#[tokio::test]
async fn test_control_plane_toggle_source() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();

let cluster_id = "test-cluster".to_string();
Expand Down Expand Up @@ -743,6 +741,7 @@ mod tests {
);
let get_open_shards_request = GetOrCreateOpenShardsRequest {
subrequests: vec![GetOrCreateOpenShardsSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
}],
Expand All @@ -753,9 +752,10 @@ mod tests {
.ask_for_res(get_open_shards_request)
.await
.unwrap();
assert_eq!(get_open_shards_response.subresponses.len(), 1);
assert_eq!(get_open_shards_response.successes.len(), 1);
assert_eq!(get_open_shards_response.failures.len(), 0);

let subresponse = &get_open_shards_response.subresponses[0];
let subresponse = &get_open_shards_response.successes[0];
assert_eq!(subresponse.index_uid, "test-index:0");
assert_eq!(subresponse.source_id, INGEST_SOURCE_ID);
assert_eq!(subresponse.open_shards.len(), 1);
Expand Down
137 changes: 87 additions & 50 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ use itertools::Itertools;
use quickwit_common::{PrettySample, Progress};
use quickwit_ingest::IngesterPool;
use quickwit_proto::control_plane::{
ClosedShards, ControlPlaneError, ControlPlaneResult, GetOpenShardsSubresponse,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse,
ClosedShards, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsFailure,
GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess,
};
use quickwit_proto::ingest::ingester::{IngesterService, PingRequest};
use quickwit_proto::ingest::{IngestV2Error, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{
EntityKind, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId};
use rand::seq::SliceRandom;
use tokio::time::timeout;
Expand Down Expand Up @@ -232,9 +231,6 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> ControlPlaneResult<GetOrCreateOpenShardsResponse> {
let mut get_open_shards_subresponses =
Vec::with_capacity(get_open_shards_request.subrequests.len());

self.handle_closed_shards(get_open_shards_request.closed_shards, model);

let mut unavailable_leaders: FnvHashSet<NodeId> = get_open_shards_request
Expand All @@ -245,37 +241,44 @@ impl IngestController {

self.handle_unavailable_leaders(&unavailable_leaders, model);

let num_subrequests = get_open_shards_request.subrequests.len();
let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests);
let mut get_or_create_open_shards_failures = Vec::new();
let mut open_shards_subrequests = Vec::new();

for get_open_shards_subrequest in get_open_shards_request.subrequests {
let index_uid = model
.index_uid(&get_open_shards_subrequest.index_id)
.ok_or_else(|| {
MetastoreError::NotFound(EntityKind::Index {
index_id: get_open_shards_subrequest.index_id.clone(),
})
})?;

let (open_shards, next_shard_id) = model
.find_open_shards(
&index_uid,
&get_open_shards_subrequest.source_id,
&unavailable_leaders,
)
.ok_or_else(|| {
MetastoreError::NotFound(EntityKind::Source {
index_id: get_open_shards_subrequest.index_id.clone(),
source_id: get_open_shards_subrequest.source_id.clone(),
})
})?;

let Some(index_uid) = model.index_uid(&get_open_shards_subrequest.index_id) else {
let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_id: get_open_shards_subrequest.index_id,
source_id: get_open_shards_subrequest.source_id,
reason: GetOrCreateOpenShardsFailureReason::IndexNotFound as i32,
};
get_or_create_open_shards_failures.push(get_or_create_open_shards_failure);
continue;
};
let Some((open_shards, next_shard_id)) = model.find_open_shards(
&index_uid,
&get_open_shards_subrequest.source_id,
&unavailable_leaders,
) else {
let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_id: get_open_shards_subrequest.index_id,
source_id: get_open_shards_subrequest.source_id,
reason: GetOrCreateOpenShardsFailureReason::SourceNotFound as i32,
};
get_or_create_open_shards_failures.push(get_or_create_open_shards_failure);
continue;
};
if !open_shards.is_empty() {
let get_open_shards_subresponse = GetOpenShardsSubresponse {
let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_uid: index_uid.into(),
source_id: get_open_shards_subrequest.source_id,
open_shards,
};
get_open_shards_subresponses.push(get_open_shards_subresponse);
get_or_create_open_shards_successes.push(get_or_create_open_shards_success);
} else {
// TODO: Find leaders in batches.
// TODO: Round-robin leader-follower pairs or choose according to load.
Expand All @@ -286,6 +289,7 @@ impl IngestController {
ControlPlaneError::Unavailable("no available ingester".to_string())
})?;
let open_shards_subrequest = metastore::OpenShardsSubrequest {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_uid: index_uid.into(),
source_id: get_open_shards_subrequest.source_id,
leader_id: leader_id.into(),
Expand Down Expand Up @@ -315,17 +319,19 @@ impl IngestController {
if let Some((open_shards, _next_shard_id)) =
model.find_open_shards(&index_uid, &source_id, &unavailable_leaders)
{
let get_open_shards_subresponse = GetOpenShardsSubresponse {
let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess {
subrequest_id: open_shards_subresponse.subrequest_id,
index_uid: index_uid.into(),
source_id,
source_id: open_shards_subresponse.source_id,
open_shards,
};
get_open_shards_subresponses.push(get_open_shards_subresponse);
get_or_create_open_shards_successes.push(get_or_create_open_shards_success);
}
}
}
Ok(GetOrCreateOpenShardsResponse {
subresponses: get_open_shards_subresponses,
successes: get_or_create_open_shards_successes,
failures: get_or_create_open_shards_failures,
})
}
}
Expand Down Expand Up @@ -575,6 +581,7 @@ mod tests {
assert_eq!(&request.subrequests[0].source_id, source_id);

let subresponses = vec![metastore::OpenShardsSubresponse {
subrequest_id: 1,
index_uid: index_uid_1.clone().into(),
source_id: source_id.to_string(),
opened_shards: vec![Shard {
Expand Down Expand Up @@ -657,17 +664,30 @@ mod tests {
.await
.unwrap();

assert_eq!(response.subresponses.len(), 0);
assert_eq!(response.successes.len(), 0);
assert_eq!(response.failures.len(), 0);

let subrequests = vec![
GetOrCreateOpenShardsSubrequest {
subrequest_id: 0,
index_id: "test-index-0".to_string(),
source_id: source_id.to_string(),
},
GetOrCreateOpenShardsSubrequest {
subrequest_id: 1,
index_id: "test-index-1".to_string(),
source_id: source_id.to_string(),
},
GetOrCreateOpenShardsSubrequest {
subrequest_id: 2,
index_id: "index-not-found".to_string(),
source_id: "source-not-found".to_string(),
},
GetOrCreateOpenShardsSubrequest {
subrequest_id: 3,
index_id: "test-index-0".to_string(),
source_id: "source-not-found".to_string(),
},
];
let closed_shards = Vec::new();
let unavailable_leaders = vec!["test-ingester-0".to_string()];
Expand All @@ -681,24 +701,41 @@ mod tests {
.await
.unwrap();

assert_eq!(response.subresponses.len(), 2);

assert_eq!(response.subresponses[0].index_uid, index_uid_0.as_str());
assert_eq!(response.subresponses[0].source_id, source_id);
assert_eq!(response.subresponses[0].open_shards.len(), 1);
assert_eq!(response.subresponses[0].open_shards[0].shard_id, 2);
assert_eq!(response.successes.len(), 2);
assert_eq!(response.failures.len(), 2);

let success = &response.successes[0];
assert_eq!(success.subrequest_id, 0);
assert_eq!(success.index_uid, index_uid_0.as_str());
assert_eq!(success.source_id, source_id);
assert_eq!(success.open_shards.len(), 1);
assert_eq!(success.open_shards[0].shard_id, 2);
assert_eq!(success.open_shards[0].leader_id, "test-ingester-1");

let success = &response.successes[1];
assert_eq!(success.subrequest_id, 1);
assert_eq!(success.index_uid, index_uid_1.as_str());
assert_eq!(success.source_id, source_id);
assert_eq!(success.open_shards.len(), 1);
assert_eq!(success.open_shards[0].shard_id, 1);
assert_eq!(success.open_shards[0].leader_id, "test-ingester-2");

let failure = &response.failures[0];
assert_eq!(failure.subrequest_id, 2);
assert_eq!(failure.index_id, "index-not-found");
assert_eq!(failure.source_id, "source-not-found");
assert_eq!(
response.subresponses[0].open_shards[0].leader_id,
"test-ingester-1"
failure.reason(),
GetOrCreateOpenShardsFailureReason::IndexNotFound
);

assert_eq!(&response.subresponses[1].index_uid, index_uid_1.as_str());
assert_eq!(response.subresponses[1].source_id, source_id);
assert_eq!(response.subresponses[1].open_shards.len(), 1);
assert_eq!(response.subresponses[1].open_shards[0].shard_id, 1);
let failure = &response.failures[1];
assert_eq!(failure.subrequest_id, 3);
assert_eq!(failure.index_id, index_id_0);
assert_eq!(failure.source_id, "source-not-found");
assert_eq!(
response.subresponses[1].open_shards[0].leader_id,
"test-ingester-2"
failure.reason(),
GetOrCreateOpenShardsFailureReason::SourceNotFound
);

assert_eq!(model.observable_state().num_shards, 2);
Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ async fn start_control_plane(

#[tokio::test]
async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() {
quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
let cluster =
create_cluster_for_test(Vec::new(), &["indexer", "control_plane"], &transport, true)
Expand Down Expand Up @@ -251,7 +250,6 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() {

#[tokio::test]
async fn test_scheduler_scheduling_no_indexer() {
quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true)
.await
Expand Down Expand Up @@ -288,7 +286,6 @@ async fn test_scheduler_scheduling_no_indexer() {

#[tokio::test]
async fn test_scheduler_scheduling_multiple_indexers() {
quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true)
.await
Expand Down
9 changes: 8 additions & 1 deletion quickwit/quickwit-ingest/src/codegen/ingest_service.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl FetchTask {
fetch_range=?self.fetch_range,
"spawning fetch task"
);
let mut has_drained_queue = true;
let mut has_drained_queue = false;
let mut has_reached_eof = false;
let mut num_records_total = 0;

Expand Down Expand Up @@ -645,12 +645,6 @@ mod tests {
.unwrap();
drop(state_guard);

timeout(Duration::from_millis(50), fetch_stream.next())
.await
.unwrap_err();

new_records_tx.send(()).unwrap();

let fetch_response = timeout(Duration::from_millis(50), fetch_stream.next())
.await
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Ingester {
.next()
.await
.expect("TODO")
.expect("")
.expect("TODO")
.into_open_response()
.expect("first message should be an open response");

Expand Down
Loading

0 comments on commit 6999fb2

Please sign in to comment.