Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle index/source not found and timeout/transport errors #4061

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
pub fn as_grpc_service(&self) -> #grpc_server_package_name::#grpc_server_name<#grpc_server_adapter_name> {
let adapter = #grpc_server_adapter_name::new(self.clone());
#grpc_server_package_name::#grpc_server_name::new(adapter)
.max_decoding_message_size(10 * 1024 * 1024)
.max_encoding_message_size(10 * 1024 * 1024)
}

pub fn from_channel(addr: std::net::SocketAddr, channel: tonic::transport::Channel) -> Self
Expand All @@ -585,7 +587,10 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
pub fn from_balance_channel(balance_channel: quickwit_common::tower::BalanceChannel<std::net::SocketAddr>) -> #client_name
{
let connection_keys_watcher = balance_channel.connection_keys_watcher();
let adapter = #grpc_client_adapter_name::new(#grpc_client_package_name::#grpc_client_name::new(balance_channel), connection_keys_watcher);
let client = #grpc_client_package_name::#grpc_client_name::new(balance_channel)
.max_decoding_message_size(10 * 1024 * 1024)
.max_encoding_message_size(10 * 1024 * 1024);
let adapter = #grpc_client_adapter_name::new(client, connection_keys_watcher);
Self::new(adapter)
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Default for IngestApiConfig {
max_queue_memory_usage: Byte::from_bytes(2 * 1024 * 1024 * 1024), /* 2 GiB // TODO maybe we want more? */
max_queue_disk_usage: Byte::from_bytes(4 * 1024 * 1024 * 1024), /* 4 GiB // TODO maybe we want more? */
replication_factor: 1,
content_length_limit: 10 * 1024 * 1024, // 10 MB
content_length_limit: 10 * 1024 * 1024, // 10 MiB
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ mod tests {

#[tokio::test]
async fn test_control_plane_add_source() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();

let cluster_id = "test-cluster".to_string();
Expand Down Expand Up @@ -564,7 +563,6 @@ mod tests {

#[tokio::test]
async fn test_control_plane_toggle_source() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();

let cluster_id = "test-cluster".to_string();
Expand Down Expand Up @@ -743,6 +741,7 @@ mod tests {
);
let get_open_shards_request = GetOrCreateOpenShardsRequest {
subrequests: vec![GetOrCreateOpenShardsSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
}],
Expand All @@ -753,9 +752,10 @@ mod tests {
.ask_for_res(get_open_shards_request)
.await
.unwrap();
assert_eq!(get_open_shards_response.subresponses.len(), 1);
assert_eq!(get_open_shards_response.successes.len(), 1);
assert_eq!(get_open_shards_response.failures.len(), 0);

let subresponse = &get_open_shards_response.subresponses[0];
let subresponse = &get_open_shards_response.successes[0];
assert_eq!(subresponse.index_uid, "test-index:0");
assert_eq!(subresponse.source_id, INGEST_SOURCE_ID);
assert_eq!(subresponse.open_shards.len(), 1);
Expand Down
137 changes: 87 additions & 50 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ use itertools::Itertools;
use quickwit_common::{PrettySample, Progress};
use quickwit_ingest::IngesterPool;
use quickwit_proto::control_plane::{
ClosedShards, ControlPlaneError, ControlPlaneResult, GetOpenShardsSubresponse,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse,
ClosedShards, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsFailure,
GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess,
};
use quickwit_proto::ingest::ingester::{IngesterService, PingRequest};
use quickwit_proto::ingest::{IngestV2Error, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{
EntityKind, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId};
use rand::seq::SliceRandom;
use tokio::time::timeout;
Expand Down Expand Up @@ -232,9 +231,6 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> ControlPlaneResult<GetOrCreateOpenShardsResponse> {
let mut get_open_shards_subresponses =
Vec::with_capacity(get_open_shards_request.subrequests.len());

self.handle_closed_shards(get_open_shards_request.closed_shards, model);

let mut unavailable_leaders: FnvHashSet<NodeId> = get_open_shards_request
Expand All @@ -245,37 +241,44 @@ impl IngestController {

self.handle_unavailable_leaders(&unavailable_leaders, model);

let num_subrequests = get_open_shards_request.subrequests.len();
let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests);
let mut get_or_create_open_shards_failures = Vec::new();
let mut open_shards_subrequests = Vec::new();

for get_open_shards_subrequest in get_open_shards_request.subrequests {
let index_uid = model
.index_uid(&get_open_shards_subrequest.index_id)
.ok_or_else(|| {
MetastoreError::NotFound(EntityKind::Index {
index_id: get_open_shards_subrequest.index_id.clone(),
})
})?;

let (open_shards, next_shard_id) = model
.find_open_shards(
&index_uid,
&get_open_shards_subrequest.source_id,
&unavailable_leaders,
)
.ok_or_else(|| {
MetastoreError::NotFound(EntityKind::Source {
index_id: get_open_shards_subrequest.index_id.clone(),
source_id: get_open_shards_subrequest.source_id.clone(),
})
})?;

let Some(index_uid) = model.index_uid(&get_open_shards_subrequest.index_id) else {
let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_id: get_open_shards_subrequest.index_id,
source_id: get_open_shards_subrequest.source_id,
reason: GetOrCreateOpenShardsFailureReason::IndexNotFound as i32,
};
get_or_create_open_shards_failures.push(get_or_create_open_shards_failure);
continue;
};
let Some((open_shards, next_shard_id)) = model.find_open_shards(
&index_uid,
&get_open_shards_subrequest.source_id,
&unavailable_leaders,
) else {
let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_id: get_open_shards_subrequest.index_id,
source_id: get_open_shards_subrequest.source_id,
reason: GetOrCreateOpenShardsFailureReason::SourceNotFound as i32,
};
get_or_create_open_shards_failures.push(get_or_create_open_shards_failure);
continue;
};
if !open_shards.is_empty() {
let get_open_shards_subresponse = GetOpenShardsSubresponse {
let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_uid: index_uid.into(),
source_id: get_open_shards_subrequest.source_id,
open_shards,
};
get_open_shards_subresponses.push(get_open_shards_subresponse);
get_or_create_open_shards_successes.push(get_or_create_open_shards_success);
} else {
// TODO: Find leaders in batches.
// TODO: Round-robin leader-follower pairs or choose according to load.
Expand All @@ -286,6 +289,7 @@ impl IngestController {
ControlPlaneError::Unavailable("no available ingester".to_string())
})?;
let open_shards_subrequest = metastore::OpenShardsSubrequest {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_uid: index_uid.into(),
source_id: get_open_shards_subrequest.source_id,
leader_id: leader_id.into(),
Expand Down Expand Up @@ -315,17 +319,19 @@ impl IngestController {
if let Some((open_shards, _next_shard_id)) =
model.find_open_shards(&index_uid, &source_id, &unavailable_leaders)
{
let get_open_shards_subresponse = GetOpenShardsSubresponse {
let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess {
subrequest_id: open_shards_subresponse.subrequest_id,
index_uid: index_uid.into(),
source_id,
source_id: open_shards_subresponse.source_id,
open_shards,
};
get_open_shards_subresponses.push(get_open_shards_subresponse);
get_or_create_open_shards_successes.push(get_or_create_open_shards_success);
}
}
}
Ok(GetOrCreateOpenShardsResponse {
subresponses: get_open_shards_subresponses,
successes: get_or_create_open_shards_successes,
failures: get_or_create_open_shards_failures,
})
}
}
Expand Down Expand Up @@ -575,6 +581,7 @@ mod tests {
assert_eq!(&request.subrequests[0].source_id, source_id);

let subresponses = vec![metastore::OpenShardsSubresponse {
subrequest_id: 1,
index_uid: index_uid_1.clone().into(),
source_id: source_id.to_string(),
opened_shards: vec![Shard {
Expand Down Expand Up @@ -657,17 +664,30 @@ mod tests {
.await
.unwrap();

assert_eq!(response.subresponses.len(), 0);
assert_eq!(response.successes.len(), 0);
assert_eq!(response.failures.len(), 0);

let subrequests = vec![
GetOrCreateOpenShardsSubrequest {
subrequest_id: 0,
index_id: "test-index-0".to_string(),
source_id: source_id.to_string(),
},
GetOrCreateOpenShardsSubrequest {
subrequest_id: 1,
index_id: "test-index-1".to_string(),
source_id: source_id.to_string(),
},
GetOrCreateOpenShardsSubrequest {
subrequest_id: 2,
index_id: "index-not-found".to_string(),
source_id: "source-not-found".to_string(),
},
GetOrCreateOpenShardsSubrequest {
subrequest_id: 3,
index_id: "test-index-0".to_string(),
source_id: "source-not-found".to_string(),
},
];
let closed_shards = Vec::new();
let unavailable_leaders = vec!["test-ingester-0".to_string()];
Expand All @@ -681,24 +701,41 @@ mod tests {
.await
.unwrap();

assert_eq!(response.subresponses.len(), 2);

assert_eq!(response.subresponses[0].index_uid, index_uid_0.as_str());
assert_eq!(response.subresponses[0].source_id, source_id);
assert_eq!(response.subresponses[0].open_shards.len(), 1);
assert_eq!(response.subresponses[0].open_shards[0].shard_id, 2);
assert_eq!(response.successes.len(), 2);
assert_eq!(response.failures.len(), 2);

let success = &response.successes[0];
assert_eq!(success.subrequest_id, 0);
assert_eq!(success.index_uid, index_uid_0.as_str());
assert_eq!(success.source_id, source_id);
assert_eq!(success.open_shards.len(), 1);
assert_eq!(success.open_shards[0].shard_id, 2);
assert_eq!(success.open_shards[0].leader_id, "test-ingester-1");

let success = &response.successes[1];
assert_eq!(success.subrequest_id, 1);
assert_eq!(success.index_uid, index_uid_1.as_str());
assert_eq!(success.source_id, source_id);
assert_eq!(success.open_shards.len(), 1);
assert_eq!(success.open_shards[0].shard_id, 1);
assert_eq!(success.open_shards[0].leader_id, "test-ingester-2");

let failure = &response.failures[0];
assert_eq!(failure.subrequest_id, 2);
assert_eq!(failure.index_id, "index-not-found");
assert_eq!(failure.source_id, "source-not-found");
assert_eq!(
response.subresponses[0].open_shards[0].leader_id,
"test-ingester-1"
failure.reason(),
GetOrCreateOpenShardsFailureReason::IndexNotFound
);

assert_eq!(&response.subresponses[1].index_uid, index_uid_1.as_str());
assert_eq!(response.subresponses[1].source_id, source_id);
assert_eq!(response.subresponses[1].open_shards.len(), 1);
assert_eq!(response.subresponses[1].open_shards[0].shard_id, 1);
let failure = &response.failures[1];
assert_eq!(failure.subrequest_id, 3);
assert_eq!(failure.index_id, index_id_0);
assert_eq!(failure.source_id, "source-not-found");
assert_eq!(
response.subresponses[1].open_shards[0].leader_id,
"test-ingester-2"
failure.reason(),
GetOrCreateOpenShardsFailureReason::SourceNotFound
);

assert_eq!(model.observable_state().num_shards, 2);
Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ async fn start_control_plane(

#[tokio::test]
async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() {
quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
let cluster =
create_cluster_for_test(Vec::new(), &["indexer", "control_plane"], &transport, true)
Expand Down Expand Up @@ -251,7 +250,6 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() {

#[tokio::test]
async fn test_scheduler_scheduling_no_indexer() {
quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true)
.await
Expand Down Expand Up @@ -288,7 +286,6 @@ async fn test_scheduler_scheduling_no_indexer() {

#[tokio::test]
async fn test_scheduler_scheduling_multiple_indexers() {
quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true)
.await
Expand Down
9 changes: 8 additions & 1 deletion quickwit/quickwit-ingest/src/codegen/ingest_service.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl FetchTask {
fetch_range=?self.fetch_range,
"spawning fetch task"
);
let mut has_drained_queue = true;
let mut has_drained_queue = false;
let mut has_reached_eof = false;
let mut num_records_total = 0;

Expand Down Expand Up @@ -645,12 +645,6 @@ mod tests {
.unwrap();
drop(state_guard);

timeout(Duration::from_millis(50), fetch_stream.next())
.await
.unwrap_err();

new_records_tx.send(()).unwrap();

let fetch_response = timeout(Duration::from_millis(50), fetch_stream.next())
.await
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Ingester {
.next()
.await
.expect("TODO")
.expect("")
.expect("TODO")
.into_open_response()
.expect("first message should be an open response");

Expand Down
Loading