Skip to content

Commit

Permalink
fix(preimage): Improve error differentiation in preimage servers (#535)
Browse files Browse the repository at this point in the history
* chore(preimage): Improve error differentiation in preimage servers

* test unblock on failure for hintrouter

* assert err

* fix bad utf8 unblocking
  • Loading branch information
clabby authored Sep 18, 2024
1 parent 83becd6 commit 5c2d272
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 39 deletions.
48 changes: 30 additions & 18 deletions bin/host/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use crate::{
},
};
use anyhow::{anyhow, Result};
use kona_preimage::{HintReaderServer, HintRouter, PreimageFetcher, PreimageOracleServer};
use kona_preimage::{
HintReaderServer, HintRouter, PreimageFetcher, PreimageOracleServer, PreimageServerError,
};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::error;

/// The [PreimageServer] is responsible for waiting for incoming preimage requests and
/// serving them to the client.
Expand Down Expand Up @@ -67,8 +70,6 @@ where
s = server => s.map_err(|e| anyhow!(e))?,
h = hint_router => h.map_err(|e| anyhow!(e))?,
}

Ok(())
}

/// Starts the oracle server, which waits for incoming preimage requests and serves them to the
Expand All @@ -77,49 +78,60 @@ where
kv_store: Arc<RwLock<KV>>,
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
oracle_server: P,
) {
) -> Result<()> {
#[inline(always)]
async fn do_loop<F, P>(fetcher: &F, server: &P)
async fn do_loop<F, P>(fetcher: &F, server: &P) -> Result<()>
where
F: PreimageFetcher + Send + Sync,
P: PreimageOracleServer,
{
loop {
// Break the loop on any error. An error in this path indicates a closed pipe.
if server.next_preimage_request(fetcher).await.is_err() {
break;
match server.next_preimage_request(fetcher).await {
Ok(_) => (),
Err(PreimageServerError::BrokenPipe(_)) => return Ok(()),
Err(PreimageServerError::Other(e)) => {
error!("Failed to serve preimage request: {e:?}");
return Err(e);
}
}
}
}

if let Some(fetcher) = fetcher.as_ref() {
do_loop(&OnlinePreimageFetcher::new(Arc::clone(fetcher)), &oracle_server).await;
do_loop(&OnlinePreimageFetcher::new(Arc::clone(fetcher)), &oracle_server).await
} else {
do_loop(&OfflinePreimageFetcher::new(Arc::clone(&kv_store)), &oracle_server).await;
};
do_loop(&OfflinePreimageFetcher::new(Arc::clone(&kv_store)), &oracle_server).await
}
}

