From a6b0ab7d12a2cd5f5298f9fa390ec2797f6af7b0 Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Tue, 30 Apr 2024 10:19:25 +0300 Subject: [PATCH] Return BadRequest for unsupported queries --- src/query/error.rs | 15 +++++- src/query/processor.rs | 109 ++++++++++++++++++++++++----------------- src/transport/p2p.rs | 15 ++++-- 3 files changed, 90 insertions(+), 49 deletions(-) diff --git a/src/query/error.rs b/src/query/error.rs index a5ca02c..65f67d6 100644 --- a/src/query/error.rs +++ b/src/query/error.rs @@ -8,16 +8,29 @@ pub enum QueryError { NoAllocation, #[error("Bad request: {0}")] BadRequest(String), + #[error("Service overloaded")] + ServiceOverloaded, #[error("Internal error")] Other(#[from] anyhow::Error), } +impl From for QueryError { + fn from(value: datafusion::error::DataFusionError) -> Self { + Self::Other(value.context("DataFusion error").into()) + } +} + impl IntoResponse for QueryError { fn into_response(self) -> axum::response::Response { match self { s @ Self::NotFound => (StatusCode::NOT_FOUND, s.to_string()).into_response(), - s @ Self::NoAllocation => (StatusCode::TOO_MANY_REQUESTS, s.to_string()).into_response(), + s @ Self::NoAllocation => { + (StatusCode::TOO_MANY_REQUESTS, s.to_string()).into_response() + } s @ Self::BadRequest(_) => (StatusCode::BAD_REQUEST, s.to_string()).into_response(), + s @ Self::ServiceOverloaded => { + (StatusCode::SERVICE_UNAVAILABLE, s.to_string()).into_response() + } Self::Other(err) => ( StatusCode::INTERNAL_SERVER_ERROR, format!("Couldn't execute query: {:?}", err), diff --git a/src/query/processor.rs b/src/query/processor.rs index 62cf952..2ce1844 100644 --- a/src/query/processor.rs +++ b/src/query/processor.rs @@ -1,7 +1,10 @@ use std::{collections::HashSet, hash::Hash, pin::Pin}; -use super::eth::{BatchRequest, NetworkType}; -use anyhow::{ensure, Context, Result}; +use super::{ + error::QueryError, + eth::{BatchRequest, NetworkType}, +}; +use anyhow::Context; use async_stream::try_stream; use datafusion::{ arrow::{ @@ -28,27 +31,36 @@ pub(super) type QueryResult = Vec; // - generalize this code #[instrument(skip_all)] -pub async fn process_query(ctx: &SessionContext, query: BatchRequest) -> Result { - anyhow::ensure!( - query.r#type == NetworkType::Eth, - "only eth queries are supported" - ); +pub async fn process_query( + ctx: &SessionContext, + query: BatchRequest, +) -> Result { + if query.r#type != NetworkType::Eth { + return Err(QueryError::BadRequest( + "only eth queries are supported".to_owned(), + )); + } let (blocks, transactions, logs) = extract_data(ctx, &query).await?; let blocks = convert_to_json(blocks); let transactions = transactions.map(convert_to_json); let logs = logs.map(convert_to_json); - collect_result(build_response(&query, blocks, transactions, logs)).await + collect_result(build_response(&query, blocks, transactions, logs)) + .await + .map_err(From::from) } #[instrument(skip_all)] async fn extract_data( ctx: &SessionContext, query: &BatchRequest, -) -> Result<( - impl Stream>, - Option>>, - Option>>, -)> { +) -> Result< + ( + impl Stream>, + Option>>, + Option>>, + ), + QueryError, +> { let blocks = ctx.table("blocks").await?; let transactions = ctx.table("transactions").await?; let logs = ctx.table("logs").await?; @@ -98,11 +110,16 @@ async fn extract_data( } tx_filters.push(predicate); - ensure!(!tx_request.traces, "Traces queries are not supported yet"); - ensure!( - !tx_request.state_diffs, - "State diffs queries are not supported yet" - ); + if tx_request.traces { + return Err(QueryError::BadRequest( + "Traces queries are not supported yet".to_owned(), + )); + } + if tx_request.state_diffs { + return Err(QueryError::BadRequest( + "State diffs queries are not supported yet".to_owned(), + )); + } } for log_request in query.logs.as_ref().unwrap_or(&Vec::new()) { @@ -119,15 +136,17 @@ async fn extract_data( } logs_filters.push(predicate); - ensure!( - !log_request.transaction_traces, - "Traces queries are not supported yet" - ); + if log_request.transaction_traces { + return Err(QueryError::BadRequest( + "Traces queries are not supported yet".to_owned(), + )); + } + } + if query.traces.is_some() { + return Err(QueryError::BadRequest( + "Traces queries are not supported yet".to_owned(), + )); } - ensure!( - query.traces.is_none(), - "Traces queries are not supported yet" - ); let blocks = camel_case_columns(all_blocks)?.select_columns(&block_columns(query))?; @@ -199,7 +218,8 @@ async fn extract_data( None => Ok(None), } }); - let (blocks_result, tx_result, logs_result) = try_join!(blocks_future, tx_future, logs_future)?; + let (blocks_result, tx_result, logs_result) = try_join!(blocks_future, tx_future, logs_future) + .context("Subqueries execution panicked")?; Ok((blocks_result?, tx_result?, logs_result?)) } @@ -207,13 +227,13 @@ async fn extract_data( #[instrument(skip_all)] fn convert_to_json( stream: impl Stream>, -) -> impl Stream>> { +) -> impl Stream, DataFusionError>> { stream.flat_map(|record_batch| { let entries = match record_batch .and_then(|batch| record_batches_to_json_rows(&[&batch]).map_err(From::from)) { Ok(vec) => vec.into_iter().map(Ok).collect_vec(), - Err(e) => vec![Err(e.into())], + Err(e) => vec![Err(e)], }; futures::stream::iter(entries) }) @@ -221,24 +241,24 @@ fn convert_to_json( #[instrument(skip_all)] fn build_response<'l>( query: &'l BatchRequest, - headers: impl Stream>> + Unpin + 'l, - transactions: Option>> + 'l>, - logs: Option>> + 'l>, -) -> impl Stream> + 'l { + headers: impl Stream, DataFusionError>> + Unpin + 'l, + transactions: Option, DataFusionError>> + 'l>, + logs: Option, DataFusionError>> + 'l>, +) -> impl Stream> + 'l { let include_tx = transactions.is_some(); let include_logs = logs.is_some(); let mut transactions = transactions.map(|stream| Box::pin(stream.peekable())); let mut logs = logs.map(|stream| Box::pin(stream.peekable())); - async fn block_rows>>>( + async fn block_rows, DataFusionError>>>( stream: &mut Option>>>, block_number: u64, - ) -> Result>> - { + ) -> Result>, DataFusionError> { if let Some(stream) = stream.as_mut() { consume_while(stream.as_mut(), |tx: &JsonMap| { tx.get("blockNumber").unwrap().as_u64().unwrap() == block_number - }).await + }) + .await } else { Ok(Vec::new()) } @@ -302,10 +322,10 @@ fn build_response<'l>( } } -async fn consume_while( - mut stream: std::pin::Pin<&mut Peekable>>>, +async fn consume_while( + mut stream: std::pin::Pin<&mut Peekable>>>, f: impl Fn(&T) -> bool, -) -> Result> { +) -> Result, E> { let mut result = Vec::new(); while let Some(item) = stream .as_mut() @@ -326,7 +346,9 @@ async fn consume_while( } #[instrument(skip_all)] -async fn collect_result(stream: impl Stream>) -> Result { +async fn collect_result( + stream: impl Stream>, +) -> anyhow::Result { let mut result = Vec::new(); tokio::pin!(stream); while let Some(row) = stream.next().await { @@ -339,7 +361,7 @@ async fn collect_result(stream: impl Stream>) -> Result Result { +fn camel_case_columns(df: DataFrame) -> Result { let columns = df .schema() .fields() @@ -352,7 +374,6 @@ fn camel_case_columns(df: DataFrame) -> Result { .map(|c| col(c.clone()).alias(RenameRule::CamelCase.apply_to_field(&c.name))) .collect_vec(), ) - .map_err(Into::into) } fn block_columns(query: &BatchRequest) -> Vec<&str> { @@ -424,7 +445,7 @@ fn any_of(predicates: Vec) -> Option { predicates.into_iter().reduce(or) } -fn union_all(dataframes: Vec) -> Result> { +fn union_all(dataframes: Vec) -> Result, DataFusionError> { let mut iter = dataframes.into_iter(); if let Some(first) = iter.next() { let mut result = first; diff --git a/src/transport/p2p.rs b/src/transport/p2p.rs index 3fbbe41..f59a3fe 100644 --- a/src/transport/p2p.rs +++ b/src/transport/p2p.rs @@ -1,6 +1,6 @@ use std::{env, time::Duration}; -use anyhow::{anyhow, Result}; +use anyhow::Result; use camino::Utf8PathBuf as PathBuf; use futures::{Stream, StreamExt}; use lazy_static::lazy_static; @@ -254,7 +254,8 @@ impl> P2PTransport { ) -> std::result::Result { let (resp_tx, resp_rx) = oneshot::channel(); if let (Some(dataset), Some(query_str)) = (&query.dataset, &query.query) { - let query = serde_json::from_str(query_str).map_err(anyhow::Error::from)?; + let query = serde_json::from_str(query_str) + .map_err(|e| QueryError::BadRequest(e.to_string()))?; match self.queries_tx.try_send(QueryTask { dataset: dataset.clone(), peer_id, @@ -262,7 +263,7 @@ impl> P2PTransport { response_sender: resp_tx, }) { Err(mpsc::error::TrySendError::Full(_)) => { - Err(anyhow!("Service overloaded"))?; + return Err(QueryError::ServiceOverloaded); } Err(mpsc::error::TrySendError::Closed(_)) => { panic!("Query subscriber dropped"); @@ -294,6 +295,9 @@ impl> P2PTransport { Err(e @ QueryError::NotFound) => query_result::Result::BadRequest(e.to_string()), Err(QueryError::NoAllocation) => query_result::Result::NoAllocation(()), Err(QueryError::BadRequest(e)) => query_result::Result::BadRequest(e), + Err(e @ QueryError::ServiceOverloaded) => { + query_result::Result::ServerError(e.to_string()) + } Err(QueryError::Other(e)) => query_result::Result::ServerError(e.to_string()), }; let envelope = subsquid_messages::Envelope { @@ -359,6 +363,9 @@ impl> P2PTransport { }), Err(e @ QueryError::NotFound) => query_executed::Result::BadRequest(e.to_string()), Err(QueryError::BadRequest(e)) => query_executed::Result::BadRequest(e.clone()), + Err(e @ QueryError::ServiceOverloaded) => { + query_executed::Result::ServerError(e.to_string()) + } Err(QueryError::Other(e)) => query_executed::Result::ServerError(e.to_string()), Err(QueryError::NoAllocation) => panic!("Shouldn't send logs with NoAllocation error"), }; @@ -453,7 +460,7 @@ fn report_metrics(result: &std::result::Result) { Err(QueryError::NotFound | QueryError::NoAllocation | QueryError::BadRequest(_)) => { metrics::BAD_REQUEST.inc() } - Err(QueryError::Other(_)) => metrics::SERVER_ERROR.inc(), + Err(QueryError::Other(_) | QueryError::ServiceOverloaded) => metrics::SERVER_ERROR.inc(), }; }