Skip to content

Commit

Permalink
chore: apply CR recommendations, remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
etolbakov authored Nov 18, 2023
1 parent 08a7f9c commit 804fdd9
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 61 deletions.
88 changes: 45 additions & 43 deletions quickwit/quickwit-serve/src/elastic_search_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,15 @@ use bytes::Bytes;
use hyper::StatusCode;
use quickwit_ingest::{
CommitType, DocBatchBuilder, IngestRequest, IngestResponse, IngestService, IngestServiceClient,
IngestServiceError,
};
use quickwit_proto::{ServiceError, ServiceErrorCode};
use thiserror::Error;
use warp::{Filter, Rejection};

use crate::elastic_search_api::filter::{elastic_bulk_filter, elastic_index_bulk_filter};
use crate::elastic_search_api::make_elastic_api_response;
use crate::elastic_search_api::model::{BulkAction, ElasticIngestOptions, ElasticSearchError};
use crate::format::extract_format_from_qs;
use crate::ingest_api::lines;
use crate::json_api_response::JsonApiResponse;
use crate::{with_arg, BodyFormat};

#[derive(Error, Debug)]
pub enum IngestRestApiError {
#[error("failed to parse action `{0}`")]
BulkInvalidAction(String),
#[error("failed to parse source `{0}`")]
BulkInvalidSource(String),
#[error(transparent)]
IngestApi(#[from] IngestServiceError),
}

impl ServiceError for IngestRestApiError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::BulkInvalidAction(_) => ServiceErrorCode::BadRequest,
Self::BulkInvalidSource(_) => ServiceErrorCode::BadRequest,
Self::IngestApi(ingest_api_error) => ingest_api_error.error_code(),
}
}
}
use crate::with_arg;

