Skip to content

Commit

Permalink
IStream can now be used as a Publisher.
Browse files Browse the repository at this point in the history
  • Loading branch information
snej committed Oct 5, 2023
1 parent 63e4450 commit d81a2be
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 85 deletions.
3 changes: 2 additions & 1 deletion include/Awaitable.hh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ namespace crouton {
/** A type of Awaitable (plus ISelectable) that guarantees to produce:
- zero or more `T`s, then
- either empty/`noerror` (completion) or an `Error` (failure.)
`Generator` is the canonical example. */
`Generator` is the canonical example.
`Future` doesn't implement this, but the wrapper class `FutureSeries` does. */
template <typename T>
class ISeries : public IAwaitable<Result<T>>, public ISelectable { };
}
169 changes: 86 additions & 83 deletions include/PubSub.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,12 @@

namespace crouton::ps {

/** Type-erasing wrapper around any Series implementation. */
template <typename T>
class AnySeries final : public ISeries<T> {
public:
/// Constructs an `AnySeries<T>` by moving a `Series<T>` rvalue.
template <std::derived_from<ISeries<T>> Impl>
AnySeries(Impl&& impl)
:_impl(std::make_unique<Impl>(std::forward<Impl>(impl)))
{ }

bool await_ready() override {return _impl->await_ready();}
coro_handle await_suspend(coro_handle cur) override {return _impl->await_suspend(cur);}
Result<T> await_resume() override {return _impl->await_resume();}
void onReady(OnReadyFn fn) override {return _impl->onReady(std::move(fn));}

private:
template <class U> friend class Subscriber;
AnySeries() = default;
explicit operator bool() const {return _impl != nullptr;}

std::unique_ptr<ISeries<T>> _impl;
// Note: it's not possible to implement this with std::any, bc Series is not copyable
};
template <typename T> using SeriesRef = std::unique_ptr<ISeries<T>>;

template <typename T, std::derived_from<ISeries<T>> Impl>
SeriesRef<T> mkref(Impl &&impl) {return std::make_unique<Impl>(std::forward<Impl>(impl));}
template <typename T>
SeriesRef<T> mkref(SeriesRef<T>&& ref) {return ref;}


/** A `Publisher<T>` asynchronously provides `Series` of `T` items to `Subscriber`s.
Expand All @@ -56,11 +38,30 @@ namespace crouton::ps {
/// @note If this is called a second time, when the first Series has already produced
/// items, the second Series may or may not include the already-produced items.
/// @warning Some implementations don't support multiple subscribers. Check the docs.
virtual AnySeries<T> generate() = 0;
virtual SeriesRef<T> publish() = 0;
};



/** Concept matching classes with a `generate()` method returning a subclass of `ISeries<T>`.
Examples are `AsyncQueue`, and `IStream`'s many implementations. */
template <class Gen, typename T>
concept GeneratorFactory = requires (Gen gen) {
{ gen.generate() } -> std::derived_from<ISeries<T>>;
};


/** Utility class that subclases anything with a `generate()` method returning an `ISeries`
-- such as AsyncQueue or IStream -- and makes it implement Publisher.*/
template <typename T, GeneratorFactory<T> Gen>
class AnyPublisher : public Gen, public Publisher<T> {
public:
using Gen::Gen;
SeriesRef<T> publish() override {return mkref<T>(Gen::generate());}
};



/** A `Subscriber<T>` asynchronously receives a series of `T` items from a `Publisher`.
Many Subscriber implementations are also Publishers (see `Connector`), allowing chains
or pipelines to be created. */
Expand All @@ -73,7 +74,7 @@ namespace crouton::ps {
/// Constructs a Subscriber connected to a Publisher.
explicit Subscriber(std::shared_ptr<Publisher<T>> pub) {subscribeTo(std::move(pub));}

explicit Subscriber(AnySeries<T> series) {_series = std::move(series);}
explicit Subscriber(SeriesRef<T> series) {_series = std::move(series);}

/// Connects the subscriber to a Publisher.
virtual void subscribeTo(std::shared_ptr<Publisher<T>> pub) {
Expand All @@ -90,10 +91,10 @@ namespace crouton::ps {
/// @note You only have to call this on the last Subscriber in a series.
virtual void start() {
if (!_task) {
AnySeries<T> series = std::move(_series);
SeriesRef<T> series = std::move(_series);
if (!series) {
assert(_publisher);
series = _publisher->generate();
series = _publisher->publish();
}
_task.emplace(run(std::move(series)));
}
Expand All @@ -118,9 +119,9 @@ namespace crouton::ps {
/// You can override this if you want more control over the lifecycle.
/// @warning You must override either this method or `handle(T)`.
/// @warning If you override, you are responsible for calling `handleEnd` when finishing.
virtual Task run(AnySeries<T> series) {
virtual Task run(SeriesRef<T> series) {
while (true) {
Result<T> result = AWAIT series;
Result<T> result = AWAIT *series;
if (result.ok()) {
AWAIT handle(std::move(result).value());
} else {
Expand All @@ -132,7 +133,7 @@ namespace crouton::ps {

/// Abstract method that handles an item received from the Publisher.
/// @warning You must override either this method or `receive`.
virtualASYNC<void> handle(T) {return CroutonError::LogicError;}
virtualASYNC<void> handle(T) {return CroutonError::Unimplemented;}

/// Handles the final Error/noerror item from the publisher.
/// Default implementation sets the `error` property; make sure to call through.
Expand All @@ -143,7 +144,7 @@ namespace crouton::ps {
Subscriber& operator=(Subscriber const&) = delete;

std::shared_ptr<Publisher<T>> _publisher; // My Publisher, if given
AnySeries<T> _series; // My Series, if given
SeriesRef<T> _series; // My Series, if given
std::optional<Task> _task; // The `receive` coroutine that reads series
Error _error; // Error received from the publisher
};
Expand All @@ -153,7 +154,7 @@ namespace crouton::ps {
/** Simple base class implementing both Publisher and Subscriber.
These are used as intermediate links in data-flow chains.
@note Subclasses' `generate()` implementations should call `this->start()`. */
@note Subclasses' `publish()` implementations should call `this->start()`. */
template <typename In, typename Out = In>
class Connector : public Subscriber<In>, public Publisher<Out> {
public:
Expand Down Expand Up @@ -247,10 +248,10 @@ namespace crouton::ps {
/// Sets an error to return at the end.
void endWithError(Error err) {_error = err;}

AnySeries<T> generate() override {return _generate();}
SeriesRef<T> publish() override {return mkref<T>(_publish());}

private:
Generator<T> _generate() {
Generator<T> _publish() {
for (T& item : _items)
YIELD item;
if (_error)
Expand All @@ -262,35 +263,6 @@ namespace crouton::ps {



/** A trivial subclass of AsyncQueue that implements Publisher;
useful for creating a Publisher from a non-PubSub source of events.
Call `push` to enqueue items which will then be delivered to Subscribers.
@warning This currently only supports a single Subscriber. */
template <typename T>
class QueuePublisher : public Publisher<T>, public AsyncQueue<T> {
public:
using super = AsyncQueue<T>;
using super::AsyncQueue;

AnySeries<T> generate() override {return super::generate();}
};


/** A trivial subclass of BoundedAsyncQueue that implements Publisher.
This is like QueuePublisher except it has a limited capacity.
Call `asyncPush` to enqueue items; if the queue is full it waits until there's room.
@warning This currently only supports a single Subscriber. */
template <typename T>
class BoundedQueuePublisher : public Publisher<T>, public BoundedAsyncQueue<T> {
public:
using super = BoundedAsyncQueue<T>;
using super::BoundedAsyncQueue;

AnySeries<T> generate() override {return super::generate();}
};



#pragma mark - UTILITY CONNECTORS


Expand All @@ -301,18 +273,18 @@ namespace crouton::ps {
public:
using Connector<T>::Connector;

AnySeries<T> generate() override {
SeriesRef<T> publish() override {
this->start();
if (_subscriberCount == 0)
_readyForItem.notify();
++_subscriberCount;
return _generate();
return mkref<T>(_publish());
}

protected:
Task run(AnySeries<T> series) override {
Task run(SeriesRef<T> series) override {
do {
Result<T> nextItem = AWAIT series;
Result<T> nextItem = AWAIT *series;
AWAIT produce(std::move(nextItem));
} while (!_eof);
}
Expand All @@ -333,7 +305,7 @@ namespace crouton::ps {
RETURN noerror;
}

Generator<T> _generate() {
Generator<T> _publish() {
DEFER {--_subscriberCount;};
do {
while (_usingItemCount == 0) {
Expand Down Expand Up @@ -365,44 +337,58 @@ namespace crouton::ps {
public:
explicit Buffer(size_t queueSize) :_queue(queueSize) { }

AnySeries<T> generate() override {this->start(); return _queue.generate();}
SeriesRef<T> publish() override {this->start(); return mkref<T>(_queue.generate());}

protected:
// (exposed by subclass Filter)
using Predicate = std::function<bool(T const&)>;
Buffer(size_t queueSize, Predicate p) :_queue(queueSize), _predicate(std::move(p)) { }
virtual bool filter(T const& item) {return true;}

private:
Task run(AnySeries<T> series) override {
Task run(SeriesRef<T> series) override {
bool eof, closed = false;
do {
Result<T> item = AWAIT series;
Result<T> item = AWAIT *series;
eof = !item.ok();
if (eof)
this->handleEnd(item.error());
if (eof || !_predicate || _predicate(item.value()))
if (eof || filter(item.value()))
closed = ! AWAIT _queue.asyncPush(std::move(item));
} while (!eof && !closed);
_queue.closeWhenEmpty();
}

BoundedAsyncQueue<T> _queue;
Predicate _predicate;
};



/** A Connector that passes on only the items that satisfy a predicate function.
There are two ways to provide the function:
1. Pass it to the constructor as a `std::function`.
2. Subclass, and override the `filter` method.
@warning This currently only supports a single Subscriber. */
template <typename T>
class Filter : public Buffer<T> {
public:
explicit Filter(std::function<bool(T const&)> p) :Buffer<T>(1, std::move(p)) { }
using Predicate = std::function<bool(T const&)>;

explicit Filter(Predicate p = {})
:Buffer<T>(1)
,_predicate(std::move(p))
{ }

protected:
bool filter(T const& item) override {return _predicate(item);}
private:
Predicate _predicate;
};



/** A Connector that reads items, transforms them through a function, and re-publishes them.
There are two ways to provide the function:
1. Pass it to the constructor as a `std::function`.
2. Subclass, and override either of the `transform` methods.
@note The function may end the series early by returning an Error or `noerror`.
But it may not extend the series by returning a T when it gets an EOF.
@warning This currently only supports a single Subscriber. */
Expand All @@ -411,23 +397,41 @@ namespace crouton::ps {
public:
using XformFn = std::function<Result<Out>(Result<In>)>;

explicit Transformer(size_t queueSize = 1)
:_queue(queueSize)
{ }

explicit Transformer(XformFn xform, size_t queueSize = 1)
:_queue(queueSize)
,_xform(std::move(xform))
{ }

AnySeries<Out> generate() override {
SeriesRef<Out> publish() override {
this->start();
return _queue.generate();
return mkref<Out>(_queue.generate());
}

protected:
Task run(AnySeries<In> series) override {
virtual Result<Out> transform(Result<In> item) {
if (_xform)
return _xform(std::move(item));
else if (item.ok())
return transform(std::move(item).value());
else
return item.error();
}

virtual Out transform(In item) {
Error(CroutonError::Unimplemented).raise();
}

private:
Task run(SeriesRef<In> series) override {
bool eof, closed;
do {
Result<In> item = AWAIT series;
Result<In> item = AWAIT *series;
bool inEof = !item.ok();
Result<Out> out = _xform(std::move(item));
Result<Out> out = transform(std::move(item));
eof = !out.ok();
assert(!inEof || eof); // EOF input has to produce EOF output
if (eof)
Expand All @@ -437,7 +441,6 @@ namespace crouton::ps {
_queue.closeWhenEmpty();
}

private:
BoundedAsyncQueue<Out> _queue;
XformFn _xform;
};
Expand All @@ -450,7 +453,7 @@ namespace crouton::ps {
public:
explicit Timeout(double secs) :_timeout(secs) { }
protected:
Task run(AnySeries<T> series) override {
Task run(SeriesRef<T> series) override {
{
// Wait for a first item, or the timeout:
Future<void> timeout = Timer::sleep(_timeout);
Expand Down
9 changes: 8 additions & 1 deletion include/io/IStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
#pragma once
#include "Bytes.hh"
#include "Future.hh"
#include "Generator.hh"

#include <initializer_list>

namespace crouton::io {

/** Abstract interface of an asynchronous bidirectional stream.
/** Abstract base class of an asynchronous bidirectional stream.
It has concrete read/write methods, which are merely conveniences that call the
abstract ones.
@warning Re-entrant reads or writes are not allowed: no read call may be issued until the
Expand Down Expand Up @@ -82,6 +83,12 @@ namespace crouton::io {
/// Reads until EOF.
ASYNC<string> readAll() {return readString(SIZE_MAX);}

/// Returns a `Generator` that produces chunks of data read from the stream.
/// This is a wrapper around `readNoCopy()` that makes `IStream` satisfy the concept
/// `GeneratorFactory`, which in turn allows it to be used as a Publisher.
/// @note The stream is opened first, if necessary.
virtual Generator<string> generate();

//---- Writing:

/// Writes all the bytes.
Expand Down
2 changes: 2 additions & 0 deletions src/io/FileStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ namespace crouton::io {

FileStream::FileStream(int fd) :_fd(fd) { }
FileStream::~FileStream() {_close();}
FileStream::FileStream(FileStream&& fs) = default;
FileStream& FileStream::operator=(FileStream&& fs) = default;


Future<void> FileStream::open() {
Expand Down
Loading

0 comments on commit d81a2be

Please sign in to comment.