Skip to content

Commit

Permalink
Unrevert with changes
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Dec 4, 2023
1 parent 248210d commit 70fd058
Showing 1 changed file with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,42 @@ impl Display for ExecutorProcessDiedError {

impl StdError for ExecutorProcessDiedError {}

struct PendingRequests {
map: HashMap<jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>>,
last_id: Option<jsonrpc_core::Id>,
}

impl PendingRequests {
fn new() -> Self {
Self {
map: HashMap::new(),
last_id: None,
}
}

fn insert(&mut self, id: jsonrpc_core::Id, sender: oneshot::Sender<Result<serde_json::value::Value>>) {
self.map.insert(id.clone(), sender);
self.last_id = Some(id);
}

fn respond(&mut self, id: &jsonrpc_core::Id, response: Result<serde_json::value::Value>) {
self.map
.remove(id)
.expect("no sender for response")
.send(response)
.unwrap();
}

fn respond_to_last(&mut self, response: Result<serde_json::value::Value>) {
let last_id = self
.last_id
.as_ref()
.expect("Expected last response to exist")
.to_owned();
self.respond(&last_id, response);
}
}

pub(super) static EXTERNAL_PROCESS: Lazy<RestartableExecutorProcess> = Lazy::new(RestartableExecutorProcess::new);

type ReqImpl = (
Expand Down Expand Up @@ -173,8 +209,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {

let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut pending_requests: HashMap<jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>> =
HashMap::new();
let mut pending_requests = PendingRequests::new();

loop {
tokio::select! {
Expand All @@ -187,20 +222,20 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
Ok(Some(line)) => // new response
{
match serde_json::from_str::<jsonrpc_core::Output>(&line) {
Ok(response) => {
let sender = pending_requests.remove(response.id()).unwrap();
match response {
Ok(ref response) => {
let res: Result<serde_json::value::Value> = match response {
jsonrpc_core::Output::Success(success) => {
// The other end may be dropped if the whole
// request future was dropped and not polled to
// completion, so we ignore send errors here.
_ = sender.send(Ok(success.result));
Ok(success.result.clone())
}
jsonrpc_core::Output::Failure(err) => {
tracing::error!("error response from jsonrpc: {err:?}");
_ = sender.send(Err(Box::new(err.error)));
Err(Box::new(err.error.clone()))
}
}
};
pending_requests.respond(response.id(), res)
}
Err(err) => {
tracing::error!(%err, "error when decoding response from child node process. Response was: `{}`", &line);
Expand All @@ -210,7 +245,11 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
}
Ok(None) => // end of the stream
{
exit_with_message(1, "child node process stdout closed")
tracing::error!("Error when reading from child node process. Process might have exited. Restarting...");

pending_requests.respond_to_last(Err(Box::new(ExecutorProcessDiedError)));
EXTERNAL_PROCESS.restart().await;
break;
}
Err(err) => // log it
{
Expand All @@ -226,6 +265,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
}
Some((request, response_sender)) => {
pending_requests.insert(request.id.clone(), response_sender);

let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
Expand Down

0 comments on commit 70fd058

Please sign in to comment.