/// Starts the hint router, which waits for incoming hints and routes them to the appropriate
/// handler.
async fn start_hint_router(hint_reader: H, fetcher: Option<Arc<RwLock<Fetcher<KV>>>>) {
async fn start_hint_router(
hint_reader: H,
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
) -> Result<()> {
#[inline(always)]
async fn do_loop<R, H>(router: &R, server: &H)
async fn do_loop<R, H>(router: &R, server: &H) -> Result<()>
where
R: HintRouter + Send + Sync,
H: HintReaderServer,
{
loop {
// Break the loop on any error. An error in this path indicates a closed pipe.
if server.next_hint(router).await.is_err() {
break;
match server.next_hint(router).await {
Ok(_) => (),
Err(PreimageServerError::BrokenPipe(_)) => return Ok(()),
Err(PreimageServerError::Other(e)) => {
error!("Failed to serve preimage request: {e:?}");
return Err(e);
}
}
}
}

if let Some(fetcher) = fetcher {
do_loop(&OnlineHintRouter::new(Arc::clone(&fetcher)), &hint_reader).await;
do_loop(&OnlineHintRouter::new(Arc::clone(&fetcher)), &hint_reader).await
} else {
do_loop(&OfflineHintRouter, &hint_reader).await;
do_loop(&OfflineHintRouter, &hint_reader).await
}
}
}
108 changes: 98 additions & 10 deletions crates/preimage/src/hint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
traits::{HintRouter, HintWriterClient},
traits::{HintRouter, HintWriterClient, PreimageServerError},
HintReaderServer, PipeHandle,
};
use alloc::{boxed::Box, string::String, vec};
Expand Down Expand Up @@ -65,34 +65,46 @@ impl HintReader {

#[async_trait]
impl HintReaderServer for HintReader {
async fn next_hint<R>(&self, hint_router: &R) -> Result<()>
async fn next_hint<R>(&self, hint_router: &R) -> Result<(), PreimageServerError>
where
R: HintRouter + Send + Sync,
{
// Read the length of the raw hint payload.
let mut len_buf = [0u8; 4];
self.pipe_handle.read_exact(&mut len_buf).await?;
self.pipe_handle.read_exact(&mut len_buf).await.map_err(PreimageServerError::BrokenPipe)?;
let len = u32::from_be_bytes(len_buf);

// Read the raw hint payload.
let mut raw_payload = vec![0u8; len as usize];
self.pipe_handle.read_exact(raw_payload.as_mut_slice()).await?;
let payload = String::from_utf8(raw_payload)
.map_err(|e| anyhow::anyhow!("Failed to decode hint payload: {e}"))?;
self.pipe_handle
.read_exact(raw_payload.as_mut_slice())
.await
.map_err(PreimageServerError::BrokenPipe)?;
let payload = match String::from_utf8(raw_payload) {
Ok(p) => p,
Err(e) => {
// Write back on error to prevent blocking the client.
self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?;

return Err(PreimageServerError::Other(anyhow::anyhow!(
"Failed to decode hint payload: {e}"
)));
}
};

trace!(target: "hint_reader", "Successfully read hint: \"{payload}\"");

// Route the hint
if let Err(e) = hint_router.route_hint(payload).await {
// Write back on error to prevent blocking the client.
self.pipe_handle.write(&[0x00]).await?;
self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?;

error!("Failed to route hint: {e}");
anyhow::bail!("Failed to rout hint: {e}");
return Err(PreimageServerError::Other(e));
}

// Write back an acknowledgement to the client to unblock their process.
self.pipe_handle.write(&[0x00]).await?;
self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?;

trace!(target: "hint_reader", "Successfully routed and acknowledged hint");

Expand All @@ -106,7 +118,7 @@ mod test {

use super::*;
use crate::test_utils::bidirectional_pipe;
use alloc::{sync::Arc, vec::Vec};
use alloc::{string::ToString, sync::Arc, vec::Vec};
use kona_common::FileDescriptor;
use std::os::fd::AsRawFd;
use tokio::sync::Mutex;
Expand All @@ -123,6 +135,82 @@ mod test {
}
}

struct TestFailRouter;

#[async_trait]
impl HintRouter for TestFailRouter {
async fn route_hint(&self, _hint: String) -> Result<()> {
anyhow::bail!("Failed to route hint")
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_unblock_on_bad_utf8() {
let mock_data = [0xf0, 0x90, 0x28, 0xbc];

let hint_pipe = bidirectional_pipe().unwrap();

let client = tokio::task::spawn(async move {
let hint_writer = HintWriter::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.client.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.client.write.as_raw_fd() as usize),
));

#[allow(invalid_from_utf8_unchecked)]
hint_writer.write(unsafe { alloc::str::from_utf8_unchecked(&mock_data) }).await
});
let host = tokio::task::spawn(async move {
let router = TestRouter { incoming_hints: Default::default() };

let hint_reader = HintReader::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize),
));
hint_reader.next_hint(&router).await
});

let (c, h) = tokio::join!(client, host);
c.unwrap().unwrap();
assert!(h.unwrap().is_err_and(|e| {
let PreimageServerError::Other(e) = e else {
return false;
};
e.to_string().contains("Failed to decode hint payload")
}));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_unblock_on_fetch_failure() {
const MOCK_DATA: &str = "test-hint 0xfacade";

let hint_pipe = bidirectional_pipe().unwrap();

let client = tokio::task::spawn(async move {
let hint_writer = HintWriter::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.client.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.client.write.as_raw_fd() as usize),
));

hint_writer.write(MOCK_DATA).await
});
let host = tokio::task::spawn(async move {
let hint_reader = HintReader::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize),
));
hint_reader.next_hint(&TestFailRouter).await
});

