From e9a87eb71444d558f03a056a1bda037aa72de335 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Wed, 17 Jul 2024 19:18:08 -0700 Subject: [PATCH] refactor(hydro_deploy)!: replace `async-channel` with `tokio::sync::mpsc::unbounded_channel` (#1356) Depends on #1339 We could make the publicly facing `stdout`, `stderr` APIs return `impl Stream` in the future, maybe --- Cargo.lock | 17 +------ hydro_deploy/core/Cargo.toml | 2 +- hydro_deploy/core/src/hydroflow_crate/mod.rs | 4 +- .../core/src/hydroflow_crate/service.rs | 16 +++---- hydro_deploy/core/src/lib.rs | 10 ++--- .../core/src/localhost/launched_binary.rs | 30 ++++++------- hydro_deploy/core/src/ssh.rs | 30 ++++++------- hydro_deploy/core/src/util.rs | 45 +++++++++---------- hydro_deploy/hydro_cli/Cargo.toml | 1 - hydro_deploy/hydro_cli/src/lib.rs | 19 ++++---- .../hydroflow_plus_cli_integration/Cargo.toml | 3 +- .../src/deploy.rs | 5 +-- .../src/cluster/many_to_many.rs | 2 +- .../src/cluster/simple_cluster.rs | 4 +- .../src/distributed/first_ten.rs | 2 +- .../src/distributed/networked.rs | 4 +- template/hydroflow_plus/flow/Cargo.toml | 1 + .../flow/src/first_ten_distributed.rs | 8 +++- 18 files changed, 91 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b58fed869806..c075672b3818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,17 +136,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - [[package]] name = "async-channel" version = "2.1.1" @@ -407,7 +396,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" dependencies = [ - "async-channel 2.1.1", + "async-channel", "async-lock 3.2.0", "async-task", "fastrand 2.0.1", @@ -1350,7 +1339,6 @@ name = "hydro_cli" version = "0.7.0" dependencies = [ "anyhow", - "async-channel 1.9.0", "async-ssh2-lite", "bytes", "clap", @@ -1384,7 +1372,6 @@ name = "hydro_deploy" version = "0.7.0" dependencies = [ "anyhow", - "async-channel 1.9.0", "async-process", "async-recursion", "async-ssh2-lite", @@ -1405,6 +1392,7 @@ dependencies = [ "shell-escape", "tempfile", "tokio", + "tokio-stream", "tokio-util", ] @@ -1556,7 +1544,6 @@ dependencies = [ name = "hydroflow_plus_cli_integration" version = "0.7.0" dependencies = [ - "async-channel 1.9.0", "hydro_deploy", "hydroflow_plus", "serde", diff --git a/hydro_deploy/core/Cargo.toml b/hydro_deploy/core/Cargo.toml index 09356bd03b36..cfd4bce5a886 100644 --- a/hydro_deploy/core/Cargo.toml +++ b/hydro_deploy/core/Cargo.toml @@ -9,7 +9,6 @@ description = "Hydro Deploy" [dependencies] anyhow = { version = "1.0.69", features = [ "backtrace" ] } -async-channel = "1.8.0" async-process = "1.6.0" async-recursion = "1.0" async-ssh2-lite = { version = "0.4.2", features = [ "tokio" ] } @@ -30,4 +29,5 @@ serde_json = "1" shell-escape = "0.1.5" tempfile = "3.3.0" tokio = { version = "1.16", features = [ "full" ] } +tokio-stream = { version = "0.1.15", default-features = false } tokio-util = { version = "0.7.7", features=[ "compat" ] } diff --git a/hydro_deploy/core/src/hydroflow_crate/mod.rs b/hydro_deploy/core/src/hydroflow_crate/mod.rs index 7edc1a5316cd..4e2639e4c4d7 100644 --- a/hydro_deploy/core/src/hydroflow_crate/mod.rs +++ b/hydro_deploy/core/src/hydroflow_crate/mod.rs @@ -150,12 +150,12 @@ mod tests { deployment.deploy().await.unwrap(); - let stdout = service.try_read().unwrap().stdout(); + let mut stdout = service.try_read().unwrap().stdout(); deployment.start().await.unwrap(); assert_eq!(stdout.recv().await.unwrap(), "hello!"); - assert!(stdout.recv().await.is_err()); + assert!(stdout.recv().await.is_none()); } } diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index 76dbe24d75e7..eb3df054c500 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -4,12 +4,11 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{bail, Result}; -use async_channel::Receiver; use async_trait::async_trait; use futures_core::Future; use hydroflow_cli_integration::{InitConfig, ServerPort}; use serde::Serialize; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, RwLock}; use super::build::{build_crate_memoized, BuildError, BuildOutput, BuildParams}; use super::ports::{self, HydroflowPortConfig, HydroflowSink, SourcePath}; @@ -139,11 +138,11 @@ impl HydroflowCrateService { } } - pub fn stdout(&self) -> Receiver { + pub fn stdout(&self) -> mpsc::UnboundedReceiver { self.launched_binary.as_ref().unwrap().stdout() } - pub fn stderr(&self) -> Receiver { + pub fn stderr(&self) -> mpsc::UnboundedReceiver { self.launched_binary.as_ref().unwrap().stderr() } @@ -246,10 +245,7 @@ impl Service for HydroflowCrateService { // request stdout before sending config so we don't miss the "ready" response let stdout_receiver = binary.cli_stdout(); - binary - .stdin() - .send(format!("{formatted_bind_config}\n")) - .await?; + binary.stdin().send(format!("{formatted_bind_config}\n"))?; let ready_line = ProgressTracker::leaf( "waiting for ready".to_string(), @@ -290,7 +286,6 @@ impl Service for HydroflowCrateService { .unwrap() .stdin() .send(format!("start: {formatted_defns}\n")) - .await .unwrap(); let start_ack_line = ProgressTracker::leaf( @@ -311,8 +306,7 @@ impl Service for HydroflowCrateService { .as_ref() .unwrap() .stdin() - .send("stop\n".to_string()) - .await?; + .send("stop\n".to_string())?; self.launched_binary.as_mut().unwrap().wait().await; diff --git a/hydro_deploy/core/src/lib.rs b/hydro_deploy/core/src/lib.rs index e144bd54c15c..627a15ec88ce 100644 --- a/hydro_deploy/core/src/lib.rs +++ b/hydro_deploy/core/src/lib.rs @@ -4,7 +4,6 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::Result; -use async_channel::{Receiver, Sender}; use async_trait::async_trait; use hydroflow_cli_integration::ServerBindConfig; @@ -29,6 +28,7 @@ pub use hydroflow_crate::HydroflowCrate; pub mod custom_service; pub use custom_service::CustomService; +use tokio::sync::{mpsc, oneshot}; use crate::hydroflow_crate::build::BuildOutput; @@ -72,16 +72,16 @@ pub struct ResourceResult { #[async_trait] pub trait LaunchedBinary: Send + Sync { - fn stdin(&self) -> Sender; + fn stdin(&self) -> mpsc::UnboundedSender; /// Provides a oneshot channel for the CLI to handshake with the binary, /// with the guarantee that as long as the CLI is holding on /// to a handle, none of the messages will also be broadcast /// to the user-facing [`LaunchedBinary::stdout`] channel. - fn cli_stdout(&self) -> tokio::sync::oneshot::Receiver; + fn cli_stdout(&self) -> oneshot::Receiver; - fn stdout(&self) -> Receiver; - fn stderr(&self) -> Receiver; + fn stdout(&self) -> mpsc::UnboundedReceiver; + fn stderr(&self) -> mpsc::UnboundedReceiver; fn exit_code(&self) -> Option; diff --git a/hydro_deploy/core/src/localhost/launched_binary.rs b/hydro_deploy/core/src/localhost/launched_binary.rs index 1a3f4efcef11..31d9caff5442 100644 --- a/hydro_deploy/core/src/localhost/launched_binary.rs +++ b/hydro_deploy/core/src/localhost/launched_binary.rs @@ -2,20 +2,20 @@ use std::os::unix::process::ExitStatusExt; use std::sync::{Arc, Mutex}; -use async_channel::{Receiver, Sender}; use async_trait::async_trait; use futures::io::BufReader; -use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt}; +use futures::{AsyncBufReadExt, AsyncWriteExt}; +use tokio::sync::{mpsc, oneshot}; use crate::util::prioritized_broadcast; use crate::LaunchedBinary; pub struct LaunchedLocalhostBinary { child: Mutex, - stdin_sender: Sender, - stdout_cli_receivers: Arc>>>, - stdout_receivers: Arc>>>, - stderr_receivers: Arc>>>, + stdin_sender: mpsc::UnboundedSender, + stdout_cli_receivers: Arc>>>, + stdout_receivers: Arc>>>, + stderr_receivers: Arc>>>, } #[cfg(unix)] @@ -39,10 +39,10 @@ impl Drop for LaunchedLocalhostBinary { impl LaunchedLocalhostBinary { pub fn new(mut child: async_process::Child, id: String) -> Self { - let (stdin_sender, mut stdin_receiver) = async_channel::unbounded::(); + let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::(); let mut stdin = child.stdin.take().unwrap(); tokio::spawn(async move { - while let Some(line) = stdin_receiver.next().await { + while let Some(line) = stdin_receiver.recv().await { if stdin.write_all(line.as_bytes()).await.is_err() { break; } @@ -73,32 +73,32 @@ impl LaunchedLocalhostBinary { #[async_trait] impl LaunchedBinary for LaunchedLocalhostBinary { - fn stdin(&self) -> Sender { + fn stdin(&self) -> mpsc::UnboundedSender { self.stdin_sender.clone() } - fn cli_stdout(&self) -> tokio::sync::oneshot::Receiver { + fn cli_stdout(&self) -> oneshot::Receiver { let mut receivers = self.stdout_cli_receivers.lock().unwrap(); if receivers.is_some() { panic!("Only one CLI stdout receiver is allowed at a time"); } - let (sender, receiver) = tokio::sync::oneshot::channel::(); + let (sender, receiver) = oneshot::channel::(); *receivers = Some(sender); receiver } - fn stdout(&self) -> Receiver { + fn stdout(&self) -> mpsc::UnboundedReceiver { let mut receivers = self.stdout_receivers.lock().unwrap(); - let (sender, receiver) = async_channel::unbounded::(); + let (sender, receiver) = mpsc::unbounded_channel::(); receivers.push(sender); receiver } - fn stderr(&self) -> Receiver { + fn stderr(&self) -> mpsc::UnboundedReceiver { let mut receivers = self.stderr_receivers.lock().unwrap(); - let (sender, receiver) = async_channel::unbounded::(); + let (sender, receiver) = mpsc::unbounded_channel::(); receivers.push(sender); receiver } diff --git a/hydro_deploy/core/src/ssh.rs b/hydro_deploy/core/src/ssh.rs index 30cadbabed4d..e037b084e809 100644 --- a/hydro_deploy/core/src/ssh.rs +++ b/hydro_deploy/core/src/ssh.rs @@ -6,15 +6,15 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{Context, Result}; -use async_channel::{Receiver, Sender}; use async_ssh2_lite::ssh2::ErrorCode; use async_ssh2_lite::{AsyncChannel, AsyncSession, Error, SessionConfiguration}; use async_trait::async_trait; use futures::io::BufReader; -use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt}; +use futures::{AsyncBufReadExt, AsyncWriteExt}; use hydroflow_cli_integration::ServerBindConfig; use nanoid::nanoid; use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{mpsc, oneshot}; use super::progress::ProgressTracker; use super::util::async_retry; @@ -26,40 +26,40 @@ struct LaunchedSshBinary { _resource_result: Arc, session: Option>, channel: AsyncChannel, - stdin_sender: Sender, - stdout_receivers: Arc>>>, - stdout_cli_receivers: Arc>>>, - stderr_receivers: Arc>>>, + stdin_sender: mpsc::UnboundedSender, + stdout_receivers: Arc>>>, + stdout_cli_receivers: Arc>>>, + stderr_receivers: Arc>>>, } #[async_trait] impl LaunchedBinary for LaunchedSshBinary { - fn stdin(&self) -> Sender { + fn stdin(&self) -> mpsc::UnboundedSender { self.stdin_sender.clone() } - fn cli_stdout(&self) -> tokio::sync::oneshot::Receiver { + fn cli_stdout(&self) -> oneshot::Receiver { let mut receivers = self.stdout_cli_receivers.lock().unwrap(); if receivers.is_some() { panic!("Only one CLI stdout receiver is allowed at a time"); } - let (sender, receiver) = tokio::sync::oneshot::channel::(); + let (sender, receiver) = oneshot::channel::(); *receivers = Some(sender); receiver } - fn stdout(&self) -> Receiver { + fn stdout(&self) -> mpsc::UnboundedReceiver { let mut receivers = self.stdout_receivers.lock().unwrap(); - let (sender, receiver) = async_channel::unbounded::(); + let (sender, receiver) = mpsc::unbounded_channel::(); receivers.push(sender); receiver } - fn stderr(&self) -> Receiver { + fn stderr(&self) -> mpsc::UnboundedReceiver { let mut receivers = self.stderr_receivers.lock().unwrap(); - let (sender, receiver) = async_channel::unbounded::(); + let (sender, receiver) = mpsc::unbounded_channel::(); receivers.push(sender); receiver } @@ -317,10 +317,10 @@ impl LaunchedHost for T { ) .await?; - let (stdin_sender, mut stdin_receiver) = async_channel::unbounded::(); + let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::(); let mut stdin = channel.stream(0); // stream 0 is stdout/stdin, we use it for stdin tokio::spawn(async move { - while let Some(line) = stdin_receiver.next().await { + while let Some(line) = stdin_receiver.recv().await { if stdin.write_all(line.as_bytes()).await.is_err() { break; } diff --git a/hydro_deploy/core/src/util.rs b/hydro_deploy/core/src/util.rs index 2e869c0ee773..ba47ffc464a0 100644 --- a/hydro_deploy/core/src/util.rs +++ b/hydro_deploy/core/src/util.rs @@ -3,10 +3,9 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::Result; -use async_channel::Sender; -use futures::future::join_all; use futures::{Future, StreamExt}; use futures_core::Stream; +use tokio::sync::{mpsc, oneshot}; pub async fn async_retry>>( mut thunk: impl FnMut() -> F, @@ -26,16 +25,16 @@ pub async fn async_retry>>( } type PriorityBroadcacst = ( - Arc>>>, - Arc>>>, + Arc>>>, + Arc>>>, ); pub fn prioritized_broadcast> + Send + Unpin + 'static>( mut lines: T, default: impl Fn(String) + Send + 'static, ) -> PriorityBroadcacst { - let priority_receivers = Arc::new(Mutex::new(None::>)); - let receivers = Arc::new(Mutex::new(Vec::>::new())); + let priority_receivers = Arc::new(Mutex::new(None::>)); + let receivers = Arc::new(Mutex::new(Vec::>::new())); let weak_priority_receivers = Arc::downgrade(&priority_receivers); let weak_receivers = Arc::downgrade(&receivers); @@ -58,19 +57,13 @@ pub fn prioritized_broadcast> + Send + Unpin } if let Some(receivers) = weak_receivers.upgrade() { - let send_all = { - let mut receivers = receivers.lock().unwrap(); - receivers.retain(|receiver| !receiver.is_closed()); - join_all(receivers.iter().map(|receiver| { - // Create a future which doesn't need to hold the `receivers` lock. - let receiver = receiver.clone(); - let line = &line; - async move { receiver.send(line.clone()).await } - })) - // Do not `.await` while holding onto the `std::sync::Mutex` `receivers` lock. - }; + let mut receivers = receivers.lock().unwrap(); + receivers.retain(|receiver| !receiver.is_closed()); - let successful_send = send_all.await.into_iter().any(|result| result.is_ok()); + let mut successful_send = false; + for receiver in receivers.iter() { + successful_send |= receiver.send(line.clone()).is_ok(); + } if !successful_send { (default)(line); } @@ -95,21 +88,23 @@ pub fn prioritized_broadcast> + Send + Unpin #[cfg(test)] mod test { - use futures::StreamExt; + use tokio_stream::wrappers::UnboundedReceiverStream; + + use super::*; #[tokio::test] async fn broadcast_listeners_close_when_source_does() { - let (tx, rx) = async_channel::unbounded::<_>(); - let (_, receivers) = super::prioritized_broadcast(rx, |_| {}); + let (tx, rx) = mpsc::unbounded_channel(); + let (_, receivers) = prioritized_broadcast(UnboundedReceiverStream::new(rx), |_| {}); - let (tx2, mut rx2) = async_channel::unbounded::<_>(); + let (tx2, mut rx2) = mpsc::unbounded_channel(); receivers.lock().unwrap().push(tx2); - tx.send(Ok("hello".to_string())).await.unwrap(); - assert_eq!(rx2.next().await, Some("hello".to_string())); + tx.send(Ok("hello".to_string())).unwrap(); + assert_eq!(rx2.recv().await, Some("hello".to_string())); - let wait_again = tokio::spawn(async move { rx2.next().await }); + let wait_again = tokio::spawn(async move { rx2.recv().await }); drop(tx); diff --git a/hydro_deploy/hydro_cli/Cargo.toml b/hydro_deploy/hydro_cli/Cargo.toml index dee40e52f528..8568143e0242 100644 --- a/hydro_deploy/hydro_cli/Cargo.toml +++ b/hydro_deploy/hydro_cli/Cargo.toml @@ -21,7 +21,6 @@ pyo3 = { version = "0.20", features = ["abi3-py37"] } pyo3-asyncio = { version = "0.20", features = ["attributes", "tokio-runtime"] } pythonize = "0.20" futures = "0.3.26" -async-channel = "1.8.0" bytes = "1.1.0" hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.5.2" } diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index 2531026b34d2..c2c3e8cffa4c 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -9,7 +9,6 @@ use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, OnceLock}; -use async_channel::Receiver; use bytes::Bytes; use futures::{Future, SinkExt, StreamExt}; use hydroflow_cli_integration::{ @@ -22,7 +21,7 @@ use pyo3::{create_exception, wrap_pymodule}; use pyo3_asyncio::TaskLocals; use pythonize::pythonize; use tokio::sync::oneshot::Sender; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; mod cli; use hydro_deploy as core; @@ -476,7 +475,7 @@ impl Service { #[pyclass] struct PyReceiver { - receiver: Arc>, + receiver: Arc>>, } #[pymethods] @@ -486,13 +485,15 @@ impl PyReceiver { } fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> { - let my_receiver = self.receiver.clone(); + let receiver = self.receiver.clone(); Some( interruptible_future_to_py(py, async move { - let underlying = my_receiver.recv(); - underlying + receiver + .lock() .await - .map_err(|_| PyStopAsyncIteration::new_err(())) + .recv() + .await + .ok_or_else(|| PyStopAsyncIteration::new_err(())) }) .unwrap(), ) @@ -569,7 +570,7 @@ impl HydroflowCrate { interruptible_future_to_py(py, async move { let underlying = underlying.read().await; Ok(PyReceiver { - receiver: Arc::new(underlying.stdout()), + receiver: Arc::new(Mutex::new(underlying.stdout())), }) }) } @@ -579,7 +580,7 @@ impl HydroflowCrate { interruptible_future_to_py(py, async move { let underlying = underlying.read().await; Ok(PyReceiver { - receiver: Arc::new(underlying.stderr()), + receiver: Arc::new(Mutex::new(underlying.stderr())), }) }) } diff --git a/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml b/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml index 0ad9f0694d89..b116a1192a63 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml +++ b/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml @@ -9,7 +9,7 @@ description = "Library for working with hydro_deploy and hydroflow_plus" [features] default = [] -deploy = [ "hydro_deploy", "async-channel" ] +deploy = [ "hydro_deploy" ] [dependencies] stageleft = { path = "../../stageleft", version = "^0.3.0" } @@ -19,7 +19,6 @@ tokio = { version = "1.16", features = [ "full" ] } serde = { version = "1", features = [ "derive" ] } hydro_deploy = { path = "../core", version = "^0.7.0", optional = true } -async-channel = { version = "1.8.0", optional = true } # added to workaround `cargo smart-release` https://github.com/Byron/cargo-smart-release/issues/16 stageleft_tool = { path = "../../stageleft_tool", version = "^0.2.0", optional = true } diff --git a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs index efd1d115f08b..597eed65f666 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs +++ b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; -use async_channel::Receiver; use hydro_deploy::custom_service::CustomClientPort; use hydro_deploy::hydroflow_crate::ports::{ DemuxSink, HydroflowSink, HydroflowSource, TaggedSource, @@ -55,12 +54,12 @@ pub trait DeployCrateWrapper { } #[allow(async_fn_in_trait)] - async fn stdout(&self) -> Receiver { + async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver { self.underlying().read().await.stdout() } #[allow(async_fn_in_trait)] - async fn stderr(&self) -> Receiver { + async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver { self.underlying().read().await.stderr() } } diff --git a/hydroflow_plus_test/src/cluster/many_to_many.rs b/hydroflow_plus_test/src/cluster/many_to_many.rs index 8a41c594a8f4..fc5b31e279c6 100644 --- a/hydroflow_plus_test/src/cluster/many_to_many.rs +++ b/hydroflow_plus_test/src/cluster/many_to_many.rs @@ -70,7 +70,7 @@ mod tests { deployment.start().await.unwrap(); - for node_stdout in cluster_stdouts { + for mut node_stdout in cluster_stdouts { let mut node_outs = vec![]; for _i in 0..4 { node_outs.push(node_stdout.recv().await.unwrap()); diff --git a/hydroflow_plus_test/src/cluster/simple_cluster.rs b/hydroflow_plus_test/src/cluster/simple_cluster.rs index 5e13532cedd2..8c533c504ee3 100644 --- a/hydroflow_plus_test/src/cluster/simple_cluster.rs +++ b/hydroflow_plus_test/src/cluster/simple_cluster.rs @@ -85,13 +85,13 @@ mod tests { deployment.deploy().await.unwrap(); - let node_stdout = node.stdout().await; + let mut node_stdout = node.stdout().await; let cluster_stdouts = futures::future::join_all(cluster.members.iter().map(|node| node.stdout())).await; deployment.start().await.unwrap(); - for (i, stdout) in cluster_stdouts.into_iter().enumerate() { + for (i, mut stdout) in cluster_stdouts.into_iter().enumerate() { for j in 0..5 { assert_eq!( stdout.recv().await.unwrap(), diff --git a/hydroflow_plus_test/src/distributed/first_ten.rs b/hydroflow_plus_test/src/distributed/first_ten.rs index c7fb7fc57b8c..f4ba715c9bd8 100644 --- a/hydroflow_plus_test/src/distributed/first_ten.rs +++ b/hydroflow_plus_test/src/distributed/first_ten.rs @@ -67,7 +67,7 @@ mod tests { deployment.deploy().await.unwrap(); - let second_node_stdout = second_node.stdout().await; + let mut second_node_stdout = second_node.stdout().await; deployment.start().await.unwrap(); diff --git a/hydroflow_plus_test/src/distributed/networked.rs b/hydroflow_plus_test/src/distributed/networked.rs index 32fe3da05776..8247516225bf 100644 --- a/hydroflow_plus_test/src/distributed/networked.rs +++ b/hydroflow_plus_test/src/distributed/networked.rs @@ -109,10 +109,10 @@ mod tests { deployment.deploy().await.unwrap(); let mut conn_to_zero = port_to_zero.connect().await.into_sink(); - let node_one_stdout = io.process_one.stdout().await; + let mut node_one_stdout = io.process_one.stdout().await; let mut conn_to_cluster = ports_to_cluster[0].connect().await.into_sink(); - let cluster_stdout = io.cluster.members[0].stdout().await; + let mut cluster_stdout = io.cluster.members[0].stdout().await; deployment.start().await.unwrap(); diff --git a/template/hydroflow_plus/flow/Cargo.toml b/template/hydroflow_plus/flow/Cargo.toml index 4d856fab3478..4992d3a5d14f 100644 --- a/template/hydroflow_plus/flow/Cargo.toml +++ b/template/hydroflow_plus/flow/Cargo.toml @@ -23,3 +23,4 @@ hydro_deploy = { git = "{{ hydroflow_git | default: 'https://github.com/hydro-pr hydroflow_plus_cli_integration = { git = "{{ hydroflow_git | default: 'https://github.com/hydro-project/hydroflow.git' }}", branch = "{{ hydroflow_branch | default: 'main' }}", features = [ "deploy", ] } +tokio-stream = "0.1.15" diff --git a/template/hydroflow_plus/flow/src/first_ten_distributed.rs b/template/hydroflow_plus/flow/src/first_ten_distributed.rs index 5ea73aa14b17..616635b520dc 100644 --- a/template/hydroflow_plus/flow/src/first_ten_distributed.rs +++ b/template/hydroflow_plus/flow/src/first_ten_distributed.rs @@ -36,6 +36,7 @@ mod tests { use hydro_deploy::{Deployment, HydroflowCrate}; use hydroflow_plus::futures::StreamExt; use hydroflow_plus_cli_integration::{DeployCrateWrapper, DeployProcessSpec}; + use tokio_stream::wrappers::UnboundedReceiverStream; #[tokio::test] async fn first_ten_distributed() { @@ -56,12 +57,15 @@ mod tests { deployment.deploy().await.unwrap(); - let second_process_stdout = second_process.stdout().await; + let mut second_process_stdout = second_process.stdout().await; deployment.start().await.unwrap(); assert_eq!( - second_process_stdout.take(10).collect::>().await, + UnboundedReceiverStream::new(second_process_stdout) + .take(10) + .collect::>() + .await, vec!["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"] ); }