From e069eb4178af1a2a7ea1f772f3f35c240d178534 Mon Sep 17 00:00:00 2001 From: clabby Date: Sun, 4 Aug 2024 21:41:54 -0400 Subject: [PATCH] chore(common): Remove need for cursors in `NativeIO` (#416) * chore(common): Remove need for cursors in `NativeIO` Removes the need for tracking the cursor within files in `NativeIO` by creating a bidirectional pipe rather than re-using the same files for communication. This approach is much less bug-prone, and also allows for some nice simplification within a hotpath. * fix: use new bidirectional pipe logic in oracle tests fix tests --- Cargo.lock | 24 +++++++-- bin/host/Cargo.toml | 2 +- bin/host/src/lib.rs | 88 +++++++++++++++++++++---------- bin/host/src/types.rs | 16 ------ bin/host/src/util.rs | 71 +++++++++---------------- crates/common/Cargo.toml | 1 - crates/common/src/io.rs | 42 ++------------- crates/common/src/types.rs | 6 +++ crates/preimage/Cargo.toml | 2 +- crates/preimage/src/hint.rs | 60 ++++++++------------- crates/preimage/src/lib.rs | 3 ++ crates/preimage/src/oracle.rs | 59 +++++---------------- crates/preimage/src/test_utils.rs | 29 ++++++++++ 13 files changed, 187 insertions(+), 216 deletions(-) delete mode 100644 bin/host/src/types.rs create mode 100644 crates/preimage/src/test_utils.rs diff --git a/Cargo.lock b/Cargo.lock index eb65a741..3c76441c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2173,7 +2173,6 @@ dependencies = [ "anyhow", "cfg-if", "linked_list_allocator", - "spin", ] [[package]] @@ -2271,10 +2270,10 @@ dependencies = [ "kona-derive", "kona-mpt", "kona-preimage", + "os_pipe", "reqwest", "revm", "serde", - "tempfile", "tokio", "tracing", "tracing-subscriber", @@ -2333,9 +2332,9 @@ dependencies = [ "async-trait", "cfg-if", "kona-common", + "os_pipe", "rkyv", "serde", - "tempfile", "tokio", "tracing", ] @@ -2760,6 +2759,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_pipe" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "overload" version = "0.1.1" @@ -4758,6 +4767,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/bin/host/Cargo.toml b/bin/host/Cargo.toml index de7e6c97..6230ba8d 100644 --- a/bin/host/Cargo.toml +++ b/bin/host/Cargo.toml @@ -38,4 +38,4 @@ clap = { version = "4.5.4", features = ["derive", "env"] } serde = { version = "1.0.198", features = ["derive"] } tracing-subscriber = "0.3.18" command-fds = { version = "0.3", features = ["tokio"] } -tempfile = "3.10" +os_pipe = "1.2.1" diff --git a/bin/host/src/lib.rs b/bin/host/src/lib.rs index eb8bc561..6419106d 100644 --- a/bin/host/src/lib.rs +++ b/bin/host/src/lib.rs @@ -8,7 +8,6 @@ pub mod fetcher; pub mod kv; pub mod preimage; pub mod server; -pub mod types; pub mod util; pub use cli::{init_tracing_subscriber, HostCli}; @@ -24,13 +23,13 @@ use kona_preimage::{HintReader, OracleServer, PipeHandle}; use kv::KeyValueStore; use std::{ io::{stderr, stdin, stdout}, - os::fd::AsFd, + os::fd::{AsFd, AsRawFd}, panic::AssertUnwindSafe, sync::Arc, }; use tokio::{process::Command, sync::RwLock, task}; use tracing::{error, info}; -use types::NativePipeFiles; +use util::Pipe; /// Starts the [PreimageServer] in the primary thread. In this mode, the host program has been /// invoked by the Fault Proof VM and the client program is running in the parent process. @@ -78,7 +77,9 @@ pub async fn start_server(cfg: HostCli) -> Result<()> { /// Starts the [PreimageServer] and the client program in separate threads. The client program is /// ran natively in this mode. pub async fn start_server_and_native_client(cfg: HostCli) -> Result<()> { - let (preimage_pipe, hint_pipe, files) = util::create_native_pipes()?; + let hint_pipe = util::bidirectional_pipe()?; + let preimage_pipe = util::bidirectional_pipe()?; + let kv_store = cfg.construct_kv_store(); let fetcher = if !cfg.is_offline() { @@ -106,20 +107,24 @@ pub async fn start_server_and_native_client(cfg: HostCli) -> Result<()> { }; // Create the server and start it. - let server_task = - task::spawn(start_native_preimage_server(kv_store, fetcher, preimage_pipe, hint_pipe)); + let server_task = task::spawn(start_native_preimage_server( + kv_store, + fetcher, + hint_pipe.host, + preimage_pipe.host, + )); // Start the client program in a separate child process. - let program_task = task::spawn(start_native_client_program(cfg, files)); + let program_task = + task::spawn(start_native_client_program(cfg, hint_pipe.client, preimage_pipe.client)); // Execute both tasks and wait for them to complete. info!("Starting preimage server and client program."); - tokio::try_join!( - util::flatten_join_result(server_task), - util::flatten_join_result(program_task) - ) - .map_err(|e| anyhow!(e))?; - info!("Preimage server and client program have joined."); + tokio::select!( + r = util::flatten_join_result(server_task) => r?, + r = util::flatten_join_result(program_task) => r? + ); + info!(target: "kona_host", "Preimage server and client program have exited."); Ok(()) } @@ -129,14 +134,20 @@ pub async fn start_server_and_native_client(cfg: HostCli) -> Result<()> { pub async fn start_native_preimage_server( kv_store: Arc>, fetcher: Option>>>, - preimage_pipe: PipeHandle, - hint_pipe: PipeHandle, + hint_pipe: Pipe, + preimage_pipe: Pipe, ) -> Result<()> where KV: KeyValueStore + Send + Sync + ?Sized + 'static, { - let oracle_server = OracleServer::new(preimage_pipe); - let hint_reader = HintReader::new(hint_pipe); + let hint_reader = HintReader::new(PipeHandle::new( + FileDescriptor::Wildcard(hint_pipe.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(hint_pipe.write.as_raw_fd() as usize), + )); + let oracle_server = OracleServer::new(PipeHandle::new( + FileDescriptor::Wildcard(preimage_pipe.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(preimage_pipe.write.as_raw_fd() as usize), + )); let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher); AssertUnwindSafe(server.start()) @@ -151,7 +162,6 @@ where anyhow!("Preimage server exited with an error: {:?}", e) })?; - info!("Preimage server has exited."); Ok(()) } @@ -167,20 +177,45 @@ where /// ## Returns /// - `Ok(())` if the client program exits successfully. /// - `Err(_)` if the client program exits with a non-zero status. -pub async fn start_native_client_program(cfg: HostCli, files: NativePipeFiles) -> Result<()> { +pub async fn start_native_client_program( + cfg: HostCli, + hint_pipe: Pipe, + preimage_pipe: Pipe, +) -> Result<()> { // Map the file descriptors to the standard streams and the preimage oracle and hint // reader's special file descriptors. let mut command = Command::new(cfg.exec.ok_or_else(|| anyhow!("No client program binary path specified."))?); command .fd_mappings(vec![ - FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 }, - FdMapping { parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(), child_fd: 1 }, - FdMapping { parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(), child_fd: 2 }, - FdMapping { parent_fd: files.hint_writ.into(), child_fd: 3 }, - FdMapping { parent_fd: files.hint_read.into(), child_fd: 4 }, - FdMapping { parent_fd: files.preimage_writ.into(), child_fd: 5 }, - FdMapping { parent_fd: files.preimage_read.into(), child_fd: 6 }, + FdMapping { + parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), + child_fd: FileDescriptor::StdIn.into(), + }, + FdMapping { + parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(), + child_fd: FileDescriptor::StdOut.into(), + }, + FdMapping { + parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(), + child_fd: FileDescriptor::StdErr.into(), + }, + FdMapping { + parent_fd: hint_pipe.read.into(), + child_fd: FileDescriptor::HintRead.into(), + }, + FdMapping { + parent_fd: hint_pipe.write.into(), + child_fd: FileDescriptor::HintWrite.into(), + }, + FdMapping { + parent_fd: preimage_pipe.read.into(), + child_fd: FileDescriptor::PreimageRead.into(), + }, + FdMapping { + parent_fd: preimage_pipe.write.into(), + child_fd: FileDescriptor::PreimageWrite.into(), + }, ]) .expect("No errors may occur when mapping file descriptors."); @@ -198,6 +233,5 @@ pub async fn start_native_client_program(cfg: HostCli, files: NativePipeFiles) - return Err(anyhow!("Client program exited with a non-zero status.")); } - info!(target: "client_program", "Client program has exited."); Ok(()) } diff --git a/bin/host/src/types.rs b/bin/host/src/types.rs deleted file mode 100644 index aafd0d86..00000000 --- a/bin/host/src/types.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! This module contains the types used in the host program. - -use std::fs::File; - -/// Represents the files that are used to communicate with the native client. -#[derive(Debug)] -pub struct NativePipeFiles { - /// The file that the preimage oracle reads from. - pub preimage_read: File, - /// The file that the preimage oracle writes to. - pub preimage_writ: File, - /// The file that the hint reader reads from. - pub hint_read: File, - /// The file that the hint reader writes to. - pub hint_writ: File, -} diff --git a/bin/host/src/util.rs b/bin/host/src/util.rs index ed62c0f4..355a93fa 100644 --- a/bin/host/src/util.rs +++ b/bin/host/src/util.rs @@ -1,18 +1,40 @@ //! Contains utility functions and helpers for the host program. -use crate::types::NativePipeFiles; use alloy_primitives::{hex, Bytes}; use alloy_provider::ReqwestProvider; use alloy_rpc_client::RpcClient; use alloy_transport_http::Http; use anyhow::{anyhow, Result}; use kona_client::HintType; -use kona_common::FileDescriptor; -use kona_preimage::PipeHandle; +use os_pipe::{PipeReader, PipeWriter}; use reqwest::Client; -use std::{fs::File, os::fd::AsRawFd}; use tokio::task::JoinHandle; +/// A bidirectional pipe, with a client and host end. +#[derive(Debug)] +pub struct BidirectionalPipe { + pub(crate) client: Pipe, + pub(crate) host: Pipe, +} + +/// A single-direction pipe, with a read and write end. +#[derive(Debug)] +pub struct Pipe { + pub(crate) read: PipeReader, + pub(crate) write: PipeWriter, +} + +/// Creates a [BidirectionalPipe] instance. +pub fn bidirectional_pipe() -> Result { + let (ar, bw) = os_pipe::pipe().map_err(|e| anyhow!("Failed to create pipe: {e}"))?; + let (br, aw) = os_pipe::pipe().map_err(|e| anyhow!("Failed to create pipe: {e}"))?; + + Ok(BidirectionalPipe { + client: Pipe { read: ar, write: aw }, + host: Pipe { read: br, write: bw }, + }) +} + /// Parses a hint from a string. /// /// Hints are of the format ` `, where `` is a string that @@ -31,47 +53,6 @@ pub(crate) fn parse_hint(s: &str) -> Result<(HintType, Bytes)> { Ok((hint_type, hint_data)) } -/// Creates two temporary files that are connected by a pipe. -pub(crate) fn create_temp_files() -> Result<(File, File)> { - let (read, write) = ( - tempfile::tempfile().map_err(|e| anyhow!(e))?, - tempfile::tempfile().map_err(|e| anyhow!(e))?, - ); - Ok((read, write)) -} - -/// Create a pair of pipes for the preimage oracle and hint reader. Also returns the files that are -/// used to create the pipes, which must be kept alive until the pipes are closed. -pub(crate) fn create_native_pipes() -> Result<(PipeHandle, PipeHandle, NativePipeFiles)> { - let (po_reader, po_writer) = create_temp_files()?; - let (hint_reader, hint_writer) = create_temp_files()?; - let preimage_pipe = PipeHandle::new( - FileDescriptor::Wildcard( - po_reader.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?, - ), - FileDescriptor::Wildcard( - po_writer.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?, - ), - ); - let hint_pipe = PipeHandle::new( - FileDescriptor::Wildcard( - hint_reader.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?, - ), - FileDescriptor::Wildcard( - hint_writer.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?, - ), - ); - - let files = NativePipeFiles { - preimage_read: po_reader, - preimage_writ: po_writer, - hint_read: hint_reader, - hint_writ: hint_writer, - }; - - Ok((preimage_pipe, hint_pipe, files)) -} - /// Returns an HTTP provider for the given URL. pub(crate) fn http_provider(url: &str) -> ReqwestProvider { let url = url.parse().unwrap(); diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index e69d1455..892028d4 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -12,7 +12,6 @@ homepage.workspace = true # workspace anyhow.workspace = true cfg-if.workspace = true -spin.workspace = true # external linked_list_allocator = "0.10.5" diff --git a/crates/common/src/io.rs b/crates/common/src/io.rs index 19481227..aad1ce13 100644 --- a/crates/common/src/io.rs +++ b/crates/common/src/io.rs @@ -63,19 +63,12 @@ mod native_io { use crate::{io::FileDescriptor, traits::BasicKernelInterface}; use anyhow::{anyhow, Result}; - use spin::{Lazy, Mutex}; use std::{ - collections::HashMap, fs::File, - io::{Read, Seek, SeekFrom, Write}, + io::{Read, Write}, os::fd::FromRawFd, }; - static READ_CURSOR: Lazy>> = - Lazy::new(|| Mutex::new(HashMap::new())); - static WRITE_CURSOR: Lazy>> = - Lazy::new(|| Mutex::new(HashMap::new())); - /// Mock IO implementation for native tests. #[derive(Debug)] pub struct NativeIO; @@ -85,18 +78,9 @@ mod native_io { let raw_fd: usize = fd.into(); let mut file = unsafe { File::from_raw_fd(raw_fd as i32) }; - let mut cursor_entry_lock = WRITE_CURSOR.lock(); - let cursor_entry = cursor_entry_lock.entry(raw_fd).or_insert(0); - - // Reset the cursor back to before the data we just wrote for the reader's consumption. - // This is a best-effort operation, and may not work for all file descriptors. - let _ = file.seek(SeekFrom::Start(*cursor_entry as u64)); - file.write_all(buf) .map_err(|e| anyhow!("Error writing to buffer to file descriptor: {e}"))?; - *cursor_entry += buf.len(); - std::mem::forget(file); Ok(buf.len()) @@ -106,28 +90,8 @@ mod native_io { let raw_fd: usize = fd.into(); let mut file = unsafe { File::from_raw_fd(raw_fd as i32) }; - // If the file is seekable, we need to seek to the cursor position before reading. - // If not, it is likely a pipe or a socket. Read as normal. - let n = if file.stream_position().is_ok() { - let mut cursor_entry_lock = READ_CURSOR.lock(); - let cursor_entry = cursor_entry_lock.entry(raw_fd).or_insert(0); - - file.seek(SeekFrom::Start(*cursor_entry as u64)).map_err(|e| { - anyhow!( - "Error seeking to cursor position {cursor_entry} in file descriptor: {e}" - ) - })?; - - let n = file - .read(buf) - .map_err(|e| anyhow!("Error reading from file descriptor: {e}"))?; - - *cursor_entry += n; - - n - } else { - file.read(buf).map_err(|e| anyhow!("Error reading from file descriptor: {e}"))? - }; + let n = + file.read(buf).map_err(|e| anyhow!("Error reading from file descriptor: {e}"))?; std::mem::forget(file); diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index 6a5e4315..dc1bd9ac 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -35,3 +35,9 @@ impl From for usize { } } } + +impl From for i32 { + fn from(fd: FileDescriptor) -> Self { + usize::from(fd) as i32 + } +} diff --git a/crates/preimage/Cargo.toml b/crates/preimage/Cargo.toml index ce6f745a..b36d0be2 100644 --- a/crates/preimage/Cargo.toml +++ b/crates/preimage/Cargo.toml @@ -23,7 +23,7 @@ serde = { version = "1.0.203", features = ["derive"], optional = true } [dev-dependencies] tokio = { version = "1.38.0", features = ["full"] } -tempfile = "3.10.1" +os_pipe = "1.2.1" [features] default = [] diff --git a/crates/preimage/src/hint.rs b/crates/preimage/src/hint.rs index ebafaa59..e996393e 100644 --- a/crates/preimage/src/hint.rs +++ b/crates/preimage/src/hint.rs @@ -105,39 +105,12 @@ mod test { extern crate std; use super::*; + use crate::test_utils::bidirectional_pipe; use alloc::{sync::Arc, vec::Vec}; use kona_common::FileDescriptor; - use std::{fs::File, os::fd::AsRawFd}; - use tempfile::tempfile; + use std::os::fd::AsRawFd; use tokio::sync::Mutex; - /// Test struct containing the [HintReader] and [HintWriter]. The [File]s are stored in this - /// struct so that they are not dropped until the end of the test. - #[derive(Debug)] - struct ClientAndHost { - hint_writer: HintWriter, - hint_reader: HintReader, - _read_file: File, - _write_file: File, - } - - /// Helper for creating a new [HintReader] and [HintWriter] for testing. The file channel is - /// over two temporary files. - fn client_and_host() -> ClientAndHost { - let (read_file, write_file) = (tempfile().unwrap(), tempfile().unwrap()); - let (read_fd, write_fd) = ( - FileDescriptor::Wildcard(read_file.as_raw_fd().try_into().unwrap()), - FileDescriptor::Wildcard(write_file.as_raw_fd().try_into().unwrap()), - ); - let client_handle = PipeHandle::new(read_fd, write_fd); - let host_handle = PipeHandle::new(write_fd, read_fd); - - let hint_writer = HintWriter::new(client_handle); - let hint_reader = HintReader::new(host_handle); - - ClientAndHost { hint_writer, hint_reader, _read_file: read_file, _write_file: write_file } - } - struct TestRouter { incoming_hints: Arc>>, } @@ -154,24 +127,35 @@ mod test { async fn test_hint_client_and_host() { const MOCK_DATA: &str = "test-hint 0xfacade"; - let sys = client_and_host(); - let (hint_writer, hint_reader) = (sys.hint_writer, sys.hint_reader); let incoming_hints = Arc::new(Mutex::new(Vec::new())); + let hint_pipe = bidirectional_pipe().unwrap(); + + let client = tokio::task::spawn(async move { + let hint_writer = HintWriter::new(PipeHandle::new( + FileDescriptor::Wildcard(hint_pipe.client.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(hint_pipe.client.write.as_raw_fd() as usize), + )); - let client = tokio::task::spawn(async move { hint_writer.write(MOCK_DATA).await }); + hint_writer.write(MOCK_DATA).await + }); let host = tokio::task::spawn({ let incoming_hints_ref = Arc::clone(&incoming_hints); async move { let router = TestRouter { incoming_hints: incoming_hints_ref }; - hint_reader.next_hint(&router).await.unwrap(); - let mut hints = incoming_hints.lock().await; - assert_eq!(hints.len(), 1); - hints.remove(0) + let hint_reader = HintReader::new(PipeHandle::new( + FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize), + )); + hint_reader.next_hint(&router).await.unwrap(); } }); - let (_, h) = tokio::join!(client, host); - assert_eq!(h.unwrap(), MOCK_DATA); + let _ = tokio::join!(client, host); + let mut hints = incoming_hints.lock().await; + + assert_eq!(hints.len(), 1); + let h = hints.remove(0); + assert_eq!(h, MOCK_DATA); } } diff --git a/crates/preimage/src/lib.rs b/crates/preimage/src/lib.rs index 77e9ff5a..781669b7 100644 --- a/crates/preimage/src/lib.rs +++ b/crates/preimage/src/lib.rs @@ -23,3 +23,6 @@ pub use traits::{ CommsClient, HintReaderServer, HintRouter, HintWriterClient, PreimageFetcher, PreimageOracleClient, PreimageOracleServer, }; + +#[cfg(test)] +mod test_utils; diff --git a/crates/preimage/src/oracle.rs b/crates/preimage/src/oracle.rs index c0fb485e..f8f9d21b 100644 --- a/crates/preimage/src/oracle.rs +++ b/crates/preimage/src/oracle.rs @@ -132,48 +132,14 @@ mod test { extern crate std; use super::*; - use crate::PreimageKeyType; + use crate::{test_utils::bidirectional_pipe, PreimageKeyType}; use alloc::sync::Arc; use alloy_primitives::keccak256; use anyhow::anyhow; use kona_common::FileDescriptor; - use std::{collections::HashMap, fs::File, os::fd::AsRawFd}; - use tempfile::tempfile; + use std::{collections::HashMap, os::fd::AsRawFd}; use tokio::sync::Mutex; - /// Test struct containing the [OracleReader] and a [OracleServer] for the host, plus the open - /// [File]s. The [File]s are stored in this struct so that they are not dropped until the - /// end of the test. - #[derive(Debug)] - struct ClientAndHost { - oracle_reader: OracleReader, - oracle_server: OracleServer, - _read_file: File, - _write_file: File, - } - - /// Helper for creating a new [OracleReader] and [OracleServer] for testing. The file channel is - /// over two temporary files. - fn client_and_host() -> ClientAndHost { - let (read_file, write_file) = (tempfile().unwrap(), tempfile().unwrap()); - let (read_fd, write_fd) = ( - FileDescriptor::Wildcard(read_file.as_raw_fd().try_into().unwrap()), - FileDescriptor::Wildcard(write_file.as_raw_fd().try_into().unwrap()), - ); - let client_handle = PipeHandle::new(read_fd, write_fd); - let host_handle = PipeHandle::new(write_fd, read_fd); - - let oracle_reader = OracleReader::new(client_handle); - let oracle_server = OracleServer::new(host_handle); - - ClientAndHost { - oracle_reader, - oracle_server, - _read_file: read_file, - _write_file: write_file, - } - } - struct TestFetcher { preimages: Arc>>>, } @@ -202,20 +168,23 @@ mod test { Arc::new(Mutex::new(preimages)) }; - let sys = client_and_host(); - let (oracle_reader, oracle_server) = (sys.oracle_reader, sys.oracle_server); + let preimage_pipe = bidirectional_pipe().unwrap(); let client = tokio::task::spawn(async move { + let oracle_reader = OracleReader::new(PipeHandle::new( + FileDescriptor::Wildcard(preimage_pipe.client.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(preimage_pipe.client.write.as_raw_fd() as usize), + )); let contents_a = oracle_reader.get(key_a).await.unwrap(); let contents_b = oracle_reader.get(key_b).await.unwrap(); - // Drop the file descriptors to close the pipe, stopping the host's blocking loop on - // waiting for client requests. - drop(sys); - (contents_a, contents_b) }); - let host = tokio::task::spawn(async move { + tokio::task::spawn(async move { + let oracle_server = OracleServer::new(PipeHandle::new( + FileDescriptor::Wildcard(preimage_pipe.host.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(preimage_pipe.host.write.as_raw_fd() as usize), + )); let test_fetcher = TestFetcher { preimages: Arc::clone(&preimages) }; loop { @@ -225,8 +194,8 @@ mod test { } }); - let (client, _) = tokio::join!(client, host); - let (contents_a, contents_b) = client.unwrap(); + let (c,) = tokio::join!(client); + let (contents_a, contents_b) = c.unwrap(); assert_eq!(contents_a, MOCK_DATA_A); assert_eq!(contents_b, MOCK_DATA_B); } diff --git a/crates/preimage/src/test_utils.rs b/crates/preimage/src/test_utils.rs new file mode 100644 index 00000000..c11d48bd --- /dev/null +++ b/crates/preimage/src/test_utils.rs @@ -0,0 +1,29 @@ +//! Test utilities for the `kona-preimage` crate. + +use anyhow::{anyhow, Result}; +use os_pipe::{PipeReader, PipeWriter}; + +/// A bidirectional pipe, with a client and host end. +#[derive(Debug)] +pub(crate) struct BidirectionalPipe { + pub(crate) client: Pipe, + pub(crate) host: Pipe, +} + +/// A single-direction pipe, with a read and write end. +#[derive(Debug)] +pub(crate) struct Pipe { + pub(crate) read: PipeReader, + pub(crate) write: PipeWriter, +} + +/// Creates a [BidirectionalPipe] instance. +pub(crate) fn bidirectional_pipe() -> Result { + let (ar, bw) = os_pipe::pipe().map_err(|e| anyhow!("Failed to create pipe: {e}"))?; + let (br, aw) = os_pipe::pipe().map_err(|e| anyhow!("Failed to create pipe: {e}"))?; + + Ok(BidirectionalPipe { + client: Pipe { read: ar, write: aw }, + host: Pipe { read: br, write: bw }, + }) +}