Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented custom errors types for Calloop #66

Merged
merged 3 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
altered in a breaking way. MSRV is bumped to 1.49.
- **Breaking:** `generic::Fd` adapter is removed, as since that rust version `RawFd` implements
`AsRawFd`, allowing it to be used directly in `Generic`.
- **Breaking:** The `EventSource` trait has a new associated type `Error`. This determines the type
of the error variant returned by `EventSource::process_events()`. It must be convertible into
`Box<dyn std::error::Error + Sync + Send>`.
- **Breaking:** All library-provided event sources now have their own error types for the
associated `Error` type on the `EventSource` trait.
- **Breaking:** Many API functions now use Calloop's own error type (`calloop::Error`) instead of
`std::io::Error` as the error variants of their returned results.

## 0.9.2 -- 2021-12-27

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ nix = "0.23"
futures-util = { version = "0.3.5", optional = true, default-features = false, features = ["std"]}
futures-io = { version = "0.3.5", optional = true }
slotmap = "1.0"
thiserror = "1.0"

[dev-dependencies]
futures = "0.3.5"
Expand Down
1 change: 1 addition & 0 deletions doc/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [Ping](ch02-03-ping.md)
- [Channels](ch02-04-channels.md)
- [Unix Signals](ch02-05-signals.md)
- [Error handling](ch02-06-errors.md)
- [I need async/await!](ch03-00-async-await.md)
- [Run async code](ch03-01-run-async-code.md)
- [Async IO types](ch03-02-async-io-types.md)
Expand Down
11 changes: 6 additions & 5 deletions doc/src/adapt_io_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use calloop::EventLoop;

// ANCHOR: use_futures_io_traits
// futures = "0.3"
use futures::io::{AsyncReadExt, AsyncWriteExt};
// ANCHOR_END: use_futures_io_traits

