Skip to content

Commit

Permalink
chore(common): Remove need for cursors in NativeIO (#416)
Browse files Browse the repository at this point in the history
* chore(common): Remove need for cursors in `NativeIO`

Removes the need for tracking the cursor within files in `NativeIO` by
creating a bidirectional pipe rather than re-using the same files for
communication. This approach is much less bug-prone, and also allows for
some nice simplification within a hotpath.

* fix: use new bidirectional pipe logic in oracle tests

fix tests
  • Loading branch information
clabby authored Aug 5, 2024
1 parent b3a8d11 commit e069eb4
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 216 deletions.
24 changes: 21 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ clap = { version = "4.5.4", features = ["derive", "env"] }
serde = { version = "1.0.198", features = ["derive"] }
tracing-subscriber = "0.3.18"
command-fds = { version = "0.3", features = ["tokio"] }
tempfile = "3.10"
os_pipe = "1.2.1"
88 changes: 61 additions & 27 deletions bin/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod fetcher;
pub mod kv;
pub mod preimage;
pub mod server;
pub mod types;
pub mod util;

pub use cli::{init_tracing_subscriber, HostCli};
Expand All @@ -24,13 +23,13 @@ use kona_preimage::{HintReader, OracleServer, PipeHandle};
use kv::KeyValueStore;
use std::{
io::{stderr, stdin, stdout},
os::fd::AsFd,
os::fd::{AsFd, AsRawFd},
panic::AssertUnwindSafe,
sync::Arc,
};
use tokio::{process::Command, sync::RwLock, task};
use tracing::{error, info};
use types::NativePipeFiles;
use util::Pipe;

/// Starts the [PreimageServer] in the primary thread. In this mode, the host program has been
/// invoked by the Fault Proof VM and the client program is running in the parent process.
Expand Down Expand Up @@ -78,7 +77,9 @@ pub async fn start_server(cfg: HostCli) -> Result<()> {
/// Starts the [PreimageServer] and the client program in separate threads. The client program is
/// ran natively in this mode.
pub async fn start_server_and_native_client(cfg: HostCli) -> Result<()> {
let (preimage_pipe, hint_pipe, files) = util::create_native_pipes()?;
let hint_pipe = util::bidirectional_pipe()?;
let preimage_pipe = util::bidirectional_pipe()?;

let kv_store = cfg.construct_kv_store();

let fetcher = if !cfg.is_offline() {
Expand Down Expand Up @@ -106,20 +107,24 @@ pub async fn start_server_and_native_client(cfg: HostCli) -> Result<()> {
};

// Create the server and start it.
let server_task =
task::spawn(start_native_preimage_server(kv_store, fetcher, preimage_pipe, hint_pipe));
let server_task = task::spawn(start_native_preimage_server(
kv_store,
fetcher,
hint_pipe.host,
preimage_pipe.host,
));

// Start the client program in a separate child process.
let program_task = task::spawn(start_native_client_program(cfg, files));
let program_task =
task::spawn(start_native_client_program(cfg, hint_pipe.client, preimage_pipe.client));

// Execute both tasks and wait for them to complete.
info!("Starting preimage server and client program.");
tokio::try_join!(
util::flatten_join_result(server_task),
util::flatten_join_result(program_task)
)
.map_err(|e| anyhow!(e))?;
info!("Preimage server and client program have joined.");
tokio::select!(
r = util::flatten_join_result(server_task) => r?,
r = util::flatten_join_result(program_task) => r?
);
info!(target: "kona_host", "Preimage server and client program have exited.");

Ok(())
}
Expand All @@ -129,14 +134,20 @@ pub async fn start_server_and_native_client(cfg: HostCli) -> Result<()> {
pub async fn start_native_preimage_server<KV>(
kv_store: Arc<RwLock<KV>>,
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
preimage_pipe: PipeHandle,
hint_pipe: PipeHandle,
hint_pipe: Pipe,
preimage_pipe: Pipe,
) -> Result<()>
where
KV: KeyValueStore + Send + Sync + ?Sized + 'static,
{
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);
let hint_reader = HintReader::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.write.as_raw_fd() as usize),
));
let oracle_server = OracleServer::new(PipeHandle::new(
FileDescriptor::Wildcard(preimage_pipe.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(preimage_pipe.write.as_raw_fd() as usize),
));

let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);
AssertUnwindSafe(server.start())
Expand All @@ -151,7 +162,6 @@ where
anyhow!("Preimage server exited with an error: {:?}", e)
})?;

info!("Preimage server has exited.");
Ok(())
}

