From e0432e14d72aa3361b29a328d0bdf56f9ec76237 Mon Sep 17 00:00:00 2001 From: Jens Alfke Date: Thu, 12 Oct 2023 15:10:49 -0700 Subject: [PATCH] More docs --- docs/Coroutine Types.md | 63 +++++++++++++++++++++++++++++++++++- docs/PubSub.md | 71 +++++++++++++++++++++++++++++++++++++++++ docs/README.md | 4 ++- 3 files changed, 136 insertions(+), 2 deletions(-) create mode 100644 docs/PubSub.md diff --git a/docs/Coroutine Types.md b/docs/Coroutine Types.md index e00e42a..74a4f66 100644 --- a/docs/Coroutine Types.md +++ b/docs/Coroutine Types.md @@ -10,7 +10,68 @@ A future-returning coroutine can also throw an exception; if so, the exception w The most common thing to do with a `Future` is to `co_await` it; this blocks the current coroutine until the Future’s result is available, and returns it as a value of type `T`. It’s also possible the Future’s result is an error; if so, the error will be thrown as an exception. (There are ways to avoid this and examine the result as an Error instead.) -It’s possible for non-coroutine code to receive or create Futures. More on that elsewhere. +### The `ASYNC` macro + +Future is such a common return type, there's a macro to highlight it: +```c++ + ASYNC readString(); +``` + +`ASYNC` is short for `[[nodiscard]] Future`. The annotation makes it an error to ignore the return value; otherwise it's easy to forget to `co_await` it. + +### Creating Futures without coroutines + +It’s possible for non-coroutine code to create, return and even await Futures. In fact this is the main way to bridge between coroutine and non-coroutine functions. + +> Note: Returning `Future` doesn't make a function a coroutine. It's only a coroutine if it uses `co_await`, `co_yield` or `co_return`. + +Most simply, you can create a `Future` that already has a value or an error simply by constructing it with one, like `Future(17)` or `Future(CroutonError::Unimplemented)`. As these are implicit constructors, if the function returns a Future you can just return the value/error: + +```c++ + Future answer() {return 6 * 7;} + Future fancyThing() {return CroutonError::Unimplemented;} +``` + +The interesting case is if you _don't_ have the value yet. In that case you create a `FutureProvider` first and hang onto it (it’s a reference, a `shared_ptr`.) You construct a `Future` from it and return that. Later, you call `setResult` or `setError` on the provider to give the Future a value. + +```c++ +Future longCalculation(double n) { + FutureProvider provider = Future::provider(); + longCalculationWithCallback(n, [provider] (double answer) { + provider->setResult(answer); // When result arrives, store it in the Future + }); + return Future(provider); // For now, return the empty Future +} +``` + +### Awaiting a Future + +What about the other direction: you call a function that returns a Future, but you’re not in a coroutine and can’t use `co_await`? + +In that case you usually use a callback. `Future::then()` takes a lambda that will be called when the Future’s value is available, and passed the value. + +```c++ +Future answerF = longCalculation(123.456); +answerF.then([=](Result answer) { cout << answer.value() << endl; }); +``` + +A `then` callback can even return a new value, which will become the value of a new Future: + +```c++ +Future longCalculationAsString(double n) { + Future answer = longCalculation(n).then([=](Result answer) { + return std::to_string(answer.value()); + }); + return answer; +}); +``` + +In the above example, what happens is: + +1. `longCalculationAsString` calls `longCalculation`, which returns an `empty Future`. +2. `longCalculationAsString` returns an empty `Future`. +3. `longCalculation` finishes, and the lambda ls called. +4. The lambda’s return value is stored in the `Future`. ## 2. `Generator` diff --git a/docs/PubSub.md b/docs/PubSub.md new file mode 100644 index 0000000..5708bc4 --- /dev/null +++ b/docs/PubSub.md @@ -0,0 +1,71 @@ +# Publishers and Subscribers + +Crouton includes a small _functional reactive programming_ framework based on the notions of **Publishers** and **Subscribers**. + +* `Publisher` is an interface describing an object that can, on demand, create `ISeries` objects. +* `ISeries` is an interface describing an awaitable object producing type `Result`, with the contract that it will produce zero or more instances of `T`, ending with either an `Error` or `noerror`. (`Generator` is a specific implementation of `ISeries`.) +* A `Subscriber` is an object that gets an `ISeries` from a `Publisher` and asynchronously consumes its items. +* A `Connector` is both a `Publisher` and a `Subscriber`: it consumes items and in response generates items, which may be of a different type. + +These interfaces are modular units that can be combined to produce data flows with a Publisher at the start, zero or more Connectors, ending in a Subscriber. For example: + +> `WebSocket` ➜ `Filter` ➜ `Transform` ➜ `FileWriter` + +This connects to a WebSocket server, picks out matching WebSocket messages, transforms them (perhaps into human-readable strings) and then writes them to a file. The test case "Stream Publisher" in `test_pubsub.cc` is very similar to this. + +## Premade Publishers & Subscribers + +There are a number of implementations of Publisher, Subscriber and Connector that you can plug together. + +* **Publishers**: + * `Emitter` is constructed with a list of items which it stores in a `std::vector`. When a Subscriber connects, the Emitter sends it all of the items. +* **Connectors**: + * `BaseConnector` simply routes items unchanged from its publisher to its subscriber (it only supports one subscriber.) It’s intended for subclassing. + * `Buffer` also routes items unchanged, but it has a fixed-capacity internal queue of items. If the queue fills up, it stops reading from the publisher until the subscriber catches up. This is useful for flow control. + * `Filter` takes a boolean-valued function and calls it on every item; it passes along only items for which the function returns true. + * `Transformer` takes a function that converts each item into a different item, which could be a different type. The converted items are passed to the subscriber. + * `Timeout` passes through items from the publisher, except that if the first item takes longer than a given interval to arrive, it sends the subscriber a `CroutonError::Timeout` error and stops. +* **Subscribers**: + * `Collector` is the opposite of `Emitter`: it just collects the items into a `vector` which can be examined afterwards. + * `CollectorFn` instead takes a function and calls it on every item it receives. + +## Creating Workflows + +The easiest way to connect publishers and subscribers is with the `|` operator, as though you were in a shell. Here’s an example taken directly from `test_pubsub.cc`: + +```c++ +auto collect = AnyPublisher("README.md") + | LineSplitter{} + | Contains("Crouton") + | Collector{}; +collect.start(); +``` + +## Publisher + +`Publisher` is a simple interface: it has one method, `publish()`, that returns a `unique_ptr` to an `ISeries`. This method is called by the `Subscriber` connected to the Publisher, to start the action. + +To implement a Publisher, just subclass `Publisher` and override the `publish()` method. + +Alternatively, you can create a Publisher from an `AsyncQueue` or `IStream` — or anything else that has a `generate` method returning an `ISeries` — by using the `AnyPublisher` template. For example, the class `AnyPublisher` is a subclass of `FileStream` that also implements `Publisher`. + +## Subscriber + +`Subscriber` is a bit more complex because it does more of the work. + +First, it’s given a shared reference to a Publisher, either in its constructor or by a call to the `subscribeTo()` method. + +Then its `start()` method is called; this calls the Publisher to get an `ISeries` and passes that series to the `run()` method. + +The `run()` method is a `Task` coroutine, so it can run indefinitely. It’s a loop that awaits an item from the series and processes it, and stops once it gets an EOF or Error. + +You can implement a Publisher by subclassing `Publisher` and either + +* overriding `run` to implement the whole loop yourself, or +* overriding `handle(T)`, and optionally `handleEnd(Error)`, which receive individual items. + +## Connector + +The abstract class `Connector` simply subclasses both `Subscriber` and `Publisher`. + +A more useful base class is `BaseConnector`, which uses a `SeriesProducer` to output a series; its `produce(Result)` method sends the next result to the subscriber. You can extend this class by overriding `run`. diff --git a/docs/README.md b/docs/README.md index d76dcac..bfa842b 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,5 +1,7 @@ # Crouton Documentation +(in progress...) + * **[Introduction](Introduction.md)** * **Core** * Fundamental Types @@ -10,7 +12,7 @@ * [Awaitable Types](Awaitable Types.md) * Scheduling and Event Loops * Logging - * Publish And Subscribe + * [Publish And Subscribe](PubSub.md) * **I/O and Networking** * Filesystem operations * Streams