Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic kills running engine in query-engine-tests #4499

Merged
merged 11 commits into from
Nov 29, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ indexmap = { version = "1.0", features = ["serde-1"] }
query-engine-metrics = {path = "../../metrics"}
quaint.workspace = true
jsonrpc-core = "17"
futures = "0.3"

# Only this version is vetted, upgrade only after going through the code,
# as this is a small crate with little user base.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod external_process;
use super::*;
use external_process::*;
use serde::de::DeserializeOwned;
use std::{collections::HashMap, sync::atomic::AtomicU64};
use std::sync::atomic::AtomicU64;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

pub(crate) async fn executor_process_request<T: DeserializeOwned>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use super::*;
use futures::lock::Mutex;
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use std::{fmt::Display, io::Write as _, sync::atomic::Ordering};
use std::{
fmt::Display,
io::Write as _,
sync::{atomic::Ordering, Arc},
};
use tokio::sync::{mpsc, oneshot};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down Expand Up @@ -29,6 +34,17 @@ fn exit_with_message(status_code: i32, message: &str) -> ! {
}

impl ExecutorProcess {
fn spawn() -> ExecutorProcess {
match std::thread::spawn(ExecutorProcess::new).join() {
Ok(Ok(process)) => process,
Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")),
Err(err) => {
let err = err.downcast_ref::<String>().map(ToOwned::to_owned).unwrap_or_default();
exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}"))
}
}
}

fn new() -> Result<ExecutorProcess> {
let (sender, receiver) = mpsc::channel::<ReqImpl>(300);

Expand Down Expand Up @@ -81,15 +97,34 @@ impl ExecutorProcess {
}
}

pub(super) static EXTERNAL_PROCESS: Lazy<ExecutorProcess> =
Lazy::new(|| match std::thread::spawn(ExecutorProcess::new).join() {
Ok(Ok(process)) => process,
Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")),
Err(err) => {
let err = err.downcast_ref::<String>().map(ToOwned::to_owned).unwrap_or_default();
exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}"))
/// Wraps an ExecutorProcess allowing for restarting it.
///
/// A node process can die for a number of reasons, being one that any `panic!` occurring in Rust
/// asynchronous code are translated to an abort trap by wasm-bindgen, which kills the node process.
#[derive(Clone)]
pub(crate) struct RestartableExecutorProcess {
p: Arc<Mutex<ExecutorProcess>>,
miguelff marked this conversation as resolved.
Show resolved Hide resolved
miguelff marked this conversation as resolved.
Show resolved Hide resolved
}

impl RestartableExecutorProcess {
fn new() -> Self {
Self {
p: Arc::new(Mutex::new(ExecutorProcess::spawn())),
}
});
}

async fn restart(&self) {
let mut p = self.p.lock().await;
_ = std::mem::replace(&mut *p, ExecutorProcess::spawn());
}

pub(crate) async fn request<T: DeserializeOwned>(&self, method: &str, params: serde_json::Value) -> Result<T> {
let p = self.p.lock().await;
p.request(method, params).await
}
}

pub(super) static EXTERNAL_PROCESS: Lazy<RestartableExecutorProcess> = Lazy::new(|| RestartableExecutorProcess::new());

type ReqImpl = (
jsonrpc_core::MethodCall,
Expand Down Expand Up @@ -122,8 +157,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {

let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut pending_requests: HashMap<jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>> =
HashMap::new();
let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>)> = None;

loop {
tokio::select! {
Expand All @@ -137,7 +171,11 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
{
match serde_json::from_str::<jsonrpc_core::Output>(&line) {
Ok(response) => {
let sender = pending_requests.remove(response.id()).unwrap();
let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request");
if &id != response.id() {
unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`");
}

match response {
jsonrpc_core::Output::Success(success) => {
// The other end may be dropped if the whole
Expand All @@ -159,7 +197,12 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
}
Ok(None) => // end of the stream
{
exit_with_message(1, "child node process stdout closed")
tracing::error!("Error when reading from child node process. Process might have exited. Restarting...");
if let Some((_, sender)) = last_pending_request.take() {
let _ = sender.send(Err(Box::new(jsonrpc_core::types::Error::new(jsonrpc_core::ErrorCode::ServerError(500))))).unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might need to adjust this, Given the last few lines of the test report
output.txt

 params=[] result="success" item_type="query" is_query=true duration_ms=8
thread 'new::interactive_tx::itx_isolation::spacing_doesnt_matter' panicked at query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs:290:35:
called `Result::unwrap()` on an `Err` value: External(Error { code: ServerError(500), message: "Server error", data: None })


failures:
    new::interactive_tx::interactive_tx::basic_commit_workflow
    new::interactive_tx::interactive_tx::basic_rollback_workflow
    new::interactive_tx::interactive_tx::batch_queries_failure
    new::interactive_tx::interactive_tx::batch_queries_rollback
    new::interactive_tx::interactive_tx::batch_queries_success
    new::interactive_tx::interactive_tx::commit_after_rollback
    new::interactive_tx::interactive_tx::double_commit
    new::interactive_tx::interactive_tx::double_rollback
    new::interactive_tx::interactive_tx::multiple_tx
    new::interactive_tx::interactive_tx::no_auto_rollback
    new::interactive_tx::interactive_tx::raw_queries
    new::interactive_tx::interactive_tx::rollback_after_commit
    new::interactive_tx::interactive_tx::tx_expiration_cycle
    new::interactive_tx::interactive_tx::tx_expiration_failure_cycle
    new::interactive_tx::interactive_tx::write_conflict
    new::interactive_tx::itx_isolation::basic_serializable
    new::interactive_tx::itx_isolation::casing_doesnt_matter
    new::interactive_tx::itx_isolation::invalid_isolation
    new::interactive_tx::itx_isolation::spacing_doesnt_matter

test result: FAILED. 1 passed; 19 failed; 0 ignored; 0 measured; 1563 filtered out; finished in 4.18s

error: test failed, to rerun pass `-p query-engine-tests --test query_engine_tests

}
EXTERNAL_PROCESS.restart().await;
break;
}
Err(err) => // log it
{
Expand All @@ -174,7 +217,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
exit_with_message(1, "The json-rpc client channel was closed");
}
Some((request, response_sender)) => {
pending_requests.insert(request.id.clone(), response_sender);
last_pending_request = Some((request.id.clone(), response_sender));
let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions query-engine/query-engine-wasm/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ else
BUILD_PROFILE="--dev"
fi

# Check if wasm-pack is installed
if ! command -v wasm-pack &> /dev/null
then
echo "wasm-pack could not be found, installing now..."
# Install wasm-pack
curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
fi
wasm-pack build $BUILD_PROFILE --target $OUT_TARGET

sed -i '' 's/name = "query_engine"/name = "query_engine_wasm"/g' Cargo.toml
Expand Down
Loading