let (c, h) = tokio::join!(client, host);
c.unwrap().unwrap();
assert!(h.unwrap().is_err_and(|e| {
let PreimageServerError::Other(e) = e else {
return false;
};
e.to_string() == "Failed to route hint"
}));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_hint_client_and_host() {
const MOCK_DATA: &str = "test-hint 0xfacade";
Expand Down
2 changes: 1 addition & 1 deletion crates/preimage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use pipe::PipeHandle;
mod traits;
pub use traits::{
CommsClient, HintReaderServer, HintRouter, HintWriterClient, PreimageFetcher,
PreimageOracleClient, PreimageOracleServer,
PreimageOracleClient, PreimageOracleServer, PreimageServerError,
};

#[cfg(test)]
Expand Down
19 changes: 11 additions & 8 deletions crates/preimage/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
traits::PreimageFetcher, PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer,
traits::{PreimageFetcher, PreimageServerError},
PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer,
};
use alloc::{boxed::Box, vec::Vec};
use anyhow::{bail, Result};
Expand Down Expand Up @@ -99,27 +100,27 @@ impl OracleServer {

#[async_trait::async_trait]
impl PreimageOracleServer for OracleServer {
async fn next_preimage_request<F>(&self, fetcher: &F) -> Result<()>
async fn next_preimage_request<F>(&self, fetcher: &F) -> Result<(), PreimageServerError>
where
F: PreimageFetcher + Send + Sync,
{
// Read the preimage request from the client, and throw early if there isn't is any.
let mut buf = [0u8; 32];
self.pipe_handle.read_exact(&mut buf).await?;
let preimage_key = PreimageKey::try_from(buf)?;
self.pipe_handle.read_exact(&mut buf).await.map_err(PreimageServerError::BrokenPipe)?;
let preimage_key = PreimageKey::try_from(buf).map_err(PreimageServerError::Other)?;

trace!(target: "oracle_server", "Fetching preimage for key {preimage_key}");

// Fetch the preimage value from the preimage getter.
let value = fetcher.get_preimage(preimage_key).await?;
let value = fetcher.get_preimage(preimage_key).await.map_err(PreimageServerError::Other)?;

// Write the length as a big-endian u64 followed by the data.
let data = [(value.len() as u64).to_be_bytes().as_ref(), value.as_ref()]
.into_iter()
.flatten()
.copied()
.collect::<Vec<_>>();
self.pipe_handle.write(data.as_slice()).await?;
self.pipe_handle.write(data.as_slice()).await.map_err(PreimageServerError::BrokenPipe)?;

trace!(target: "oracle_server", "Successfully wrote preimage data for key {preimage_key}");

Expand Down Expand Up @@ -188,8 +189,10 @@ mod test {
let test_fetcher = TestFetcher { preimages: Arc::clone(&preimages) };

loop {
if oracle_server.next_preimage_request(&test_fetcher).await.is_err() {
break;
match oracle_server.next_preimage_request(&test_fetcher).await {
Err(PreimageServerError::BrokenPipe(_)) => break,
Err(PreimageServerError::Other(e)) => panic!("Unexpected error: {:?}", e),
Ok(_) => {}
}
}
});
Expand Down
14 changes: 12 additions & 2 deletions crates/preimage/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ pub trait CommsClient: PreimageOracleClient + Clone + HintWriterClient {}
// Implement the super trait for any type that satisfies the bounds
impl<T: PreimageOracleClient + Clone + HintWriterClient> CommsClient for T {}

/// A [PreimageServerError] is an enum that differentiates pipe-related errors from other errors
/// in the [PreimageOracleServer] and [HintReaderServer] implementations.
#[derive(Debug)]
pub enum PreimageServerError {
/// The pipe has been broken.
BrokenPipe(anyhow::Error),
/// Other errors.
Other(anyhow::Error),
}

/// A [PreimageOracleServer] is a high-level interface to accept read requests from the client and
/// write the preimage data to the client pipe.
#[async_trait]
Expand All @@ -52,7 +62,7 @@ pub trait PreimageOracleServer {
/// # Returns
/// - `Ok(())` if the data was successfully written into the client pipe.
/// - `Err(_)` if the data could not be written to the client.
async fn next_preimage_request<F>(&self, get_preimage: &F) -> Result<()>
async fn next_preimage_request<F>(&self, get_preimage: &F) -> Result<(), PreimageServerError>
where
F: PreimageFetcher + Send + Sync;
}
Expand All @@ -67,7 +77,7 @@ pub trait HintReaderServer {
/// - `Ok(())` if the hint was received and the client was notified of the host's
/// acknowledgement.
/// - `Err(_)` if the hint was not received correctly.
async fn next_hint<R>(&self, route_hint: &R) -> Result<()>
async fn next_hint<R>(&self, route_hint: &R) -> Result<(), PreimageServerError>
where
R: HintRouter + Send + Sync;
}
Expand Down

0 comments on commit 5c2d272

Please sign in to comment.