Expand All @@ -167,20 +177,45 @@ where
/// ## Returns
/// - `Ok(())` if the client program exits successfully.
/// - `Err(_)` if the client program exits with a non-zero status.
pub async fn start_native_client_program(cfg: HostCli, files: NativePipeFiles) -> Result<()> {
pub async fn start_native_client_program(
cfg: HostCli,
hint_pipe: Pipe,
preimage_pipe: Pipe,
) -> Result<()> {
// Map the file descriptors to the standard streams and the preimage oracle and hint
// reader's special file descriptors.
let mut command =
Command::new(cfg.exec.ok_or_else(|| anyhow!("No client program binary path specified."))?);
command
.fd_mappings(vec![
FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 },
FdMapping { parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(), child_fd: 1 },
FdMapping { parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(), child_fd: 2 },
FdMapping { parent_fd: files.hint_writ.into(), child_fd: 3 },
FdMapping { parent_fd: files.hint_read.into(), child_fd: 4 },
FdMapping { parent_fd: files.preimage_writ.into(), child_fd: 5 },
FdMapping { parent_fd: files.preimage_read.into(), child_fd: 6 },
FdMapping {
parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(),
child_fd: FileDescriptor::StdIn.into(),
},
FdMapping {
parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(),
child_fd: FileDescriptor::StdOut.into(),
},
FdMapping {
parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(),
child_fd: FileDescriptor::StdErr.into(),
},
FdMapping {
parent_fd: hint_pipe.read.into(),
child_fd: FileDescriptor::HintRead.into(),
},
FdMapping {
parent_fd: hint_pipe.write.into(),
child_fd: FileDescriptor::HintWrite.into(),
},
FdMapping {
parent_fd: preimage_pipe.read.into(),
child_fd: FileDescriptor::PreimageRead.into(),
},
FdMapping {
parent_fd: preimage_pipe.write.into(),
child_fd: FileDescriptor::PreimageWrite.into(),
},
])
.expect("No errors may occur when mapping file descriptors.");

Expand All @@ -198,6 +233,5 @@ pub async fn start_native_client_program(cfg: HostCli, files: NativePipeFiles) -
return Err(anyhow!("Client program exited with a non-zero status."));
}

info!(target: "client_program", "Client program has exited.");
Ok(())
}
16 changes: 0 additions & 16 deletions bin/host/src/types.rs

This file was deleted.

71 changes: 26 additions & 45 deletions bin/host/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
//! Contains utility functions and helpers for the host program.

use crate::types::NativePipeFiles;
use alloy_primitives::{hex, Bytes};
use alloy_provider::ReqwestProvider;
use alloy_rpc_client::RpcClient;
use alloy_transport_http::Http;
use anyhow::{anyhow, Result};
use kona_client::HintType;
use kona_common::FileDescriptor;
use kona_preimage::PipeHandle;
use os_pipe::{PipeReader, PipeWriter};
use reqwest::Client;
use std::{fs::File, os::fd::AsRawFd};
use tokio::task::JoinHandle;

/// A bidirectional pipe, with a client and host end.
#[derive(Debug)]
pub struct BidirectionalPipe {
pub(crate) client: Pipe,
pub(crate) host: Pipe,
}

/// A single-direction pipe, with a read and write end.
#[derive(Debug)]
pub struct Pipe {
pub(crate) read: PipeReader,
pub(crate) write: PipeWriter,
}

/// Creates a [BidirectionalPipe] instance.
pub fn bidirectional_pipe() -> Result<BidirectionalPipe> {
let (ar, bw) = os_pipe::pipe().map_err(|e| anyhow!("Failed to create pipe: {e}"))?;
let (br, aw) = os_pipe::pipe().map_err(|e| anyhow!("Failed to create pipe: {e}"))?;

Ok(BidirectionalPipe {
client: Pipe { read: ar, write: aw },
host: Pipe { read: br, write: bw },
})
}

/// Parses a hint from a string.
///
/// Hints are of the format `<hint_type> <hint_data>`, where `<hint_type>` is a string that
Expand All @@ -31,47 +53,6 @@ pub(crate) fn parse_hint(s: &str) -> Result<(HintType, Bytes)> {
Ok((hint_type, hint_data))
}

/// Creates two temporary files that are connected by a pipe.
pub(crate) fn create_temp_files() -> Result<(File, File)> {
let (read, write) = (
tempfile::tempfile().map_err(|e| anyhow!(e))?,
tempfile::tempfile().map_err(|e| anyhow!(e))?,
);
Ok((read, write))
}

/// Create a pair of pipes for the preimage oracle and hint reader. Also returns the files that are
/// used to create the pipes, which must be kept alive until the pipes are closed.
pub(crate) fn create_native_pipes() -> Result<(PipeHandle, PipeHandle, NativePipeFiles)> {
let (po_reader, po_writer) = create_temp_files()?;
let (hint_reader, hint_writer) = create_temp_files()?;
let preimage_pipe = PipeHandle::new(
FileDescriptor::Wildcard(
po_reader.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?,
),
FileDescriptor::Wildcard(
po_writer.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?,
),
);
let hint_pipe = PipeHandle::new(
FileDescriptor::Wildcard(
hint_reader.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?,
),
FileDescriptor::Wildcard(
hint_writer.as_raw_fd().try_into().map_err(|e| anyhow!("Failed to get raw FD: {e}"))?,
),
);

let files = NativePipeFiles {
preimage_read: po_reader,
preimage_writ: po_writer,
hint_read: hint_reader,
hint_writ: hint_writer,
};

Ok((preimage_pipe, hint_pipe, files))
}

/// Returns an HTTP provider for the given URL.
pub(crate) fn http_provider(url: &str) -> ReqwestProvider {
let url = url.parse().unwrap();
Expand Down
1 change: 0 additions & 1 deletion crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ homepage.workspace = true
# workspace
anyhow.workspace = true
cfg-if.workspace = true
spin.workspace = true

# external
linked_list_allocator = "0.10.5"
Loading

0 comments on commit e069eb4

Please sign in to comment.