Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix future reduction with non copyable types #378

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 83 additions & 117 deletions stlab/concurrency/future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stlab/concurrency/config.hpp>
#include <stlab/concurrency/executor_base.hpp>
#include <stlab/concurrency/immediate_executor.hpp>
#include <stlab/concurrency/optional.hpp>
#include <stlab/concurrency/task.hpp>
#include <stlab/concurrency/traits.hpp>
Expand All @@ -35,7 +36,6 @@
#define STLAB_FUTURE_COROUTINES_SUPPORT() 1
#include <experimental/coroutine>
#include <stlab/concurrency/default_executor.hpp>
#include <stlab/concurrency/immediate_executor.hpp>
#endif
#endif

Expand Down Expand Up @@ -201,6 +201,29 @@ using packaged_task_from_signature_t = typename packaged_task_from_signature<T>:

/**************************************************************************************************/

template <typename R>
R get_ready(future<R>&& f) {
assert(f.is_ready());
return *std::move(f).get_try();
}

template <typename R>
R get_ready(future<R> const& f) {
assert(f.is_ready());
return *f.get_try();
}

struct forward_future_content {
template <typename F>
auto operator()(F&& f) const {
return get_ready(std::forward<F>(f));
}
void operator()(future<void>&&) const {}
void operator()(future<void> const&) const {}
};

/**************************************************************************************************/

template <typename>
struct reduced_;

Expand All @@ -225,6 +248,16 @@ using reduced_t = typename reduced_<T>::type;
template <typename T>
struct reduction_helper;

template <typename R>
R&& reduce(R&& r) {
return std::forward<R>(r);
}

template <typename R>
auto reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then(stlab::immediate_executor, forward_future_content{});
}

/**************************************************************************************************/

template <typename T, typename = void>
Expand Down Expand Up @@ -318,7 +351,7 @@ struct shared_base<T, enable_if_copyable<T>> : std::enable_shared_from_this<shar
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
return detail::reduce(std::move(p.second));
}

template <typename F>
Expand All @@ -328,9 +361,10 @@ struct shared_base<T, enable_if_copyable<T>> : std::enable_shared_from_this<shar

template <typename E, typename F>
auto then_r(bool unique, E&& executor, F&& f) {
return recover_r(unique, std::forward<E>(executor), [_f = std::forward<F>(f)](auto&& x) mutable {
return std::move(_f)(std::move(*(std::forward<decltype(x)>(x).get_try())));
});
return recover_r(unique, std::forward<E>(executor),
[_f = std::forward<F>(f)](auto&& x) mutable {
return std::move(_f)(detail::get_ready(std::forward<decltype(x)>(x)));
});
}

template <typename F>
Expand All @@ -355,24 +389,14 @@ struct shared_base<T, enable_if_copyable<T>> : std::enable_shared_from_this<shar
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
return detail::reduce(std::move(p.second));
}

void _detach() {
std::unique_lock<std::mutex> lock(_mutex);
if (!_ready) _then.emplace_back([](auto&&) {}, [_p = this->shared_from_this()] {});
}

template <typename R>
auto reduce(R&& r) {
return std::forward<R>(r);
}

auto reduce(future<future<void>>&& r) -> future<void>;

template <typename R>
auto reduce(future<future<R>>&& r) -> future<R>;

void set_exception(std::exception_ptr error) {
_exception = std::move(error);
then_t then;
Expand Down Expand Up @@ -458,10 +482,10 @@ struct shared_base<T, enable_if_not_copyable<T>> : std::enable_shared_from_this<

template <typename E, typename F>
auto then_r(bool unique, E&& executor, F&& f) {
return recover_r(
unique, std::forward<E>(executor), [_f = std::forward<F>(f)](auto&& x) mutable {
return std::move(_f)(std::move(*std::forward<decltype(x)>(x).get_try()));
});
return recover_r(unique, std::forward<E>(executor),
[_f = std::forward<F>(f)](auto&& x) mutable {
return std::move(_f)(detail::get_ready(std::forward<decltype(x)>(x)));
});
}

template <typename F>
Expand All @@ -486,24 +510,14 @@ struct shared_base<T, enable_if_not_copyable<T>> : std::enable_shared_from_this<
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
return detail::reduce(std::move(p.second));
}

