From c3f1183efbf1757c1adc0ab812e26d6edc4d57e5 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Wed, 2 Oct 2024 11:55:45 -0700 Subject: [PATCH] Make IO errors paintful explicit in logrotate generator In my downstack PR I discovered that in fact we have some IO error happening in practice although exactly which one who can say. This PR attempts to say. I've gone through and made every error very explicit and where possible added the paths being acted on. Signed-off-by: Brian L. Troutwine --- lading/src/generator/file_gen/logrotate.rs | 197 ++++++++++++++++----- 1 file changed, 148 insertions(+), 49 deletions(-) diff --git a/lading/src/generator/file_gen/logrotate.rs b/lading/src/generator/file_gen/logrotate.rs index 9fc3fbd3e..9ce42f8f1 100644 --- a/lading/src/generator/file_gen/logrotate.rs +++ b/lading/src/generator/file_gen/logrotate.rs @@ -39,12 +39,61 @@ use lading_payload::block::{self, Block}; use super::General; +/// An enum to allow us to determine what operation caused an IO errror as the +/// default error message lacks detail. +#[derive(Debug, Clone, Copy)] +pub enum IoOp { + /// Operation for `fs::try_exists` + TryExists, + /// Operation for `fs::remove_file` + RemoveFile, + /// Operation for `fs::create_dir_all` + CreateDirAll, + /// Operation for `fs::OpenOptions` etc + Open, +} + #[derive(thiserror::Error, Debug)] /// Errors produced by [`FileGen`]. pub enum Error { /// Wrapper around [`std::io::Error`]. - #[error("Io error: {0}")] - Io(#[from] ::std::io::Error), + #[error("IO error [{path}]: {err}")] + Io { + /// The path being operated on + path: PathBuf, + /// The operation + operation: IoOp, + /// The error + err: std::io::Error, + }, + /// Error for `fs::rename` operation + #[error("Rename error [{from} -> {to}]: {err}")] + IoRename { + /// The path being moved from + from: PathBuf, + /// The path being moved to + to: PathBuf, + /// The actual error + err: std::io::Error, + }, + /// Error for `fp.write_all` operation + #[error("Write all bytes error: {err}")] + IoWriteAll { + /// The actual error + err: std::io::Error, + }, + /// Error for thread spawning + #[error("Unable to spawn thread: {err}")] + IoThreadSpawn { + /// The actual error + err: std::io::Error, + }, + /// Error for `fs::flush` operation + #[error("Flush error: {err}")] + IoFlush { + /// The error, actual + err: std::io::Error, + }, /// Creation of payload blocks failed. #[error("Block creation error: {0}")] Block(#[from] block::Error), @@ -274,15 +323,24 @@ impl Child { // SAFETY: By construction there is at least one name present. let parent: &Path = self.names[0].parent().ok_or(Error::NameWithNoParent)?; - fs::create_dir_all(parent).await?; - let mut fp = BufWriter::with_capacity( + fs::create_dir_all(parent).await.map_err(|err| Error::Io { + path: PathBuf::from(parent), + operation: IoOp::CreateDirAll, + err, + })?; + let mut fp: BufWriter = BufWriter::with_capacity( bytes_per_second, fs::OpenOptions::new() .create(true) .truncate(true) .write(true) .open(&self.names[0]) - .await?, + .await + .map_err(|err| Error::Io { + path: PathBuf::from(parent), + operation: IoOp::Open, + err, + })?, ); // Move the block_cache into an OS thread, exposing a channel between it @@ -290,7 +348,9 @@ impl Child { let block_cache = self.block_cache; let (snd, rcv) = mpsc::channel(1024); let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); - thread::Builder::new().spawn(|| block_cache.spin(snd))?; + thread::Builder::new() + .spawn(|| block_cache.spin(snd)) + .map_err(|err| Error::IoThreadSpawn { err })?; let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); @@ -302,51 +362,12 @@ impl Child { tokio::select! { _ = self.throttle.wait_for(total_bytes) => { - let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks - let total_bytes = u64::from(total_bytes.get()); - - { - fp.write_all(&blk.bytes).await?; - counter!("bytes_written").increment(total_bytes); - total_bytes_written += total_bytes; - } - - - if total_bytes_written > maximum_bytes_per_log { - fp.flush().await?; - drop(fp); - - // Delete the last name file, if it exists. Move all files to their next highest. - if fs::try_exists(&last_name).await? { - fs::remove_file(&last_name).await?; - } - if total_names > 1 { - // If there's only one name this rotation is k8s - // default style and we've just dropped the only - // named log file. - for i in (0..total_names-1).rev() { - let from = &self.names[i]; - let to = &self.names[i+1]; - fs::rename(from, to).await?; - } - } - - // Open a new fp to `path`, replacing `fp`. Any holders of the - // file pointer still have it but the file no longer has a name. - fp = BufWriter::with_capacity( - bytes_per_second, - fs::OpenOptions::new() - .create(true) - .truncate(false) - .write(true) - .open(&self.names[0]) - .await?, - ); - total_bytes_written = 0; - } + write_bytes(&mut rcv, &mut fp, &mut total_bytes_written, bytes_per_second, + total_names, total_bytes, maximum_bytes_per_log, + &self.names, last_name).await?; } () = &mut shutdown_wait => { - fp.flush().await?; + fp.flush().await.map_err(|err| Error::IoFlush { err })?; drop(fp); info!("shutdown signal received"); @@ -357,3 +378,81 @@ impl Child { } } } + +#[allow(clippy::too_many_arguments)] +async fn write_bytes( + rcv: &mut PeekableReceiver, + fp: &mut BufWriter, + total_bytes_written: &mut u64, + bytes_per_second: usize, + total_names: usize, + total_bytes: NonZeroU32, + maximum_bytes_per_log: u64, + names: &[PathBuf], + last_name: &Path, +) -> Result<(), Error> { + let blk = rcv + .next() + .await + .expect("failed to advance through the blocks"); // actually advance through the blocks + let total_bytes = u64::from(total_bytes.get()); + + { + fp.write_all(&blk.bytes) + .await + .map_err(|err| Error::IoWriteAll { err })?; + counter!("bytes_written").increment(total_bytes); + *total_bytes_written += total_bytes; + } + + if *total_bytes_written > maximum_bytes_per_log { + fp.flush().await.map_err(|err| Error::IoFlush { err })?; + + // Delete the last name file, if it exists. Move all files to their next highest. + if fs::try_exists(&last_name).await.map_err(|err| Error::Io { + path: PathBuf::from(last_name), + operation: IoOp::TryExists, + err, + })? { + fs::remove_file(&last_name).await.map_err(|err| Error::Io { + path: PathBuf::from(last_name), + operation: IoOp::RemoveFile, + err, + })?; + } + if total_names > 1 { + // If there's only one name this rotation is k8s + // default style and we've just dropped the only + // named log file. + for i in (0..total_names - 1).rev() { + let from = &names[i]; + let to = &names[i + 1]; + fs::rename(from, to).await.map_err(|err| Error::IoRename { + from: PathBuf::from(from), + to: PathBuf::from(to), + err, + })?; + } + } + + // Open a new fp to `path`, replacing `fp`. Any holders of the + // file pointer still have it but the file no longer has a name. + *fp = BufWriter::with_capacity( + bytes_per_second, + fs::OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(&names[0]) + .await + .map_err(|err| Error::Io { + path: PathBuf::from(&names[0]), + operation: IoOp::Open, + err, + })?, + ); + *total_bytes_written = 0; + } + + Ok(()) +}