// // impl/awaitable.hpp // ~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BOOST_ASIO_IMPL_AWAITABLE_HPP #define BOOST_ASIO_IMPL_AWAITABLE_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace boost { namespace asio { namespace detail { struct awaitable_thread_has_context_switched {}; template class awaitable_async_op_handler; template class awaitable_async_op; // An awaitable_thread represents a thread-of-execution that is composed of one // or more "stack frames", with each frame represented by an awaitable_frame. // All execution occurs in the context of the awaitable_thread's executor. An // awaitable_thread continues to "pump" the stack frames by repeatedly resuming // the top stack frame until the stack is empty, or until ownership of the // stack is transferred to another awaitable_thread object. // // +------------------------------------+ // | top_of_stack_ | // | V // +--------------+---+ +-----------------+ // | | | | // | awaitable_thread |<---------------------------+ awaitable_frame | // | | attached_thread_ | | // +--------------+---+ (Set only when +---+-------------+ // | frames are being | // | actively pumped | caller_ // | by a thread, and | // | then only for V // | the top frame.) +-----------------+ // | | | // | | awaitable_frame | // | | | // | +---+-------------+ // | | // | | caller_ // | : // | : // | | // | V // | +-----------------+ // | bottom_of_stack_ | | // +------------------------------->| awaitable_frame | // | | // +-----------------+ template class awaitable_frame_base { public: #if !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING) void* operator new(std::size_t size) { return boost::asio::detail::thread_info_base::allocate( boost::asio::detail::thread_info_base::awaitable_frame_tag(), boost::asio::detail::thread_context::top_of_thread_call_stack(), size); } void operator delete(void* pointer, std::size_t size) { boost::asio::detail::thread_info_base::deallocate( boost::asio::detail::thread_info_base::awaitable_frame_tag(), boost::asio::detail::thread_context::top_of_thread_call_stack(), pointer, size); } #endif // !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING) // The frame starts in a suspended state until the awaitable_thread object // pumps the stack. auto initial_suspend() noexcept { return suspend_always(); } // On final suspension the frame is popped from the top of the stack. auto final_suspend() noexcept { struct result { awaitable_frame_base* this_; bool await_ready() const noexcept { return false; } void await_suspend(coroutine_handle) noexcept { this->this_->pop_frame(); } void await_resume() const noexcept { } }; return result{this}; } void set_except(std::exception_ptr e) noexcept { pending_exception_ = e; } void set_error(const boost::system::error_code& ec) { this->set_except(std::make_exception_ptr(boost::system::system_error(ec))); } void unhandled_exception() { set_except(std::current_exception()); } void rethrow_exception() { if (pending_exception_) { std::exception_ptr ex = std::exchange(pending_exception_, nullptr); std::rethrow_exception(ex); } } void clear_cancellation_slot() { this->attached_thread_->entry_point()->cancellation_state_.slot().clear(); } template auto await_transform(awaitable a) const { if (attached_thread_->entry_point()->throw_if_cancelled_) if (!!attached_thread_->get_cancellation_state().cancelled()) throw_error(boost::asio::error::operation_aborted, "co_await"); return a; } template auto await_transform(Op&& op, typename constraint::value>::type = 0) { if (attached_thread_->entry_point()->throw_if_cancelled_) if (!!attached_thread_->get_cancellation_state().cancelled()) throw_error(boost::asio::error::operation_aborted, "co_await"); return awaitable_async_op::type, typename decay::type, Executor>{ std::forward(op), this}; } // This await transformation obtains the associated executor of the thread of // execution. auto await_transform(this_coro::executor_t) noexcept { struct result { awaitable_frame_base* this_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() const noexcept { return this_->attached_thread_->get_executor(); } }; return result{this}; } // This await transformation obtains the associated cancellation state of the // thread of execution. auto await_transform(this_coro::cancellation_state_t) noexcept { struct result { awaitable_frame_base* this_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() const noexcept { return this_->attached_thread_->get_cancellation_state(); } }; return result{this}; } // This await transformation resets the associated cancellation state. auto await_transform(this_coro::reset_cancellation_state_0_t) noexcept { struct result { awaitable_frame_base* this_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() const { return this_->attached_thread_->reset_cancellation_state(); } }; return result{this}; } // This await transformation resets the associated cancellation state. template auto await_transform( this_coro::reset_cancellation_state_1_t reset) noexcept { struct result { awaitable_frame_base* this_; Filter filter_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { return this_->attached_thread_->reset_cancellation_state( BOOST_ASIO_MOVE_CAST(Filter)(filter_)); } }; return result{this, BOOST_ASIO_MOVE_CAST(Filter)(reset.filter)}; } // This await transformation resets the associated cancellation state. template auto await_transform( this_coro::reset_cancellation_state_2_t reset) noexcept { struct result { awaitable_frame_base* this_; InFilter in_filter_; OutFilter out_filter_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { return this_->attached_thread_->reset_cancellation_state( BOOST_ASIO_MOVE_CAST(InFilter)(in_filter_), BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter_)); } }; return result{this, BOOST_ASIO_MOVE_CAST(InFilter)(reset.in_filter), BOOST_ASIO_MOVE_CAST(OutFilter)(reset.out_filter)}; } // This await transformation determines whether cancellation is propagated as // an exception. auto await_transform(this_coro::throw_if_cancelled_0_t) noexcept { struct result { awaitable_frame_base* this_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { return this_->attached_thread_->throw_if_cancelled(); } }; return result{this}; } // This await transformation sets whether cancellation is propagated as an // exception. auto await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled) noexcept { struct result { awaitable_frame_base* this_; bool value_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { this_->attached_thread_->throw_if_cancelled(value_); } }; return result{this, throw_if_cancelled.value}; } // This await transformation is used to run an async operation's initiation // function object after the coroutine has been suspended. This ensures that // immediate resumption of the coroutine in another thread does not cause a // race condition. template auto await_transform(Function f, typename enable_if< is_convertible< typename result_of::type, awaitable_thread* >::value >::type* = nullptr) { struct result { Function function_; awaitable_frame_base* this_; bool await_ready() const noexcept { return false; } void await_suspend(coroutine_handle) noexcept { this_->after_suspend( [](void* arg) { result* r = static_cast(arg); r->function_(r->this_); }, this); } void await_resume() const noexcept { } }; return result{std::move(f), this}; } // Access the awaitable thread's has_context_switched_ flag. auto await_transform(detail::awaitable_thread_has_context_switched) noexcept { struct result { awaitable_frame_base* this_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } bool& await_resume() const noexcept { return this_->attached_thread_->entry_point()->has_context_switched_; } }; return result{this}; } void attach_thread(awaitable_thread* handler) noexcept { attached_thread_ = handler; } awaitable_thread* detach_thread() noexcept { attached_thread_->entry_point()->has_context_switched_ = true; return std::exchange(attached_thread_, nullptr); } void push_frame(awaitable_frame_base* caller) noexcept { caller_ = caller; attached_thread_ = caller_->attached_thread_; attached_thread_->entry_point()->top_of_stack_ = this; caller_->attached_thread_ = nullptr; } void pop_frame() noexcept { if (caller_) caller_->attached_thread_ = attached_thread_; attached_thread_->entry_point()->top_of_stack_ = caller_; attached_thread_ = nullptr; caller_ = nullptr; } struct resume_context { void (*after_suspend_fn_)(void*) = nullptr; void *after_suspend_arg_ = nullptr; }; void resume() { resume_context context; resume_context_ = &context; coro_.resume(); if (context.after_suspend_fn_) context.after_suspend_fn_(context.after_suspend_arg_); } void after_suspend(void (*fn)(void*), void* arg) { resume_context_->after_suspend_fn_ = fn; resume_context_->after_suspend_arg_ = arg; } void destroy() { coro_.destroy(); } protected: coroutine_handle coro_ = nullptr; awaitable_thread* attached_thread_ = nullptr; awaitable_frame_base* caller_ = nullptr; std::exception_ptr pending_exception_ = nullptr; resume_context* resume_context_ = nullptr; }; template class awaitable_frame : public awaitable_frame_base { public: awaitable_frame() noexcept { } awaitable_frame(awaitable_frame&& other) noexcept : awaitable_frame_base(std::move(other)) { } ~awaitable_frame() { if (has_result_) static_cast(static_cast(result_))->~T(); } awaitable get_return_object() noexcept { this->coro_ = coroutine_handle::from_promise(*this); return awaitable(this); }; template void return_value(U&& u) { new (&result_) T(std::forward(u)); has_result_ = true; } template void return_values(Us&&... us) { this->return_value(std::forward_as_tuple(std::forward(us)...)); } T get() { this->caller_ = nullptr; this->rethrow_exception(); return std::move(*static_cast(static_cast(result_))); } private: alignas(T) unsigned char result_[sizeof(T)]; bool has_result_ = false; }; template class awaitable_frame : public awaitable_frame_base { public: awaitable get_return_object() { this->coro_ = coroutine_handle::from_promise(*this); return awaitable(this); }; void return_void() { } void get() { this->caller_ = nullptr; this->rethrow_exception(); } }; struct awaitable_thread_entry_point {}; template class awaitable_frame : public awaitable_frame_base { public: awaitable_frame() : top_of_stack_(0), has_executor_(false), has_context_switched_(false), throw_if_cancelled_(true) { } ~awaitable_frame() { if (has_executor_) u_.executor_.~Executor(); } awaitable get_return_object() { this->coro_ = coroutine_handle::from_promise(*this); return awaitable(this); }; void return_void() { } void get() { this->caller_ = nullptr; this->rethrow_exception(); } private: template friend class awaitable_frame_base; template friend class awaitable_async_op_handler; template friend class awaitable_handler_base; template friend class awaitable_thread; union u { u() {} ~u() {} char c_; Executor executor_; } u_; awaitable_frame_base* top_of_stack_; boost::asio::cancellation_slot parent_cancellation_slot_; boost::asio::cancellation_state cancellation_state_; bool has_executor_; bool has_context_switched_; bool throw_if_cancelled_; }; template class awaitable_thread { public: typedef Executor executor_type; typedef cancellation_slot cancellation_slot_type; // Construct from the entry point of a new thread of execution. awaitable_thread(awaitable p, const Executor& ex, cancellation_slot parent_cancel_slot, cancellation_state cancel_state) : bottom_of_stack_(std::move(p)) { bottom_of_stack_.frame_->top_of_stack_ = bottom_of_stack_.frame_; new (&bottom_of_stack_.frame_->u_.executor_) Executor(ex); bottom_of_stack_.frame_->has_executor_ = true; bottom_of_stack_.frame_->parent_cancellation_slot_ = parent_cancel_slot; bottom_of_stack_.frame_->cancellation_state_ = cancel_state; } // Transfer ownership from another awaitable_thread. awaitable_thread(awaitable_thread&& other) noexcept : bottom_of_stack_(std::move(other.bottom_of_stack_)) { } // Clean up with a last ditch effort to ensure the thread is unwound within // the context of the executor. ~awaitable_thread() { if (bottom_of_stack_.valid()) { // Coroutine "stack unwinding" must be performed through the executor. auto* bottom_frame = bottom_of_stack_.frame_; (post)(bottom_frame->u_.executor_, [a = std::move(bottom_of_stack_)]() mutable { (void)awaitable( std::move(a)); }); } } awaitable_frame* entry_point() { return bottom_of_stack_.frame_; } executor_type get_executor() const noexcept { return bottom_of_stack_.frame_->u_.executor_; } cancellation_state get_cancellation_state() const noexcept { return bottom_of_stack_.frame_->cancellation_state_; } void reset_cancellation_state() { bottom_of_stack_.frame_->cancellation_state_ = cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_); } template void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(Filter) filter) { bottom_of_stack_.frame_->cancellation_state_ = cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_, BOOST_ASIO_MOVE_CAST(Filter)(filter)); } template void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(InFilter) in_filter, BOOST_ASIO_MOVE_ARG(OutFilter) out_filter) { bottom_of_stack_.frame_->cancellation_state_ = cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_, BOOST_ASIO_MOVE_CAST(InFilter)(in_filter), BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter)); } bool throw_if_cancelled() const { return bottom_of_stack_.frame_->throw_if_cancelled_; } void throw_if_cancelled(bool value) { bottom_of_stack_.frame_->throw_if_cancelled_ = value; } cancellation_slot_type get_cancellation_slot() const noexcept { return bottom_of_stack_.frame_->cancellation_state_.slot(); } // Launch a new thread of execution. void launch() { bottom_of_stack_.frame_->top_of_stack_->attach_thread(this); pump(); } protected: template friend class awaitable_frame_base; // Repeatedly resume the top stack frame until the stack is empty or until it // has been transferred to another resumable_thread object. void pump() { do bottom_of_stack_.frame_->top_of_stack_->resume(); while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_); if (bottom_of_stack_.frame_) { awaitable a( std::move(bottom_of_stack_)); a.frame_->rethrow_exception(); } } awaitable bottom_of_stack_; }; template class awaitable_async_op_handler; template class awaitable_async_op_handler : public awaitable_thread { public: struct result_type {}; awaitable_async_op_handler( awaitable_thread* h, result_type&) : awaitable_thread(std::move(*h)) { } void operator()() { this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static void resume(result_type&) { } }; template class awaitable_async_op_handler : public awaitable_thread { public: typedef boost::system::error_code* result_type; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } void operator()(boost::system::error_code ec) { result_ = &ec; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static void resume(result_type& result) { throw_error(*result); } private: result_type& result_; }; template class awaitable_async_op_handler : public awaitable_thread { public: typedef std::exception_ptr* result_type; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } void operator()(std::exception_ptr ex) { result_ = &ex; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static void resume(result_type& result) { if (*result) { std::exception_ptr ex = std::exchange(*result, nullptr); std::rethrow_exception(ex); } } private: result_type& result_; }; template class awaitable_async_op_handler : public awaitable_thread { public: typedef T* result_type; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } void operator()(T result) { result_ = &result; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static T resume(result_type& result) { return std::move(*result); } private: result_type& result_; }; template class awaitable_async_op_handler : public awaitable_thread { public: struct result_type { boost::system::error_code* ec_; T* value_; }; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } void operator()(boost::system::error_code ec, T value) { result_.ec_ = &ec; result_.value_ = &value; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static T resume(result_type& result) { throw_error(*result.ec_); return std::move(*result.value_); } private: result_type& result_; }; template class awaitable_async_op_handler : public awaitable_thread { public: struct result_type { std::exception_ptr* ex_; T* value_; }; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } void operator()(std::exception_ptr ex, T value) { result_.ex_ = &ex; result_.value_ = &value; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static T resume(result_type& result) { if (*result) { std::exception_ptr ex = std::exchange(*result.ex_, nullptr); std::rethrow_exception(ex); } return std::move(*result.value_); } private: result_type& result_; }; template class awaitable_async_op_handler : public awaitable_thread { public: typedef std::tuple* result_type; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } template void operator()(Args&&... args) { std::tuple result(std::forward(args)...); result_ = &result; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static std::tuple resume(result_type& result) { return std::move(*result); } private: result_type& result_; }; template class awaitable_async_op_handler : public awaitable_thread { public: struct result_type { boost::system::error_code* ec_; std::tuple* value_; }; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } template void operator()(boost::system::error_code ec, Args&&... args) { result_.ec_ = &ec; std::tuple value(std::forward(args)...); result_.value_ = &value; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static std::tuple resume(result_type& result) { throw_error(*result.ec_); return std::move(*result.value_); } private: result_type& result_; }; template class awaitable_async_op_handler : public awaitable_thread { public: struct result_type { std::exception_ptr* ex_; std::tuple* value_; }; awaitable_async_op_handler( awaitable_thread* h, result_type& result) : awaitable_thread(std::move(*h)), result_(result) { } template void operator()(std::exception_ptr ex, Args&&... args) { result_.ex_ = &ex; std::tuple value(std::forward(args)...); result_.value_ = &value; this->entry_point()->top_of_stack_->attach_thread(this); this->entry_point()->top_of_stack_->clear_cancellation_slot(); this->pump(); } static std::tuple resume(result_type& result) { if (*result) { std::exception_ptr ex = std::exchange(*result.ex_, nullptr); std::rethrow_exception(ex); } return std::move(*result.value_); } private: result_type& result_; }; template class awaitable_async_op { public: typedef awaitable_async_op_handler handler_type; awaitable_async_op(Op&& o, awaitable_frame_base* frame) : op_(std::forward(o)), frame_(frame), result_() { } bool await_ready() const noexcept { return false; } void await_suspend(coroutine_handle) { frame_->after_suspend( [](void* arg) { awaitable_async_op* self = static_cast(arg); std::forward(self->op_)( handler_type(self->frame_->detach_thread(), self->result_)); }, this); } auto await_resume() { return handler_type::resume(result_); } private: Op&& op_; awaitable_frame_base* frame_; typename handler_type::result_type result_; }; } // namespace detail } // namespace asio } // namespace boost #if !defined(GENERATING_DOCUMENTATION) # if defined(BOOST_ASIO_HAS_STD_COROUTINE) namespace std { template struct coroutine_traits, Args...> { typedef boost::asio::detail::awaitable_frame promise_type; }; } // namespace std # else // defined(BOOST_ASIO_HAS_STD_COROUTINE) namespace std { namespace experimental { template struct coroutine_traits, Args...> { typedef boost::asio::detail::awaitable_frame promise_type; }; }} // namespace std::experimental # endif // defined(BOOST_ASIO_HAS_STD_COROUTINE) #endif // !defined(GENERATING_DOCUMENTATION) #include #endif // BOOST_ASIO_IMPL_AWAITABLE_HPP