Skip to content

Commit

Permalink
Return action responses in same order as request
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 17, 2024
1 parent b56483b commit 5f7fa4f
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ElasticDocId = String;

#[derive(Debug)]
struct DocHandle {
doc_position: usize,
doc_uid: DocUid,
es_doc_id: Option<ElasticDocId>,
// Whether the document failed to parse. When the struct is instantiated, this value is set to
Expand All @@ -94,7 +95,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let mut ingest_request_builder = IngestRequestV2Builder::default();
let mut lines = lines(&body.content).enumerate();
let mut per_subrequest_doc_handles: HashMap<u32, Vec<DocHandle>> = HashMap::new();

let mut action_count = 0;
while let Some((line_no, line)) = lines.next() {
let action = serde_json::from_slice::<BulkAction>(line).map_err(|error| {
ElasticsearchError::new(
Expand Down Expand Up @@ -128,10 +129,12 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let (subrequest_id, doc_uid) = ingest_request_builder.add_doc(index_id, doc);

let doc_handle = DocHandle {
doc_position: action_count,
doc_uid,
es_doc_id: meta.es_doc_id,
is_parse_failure: false,
};
action_count += 1;
per_subrequest_doc_handles
.entry(subrequest_id)
.or_default()
Expand All @@ -148,15 +151,21 @@ pub(crate) async fn elastic_bulk_ingest_v2(
rate_limited_error!(limit_per_min=6, err=?err, "router error");
err
})?;
make_elastic_bulk_response_v2(ingest_response, per_subrequest_doc_handles, now)
make_elastic_bulk_response_v2(
ingest_response,
per_subrequest_doc_handles,
now,
action_count,
)
}

fn make_elastic_bulk_response_v2(
ingest_response_v2: IngestResponseV2,
mut per_subrequest_doc_handles: HashMap<u32, Vec<DocHandle>>,
now: Instant,
action_count: usize,
) -> Result<ElasticBulkResponse, ElasticsearchError> {
let mut actions: Vec<ElasticBulkAction> = Vec::new();
let mut positioned_actions: Vec<(usize, ElasticBulkAction)> = Vec::with_capacity(action_count);
let mut errors = false;

// Populate the items for each `IngestSuccess` subresponse. They may be partially successful and
Expand All @@ -181,8 +190,6 @@ fn make_elastic_bulk_response_v2(
for parse_failure in success.parse_failures {
errors = true;

// Since the generated doc UIDs are monotonically increasing, and inserted in order, we
// can find doc handles using binary search.
let failed_doc_uid = parse_failure.doc_uid();
let doc_handle_idx = doc_handles
.binary_search_by_key(&failed_doc_uid, |doc_handle| doc_handle.doc_uid)
Expand Down Expand Up @@ -212,7 +219,7 @@ fn make_elastic_bulk_response_v2(
error: Some(error),
};
let action = ElasticBulkAction::Index(item);
actions.push(action);
positioned_actions.push((doc_handle.doc_position, action));
}
// Populate the remaining successful items.
for mut doc_handle in doc_handles {
Expand All @@ -226,7 +233,7 @@ fn make_elastic_bulk_response_v2(
error: None,
};
let action = ElasticBulkAction::Index(item);
actions.push(action);
positioned_actions.push((doc_handle.doc_position, action));
}
}
// Repeat the operation for each `IngestFailure` subresponse.
Expand Down Expand Up @@ -293,13 +300,25 @@ fn make_elastic_bulk_response_v2(
error: Some(error),
};
let action = ElasticBulkAction::Index(item);
actions.push(action);
positioned_actions.push((doc_handle.doc_position, action));
}
}
assert!(
per_subrequest_doc_handles.is_empty(),
"doc handles should be empty"
);

assert_eq!(
positioned_actions.len(),
action_count,
"request and response action count should match"
);
positioned_actions.sort_unstable_by_key(|(idx, _)| *idx);
let actions = positioned_actions
.into_iter()
.map(|(_, action)| action)
.collect();

let took_millis = now.elapsed().as_millis() as u64;

let bulk_response = ElasticBulkResponse {
Expand Down Expand Up @@ -685,6 +704,7 @@ mod tests {
IngestResponseV2::default(),
HashMap::new(),
Instant::now(),
0,
)
.unwrap();

Expand Down Expand Up @@ -717,11 +737,13 @@ mod tests {
0,
vec![
DocHandle {
doc_position: 0,
doc_uid: DocUid::for_test(0),
es_doc_id: Some("0".to_string()),
is_parse_failure: false,
},
DocHandle {
doc_position: 1,
doc_uid: DocUid::for_test(1),
es_doc_id: Some("1".to_string()),
is_parse_failure: false,
Expand All @@ -731,26 +753,24 @@ mod tests {
(
1,
vec![DocHandle {
doc_position: 2,
doc_uid: DocUid::for_test(2),
es_doc_id: Some("2".to_string()),
is_parse_failure: false,
}],
),
]);
let mut response = make_elastic_bulk_response_v2(
let response = make_elastic_bulk_response_v2(
ingest_response_v2,
per_request_doc_handles,
Instant::now(),
3,
)
.unwrap();

assert!(response.errors);
assert_eq!(response.actions.len(), 3);

response
.actions
.sort_unstable_by(|left, right| left.es_doc_id().cmp(&right.es_doc_id()));

assert_eq!(response.actions[0].index_id(), "test-index-foo");
assert_eq!(response.actions[0].es_doc_id(), Some("0"));
assert_eq!(response.actions[0].status(), StatusCode::CREATED);
Expand Down

0 comments on commit 5f7fa4f

Please sign in to comment.