Skip to content

Commit

Permalink
Make IO errors paintful explicit in logrotate generator
Browse files Browse the repository at this point in the history
In my downstack PR I discovered that in fact we have some IO error happening
in practice although exactly which one who can say. This PR attempts to say.
I've gone through and made every error very explicit and where possible added
the paths being acted on.

Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt committed Oct 3, 2024
1 parent ec1eaf5 commit c3f1183
Showing 1 changed file with 148 additions and 49 deletions.
197 changes: 148 additions & 49 deletions lading/src/generator/file_gen/logrotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,61 @@ use lading_payload::block::{self, Block};

use super::General;

/// An enum to allow us to determine what operation caused an IO errror as the
/// default error message lacks detail.
#[derive(Debug, Clone, Copy)]
pub enum IoOp {
/// Operation for `fs::try_exists`
TryExists,
/// Operation for `fs::remove_file`
RemoveFile,
/// Operation for `fs::create_dir_all`
CreateDirAll,
/// Operation for `fs::OpenOptions` etc
Open,
}

#[derive(thiserror::Error, Debug)]
/// Errors produced by [`FileGen`].
pub enum Error {
/// Wrapper around [`std::io::Error`].
#[error("Io error: {0}")]
Io(#[from] ::std::io::Error),
#[error("IO error [{path}]: {err}")]
Io {
/// The path being operated on
path: PathBuf,
/// The operation
operation: IoOp,
/// The error
err: std::io::Error,
},
/// Error for `fs::rename` operation
#[error("Rename error [{from} -> {to}]: {err}")]
IoRename {
/// The path being moved from
from: PathBuf,
/// The path being moved to
to: PathBuf,
/// The actual error
err: std::io::Error,
},
/// Error for `fp.write_all` operation
#[error("Write all bytes error: {err}")]
IoWriteAll {
/// The actual error
err: std::io::Error,
},
/// Error for thread spawning
#[error("Unable to spawn thread: {err}")]
IoThreadSpawn {
/// The actual error
err: std::io::Error,
},
/// Error for `fs::flush` operation
#[error("Flush error: {err}")]
IoFlush {
/// The error, actual
err: std::io::Error,
},
/// Creation of payload blocks failed.
#[error("Block creation error: {0}")]
Block(#[from] block::Error),
Expand Down Expand Up @@ -274,23 +323,34 @@ impl Child {

// SAFETY: By construction there is at least one name present.
let parent: &Path = self.names[0].parent().ok_or(Error::NameWithNoParent)?;
fs::create_dir_all(parent).await?;
let mut fp = BufWriter::with_capacity(
fs::create_dir_all(parent).await.map_err(|err| Error::Io {
path: PathBuf::from(parent),
operation: IoOp::CreateDirAll,
err,
})?;
let mut fp: BufWriter<fs::File> = BufWriter::with_capacity(
bytes_per_second,
fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&self.names[0])
.await?,
.await
.map_err(|err| Error::Io {
path: PathBuf::from(parent),
operation: IoOp::Open,
err,
})?,
);

// Move the block_cache into an OS thread, exposing a channel between it
// and this async context.
let block_cache = self.block_cache;
let (snd, rcv) = mpsc::channel(1024);
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;
thread::Builder::new()
.spawn(|| block_cache.spin(snd))
.map_err(|err| Error::IoThreadSpawn { err })?;

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
Expand All @@ -302,51 +362,12 @@ impl Child {

tokio::select! {
_ = self.throttle.wait_for(total_bytes) => {
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
let total_bytes = u64::from(total_bytes.get());

{
fp.write_all(&blk.bytes).await?;
counter!("bytes_written").increment(total_bytes);
total_bytes_written += total_bytes;
}


if total_bytes_written > maximum_bytes_per_log {
fp.flush().await?;
drop(fp);

// Delete the last name file, if it exists. Move all files to their next highest.
if fs::try_exists(&last_name).await? {
fs::remove_file(&last_name).await?;
}
if total_names > 1 {
// If there's only one name this rotation is k8s
// default style and we've just dropped the only
// named log file.
for i in (0..total_names-1).rev() {
let from = &self.names[i];
let to = &self.names[i+1];
fs::rename(from, to).await?;
}
}

// Open a new fp to `path`, replacing `fp`. Any holders of the
// file pointer still have it but the file no longer has a name.
fp = BufWriter::with_capacity(
bytes_per_second,
fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&self.names[0])
.await?,
);
total_bytes_written = 0;
}
write_bytes(&mut rcv, &mut fp, &mut total_bytes_written, bytes_per_second,
total_names, total_bytes, maximum_bytes_per_log,
&self.names, last_name).await?;
}
() = &mut shutdown_wait => {
fp.flush().await?;
fp.flush().await.map_err(|err| Error::IoFlush { err })?;
drop(fp);

info!("shutdown signal received");
Expand All @@ -357,3 +378,81 @@ impl Child {
}
}
}

#[allow(clippy::too_many_arguments)]
async fn write_bytes(
rcv: &mut PeekableReceiver<Block>,
fp: &mut BufWriter<fs::File>,
total_bytes_written: &mut u64,
bytes_per_second: usize,
total_names: usize,
total_bytes: NonZeroU32,
maximum_bytes_per_log: u64,
names: &[PathBuf],
last_name: &Path,
) -> Result<(), Error> {
let blk = rcv
.next()
.await
.expect("failed to advance through the blocks"); // actually advance through the blocks
let total_bytes = u64::from(total_bytes.get());

{
fp.write_all(&blk.bytes)
.await
.map_err(|err| Error::IoWriteAll { err })?;
counter!("bytes_written").increment(total_bytes);
*total_bytes_written += total_bytes;
}

if *total_bytes_written > maximum_bytes_per_log {
fp.flush().await.map_err(|err| Error::IoFlush { err })?;

// Delete the last name file, if it exists. Move all files to their next highest.
if fs::try_exists(&last_name).await.map_err(|err| Error::Io {
path: PathBuf::from(last_name),
operation: IoOp::TryExists,
err,
})? {
fs::remove_file(&last_name).await.map_err(|err| Error::Io {
path: PathBuf::from(last_name),
operation: IoOp::RemoveFile,
err,
})?;
}
if total_names > 1 {
// If there's only one name this rotation is k8s
// default style and we've just dropped the only
// named log file.
for i in (0..total_names - 1).rev() {
let from = &names[i];
let to = &names[i + 1];
fs::rename(from, to).await.map_err(|err| Error::IoRename {
from: PathBuf::from(from),
to: PathBuf::from(to),
err,
})?;
}
}

// Open a new fp to `path`, replacing `fp`. Any holders of the
// file pointer still have it but the file no longer has a name.
*fp = BufWriter::with_capacity(
bytes_per_second,
fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&names[0])
.await
.map_err(|err| Error::Io {
path: PathBuf::from(&names[0]),
operation: IoOp::Open,
err,
})?,
);
*total_bytes_written = 0;
}

Ok(())
}

0 comments on commit c3f1183

Please sign in to comment.