Skip to content

Commit

Permalink
chore: cleanup service loop
Browse files Browse the repository at this point in the history
  • Loading branch information
nhtyy committed May 2, 2024
1 parent abd0ab2 commit 3cb1f39
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 62 deletions.
50 changes: 19 additions & 31 deletions core/service-executor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::{Duration, Instant};

use dashmap::DashMap;
use fleek_crypto::ClientPublicKey;
use fn_sdk::ipc_types::{self, IpcMessage, IpcRequest};
use fn_sdk::ipc_types::{self, IpcMessage, IpcRequest, DELIMITER_SIZE};
use infusion::c;
use lightning_interfaces::infu_collection::Collection;
use lightning_interfaces::{ApplicationInterface, FetcherSocket, SyncQueryRunnerInterface};
Expand Down Expand Up @@ -213,9 +213,9 @@ async fn handle_stream<C: Collection>(
ctx: Arc<Context<C>>,
) -> Result<(), Box<dyn Error>> {
// incoming IpcRequests
let mut read_buffer = vec![0; 8];
// start with a buffer of 8 bytes to read the length delimiter
let mut read_buffer = vec![0; DELIMITER_SIZE];
let mut read_buffer_pos = 0;
let mut read_len = 0;

// outgoing IpcMessages
let mut write_buffer = Vec::<u8>::new();
Expand Down Expand Up @@ -290,9 +290,11 @@ async fn handle_stream<C: Collection>(
// these might take awhile so we want to get them kicked off asap, if theres multiple
// messages in the stream then this will handle them all before moving on
if ready.is_readable() {
let mut reading_len = true;

'read: loop {
// try to read the length delimiter first
while read_buffer_pos < 8 && read_len == 0 {
while read_buffer_pos < read_buffer.len() {
match stream.try_read(&mut read_buffer[read_buffer_pos..]) {
Ok(0) => {
tracing::warn!("Connection reset control loop");
Expand All @@ -310,46 +312,32 @@ async fn handle_stream<C: Collection>(
}
}

// if we have no message len already then we have the new length delimiter
if read_len == 0 {
read_len = usize::from_le_bytes(
// Check if were only reading the length delimiter
if reading_len {
let read_len = usize::from_le_bytes(
read_buffer[..8]
.try_into()
.expect("Can create len 4 array from read buffer slice"),
);

// here we just resize expected size buffer to the correct size and reset the
// position
read_buffer_pos = 0;
read_buffer.resize(read_len, 0);
}

// try to read the request
// downcast here should be safe on >= 32 bit machines
while read_buffer_pos < read_len {
match stream.try_read(&mut read_buffer[read_buffer_pos..]) {
Ok(0) => {
// connection reset
tracing::warn!("Connection reset control loop");
break 'outer;
},
Ok(n) => {
read_buffer_pos += n;
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
break 'read;
},
Err(e) => {
return Err(e.into());
},
}
// make sure we dont hit this block again
reading_len = false;

// get more bytes
continue 'read;
}

let request = IpcRequest::decode(&read_buffer)?;

// we need to resize to 4 bytes or else the first part of the read loop may bring in
// too many bytes
read_buffer.resize(8, 0);
read_len = 0;
// reset the buffer back to 8 bytes for the next delimiter
read_buffer.resize(DELIMITER_SIZE, 0);
read_buffer_pos = 0;
reading_len = true;

if let Some(request_ctx) = request.request_ctx {
let ctx = ctx.clone();
Expand Down
39 changes: 12 additions & 27 deletions lib/sdk/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::mpsc;

use crate::connection::ConnectionListener;
use crate::futures::future_callback;
use crate::ipc_types::{IpcMessage, IpcRequest, Request, Response};
use crate::ipc_types::{IpcMessage, IpcRequest, Request, Response, DELIMITER_SIZE};

static mut SENDER: Option<tokio::sync::mpsc::Sender<IpcRequest>> = None;
pub(crate) static mut IPC_PATH: Option<PathBuf> = None;
Expand Down Expand Up @@ -80,9 +80,8 @@ pub(crate) async fn spawn_service_loop_inner(
let mut write_buffer = Vec::<u8>::new();
let mut write_buffer_pos = 0_usize;
// IpcMessage
let mut read_buffer = vec![0; 8];
let mut read_buffer = vec![0; DELIMITER_SIZE];
let mut read_buffer_pos = 0;
let mut read_len = 0;

'outer: loop {
let ready = if write_buffer.is_empty() {
Expand Down Expand Up @@ -133,10 +132,12 @@ pub(crate) async fn spawn_service_loop_inner(
}

if ready.is_readable() {
let mut reading_len = true;

'read: loop {
// try to read the length from the buffer,
// if were not already trying to read a message
while read_buffer_pos < 8 && read_len == 0 {
while read_buffer_pos < read_buffer.len() {
match ipc_stream.try_read(&mut read_buffer[read_buffer_pos..]) {
// socket reset
Ok(0) => {
Expand All @@ -159,46 +160,30 @@ pub(crate) async fn spawn_service_loop_inner(

// if the len is 0 then we dont have any messages yet, therefore we jsut finished
// reading the len or else we would have broken out
if read_len == 0 {
if reading_len {
// assume were running on a 64bit machine
read_len = usize::from_le_bytes(
let read_len = usize::from_le_bytes(
read_buffer[..8]
.try_into()
.expect("can create len 8 buffer from read buffer"),
);

// were now using this as the postion in the message buffer
read_buffer_pos = 0;

// resize will not deallocate
read_buffer.resize(read_len, 0);
}
reading_len = false;

// this downcasting should be safe because its from a u32
// and no one should be running this on a < 32 bit machine
while read_buffer_pos < read_len {
match ipc_stream.try_read(&mut read_buffer[read_buffer_pos..]) {
Ok(0) => {
tracing::warn!("service control loop connection reset");
break 'outer;
},
Ok(n) => {
read_buffer_pos += n;
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
break 'read;
},
Err(e) => {
return Err(e.into());
},
}
continue 'read;
}

let message = IpcMessage::decode(&read_buffer)?;

// cleanup now that weve read the message
read_buffer_pos = 0;
read_len = 0;
read_buffer.resize(8, 0);
read_buffer.resize(DELIMITER_SIZE, 0);
reading_len = true;

handle_message(message);
}
Expand Down
16 changes: 12 additions & 4 deletions lib/sdk/src/ipc_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ pub enum IpcMessage {
response: Response,
},
}
/// The size of the length delimiter in bytes.
///
/// todo!(n) this should probably be reflected on the trait
pub const DELIMITER_SIZE: usize = 8;

/// Deduping this as is seems to be impossible because it would require IpcMessage (`T`)
/// To implemenet both Serialize<AllocSeralizer<_>> and Serialize<WriteSerializer<_>>
///
///
/// todo!(n)
impl LightningMessage for IpcMessage {
fn decode(buffer: &[u8]) -> anyhow::Result<Self> {
Expand All @@ -106,7 +109,10 @@ impl LightningMessage for IpcMessage {
let encoded = rkyv::to_bytes::<_, 256>(self)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

let len = encoded.len().to_le_bytes();
// we need to cast to an 8 byte uint incase a systems usize is not 8 bytes
// will probably never happen
let len = encoded.len() as u64;
let len = len.to_le_bytes();

writer.write_all(&len)?;
writer.write_all(&encoded)
Expand Down Expand Up @@ -137,15 +143,17 @@ impl LightningMessage for IpcRequest {
let encoded = rkyv::to_bytes::<_, 256>(self)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

let len = encoded.len().to_le_bytes();
// we need to cast to an 8 byte uint incase a systems usize is not 8 bytes
let len = encoded.len() as u64;
let len = len.to_le_bytes();

writer.write_all(&len)?;
writer.write_all(&encoded)
}
}

ReqRes! {
// we need an extra `meta` attribute to specify the derive macros otherwise it could be a multiple parse
// we need an extra `endmeta` derive macros otherwise it could be a multiple parse
// todo!(n) find out how to remove this,
// for some reasons adding these directly into the macro invocation doesn't work
meta: #[derive(IsVariant, Archive, Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)],
Expand Down

0 comments on commit 3cb1f39

Please sign in to comment.