Skip to content

Commit

Permalink
refactor(hydro_deploy)!: replace async-channel with `tokio::sync::m…
Browse files Browse the repository at this point in the history
…psc::unbounded_channel` (#1356)

Depends on #1339

We could make the publicly facing `stdout`, `stderr` APIs return `impl Stream<Output = String>` in the future, maybe
  • Loading branch information
MingweiSamuel committed Jul 18, 2024
1 parent 141eae1 commit e9a87eb
Show file tree
Hide file tree
Showing 18 changed files with 91 additions and 112 deletions.
17 changes: 2 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion hydro_deploy/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ description = "Hydro Deploy"

[dependencies]
anyhow = { version = "1.0.69", features = [ "backtrace" ] }
async-channel = "1.8.0"
async-process = "1.6.0"
async-recursion = "1.0"
async-ssh2-lite = { version = "0.4.2", features = [ "tokio" ] }
Expand All @@ -30,4 +29,5 @@ serde_json = "1"
shell-escape = "0.1.5"
tempfile = "3.3.0"
tokio = { version = "1.16", features = [ "full" ] }
tokio-stream = { version = "0.1.15", default-features = false }
tokio-util = { version = "0.7.7", features=[ "compat" ] }
4 changes: 2 additions & 2 deletions hydro_deploy/core/src/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ mod tests {

deployment.deploy().await.unwrap();

let stdout = service.try_read().unwrap().stdout();
let mut stdout = service.try_read().unwrap().stdout();

deployment.start().await.unwrap();

assert_eq!(stdout.recv().await.unwrap(), "hello!");

assert!(stdout.recv().await.is_err());
assert!(stdout.recv().await.is_none());
}
}
16 changes: 5 additions & 11 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Result};
use async_channel::Receiver;
use async_trait::async_trait;
use futures_core::Future;
use hydroflow_cli_integration::{InitConfig, ServerPort};
use serde::Serialize;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, RwLock};

use super::build::{build_crate_memoized, BuildError, BuildOutput, BuildParams};
use super::ports::{self, HydroflowPortConfig, HydroflowSink, SourcePath};
Expand Down Expand Up @@ -139,11 +138,11 @@ impl HydroflowCrateService {
}
}

pub fn stdout(&self) -> Receiver<String> {
pub fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
self.launched_binary.as_ref().unwrap().stdout()
}

pub fn stderr(&self) -> Receiver<String> {
pub fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
self.launched_binary.as_ref().unwrap().stderr()
}