Expand All @@ -15,10 +14,12 @@ fn main() -> std::io::Result<()> {
let mut event_loop = EventLoop::try_new()?;
let handle = event_loop.handle();

handle.insert_source(exec, |evt, _metadata, _shared| {
// Print the value of the async block ie. the return value.
println!("Async block ended with: {}", evt);
})?;
handle
.insert_source(exec, |evt, _metadata, _shared| {
// Print the value of the async block ie. the return value.
println!("Async block ended with: {}", evt);
})
.map_err(|e| e.error)?;
// ANCHOR_END: decl_loop

// ANCHOR: decl_io
Expand Down
10 changes: 6 additions & 4 deletions doc/src/async_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ fn main() -> std::io::Result<()> {
let mut event_loop = EventLoop::try_new()?;
let handle = event_loop.handle();

handle.insert_source(exec, |evt, _metadata, _shared| {
// Print the value of the async block ie. the return value.
println!("Async block ended with: {}", evt);
})?;
handle
.insert_source(exec, |evt, _metadata, _shared| {
// Print the value of the async block ie. the return value.
println!("Async block ended with: {}", evt);
})
.map_err(|e| e.error)?;
// ANCHOR_END: decl_loop

// ANCHOR: decl_async
Expand Down
36 changes: 36 additions & 0 deletions doc/src/ch02-06-errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Error handling in Calloop

## Overview

Most error handling crates/guides/documentation for Rust focus on one of two situations:

- Creating errors that an API can propagate out to a user of the API, or
- Making your library deal nicely with the `Result`s from closure or trait methods that it might call

Calloop has to do both of these things. It needs to provide a library user with errors that work well with `?` and common error-handling idioms in their own code, and it needs to handle errors from the callbacks you give to `process_events()` or `insert_source()`. It *also* needs to provide some flexibility in the `EventSource` trait, which is used both for internal event sources and by users of the library.

Because of this, error handling in Calloop leans more towards having separate error types for different concerns. This may mean that there is some extra conversion code in places like returning results from `process_events()`, or in callbacks that use other libraries. However, we try to make it smoother to do these conversions, and to make sure information isn't lost in doing so.

If your crate already has some form of structured error handling, Calloop's error types should pose no problem to integrate into this. All of Calloop's errors implement `std::error::Error` and can be manipulated the same as any other error types.

The place where this becomes the most complex is in the `process_events()` method on the `EventSource` trait.

## The Error type on the EventSource trait

The `EventSource` trait contains an associated type named `Error`, which forms part of the return type from `process_events()`. This type must be convertible into `Box<dyn std::error::Error + Sync + Send>`, which means you can use:

- Your own error type that implements `std::error::Error`
- A structured error type created with [*Thiserror*](https://crates.io/crates/thiserror)
- `Box<dyn std::error::Error + Sync + Send>`
- A flexible string-based error type such as [*Anyhow's*](https://crates.io/crates/anyhow) `anyhow::Error`

As a rule, if you implement `EventSource` you should try to split your errors into two different categories:

- Errors that make sense as a kind of event. These should be a part of the `Event` associated type eg. as an enum or `Result`.
- Errors that mean your event source simply cannot process more events. These should form the `Error` associated type.

For an example, take Calloop's channel type, [`calloop::channel::Channel`](api/calloop/channel/struct.Channel.html). When the sending end is dropped, no more messages can be received after that point. But this is not returned as an error when calling `process_events()`, because you still want to (and can!) receive messages sent before that point that might still be in the queue. Hence the events received by the callback for this source can be `Msg(e)` or `Closed`.

However, if the internal ping source produces an error, there is no way for the sending end of the channel to notify the receiver. It is impossible to process more events on this event source, and the caller needs to decide how to recover from this situation. Hence this is returned as a `ChannelError` from `process_events()`.

Another example might be an event source that represents a running subprocess. If the subprocess exits with a non-zero status code, or the executable can't be found, those don't mean that events can no longer be processed. They can be provided to the caller through the callback. But if the lower level sources being used to run (eg. an asynchronous executor or subprocess file descriptor) fail to work as expected, `process_events()` should return an error.
11 changes: 7 additions & 4 deletions doc/src/ch04-02-creating-our-source-part-1-our-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ So at a minimum, our type needs to contain these:
pub struct ZeroMQSource
{
// Calloop components.
socket_source: calloop::generic::Generic<calloop::generic::Fd>,
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
mpsc_receiver: calloop::channel::Channel<?>,
wake_ping_receiver: calloop::ping::PingSource,
}
Expand All @@ -26,7 +26,7 @@ What else do we need? If the `PingSource` is there to wake up the loop manually,
pub struct ZeroMQSource
{
// Calloop components.
socket_source: calloop::generic::Generic<calloop::generic::Fd>,
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
mpsc_receiver: calloop::channel::Channel<?>,
wake_ping_receiver: calloop::ping::PingSource,

Expand Down Expand Up @@ -59,14 +59,16 @@ where
> Remember that it's not just `Vec<T>` and other sequence types that implement `IntoIterator` — `Option<T>` implements it too! There is also `std::iter::Once<T>`. So if a user of our API wants to enforce that all "multi"-part messages actually contain exactly one part, they can use this API with `T` being, say, `std::iter::Once<zmq::Message>` (or even just `[zmq::Message; 1]` in Rust 2021 edition).

## Associated types
The `EventSource` trait has three associated types:
The `EventSource` trait has four associated types:

- `Event` - when an event is generated that our caller cares about (ie. not some internal thing), this is the data we provide to their callback. This will be another sequence of messages, but because we're constructing it we can be more opinionated about the type and use the return type of `zmq::Socket::recv_multipart()` which is `Vec<Vec<u8>>`.

- `Metadata` - this is a more persistent kind of data, perhaps the underlying file descriptor or socket, or maybe some stateful object that the callback can manipulate. It is passed by exclusive reference to the `Metadata` type. In our case we don't use this, so it's `()`.

- `Ret` - this is the return type of the callback that's called on an event. Usually this will be a `Result` of some sort; in our case it's `std::io::Result<()>` just to signal whether some underlying operation failed or not.

- `Error` - this is the error type returned by `process_events()` (not the user callback!). Having this as an associated type allows you to have more control over error propagation in nested event sources. We will use [Anyhow](https://crates.io/crates/anyhow), which is like a more fully-features `Box<dyn Error>`. It allows you to add context to any other error with a `context()` method.

So together these are:

```rust,noplayground
Expand All @@ -78,10 +80,11 @@ where
type Event = Vec<Vec<u8>>;
type Metadata = ();
type Ret = io::Result<()>;
type Error = anyhow::Error;
// ...
}
```

----

I have saved one surprise for later to emphasise some important principles, but for now, let's move on to defining some methods!
I have saved one surprise for later to emphasise some important principles, but for now, let's move on to defining some methods!
12 changes: 6 additions & 6 deletions doc/src/ch04-03-creating-our-source-part-2-setup-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ pub fn from_socket(socket: zmq::Socket) -> io::Result<(Self, calloop::channel::S

Calloop's event sources have a kind of life cycle, starting with *registration*. When you add an event source to the event loop, under the hood the source will *register* itself with the loop. Under certain circumstances a source will need to re-register itself. And finally there is the *unregister* action when an event source is removed from the loop. These are expressed via the `calloop::EventSource` methods:

- `fn register(&mut self, poll: &mut calloop::Poll, token_factory: &mut calloop::TokenFactory) -> std::io::Result<()>`
- `fn reregister(&mut self, poll: &mut calloop::Poll, token_factory: &mut calloop::TokenFactory) -> std::io::Result<()>`
- `fn unregister(&mut self, poll: &mut calloop::Poll) -> std::io::Result<()>`
- `fn register(&mut self, poll: &mut calloop::Poll, token_factory: &mut calloop::TokenFactory) -> calloop::Result<()>`
- `fn reregister(&mut self, poll: &mut calloop::Poll, token_factory: &mut calloop::TokenFactory) -> calloop::Result<()>`
- `fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()>`

The first two methods take a *token factory*, which is a way for Calloop to keep track of why your source was woken up. When we get to actually processing events, you'll see how this works. But for now, all you need to do is recursively pass the token factory into whatever sources your own event source is composed of. This includes other composed sources, which will pass the token factory into *their* sources, and so on.

Expand All @@ -51,7 +51,7 @@ fn register(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory
) -> io::Result<()>
) -> calloop::Result<()>
{
self.socket_source.register(poll, token_factory)?;
self.mpsc_receiver.register(poll, token_factory)?;
Expand All @@ -65,7 +65,7 @@ fn reregister(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory
) -> io::Result<()>
) -> calloop::Result<()>
{
self.socket_source.reregister(poll, token_factory)?;
self.mpsc_receiver.reregister(poll, token_factory)?;
Expand All @@ -77,7 +77,7 @@ fn reregister(
}


fn unregister(&mut self, poll: &mut calloop::Poll)-> std::io::Result<()> {
fn unregister(&mut self, poll: &mut calloop::Poll)-> calloop::Result<()> {
self.socket_source.unregister(poll)?;
self.mpsc_receiver.unregister(poll)?;
self.wake_ping_receiver.unregister(poll)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn process_events<F>(
readiness: calloop::Readiness,
token: calloop::Token,
mut callback: F,
) -> io::Result<calloop::PostAction>
) -> Result<calloop::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
```
Expand All @@ -31,6 +31,8 @@ Implementing `process_events()` for a type that contains various Calloop sources

If we were woken up because of the ping source, then the ping source's `process_events()` will see that the token matches its own, and call the callback (possibly multiple times). If we were woken up because a message was sent through the MPSC channel, then the channel's `process_events()` will match on the token instead and call the callback for every message waiting. The zsocket is a little different, and we'll go over that in detail.

For error handling we're using [Anyhow](https://crates.io/crates/anyhow), hence the `context()` calls on each fallible operation. These just add a message to any error that might appear in a traceback.

So a first draft of our code might look like:

```rust,noplayground
Expand All @@ -39,36 +41,47 @@ fn process_events<F>(
readiness: calloop::Readiness,
token: calloop::Token,
mut callback: F,
) -> io::Result<calloop::PostAction>
) -> Result<calloop::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
// Runs if we were woken up on startup/registration.
self.wake_ping_receiver
.process_events(readiness, token, |_, _| {})?;
.process_events(readiness, token, |_, _| {})
.context("Failed after registration")?;

// Runs if we received a message over the MPSC channel.
self.mpsc_receiver
.process_events(readiness, token, |evt, _| {
// 'evt' could be a message or a "sending end closed"
// notification. We don't care about the latter.
if let calloop::channel::Event::Msg(msg) = evt {
self.socket.send_multipart(msg, 0)?;
self.socket
.send_multipart(msg, 0)
.context("Failed to send message")?;
}
})?;

// Runs if the zsocket became read/write-able.
self.socket
.process_events(readiness, token, |_, _| {
let events = self.socket.get_events()?;
let events =
self.socket
.get_events()
.context("Failed to read ZeroMQ events")?;

if events.contains(zmq::POLLOUT) {
// Wait, what do we do here?
}

if events.contains(zmq::POLLIN) {
let messages = self.socket.recv_multipart(0)?;
callback(messages, &mut ())?;
let messages =
self.socket
.recv_multipart(0)
.context("Failed to receive message")?;

callback(messages, &mut ())
.context("Error in event callback")?;
}
})?;

Expand All @@ -90,7 +103,9 @@ Thirdly, we commit one of the worst sins you can commit in an event-loop-based s
self.mpsc_receiver
.process_events(readiness, token, |evt, _| {
if let calloop::channel::Event::Msg(msg) = evt {
self.socket.send_multipart(msg, 0)?;
self.socket
.send_multipart(msg, 0)
.context("Failed to send message")?;
}
})?;
```
Expand All @@ -108,7 +123,7 @@ where
T::Item: Into<zmq::Message>,
{
// Calloop components.
socket_source: calloop::generic::Generic<calloop::generic::Fd>,
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
mpsc_receiver: calloop::channel::Channel<T>,
wake_ping_receiver: calloop::ping::PingSource,

Expand Down Expand Up @@ -141,21 +156,29 @@ And our "zsocket is writeable" code becomes:
```rust,noplayground
self.socket
.process_events(readiness, token, |_, _| {
let events = self.socket.get_events()?;
let events = self
.socket
.get_events()
.context("Failed to read ZeroMQ events")?;

if events.contains(zmq::POLLOUT) {
if let Some(parts) = self.outbox.pop_front() {
self.socket
.send_multipart(parts, 0)?;
.send_multipart(parts, 0)
.context("Failed to send message")?;
}
}

if events.contains(zmq::POLLIN) {
let messages = self.socket.recv_multipart(0)?;
callback(messages, &mut ())?;
let messages =
self.socket
.recv_multipart(0)
.context("Failed to receive message")?;
callback(messages, &mut ())
.context("Error in event callback")?;
}
})?;

```

So we've not only solved problem #3, we've also figured out #2, which suggests we're on the right track. But we still have (at least) that first issue to sort out.
So we've not only solved problem #3, we've also figured out #2, which suggests we're on the right track. But we still have (at least) that first issue to sort out.
Loading