Skip to content

Commit

Permalink
Return BadRequest for unsupported queries
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Apr 30, 2024
1 parent 12cbd15 commit a6b0ab7
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 49 deletions.
15 changes: 14 additions & 1 deletion src/query/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@ pub enum QueryError {
NoAllocation,
#[error("Bad request: {0}")]
BadRequest(String),
#[error("Service overloaded")]
ServiceOverloaded,
#[error("Internal error")]
Other(#[from] anyhow::Error),
}

impl From<datafusion::error::DataFusionError> for QueryError {
fn from(value: datafusion::error::DataFusionError) -> Self {
Self::Other(value.context("DataFusion error").into())
}
}

impl IntoResponse for QueryError {
fn into_response(self) -> axum::response::Response {
match self {
s @ Self::NotFound => (StatusCode::NOT_FOUND, s.to_string()).into_response(),
s @ Self::NoAllocation => (StatusCode::TOO_MANY_REQUESTS, s.to_string()).into_response(),
s @ Self::NoAllocation => {
(StatusCode::TOO_MANY_REQUESTS, s.to_string()).into_response()
}
s @ Self::BadRequest(_) => (StatusCode::BAD_REQUEST, s.to_string()).into_response(),
s @ Self::ServiceOverloaded => {
(StatusCode::SERVICE_UNAVAILABLE, s.to_string()).into_response()
}
Self::Other(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Couldn't execute query: {:?}", err),
Expand Down
109 changes: 65 additions & 44 deletions src/query/processor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{collections::HashSet, hash::Hash, pin::Pin};

use super::eth::{BatchRequest, NetworkType};
use anyhow::{ensure, Context, Result};
use super::{
error::QueryError,
eth::{BatchRequest, NetworkType},
};
use anyhow::Context;
use async_stream::try_stream;
use datafusion::{
arrow::{
Expand All @@ -28,27 +31,36 @@ pub(super) type QueryResult = Vec<Value>;
// - generalize this code

#[instrument(skip_all)]
pub async fn process_query(ctx: &SessionContext, query: BatchRequest) -> Result<QueryResult> {
anyhow::ensure!(
query.r#type == NetworkType::Eth,
"only eth queries are supported"
);
pub async fn process_query(
ctx: &SessionContext,
query: BatchRequest,
) -> Result<QueryResult, QueryError> {
if query.r#type != NetworkType::Eth {
return Err(QueryError::BadRequest(
"only eth queries are supported".to_owned(),
));
}
let (blocks, transactions, logs) = extract_data(ctx, &query).await?;
let blocks = convert_to_json(blocks);
let transactions = transactions.map(convert_to_json);
let logs = logs.map(convert_to_json);
collect_result(build_response(&query, blocks, transactions, logs)).await
collect_result(build_response(&query, blocks, transactions, logs))
.await
.map_err(From::from)
}

#[instrument(skip_all)]
async fn extract_data(
ctx: &SessionContext,
query: &BatchRequest,
) -> Result<(
impl Stream<Item = Result<RecordBatch, DataFusionError>>,
Option<impl Stream<Item = Result<RecordBatch, DataFusionError>>>,
Option<impl Stream<Item = Result<RecordBatch, DataFusionError>>>,
)> {
) -> Result<
(
impl Stream<Item = Result<RecordBatch, DataFusionError>>,
Option<impl Stream<Item = Result<RecordBatch, DataFusionError>>>,
Option<impl Stream<Item = Result<RecordBatch, DataFusionError>>>,
),
QueryError,
> {
let blocks = ctx.table("blocks").await?;
let transactions = ctx.table("transactions").await?;
let logs = ctx.table("logs").await?;
Expand Down Expand Up @@ -98,11 +110,16 @@ async fn extract_data(
}
tx_filters.push(predicate);

ensure!(!tx_request.traces, "Traces queries are not supported yet");
ensure!(
!tx_request.state_diffs,
"State diffs queries are not supported yet"
);
if tx_request.traces {
return Err(QueryError::BadRequest(
"Traces queries are not supported yet".to_owned(),
));
}
if tx_request.state_diffs {
return Err(QueryError::BadRequest(
"State diffs queries are not supported yet".to_owned(),
));
}
}

for log_request in query.logs.as_ref().unwrap_or(&Vec::new()) {
Expand All @@ -119,15 +136,17 @@ async fn extract_data(
}
logs_filters.push(predicate);

ensure!(
!log_request.transaction_traces,
"Traces queries are not supported yet"
);
if log_request.transaction_traces {
return Err(QueryError::BadRequest(
"Traces queries are not supported yet".to_owned(),
));
}
}
if query.traces.is_some() {
return Err(QueryError::BadRequest(
"Traces queries are not supported yet".to_owned(),
));
}
ensure!(
query.traces.is_none(),
"Traces queries are not supported yet"
);

let blocks = camel_case_columns(all_blocks)?.select_columns(&block_columns(query))?;

Expand Down Expand Up @@ -199,46 +218,47 @@ async fn extract_data(
None => Ok(None),
}
});
let (blocks_result, tx_result, logs_result) = try_join!(blocks_future, tx_future, logs_future)?;
let (blocks_result, tx_result, logs_result) = try_join!(blocks_future, tx_future, logs_future)
.context("Subqueries execution panicked")?;

Ok((blocks_result?, tx_result?, logs_result?))
}

#[instrument(skip_all)]
fn convert_to_json(
stream: impl Stream<Item = Result<RecordBatch, DataFusionError>>,
) -> impl Stream<Item = Result<JsonMap<String, Value>>> {
) -> impl Stream<Item = Result<JsonMap<String, Value>, DataFusionError>> {
stream.flat_map(|record_batch| {
let entries = match record_batch
.and_then(|batch| record_batches_to_json_rows(&[&batch]).map_err(From::from))
{
Ok(vec) => vec.into_iter().map(Ok).collect_vec(),
Err(e) => vec![Err(e.into())],
Err(e) => vec![Err(e)],
};
futures::stream::iter(entries)
})
}
#[instrument(skip_all)]
fn build_response<'l>(
query: &'l BatchRequest,
headers: impl Stream<Item = Result<JsonMap<String, Value>>> + Unpin + 'l,
transactions: Option<impl Stream<Item = Result<JsonMap<String, Value>>> + 'l>,
logs: Option<impl Stream<Item = Result<JsonMap<String, Value>>> + 'l>,
) -> impl Stream<Item = Result<Value>> + 'l {
headers: impl Stream<Item = Result<JsonMap<String, Value>, DataFusionError>> + Unpin + 'l,
transactions: Option<impl Stream<Item = Result<JsonMap<String, Value>, DataFusionError>> + 'l>,
logs: Option<impl Stream<Item = Result<JsonMap<String, Value>, DataFusionError>> + 'l>,
) -> impl Stream<Item = anyhow::Result<Value>> + 'l {
let include_tx = transactions.is_some();
let include_logs = logs.is_some();
let mut transactions = transactions.map(|stream| Box::pin(stream.peekable()));
let mut logs = logs.map(|stream| Box::pin(stream.peekable()));

async fn block_rows<S: Stream<Item = Result<JsonMap<String, Value>>>>(
async fn block_rows<S: Stream<Item = Result<JsonMap<String, Value>, DataFusionError>>>(
stream: &mut Option<Pin<Box<Peekable<S>>>>,
block_number: u64,
) -> Result<Vec<JsonMap<String, Value>>>
{
) -> Result<Vec<JsonMap<String, Value>>, DataFusionError> {
if let Some(stream) = stream.as_mut() {
consume_while(stream.as_mut(), |tx: &JsonMap<String, Value>| {
tx.get("blockNumber").unwrap().as_u64().unwrap() == block_number
}).await
})
.await
} else {
Ok(Vec::new())
}
Expand Down Expand Up @@ -302,10 +322,10 @@ fn build_response<'l>(
}
}