Expand Down Expand Up @@ -246,10 +245,7 @@ impl Service for HydroflowCrateService {
// request stdout before sending config so we don't miss the "ready" response
let stdout_receiver = binary.cli_stdout();

binary
.stdin()
.send(format!("{formatted_bind_config}\n"))
.await?;
binary.stdin().send(format!("{formatted_bind_config}\n"))?;

let ready_line = ProgressTracker::leaf(
"waiting for ready".to_string(),
Expand Down Expand Up @@ -290,7 +286,6 @@ impl Service for HydroflowCrateService {
.unwrap()
.stdin()
.send(format!("start: {formatted_defns}\n"))
.await
.unwrap();

let start_ack_line = ProgressTracker::leaf(
Expand All @@ -311,8 +306,7 @@ impl Service for HydroflowCrateService {
.as_ref()
.unwrap()
.stdin()
.send("stop\n".to_string())
.await?;
.send("stop\n".to_string())?;

self.launched_binary.as_mut().unwrap().wait().await;

Expand Down
10 changes: 5 additions & 5 deletions hydro_deploy/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use hydroflow_cli_integration::ServerBindConfig;

Expand All @@ -29,6 +28,7 @@ pub use hydroflow_crate::HydroflowCrate;

pub mod custom_service;
pub use custom_service::CustomService;
use tokio::sync::{mpsc, oneshot};

use crate::hydroflow_crate::build::BuildOutput;

Expand Down Expand Up @@ -72,16 +72,16 @@ pub struct ResourceResult {

#[async_trait]
pub trait LaunchedBinary: Send + Sync {
fn stdin(&self) -> Sender<String>;
fn stdin(&self) -> mpsc::UnboundedSender<String>;

/// Provides a oneshot channel for the CLI to handshake with the binary,
/// with the guarantee that as long as the CLI is holding on
/// to a handle, none of the messages will also be broadcast
/// to the user-facing [`LaunchedBinary::stdout`] channel.
fn cli_stdout(&self) -> tokio::sync::oneshot::Receiver<String>;
fn cli_stdout(&self) -> oneshot::Receiver<String>;

fn stdout(&self) -> Receiver<String>;
fn stderr(&self) -> Receiver<String>;
fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
fn stderr(&self) -> mpsc::UnboundedReceiver<String>;

fn exit_code(&self) -> Option<i32>;

Expand Down
30 changes: 15 additions & 15 deletions hydro_deploy/core/src/localhost/launched_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
use std::os::unix::process::ExitStatusExt;
use std::sync::{Arc, Mutex};

use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use futures::io::BufReader;
use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
use futures::{AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot};

use crate::util::prioritized_broadcast;
use crate::LaunchedBinary;

pub struct LaunchedLocalhostBinary {
child: Mutex<async_process::Child>,
stdin_sender: Sender<String>,
stdout_cli_receivers: Arc<Mutex<Option<tokio::sync::oneshot::Sender<String>>>>,
stdout_receivers: Arc<Mutex<Vec<Sender<String>>>>,
stderr_receivers: Arc<Mutex<Vec<Sender<String>>>>,
stdin_sender: mpsc::UnboundedSender<String>,
stdout_cli_receivers: Arc<Mutex<Option<oneshot::Sender<String>>>>,
stdout_receivers: Arc<Mutex<Vec<mpsc::UnboundedSender<String>>>>,
stderr_receivers: Arc<Mutex<Vec<mpsc::UnboundedSender<String>>>>,
}

#[cfg(unix)]
Expand All @@ -39,10 +39,10 @@ impl Drop for LaunchedLocalhostBinary {

impl LaunchedLocalhostBinary {
pub fn new(mut child: async_process::Child, id: String) -> Self {
let (stdin_sender, mut stdin_receiver) = async_channel::unbounded::<String>();
let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
let mut stdin = child.stdin.take().unwrap();
tokio::spawn(async move {
while let Some(line) = stdin_receiver.next().await {
while let Some(line) = stdin_receiver.recv().await {
if stdin.write_all(line.as_bytes()).await.is_err() {
break;
}
Expand Down Expand Up @@ -73,32 +73,32 @@ impl LaunchedLocalhostBinary {

#[async_trait]
impl LaunchedBinary for LaunchedLocalhostBinary {
fn stdin(&self) -> Sender<String> {
fn stdin(&self) -> mpsc::UnboundedSender<String> {
self.stdin_sender.clone()
}

fn cli_stdout(&self) -> tokio::sync::oneshot::Receiver<String> {
fn cli_stdout(&self) -> oneshot::Receiver<String> {
let mut receivers = self.stdout_cli_receivers.lock().unwrap();

if receivers.is_some() {
panic!("Only one CLI stdout receiver is allowed at a time");
}

let (sender, receiver) = tokio::sync::oneshot::channel::<String>();
let (sender, receiver) = oneshot::channel::<String>();
*receivers = Some(sender);
receiver
}

fn stdout(&self) -> Receiver<String> {
fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
let mut receivers = self.stdout_receivers.lock().unwrap();
let (sender, receiver) = async_channel::unbounded::<String>();
let (sender, receiver) = mpsc::unbounded_channel::<String>();
receivers.push(sender);
receiver
}

fn stderr(&self) -> Receiver<String> {
fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
let mut receivers = self.stderr_receivers.lock().unwrap();
let (sender, receiver) = async_channel::unbounded::<String>();
let (sender, receiver) = mpsc::unbounded_channel::<String>();
receivers.push(sender);
receiver
}
Expand Down
30 changes: 15 additions & 15 deletions hydro_deploy/core/src/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::{Context, Result};
use async_channel::{Receiver, Sender};
use async_ssh2_lite::ssh2::ErrorCode;
use async_ssh2_lite::{AsyncChannel, AsyncSession, Error, SessionConfiguration};
use async_trait::async_trait;
use futures::io::BufReader;
use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
use futures::{AsyncBufReadExt, AsyncWriteExt};
use hydroflow_cli_integration::ServerBindConfig;
use nanoid::nanoid;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, oneshot};

use super::progress::ProgressTracker;
use super::util::async_retry;
Expand All @@ -26,40 +26,40 @@ struct LaunchedSshBinary {
_resource_result: Arc<ResourceResult>,
session: Option<AsyncSession<TcpStream>>,
channel: AsyncChannel<TcpStream>,
stdin_sender: Sender<String>,
stdout_receivers: Arc<Mutex<Vec<Sender<String>>>>,
stdout_cli_receivers: Arc<Mutex<Option<tokio::sync::oneshot::Sender<String>>>>,
stderr_receivers: Arc<Mutex<Vec<Sender<String>>>>,
stdin_sender: mpsc::UnboundedSender<String>,
stdout_receivers: Arc<Mutex<Vec<mpsc::UnboundedSender<String>>>>,
stdout_cli_receivers: Arc<Mutex<Option<oneshot::Sender<String>>>>,
stderr_receivers: Arc<Mutex<Vec<mpsc::UnboundedSender<String>>>>,
}

#[async_trait]
impl LaunchedBinary for LaunchedSshBinary {
fn stdin(&self) -> Sender<String> {
fn stdin(&self) -> mpsc::UnboundedSender<String> {
self.stdin_sender.clone()
}

fn cli_stdout(&self) -> tokio::sync::oneshot::Receiver<String> {
fn cli_stdout(&self) -> oneshot::Receiver<String> {
let mut receivers = self.stdout_cli_receivers.lock().unwrap();

if receivers.is_some() {
panic!("Only one CLI stdout receiver is allowed at a time");
}

let (sender, receiver) = tokio::sync::oneshot::channel::<String>();
let (sender, receiver) = oneshot::channel::<String>();
*receivers = Some(sender);
receiver
}

fn stdout(&self) -> Receiver<String> {
fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
let mut receivers = self.stdout_receivers.lock().unwrap();
let (sender, receiver) = async_channel::unbounded::<String>();
let (sender, receiver) = mpsc::unbounded_channel::<String>();
receivers.push(sender);
receiver
}

fn stderr(&self) -> Receiver<String> {
fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
let mut receivers = self.stderr_receivers.lock().unwrap();
let (sender, receiver) = async_channel::unbounded::<String>();
let (sender, receiver) = mpsc::unbounded_channel::<String>();
receivers.push(sender);
receiver
}
Expand Down Expand Up @@ -317,10 +317,10 @@ impl<T: LaunchedSshHost> LaunchedHost for T {
)
.await?;

let (stdin_sender, mut stdin_receiver) = async_channel::unbounded::<String>();
let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
let mut stdin = channel.stream(0); // stream 0 is stdout/stdin, we use it for stdin
tokio::spawn(async move {
while let Some(line) = stdin_receiver.next().await {
while let Some(line) = stdin_receiver.recv().await {
if stdin.write_all(line.as_bytes()).await.is_err() {
break;
}
Expand Down
Loading

0 comments on commit e9a87eb

Please sign in to comment.