Skip to content

Commit

Permalink
Add parameter to configure content limit for ingest api (#3905)
Browse files Browse the repository at this point in the history
* Add parameter to configure content limit for ingest api

* Add parameter to configure content limit for ingest api
  • Loading branch information
kamalesh0406 authored Oct 6, 2023
1 parent 5699660 commit af573be
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 15 deletions.
2 changes: 2 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub struct IngestApiConfig {
pub max_queue_memory_usage: Byte,
pub max_queue_disk_usage: Byte,
pub replication_factor: usize,
pub content_length_limit: u64,
}

impl Default for IngestApiConfig {
Expand All @@ -186,6 +187,7 @@ impl Default for IngestApiConfig {
max_queue_memory_usage: Byte::from_bytes(2 * 1024 * 1024 * 1024), /* 2 GiB // TODO maybe we want more? */
max_queue_disk_usage: Byte::from_bytes(4 * 1024 * 1024 * 1024), /* 4 GiB // TODO maybe we want more? */
replication_factor: 1,
content_length_limit: 10 * 1024 * 1024, // 10 MB
}
}
}
Expand Down
68 changes: 54 additions & 14 deletions quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytes::{Buf, BufMut, Bytes, BytesMut};
use quickwit_config::INGEST_SOURCE_ID;
use quickwit_config::{IngestApiConfig, INGEST_SOURCE_ID};
use quickwit_ingest::{
CommitType, DocBatchBuilder, FetchResponse, IngestRequest, IngestResponse, IngestService,
IngestServiceClient, IngestServiceError, TailRequest,
Expand Down Expand Up @@ -56,8 +56,6 @@ struct InvalidUtf8;

impl warp::reject::Reject for InvalidUtf8 {}

const CONTENT_LENGTH_LIMIT: u64 = 10 * 1024 * 1024; // 10MiB

#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
struct IngestOptions {
#[serde(alias = "commit")]
Expand All @@ -68,17 +66,21 @@ struct IngestOptions {
pub(crate) fn ingest_api_handlers(
ingest_router: IngestRouterServiceClient,
ingest_service: IngestServiceClient,
config: IngestApiConfig,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
ingest_handler(ingest_service.clone())
ingest_handler(ingest_service.clone(), config.clone())
.or(tail_handler(ingest_service))
.or(ingest_v2_handler(ingest_router))
.or(ingest_v2_handler(ingest_router, config))
}

fn ingest_filter(
config: IngestApiConfig,
) -> impl Filter<Extract = (String, Bytes, IngestOptions), Error = Rejection> + Clone {
warp::path!(String / "ingest")
.and(warp::post())
.and(warp::body::content_length_limit(CONTENT_LENGTH_LIMIT))
.and(warp::body::content_length_limit(
config.content_length_limit,
))
.and(warp::body::bytes())
.and(serde_qs::warp::query::<IngestOptions>(
serde_qs::Config::default(),
Expand All @@ -87,18 +89,22 @@ fn ingest_filter(

fn ingest_handler(
ingest_service: IngestServiceClient,
config: IngestApiConfig,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
ingest_filter()
ingest_filter(config)
.and(with_arg(ingest_service))
.then(ingest)
.map(|result| make_json_api_response(result, BodyFormat::default()))
}

fn ingest_v2_filter(
config: IngestApiConfig,
) -> impl Filter<Extract = (String, Bytes, IngestOptions), Error = Rejection> + Clone {
warp::path!(String / "ingest-v2")
.and(warp::post())
.and(warp::body::content_length_limit(CONTENT_LENGTH_LIMIT))
.and(warp::body::content_length_limit(
config.content_length_limit,
))
.and(warp::body::bytes())
.and(serde_qs::warp::query::<IngestOptions>(
serde_qs::Config::default(),
Expand All @@ -107,8 +113,9 @@ fn ingest_v2_filter(

fn ingest_v2_handler(
ingest_router: IngestRouterServiceClient,
config: IngestApiConfig,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
ingest_v2_filter()
ingest_v2_filter(config)
.and(with_arg(ingest_router))
.then(ingest_v2)
.and(with_arg(BodyFormat::default()))
Expand Down Expand Up @@ -267,7 +274,8 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service);
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let resp = warp::test::request()
.path("/my-index/ingest")
.method("POST")
Expand Down Expand Up @@ -302,7 +310,8 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service);
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let payload = r#"
{"id": 1, "message": "push"}
{"id": 2, "message": "push"}
Expand All @@ -329,7 +338,8 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &config).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service);
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let resp = warp::test::request()
.path("/my-index/ingest")
.method("POST")
Expand All @@ -341,12 +351,38 @@ pub(crate) mod tests {
universe.assert_quit().await;
}

#[tokio::test]
async fn test_ingest_api_return_413_if_above_content_limit() {
let config = IngestApiConfig {
content_length_limit: 1,
..Default::default()
};
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, config.clone());
let resp = warp::test::request()
.path("/my-index/ingest")
.method("POST")
.json(&true)
.body(r#"{"id": 1, "message": "push"}"#)
.reply(&ingest_api_handlers)
.await;
assert_eq!(resp.status(), 413);
universe.assert_quit().await;
}

#[tokio::test]
async fn test_ingest_api_blocks_when_wait_is_specified() {
let (universe, _temp_dir, ingest_service_client, ingest_service_mailbox) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service_client);
let ingest_api_handlers = ingest_api_handlers(
ingest_router,
ingest_service_client,
IngestApiConfig::default(),
);
let handle = tokio::spawn(async move {
let resp = warp::test::request()
.path("/my-index/ingest?commit=wait_for")
Expand Down Expand Up @@ -391,7 +427,11 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service_client, ingest_service_mailbox) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service_client);
let ingest_api_handlers = ingest_api_handlers(
ingest_router,
ingest_service_client,
IngestApiConfig::default(),
);
let handle = tokio::spawn(async move {
let resp = warp::test::request()
.path("/my-index/ingest?commit=force")
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ pub(crate) async fn start_rest_server(
.or(search_stream_handler(
quickwit_services.search_service.clone(),
))
.or(ingest_api_handlers(ingest_router, ingest_service.clone()))
.or(ingest_api_handlers(
ingest_router,
ingest_service.clone(),
quickwit_services.node_config.ingest_api_config.clone(),
))
.or(index_management_handlers(
quickwit_services.index_manager.clone(),
quickwit_services.node_config.clone(),
Expand Down

0 comments on commit af573be

Please sign in to comment.