async fn consume_while<T>(
mut stream: std::pin::Pin<&mut Peekable<impl Stream<Item = Result<T>>>>,
async fn consume_while<T, E>(
mut stream: std::pin::Pin<&mut Peekable<impl Stream<Item = Result<T, E>>>>,
f: impl Fn(&T) -> bool,
) -> Result<Vec<T>> {
) -> Result<Vec<T>, E> {
let mut result = Vec::new();
while let Some(item) = stream
.as_mut()
Expand All @@ -326,7 +346,9 @@ async fn consume_while<T>(
}

#[instrument(skip_all)]
async fn collect_result(stream: impl Stream<Item = Result<Value>>) -> Result<QueryResult> {
async fn collect_result(
stream: impl Stream<Item = anyhow::Result<Value>>,
) -> anyhow::Result<QueryResult> {
let mut result = Vec::new();
tokio::pin!(stream);
while let Some(row) = stream.next().await {
Expand All @@ -339,7 +361,7 @@ async fn collect_result(stream: impl Stream<Item = Result<Value>>) -> Result<Que
Ok(result)
}

fn camel_case_columns(df: DataFrame) -> Result<DataFrame> {
fn camel_case_columns(df: DataFrame) -> Result<DataFrame, DataFusionError> {
let columns = df
.schema()
.fields()
Expand All @@ -352,7 +374,6 @@ fn camel_case_columns(df: DataFrame) -> Result<DataFrame> {
.map(|c| col(c.clone()).alias(RenameRule::CamelCase.apply_to_field(&c.name)))
.collect_vec(),
)
.map_err(Into::into)
}

fn block_columns(query: &BatchRequest) -> Vec<&str> {
Expand Down Expand Up @@ -424,7 +445,7 @@ fn any_of(predicates: Vec<Expr>) -> Option<Expr> {
predicates.into_iter().reduce(or)
}

fn union_all(dataframes: Vec<DataFrame>) -> Result<Option<DataFrame>> {
fn union_all(dataframes: Vec<DataFrame>) -> Result<Option<DataFrame>, DataFusionError> {
let mut iter = dataframes.into_iter();
if let Some(first) = iter.next() {
let mut result = first;
Expand Down
15 changes: 11 additions & 4 deletions src/transport/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{env, time::Duration};

use anyhow::{anyhow, Result};
use anyhow::Result;
use camino::Utf8PathBuf as PathBuf;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -254,15 +254,16 @@ impl<MsgStream: Stream<Item = Message>> P2PTransport<MsgStream> {
) -> std::result::Result<QueryResult, QueryError> {
let (resp_tx, resp_rx) = oneshot::channel();
if let (Some(dataset), Some(query_str)) = (&query.dataset, &query.query) {
let query = serde_json::from_str(query_str).map_err(anyhow::Error::from)?;
let query = serde_json::from_str(query_str)
.map_err(|e| QueryError::BadRequest(e.to_string()))?;
match self.queries_tx.try_send(QueryTask {
dataset: dataset.clone(),
peer_id,
query,
response_sender: resp_tx,
}) {
Err(mpsc::error::TrySendError::Full(_)) => {
Err(anyhow!("Service overloaded"))?;
return Err(QueryError::ServiceOverloaded);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
panic!("Query subscriber dropped");
Expand Down Expand Up @@ -294,6 +295,9 @@ impl<MsgStream: Stream<Item = Message>> P2PTransport<MsgStream> {
Err(e @ QueryError::NotFound) => query_result::Result::BadRequest(e.to_string()),
Err(QueryError::NoAllocation) => query_result::Result::NoAllocation(()),
Err(QueryError::BadRequest(e)) => query_result::Result::BadRequest(e),
Err(e @ QueryError::ServiceOverloaded) => {
query_result::Result::ServerError(e.to_string())
}
Err(QueryError::Other(e)) => query_result::Result::ServerError(e.to_string()),
};
let envelope = subsquid_messages::Envelope {
Expand Down Expand Up @@ -359,6 +363,9 @@ impl<MsgStream: Stream<Item = Message>> P2PTransport<MsgStream> {
}),
Err(e @ QueryError::NotFound) => query_executed::Result::BadRequest(e.to_string()),
Err(QueryError::BadRequest(e)) => query_executed::Result::BadRequest(e.clone()),
Err(e @ QueryError::ServiceOverloaded) => {
query_executed::Result::ServerError(e.to_string())
}
Err(QueryError::Other(e)) => query_executed::Result::ServerError(e.to_string()),
Err(QueryError::NoAllocation) => panic!("Shouldn't send logs with NoAllocation error"),
};
Expand Down Expand Up @@ -453,7 +460,7 @@ fn report_metrics(result: &std::result::Result<QueryResult, QueryError>) {
Err(QueryError::NotFound | QueryError::NoAllocation | QueryError::BadRequest(_)) => {
metrics::BAD_REQUEST.inc()
}
Err(QueryError::Other(_)) => metrics::SERVER_ERROR.inc(),
Err(QueryError::Other(_) | QueryError::ServiceOverloaded) => metrics::SERVER_ERROR.inc(),
};
}

Expand Down

0 comments on commit a6b0ab7

Please sign in to comment.