/// POST `_elastic/_bulk`
pub fn es_compat_bulk_handler(
Expand Down Expand Up @@ -82,17 +59,6 @@ pub fn es_compat_index_bulk_handler(
.map(make_elastic_api_response)
}

fn make_elastic_api_response(
elasticsearch_result: Result<IngestResponse, ElasticSearchError>,
format: BodyFormat,
) -> JsonApiResponse {
let status_code = match &elasticsearch_result {
Ok(_) => StatusCode::OK,
Err(err) => err.status,
};
JsonApiResponse::new(&elasticsearch_result, status_code, &format)
}

async fn elastic_ingest_bulk(
index: Option<String>,
body: Bytes,
Expand All @@ -104,12 +70,16 @@ async fn elastic_ingest_bulk(

while let Some((line_number, line)) = lines.next() {
let action = serde_json::from_slice::<BulkAction>(line).map_err(|error| {
ElasticSearchError::bad_request(format!(
"Malformed action/metadata line [#{line_number}]. Details: `{error}`"
))
ElasticSearchError::new(
StatusCode::BAD_REQUEST,
format!("Malformed action/metadata line [#{line_number}]. Details: `{error}`"),
)
})?;
let (_, source) = lines.next().ok_or_else(|| {
ElasticSearchError::bad_request("expected source for the action".to_string())
ElasticSearchError::new(
StatusCode::BAD_REQUEST,
"expected source for the action".to_string(),
)
})?;
// when ingesting on /my-index/_bulk, if _index: is set to something else than my-index,
// ES honors it and create the doc in the requested index. That is, `my-index` is a default
Expand All @@ -118,9 +88,10 @@ async fn elastic_ingest_bulk(
.into_index()
.or_else(|| index.clone())
.ok_or_else(|| {
ElasticSearchError::bad_request(format!(
"missing required field: `_index` in the line [#{line_number}]."
))
ElasticSearchError::new(
StatusCode::BAD_REQUEST,
format!("missing required field: `_index` in the line [#{line_number}]."),
)
})?;
let doc_batch_builder = doc_batch_builders
.entry(index_id.clone())
Expand All @@ -146,13 +117,15 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use hyper::StatusCode;
use quickwit_config::{IngestApiConfig, NodeConfig};
use quickwit_ingest::{
FetchRequest, IngestResponse, IngestServiceClient, SuggestTruncateRequest,
};
use quickwit_search::MockSearchService;

use crate::elastic_search_api::elastic_api_handlers;
use crate::elastic_search_api::model::ElasticSearchError;
use crate::ingest_api::setup_ingest_service;

#[tokio::test]
Expand Down Expand Up @@ -203,6 +176,29 @@ mod tests {
universe.assert_quit().await;
}

#[tokio::test]
async fn test_bulk_api_returns_200_if_payload_has_blank_lines() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = "
{\"create\": {\"_index\": \"my-index-1\", \"_id\": \"1674834324802805760\"}}
\u{20}\u{20}\u{20}\u{20}\n
{\"_line\": {\"message\": \"hello-world\"}}";
let resp = warp::test::request()
.path("/_elastic/_bulk")
.method("POST")
.body(payload)
.reply(&elastic_api_handlers)
.await;
assert_eq!(resp.status(), 200);
let ingest_response: IngestResponse = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(ingest_response.num_docs_for_processing, 1);
universe.assert_quit().await;
}

#[tokio::test]
async fn test_bulk_index_api_returns_200() {
let config = Arc::new(NodeConfig::for_test());
Expand Down Expand Up @@ -398,5 +394,11 @@ mod tests {
.reply(&elastic_api_handlers)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(es_error.status, StatusCode::BAD_REQUEST);
assert_eq!(
es_error.error.reason.unwrap(),
"Malformed action/metadata line [#0]. Details: `expected value at line 1 column 57`"
);
}
}
17 changes: 16 additions & 1 deletion quickwit/quickwit-serve/src/elastic_search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler};
pub use filter::ElasticCompatibleApi;
use hyper::StatusCode;
use quickwit_config::NodeConfig;
use quickwit_ingest::IngestServiceClient;
use quickwit_search::SearchService;
Expand All @@ -36,7 +37,9 @@ use rest_handler::{
use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

use crate::BuildInfo;
use crate::elastic_search_api::model::ElasticSearchError;
use crate::json_api_response::JsonApiResponse;
use crate::{BodyFormat, BuildInfo};

/// Setup Elasticsearch API handlers
///
Expand Down Expand Up @@ -86,6 +89,18 @@ impl From<i64> for TrackTotalHits {
}
}

fn make_elastic_api_response<T: serde::Serialize>(
elasticsearch_result: Result<T, ElasticSearchError>,
format: BodyFormat,
) -> JsonApiResponse {
let status_code = match &elasticsearch_result {
Ok(_) => StatusCode::OK,
Err(err) => err.status,
};

JsonApiResponse::new(&elasticsearch_result, status_code, &format)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-serve/src/elastic_search_api/model/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ pub struct ElasticSearchError {
}

impl ElasticSearchError {
pub fn bad_request(reason_string: String) -> Self {
pub fn new(status: StatusCode, reason_string: String) -> Self {
ElasticSearchError {
status: StatusCode::BAD_REQUEST,
status,
error: ErrorCause {
reason: Some(reason_string),
caused_by: None,
Expand Down
18 changes: 4 additions & 14 deletions quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::model::{
ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
};
use super::TrackTotalHits;
use super::{make_elastic_api_response, TrackTotalHits};
use crate::format::BodyFormat;
use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse};
use crate::{with_arg, BuildInfo};
Expand Down Expand Up @@ -98,7 +98,7 @@ pub fn es_compat_index_search_handler(
elastic_index_search_filter()
.and(with_arg(search_service))
.then(es_compat_index_search)
.map(make_elastic_api_response)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET or POST _elastic/_search/scroll
Expand All @@ -108,7 +108,7 @@ pub fn es_compat_scroll_handler(
elastic_scroll_filter()
.and(with_arg(search_service))
.then(es_scroll)
.map(make_elastic_api_response)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// POST _elastic/_search
Expand Down Expand Up @@ -383,7 +383,7 @@ async fn es_compat_index_multi_search(
build_request_for_es_api(index_ids_patterns, search_query_params, search_body)?;
search_requests.push(es_request);
}
// TODO: forced to do weird referencing to work arround https://github.com/rust-lang/rust/issues/100905
// TODO: forced to do weird referencing to work around https://github.com/rust-lang/rust/issues/100905
// otherwise append_shard_doc is captured by ref, and we get lifetime issues
let futures = search_requests
.into_iter()
Expand Down Expand Up @@ -474,16 +474,6 @@ fn convert_to_es_search_response(
}
}

fn make_elastic_api_response(
elasticsearch_result: Result<ElasticSearchResponse, ElasticSearchError>,
) -> JsonApiResponse {
let status_code = match &elasticsearch_result {
Ok(_) => StatusCode::OK,
Err(err) => err.status,
};
JsonApiResponse::new(&elasticsearch_result, status_code, &BodyFormat::default())
}

pub(crate) fn str_lines(body: &str) -> impl Iterator<Item = &str> {
body.lines()
.map(|line| line.trim())
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ async fn tail_endpoint(

pub(crate) fn lines(body: &Bytes) -> impl Iterator<Item = &[u8]> {
body.split(|byte| byte == &b'\n')
.filter(|line| !line.iter().all(|&b| b == b' '))
.filter(|line| !line.iter().all(|&b| b.is_ascii_whitespace()))
.filter(|line| !line.is_empty())
}

Expand Down

0 comments on commit 804fdd9

Please sign in to comment.