From 4fe298bcdaa934021fad991610757e9d42ee5c80 Mon Sep 17 00:00:00 2001 From: Nikita Lapkov <5737185+laplab@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:01:36 +0100 Subject: [PATCH] feat: add header to specify query timeout (#4988) Co-authored-by: Alexander Polanco Co-authored-by: Alberto Schiabel --- query-engine/core/src/error.rs | 29 +++++++++++ .../src/interactive_transactions/error.rs | 29 ----------- query-engine/core/src/lib.rs | 4 +- query-engine/query-engine/src/server/mod.rs | 48 +++++++++++++++++-- 4 files changed, 74 insertions(+), 36 deletions(-) diff --git a/query-engine/core/src/error.rs b/query-engine/core/src/error.rs index 3a3803bf0d67..b067a325a4a5 100644 --- a/query-engine/core/src/error.rs +++ b/query-engine/core/src/error.rs @@ -4,6 +4,8 @@ use query_structure::DomainError; use thiserror::Error; use user_facing_errors::UnknownError; +use crate::response_ir::{Item, Map}; + #[derive(Debug, Error)] #[error( "Error converting field \"{field}\" of expected non-nullable type \"{expected_type}\", found incompatible value of \"{found}\"." @@ -62,6 +64,9 @@ pub enum CoreError { #[error("Error in batch request {request_idx}: {error}")] BatchError { request_idx: usize, error: Box }, + + #[error("Query timed out")] + QueryTimeout, } impl CoreError { @@ -227,3 +232,27 @@ impl From for user_facing_errors::Error { } } } + +#[derive(Debug, serde::Serialize, PartialEq)] +pub struct ExtendedUserFacingError { + #[serde(flatten)] + user_facing_error: user_facing_errors::Error, + + #[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")] + extensions: Map, +} + +impl ExtendedUserFacingError { + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } +} + +impl From for ExtendedUserFacingError { + fn from(error: CoreError) -> Self { + ExtendedUserFacingError { + user_facing_error: error.into(), + extensions: Default::default(), + } + } +} diff --git a/query-engine/core/src/interactive_transactions/error.rs b/query-engine/core/src/interactive_transactions/error.rs index 8189e2ce7420..146d69f103b5 100644 --- a/query-engine/core/src/interactive_transactions/error.rs +++ b/query-engine/core/src/interactive_transactions/error.rs @@ -1,10 +1,5 @@ use thiserror::Error; -use crate::{ - response_ir::{Item, Map}, - CoreError, -}; - #[derive(Debug, Error, PartialEq)] pub enum TransactionError { #[error("Unable to start a transaction in the given time.")] @@ -22,27 +17,3 @@ pub enum TransactionError { #[error("Unexpected response: {reason}.")] Unknown { reason: String }, } - -#[derive(Debug, serde::Serialize, PartialEq)] -pub struct ExtendedTransactionUserFacingError { - #[serde(flatten)] - user_facing_error: user_facing_errors::Error, - - #[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")] - extensions: Map, -} - -impl ExtendedTransactionUserFacingError { - pub fn set_extension(&mut self, key: String, val: serde_json::Value) { - self.extensions.entry(key).or_insert(Item::Json(val)); - } -} - -impl From for ExtendedTransactionUserFacingError { - fn from(error: CoreError) -> Self { - ExtendedTransactionUserFacingError { - user_facing_error: error.into(), - extensions: Default::default(), - } - } -} diff --git a/query-engine/core/src/lib.rs b/query-engine/core/src/lib.rs index bf993d6bce18..ae6a437bc083 100644 --- a/query-engine/core/src/lib.rs +++ b/query-engine/core/src/lib.rs @@ -14,9 +14,9 @@ pub mod telemetry; pub use self::telemetry::*; pub use self::{ - error::{CoreError, FieldConversionError}, + error::{CoreError, ExtendedUserFacingError, FieldConversionError}, executor::{QueryExecutor, TransactionOptions}, - interactive_transactions::{ExtendedTransactionUserFacingError, TransactionError, TxId}, + interactive_transactions::{TransactionError, TxId}, query_document::*, }; diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index 0b6ef2245ad5..47adb732c7b5 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -7,14 +7,14 @@ use opentelemetry::trace::TraceContextExt; use opentelemetry::{global, propagation::Extractor}; use query_core::helpers::*; use query_core::telemetry::capturing::TxTraceExt; -use query_core::{telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId}; +use query_core::{telemetry, ExtendedUserFacingError, TransactionOptions, TxId}; use request_handlers::{dmmf, render_graphql_schema, RequestBody, RequestHandler}; use serde::Serialize; use serde_json::json; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tracing::{field, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -116,6 +116,8 @@ async fn request_handler(cx: Arc, req: Request) -> Result, req: Request) -> Result { let handler = RequestHandler::new(cx.executor(), cx.query_schema(), cx.engine_protocol()); let mut result = handler.handle(body, tx_id, traceparent).instrument(span).await; - if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + if let telemetry::capturing::Capturer::Enabled(capturer) = capture_config { let telemetry = capturer.fetch_captures().await; if let Some(telemetry) = telemetry { result.set_extension("traces".to_owned(), json!(telemetry.traces)); @@ -202,7 +205,32 @@ async fn request_handler(cx: Arc, req: Request) -> Result tokio::time::sleep(timeout).await, + // Never return if timeout isn't set. + None => std::future::pending().await, + } + }; + + tokio::select! { + _ = query_timeout_fut => { + let captured_telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.fetch_captures().await + } else { + None + }; + + // Note: this relies on the fact that client will rollback the transaction after the + // error. If the client continues using this transaction (and later commits it), data + // corruption might happen because some write queries (but not all of them) might be + // already executed by the database before the timeout is fired. + Ok(err_to_http_resp(query_core::CoreError::QueryTimeout, captured_telemetry)) + } + result = work => { + result + } + } } /// Expose the GraphQL playground if enabled. @@ -454,11 +482,13 @@ fn err_to_http_resp( query_core::TransactionError::Unknown { reason: _ } => StatusCode::INTERNAL_SERVER_ERROR, }, + query_core::CoreError::QueryTimeout => StatusCode::REQUEST_TIMEOUT, + // All other errors are treated as 500s, most of these paths should never be hit, only connector errors may occur. _ => StatusCode::INTERNAL_SERVER_ERROR, }; - let mut err: ExtendedTransactionUserFacingError = err.into(); + let mut err: ExtendedUserFacingError = err.into(); if let Some(telemetry) = captured_telemetry { err.set_extension("traces".to_owned(), json!(telemetry.traces)); err.set_extension("logs".to_owned(), json!(telemetry.logs)); @@ -513,6 +543,14 @@ fn transaction_id(headers: &HeaderMap) -> Option { .map(TxId::from) } +fn query_timeout(headers: &HeaderMap) -> Option { + headers + .get("X-query-timeout") + .and_then(|h| h.to_str().ok()) + .and_then(|value| value.parse::().ok()) + .map(Duration::from_millis) +} + /// If the client sends us a trace and span id, extracting a new context if the /// headers are set. If not, returns current context. fn get_parent_span_context(headers: &HeaderMap) -> opentelemetry::Context {