void _detach() {
std::unique_lock<std::mutex> lock(_mutex);
if (!_ready) _then = then_t([](auto&&){}, [_p = this->shared_from_this()] {});
}

template <typename R>
auto reduce(R&& r) {
return std::forward<R>(r);
}

template <typename R>
auto reduce(future<future<R>>&& r) -> future<R>;

auto reduce(future<future<void>>&& r)->future<void>;

void set_exception(std::exception_ptr error) {
_exception = std::move(error);
then_t then;
Expand All @@ -520,8 +534,6 @@ struct shared_base<T, enable_if_not_copyable<T>> : std::enable_shared_from_this<

bool is_ready() const { return _ready; }

auto get_try() -> stlab::optional<T> { return get_try_r(true); }

auto get_try_r(bool) -> stlab::optional<T> {
bool ready = false;
{
Expand Down Expand Up @@ -598,16 +610,6 @@ struct shared_base<void> : std::enable_shared_from_this<shared_base<void>> {
return recover(std::forward<E>(executor), std::forward<F>(f));
}

template <typename R>
auto reduce(R&& r) {
return std::forward<R>(r);
}

auto reduce(future<future<void>>&& r) -> future<void>;

template <typename R>
auto reduce(future<future<R>>&& r) -> future<R>;

void set_exception(std::exception_ptr error) {
_exception = std::move(error);
then_t then;
Expand Down Expand Up @@ -1099,8 +1101,6 @@ class STLAB_NODISCARD() future<T, enable_if_not_copyable<T>> {

bool is_ready() const& { return _p && _p->is_ready(); }

auto get_try() const& { return _p->get_try(); }

auto get_try() && { return _p->get_try_r(unique_usage(_p)); }

[[deprecated("Use exception() instead")]]
Expand Down Expand Up @@ -1139,7 +1139,7 @@ template <typename F>
struct assign_ready_future {
template <typename T>
static void assign(T& x, F f) {
x = std::move(*(std::move(f).get_try()));
x = detail::get_ready(std::move(f));
}
};

Expand Down Expand Up @@ -1219,7 +1219,7 @@ struct when_any_shared {
{
std::unique_lock<std::mutex> lock{ _guard };
if (_index == std::numeric_limits<std::size_t>::max()) {
_arg = std::move(*std::forward<FF>(f).get_try());
_arg = detail::get_ready(std::forward<FF>(f));
_index = index;
run = true;
}
Expand Down Expand Up @@ -1405,7 +1405,7 @@ template <typename T>
struct value_storer {
template <typename C, typename F>
static void store(C& context, F&& f, std::size_t index) {
context._results = std::move(*std::forward<F>(f).get_try());
context._results = detail::get_ready(std::forward<F>(f));
context._index = index;
}
};
Expand All @@ -1414,7 +1414,7 @@ template <typename T>
struct value_storer<std::vector<T>> {
template <typename C, typename F>
static void store(C& context, F&& f, std::size_t index) {
context._results[index] = std::move(*std::forward<F>(f).get_try());
context._results[index] = detail::get_ready(std::forward<F>(f));
}
};

Expand Down Expand Up @@ -1705,8 +1705,8 @@ auto when_any(E executor, F&& f, std::pair<I, I> range) {
/**************************************************************************************************/

template <typename E, typename F, typename... Args>
auto async(E executor, F&& f, Args&&... args)
-> future<detail::result_t<std::decay_t<F>, std::decay_t<Args>...>> {
auto async(E executor, F&& f, Args&&... args) -> future<
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this as a breaking change on the (public) stlab slack - (let me know if you need an invite). Personally, I think I'm fine with it as I doubt there are any (or many) cases this would break and likely the break is minor but we might consider alternate names or using namespace versioning.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this need to be discussed. I think need an invite, I can't sign in into stlab.slack.com.

detail::reduced_t<std::decay_t<detail::result_t<std::decay_t<F>, std::decay_t<Args>...>>>> {
using result_type = detail::result_t<std::decay_t<F>, std::decay_t<Args>...>;

auto p = package<result_type()>(
Expand All @@ -1719,7 +1719,7 @@ auto async(E executor, F&& f, Args&&... args)

executor(std::move(p.first));

return std::move(p.second);
return detail::reduce(std::move(p.second));
}

/**************************************************************************************************/
Expand Down Expand Up @@ -1766,31 +1766,32 @@ struct value_<T, enable_if_copyable<T>> {
sb._result = f(std::forward<Args>(args)...);
sb._reduction_helper.value =
(*sb._result)
.recover([_p = sb.shared_from_this()](future<R> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return *f.get_try();
})
.then([_p = sb.shared_from_this()](auto) { proceed(*_p); });
.recover(stlab::immediate_executor,
[_p = sb.shared_from_this()](future<R> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
})
.then(stlab::immediate_executor, [_p = sb.shared_from_this()]() { proceed(*_p); });
}

template <typename F, typename... Args>
static void set(shared_base<future<void>>& sb, F& f, Args&&... args) {
sb._result = f(std::forward<Args>(args)...);
sb._reduction_helper.value =
(*sb._result)
.recover([_p = sb.shared_from_this()](future<void> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
value_::proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return;
})
.then([_p = sb.shared_from_this()]() { proceed(*_p); });
.recover(stlab::immediate_executor,
[_p = sb.shared_from_this()](future<void> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
value_::proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return;
})
.then(stlab::immediate_executor, [_p = sb.shared_from_this()]() { proceed(*_p); });
}
};

Expand Down Expand Up @@ -1818,15 +1819,17 @@ struct value_<T, enable_if_not_copyable<T>> {
sb._result = f(std::forward<Args>(args)...);
sb._reduction_helper.value =
std::move(*sb._result)
.recover([_p = sb.shared_from_this()](future<R> f) {
if (auto ex = std::move(f).exception()) {
_p->_exception = ex;
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return *f.get_try();
})
.then([_p = sb.shared_from_this()](auto) { proceed(*_p); });
.recover(stlab::immediate_executor,
[_p = sb.shared_from_this()](future<R> f) {
if (auto ex = std::move(f).exception()) {
_p->_exception = ex;
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
// We could move out the data to put it back in place in the
// next 'then' call or just leave it in place and do nothing.
})
.then(stlab::immediate_executor, [_p = sb.shared_from_this()]() { proceed(*_p); });
}
};

Expand Down Expand Up @@ -1888,44 +1891,7 @@ auto shared_base<void>::recover(E&& executor, F&& f)
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
}

/**************************************************************************************************/

template <typename T>
auto shared_base<T, enable_if_copyable<T>>::reduce(future<future<void>>&& r) -> future<void> {
return std::move(r).then([](auto) {});
}

template <typename T>
template <typename R>
auto shared_base<T, enable_if_copyable<T>>::reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then([](auto&& f) { return *std::forward<decltype(f)>(f).get_try(); });
}

/**************************************************************************************************/

template <typename T>
auto shared_base<T, enable_if_not_copyable<T>>::reduce(future<future<void>>&& r) -> future<void> {
return std::move(r).then([](auto){});
}

template <typename T>
template <typename R>
auto shared_base<T, enable_if_not_copyable<T>>::reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then([](auto&& f) { return *std::forward<decltype(f)>(f).get_try(); });
}

/**************************************************************************************************/

inline auto shared_base<void>::reduce(future<future<void>>&& r) -> future<void> {
return std::move(r).then([](auto) {});
}

template <typename R>
auto shared_base<void>::reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then([](auto&& f) { return *std::forward<decltype(f)>(f).get_try(); });
return detail::reduce(std::move(p.second));
}

/**************************************************************************************************/
Expand Down
4 changes: 2 additions & 2 deletions test/future_recover_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ BOOST_AUTO_TEST_CASE(future_recover_move_only_with_broken_promise) {
return std::move(p.second).recover([&check](auto f) {
check = true;
try {
return *std::move(f.get_try());
return *std::move(f).get_try();
} catch (const exception&) {
throw;
}
Expand All @@ -1179,7 +1179,7 @@ BOOST_AUTO_TEST_CASE(future_recover_move_only_with_broken_promise) {
return std::move(p.second) ^ [&check](auto f) {
check = true;
try {
return *std::move(f.get_try());
return *std::move(f).get_try();
} catch (const exception&) {
throw;
}
Expand Down
Loading