Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
getting closer to end to end run

External executors are getting an identifier
  • Loading branch information
tomhoule committed Sep 1, 2023
1 parent 1bc57fa commit e42f399
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ model Opp {
runner,
r#"{groupByApp(by: [categories], orderBy: { categories: "desc" }) { _count { slug } categories }}"#
);
dbg!(&result);
assert_eq!(result, "{\"data\":{\"groupByApp\":[{\"_count\":{\"slug\":1},\"categories\":[\"messaging\",\"payment\"]},{\"_count\":{\"slug\":2},\"categories\":[\"calendar\",\"other\"]},{\"_count\":{\"slug\":1},\"categories\":[]}]}}");

let result = run_query!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub(crate) async fn executor_process_request<T: DeserializeOwned>(
method: &str,
params: serde_json::Value,
) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
dbg!("here, starting");
NODE_PROCESS.0.request(method, params).await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ExecutorProcess {
},
init_sender,
))
.unwrap();
.expect("Failed to blocking send the initialize method");

let config_json = init_receiver
.blocking_recv()
Expand All @@ -72,6 +72,7 @@ impl ExecutorProcess {

/// Convenient façade. Allocates more than necessary, but this is only for testing.
pub(crate) async fn request<T: DeserializeOwned>(&self, method: &str, params: serde_json::Value) -> Result<T> {
dbg!("here");
let (sender, receiver) = oneshot::channel();
let params = if let serde_json::Value::Object(params) = params {
params
Expand All @@ -87,6 +88,8 @@ impl ExecutorProcess {

self.task_handle.send((method_call, sender)).await?;
let raw_response = receiver.await?;
dbg!(&raw_response);
tracing::debug!(%raw_response);
let response = serde_json::from_value(raw_response)?;
Ok(response)
}
Expand Down Expand Up @@ -117,71 +120,81 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
Some(env_var) => env_var,
None => exit_with_message(1, "start_rpc_thread() error: NODE_TEST_EXECUTOR env var is not defined"),
};
eprintln!("Spawning test executor process at `{env_var}`");
let process = Command::new(env_var)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(|err| GenericError(format!("Failed to spawn node executor process.\nDetails: {err}\n")))?;

tokio::spawn(async move {
let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut pending_requests: HashMap<jsonrpc_core::Id, oneshot::Sender<serde_json::value::Value>> = HashMap::new();

loop {
tokio::select! {
line = stdout.next_line() => {
match line {
Ok(Some(line)) => // new response
{
let response: jsonrpc_core::Output = match serde_json::from_str(&line) {
Ok(response) => response,
Err(err) => // log it
{
tracing::error!(%err, "Error when decoding response from child node process");
continue
}
};

let sender = pending_requests.remove(response.id()).unwrap();
match response {
jsonrpc_core::Output::Success(success) => {
sender.send(success.result).unwrap();
}
jsonrpc_core::Output::Failure(err) => {
panic!("error response from jsonrpc: {err:?}")
tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
eprintln!("Spawning test executor process at `{env_var}`");
let process = match Command::new(env_var)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
{
Ok(process) => process,
Err(err) => exit_with_message(1, &format!("Failed to spawn the executor process.\nDetails: {err}\n")),
};
// .map_err(|err| GenericError(format!("Failed to spawn node executor process.\nDetails: {err}\n")))?;

let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut pending_requests: HashMap<jsonrpc_core::Id, oneshot::Sender<serde_json::value::Value>> =
HashMap::new();

loop {
tokio::select! {
line = stdout.next_line() => {
match line {
Ok(Some(line)) => // new response
{
let response: jsonrpc_core::Output = match serde_json::from_str(&line) {
Ok(response) => response,
Err(err) => // log it
{
tracing::error!(%err, "Error when decoding response from child node process");
continue
}
};

let sender = pending_requests.remove(response.id()).unwrap();
match response {
jsonrpc_core::Output::Success(success) => {
sender.send(success.result).unwrap();
}
jsonrpc_core::Output::Failure(err) => {
panic!("error response from jsonrpc: {err:?}")
}
}
}

}
Ok(None) => // end of the stream
{
exit_with_message(1, "child node process stdout closed")
}
Err(err) => // log it
{
tracing::error!(%err, "Error when reading from child node process");
}
Ok(None) => // end of the stream
{
exit_with_message(1, "child node process stdout closed")
}
Err(err) => // log it
{
tracing::error!(%err, "Error when reading from child node process");
}
}
}
}
request = receiver.recv() => {
match request {
None => // channel closed
{
exit_with_message(1, "The json-rpc client channel was closed");
}
Some((request, response_sender)) => {
pending_requests.insert(request.id.clone(), response_sender);
let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
request = receiver.recv() => {
match request {
None => // channel closed
{
exit_with_message(1, "The json-rpc client channel was closed");
}
Some((request, response_sender)) => {
pending_requests.insert(request.id.clone(), response_sender);
let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
}
}
}
}
}
}
});
});

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
struct SimpleGqlResponse {
data: serde_json::Value,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)]
errors: Vec<GQLError>,
#[serde(skip_serializing_if = "Option::is_none")]
extensions: Option<serde_json::Value>,
Expand All @@ -14,6 +15,7 @@ struct SimpleGqlResponse {
struct SimpleGqlBatchResponse {
batch_result: Vec<SimpleGqlResponse>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)]
errors: Vec<GQLError>,
#[serde(skip_serializing_if = "Option::is_none")]
extensions: Option<serde_json::Value>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@ use request_handlers::{
RequestBody, RequestHandler,
};
use serde_json::json;
use std::{env, sync::Arc};
use std::{
env,
sync::{atomic::AtomicUsize, Arc},
};

pub type TxResult = Result<(), user_facing_errors::Error>;

pub(crate) type Executor = Box<dyn QueryExecutor + Send + Sync>;

pub enum RunnerExecutor {
Builtin(Executor),
External,
External(usize),
}

impl RunnerExecutor {
fn new_external() -> RunnerExecutor {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
RunnerExecutor::External(COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
}

/// Direct engine runner.
Expand Down Expand Up @@ -62,7 +72,7 @@ impl Runner {
let url = data_source.load_url(|key| env::var(key).ok()).unwrap();

let executor = match crate::NODE_TEST_EXECUTOR.as_ref() {
Some(_) => RunnerExecutor::External,
Some(_) => RunnerExecutor::new_external(),
None => RunnerExecutor::Builtin(
request_handlers::load_executor(
ConnectorMode::Rust,
Expand Down Expand Up @@ -96,7 +106,9 @@ impl Runner {

let executor = match &self.executor {
RunnerExecutor::Builtin(e) => e,
RunnerExecutor::External => return Ok(executor_process_request("query", json!({ "query": query })).await?),
RunnerExecutor::External(_) => {
return Ok(executor_process_request("query", json!({ "query": query })).await?)
}
};

tracing::debug!("Querying: {}", query.clone().green());
Expand Down Expand Up @@ -146,7 +158,7 @@ impl Runner {

let executor = match &self.executor {
RunnerExecutor::Builtin(e) => e,
RunnerExecutor::External => {
RunnerExecutor::External(_) => {
return Ok(executor_process_request("queryJson", json!({ "query": query })).await?)
}
};
Expand Down Expand Up @@ -189,7 +201,7 @@ impl Runner {
isolation_level: Option<String>,
) -> TestResult<crate::QueryResult> {
let executor = match &self.executor {
RunnerExecutor::External => todo!(),
RunnerExecutor::External(_) => todo!(),
RunnerExecutor::Builtin(e) => e,
};

Expand All @@ -214,7 +226,7 @@ impl Runner {
isolation_level: Option<String>,
) -> TestResult<crate::QueryResult> {
let executor = match &self.executor {
RunnerExecutor::External => todo!(),
RunnerExecutor::External(_) => todo!(),
RunnerExecutor::Builtin(e) => e,
};

Expand Down Expand Up @@ -262,7 +274,7 @@ impl Runner {
) -> TestResult<TxId> {
let executor = match &self.executor {
RunnerExecutor::Builtin(e) => e,
RunnerExecutor::External => todo!(),
RunnerExecutor::External(_) => todo!(),
};

let tx_opts = TransactionOptions::new(max_acquisition_millis, valid_for_millis, isolation_level);
Expand All @@ -276,7 +288,7 @@ impl Runner {
pub async fn commit_tx(&self, tx_id: TxId) -> TestResult<TxResult> {
let executor = match &self.executor {
RunnerExecutor::Builtin(e) => e,
RunnerExecutor::External => todo!(),
RunnerExecutor::External(_) => todo!(),
};
let res = executor.commit_tx(tx_id).await;

Expand All @@ -290,7 +302,7 @@ impl Runner {
pub async fn rollback_tx(&self, tx_id: TxId) -> TestResult<TxResult> {
let executor = match &self.executor {
RunnerExecutor::Builtin(e) => e,
RunnerExecutor::External => todo!(),
RunnerExecutor::External(_) => todo!(),
};
let res = executor.rollback_tx(tx_id).await;

Expand Down
5 changes: 3 additions & 2 deletions start.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

node ./query-engine/js-connectors/js/connector-test-kit-executor/dist/index.mjs
SOURCE=$(dirname ${BASH_SOURCE[0]})
node "${SOURCE}/query-engine/js-connectors/js/connector-test-kit-executor/dist/index.mjs"

0 comments on commit e42f399

Please sign in to comment.