// // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // // Official repository: https://github.com/boostorg/beast // #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace boost { namespace beast { namespace websocket { /* Read some message data into a buffer sequence. Also reads and handles control frames. */ template template class stream::read_some_op : public beast::async_base< Handler, beast::executor_type> , public asio::coroutine { boost::weak_ptr wp_; MutableBufferSequence bs_; buffers_suffix cb_; std::size_t bytes_written_ = 0; error_code result_; close_code code_; bool did_read_ = false; public: static constexpr int id = 1; // for soft_mutex template read_some_op( Handler_&& h, boost::shared_ptr const& sp, MutableBufferSequence const& bs) : async_base< Handler, beast::executor_type>( std::forward(h), sp->stream().get_executor()) , wp_(sp) , bs_(bs) , cb_(bs) , code_(close_code::none) { (*this)({}, 0, false); } void operator()( error_code ec = {}, std::size_t bytes_transferred = 0, bool cont = true) { using beast::detail::clamp; auto sp = wp_.lock(); if(! sp) { BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted); bytes_written_ = 0; return this->complete(cont, ec, bytes_written_); } auto& impl = *sp; BOOST_ASIO_CORO_REENTER(*this) { impl.update_timer(this->get_executor()); acquire_read_lock: // Acquire the read lock if(! impl.rd_block.try_lock(this)) { do_suspend: BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); this->set_allowed_cancellation(net::cancellation_type::all); impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all); } if (ec) return this->complete(cont, ec, bytes_written_); this->set_allowed_cancellation(net::cancellation_type::terminal); impl.rd_block.lock(this); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::post(std::move(*this)); } BOOST_ASSERT(impl.rd_block.is_locked(this)); BOOST_ASSERT(!ec); if(impl.check_stop_now(ec)) { // Issue 2264 - There is no guarantee that the next // error will be operation_aborted. // The error could be a result of the peer resetting the // connection // BOOST_ASSERT(ec == net::error::operation_aborted); goto upcall; } // VFALCO Should never get here // The only way to get read blocked is if // a `close_op` wrote a close frame BOOST_ASSERT(impl.wr_close); BOOST_ASSERT(impl.status_ != status::open); BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted); goto upcall; } else { // Make sure the stream is not closed if( impl.status_ == status::closed || impl.status_ == status::failed) { BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted); goto upcall; } } // if status_ == status::closing, we want to suspend // the read operation until the close completes, // then finish the read with operation_aborted. loop: BOOST_ASSERT(impl.rd_block.is_locked(this)); // See if we need to read a frame header. This // condition is structured to give the decompressor // a chance to emit the final empty deflate block // if(impl.rd_remain == 0 && (! impl.rd_fh.fin || impl.rd_done)) { // Read frame header while(! impl.parse_fh( impl.rd_fh, impl.rd_buf, result_)) { if(result_) { // _Fail the WebSocket Connection_ if(result_ == error::message_too_big) code_ = close_code::too_big; else code_ = close_code::protocol_error; goto close; } BOOST_ASSERT(impl.rd_block.is_locked(this)); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); impl.stream().async_read_some( impl.rd_buf.prepare(read_size( impl.rd_buf, impl.rd_buf.max_size())), std::move(*this)); } BOOST_ASSERT(impl.rd_block.is_locked(this)); impl.rd_buf.commit(bytes_transferred); if(impl.check_stop_now(ec)) goto upcall; impl.reset_idle(); // Allow a close operation // to acquire the read block impl.rd_block.unlock(this); if( impl.op_r_close.maybe_invoke()) { // Suspend BOOST_ASSERT(impl.rd_block.is_locked()); goto do_suspend; } // Acquire read block impl.rd_block.lock(this); } // Immediately apply the mask to the portion // of the buffer holding payload data. if(impl.rd_fh.len > 0 && impl.rd_fh.mask) detail::mask_inplace(buffers_prefix( clamp(impl.rd_fh.len), impl.rd_buf.data()), impl.rd_key); if(detail::is_control(impl.rd_fh.op)) { // Clear this otherwise the next // frame will be considered final. impl.rd_fh.fin = false; // Handle ping frame if(impl.rd_fh.op == detail::opcode::ping) { if(impl.ctrl_cb) { if(! cont) { BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::post(std::move(*this)); } BOOST_ASSERT(cont); // VFALCO call check_stop_now() here? } } { auto const b = buffers_prefix( clamp(impl.rd_fh.len), impl.rd_buf.data()); auto const len = buffer_bytes(b); BOOST_ASSERT(len == impl.rd_fh.len); ping_data payload; detail::read_ping(payload, b); impl.rd_buf.consume(len); // Ignore ping when closing if(impl.status_ == status::closing) goto loop; if(impl.ctrl_cb) impl.ctrl_cb( frame_type::ping, to_string_view(payload)); impl.rd_fb.clear(); impl.template write_ping< flat_static_buffer_base>(impl.rd_fb, detail::opcode::pong, payload); } // Allow a close operation // to acquire the read block impl.rd_block.unlock(this); impl.op_r_close.maybe_invoke(); // Acquire the write lock if(! impl.wr_block.try_lock(this)) { BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); impl.op_rd.emplace(std::move(*this)); } if (ec) return this->complete(cont, ec, bytes_written_); impl.wr_block.lock(this); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::post(std::move(*this)); } BOOST_ASSERT(impl.wr_block.is_locked(this)); if(impl.check_stop_now(ec)) goto upcall; } // Send pong BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::async_write( impl.stream(), net::const_buffer(impl.rd_fb.data()), beast::detail::bind_continuation(std::move(*this))); } BOOST_ASSERT(impl.wr_block.is_locked(this)); if(impl.check_stop_now(ec)) goto upcall; impl.wr_block.unlock(this); impl.op_close.maybe_invoke() || impl.op_idle_ping.maybe_invoke() || impl.op_ping.maybe_invoke() || impl.op_wr.maybe_invoke(); goto acquire_read_lock; } // Handle pong frame if(impl.rd_fh.op == detail::opcode::pong) { // Ignore pong when closing if(! impl.wr_close && impl.ctrl_cb) { if(! cont) { BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::post(std::move(*this)); } BOOST_ASSERT(cont); } } auto const cb = buffers_prefix(clamp( impl.rd_fh.len), impl.rd_buf.data()); auto const len = buffer_bytes(cb); BOOST_ASSERT(len == impl.rd_fh.len); ping_data payload; detail::read_ping(payload, cb); impl.rd_buf.consume(len); // Ignore pong when closing if(! impl.wr_close && impl.ctrl_cb) impl.ctrl_cb(frame_type::pong, to_string_view(payload)); goto loop; } // Handle close frame BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); { if(impl.ctrl_cb) { if(! cont) { BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::post(std::move(*this)); } BOOST_ASSERT(cont); } } auto const cb = buffers_prefix(clamp( impl.rd_fh.len), impl.rd_buf.data()); auto const len = buffer_bytes(cb); BOOST_ASSERT(len == impl.rd_fh.len); BOOST_ASSERT(! impl.rd_close); impl.rd_close = true; close_reason cr; detail::read_close(cr, cb, result_); if(result_) { // _Fail the WebSocket Connection_ code_ = close_code::protocol_error; goto close; } impl.cr = cr; impl.rd_buf.consume(len); if(impl.ctrl_cb) impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason)); // See if we are already closing if(impl.status_ == status::closing) { // _Close the WebSocket Connection_ BOOST_ASSERT(impl.wr_close); code_ = close_code::none; result_ = error::closed; goto close; } // _Start the WebSocket Closing Handshake_ code_ = cr.code == close_code::none ? close_code::normal : static_cast(cr.code); result_ = error::closed; goto close; } } if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin) { // Empty non-final frame goto loop; } impl.rd_done = false; } if(! impl.rd_deflated()) { if(impl.rd_remain > 0) { if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() > (std::min)(clamp(impl.rd_remain), buffer_bytes(cb_))) { // Fill the read buffer first, otherwise we // get fewer bytes at the cost of one I/O. BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); impl.stream().async_read_some( impl.rd_buf.prepare(read_size( impl.rd_buf, impl.rd_buf.max_size())), std::move(*this)); } impl.rd_buf.commit(bytes_transferred); if(impl.check_stop_now(ec)) goto upcall; impl.reset_idle(); if(impl.rd_fh.mask) detail::mask_inplace(buffers_prefix(clamp( impl.rd_remain), impl.rd_buf.data()), impl.rd_key); } if(impl.rd_buf.size() > 0) { // Copy from the read buffer. // The mask was already applied. bytes_transferred = net::buffer_copy(cb_, impl.rd_buf.data(), clamp(impl.rd_remain)); auto const mb = buffers_prefix( bytes_transferred, cb_); impl.rd_remain -= bytes_transferred; if(impl.rd_op == detail::opcode::text) { if(! impl.rd_utf8.write(mb) || (impl.rd_remain == 0 && impl.rd_fh.fin && ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ code_ = close_code::bad_payload; result_ = error::bad_frame_payload; goto close; } } bytes_written_ += bytes_transferred; impl.rd_size += bytes_transferred; impl.rd_buf.consume(bytes_transferred); } else { // Read into caller's buffer BOOST_ASSERT(impl.rd_remain > 0); BOOST_ASSERT(buffer_bytes(cb_) > 0); BOOST_ASSERT(buffer_bytes(buffers_prefix( clamp(impl.rd_remain), cb_)) > 0); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); impl.stream().async_read_some(buffers_prefix( clamp(impl.rd_remain), cb_), std::move(*this)); } if(impl.check_stop_now(ec)) goto upcall; impl.reset_idle(); BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffers_prefix( bytes_transferred, cb_); impl.rd_remain -= bytes_transferred; if(impl.rd_fh.mask) detail::mask_inplace(mb, impl.rd_key); if(impl.rd_op == detail::opcode::text) { if(! impl.rd_utf8.write(mb) || (impl.rd_remain == 0 && impl.rd_fh.fin && ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ code_ = close_code::bad_payload; result_ = error::bad_frame_payload; goto close; } } bytes_written_ += bytes_transferred; impl.rd_size += bytes_transferred; } } BOOST_ASSERT( ! impl.rd_done ); if( impl.rd_remain == 0 && impl.rd_fh.fin ) impl.rd_done = true; } else { // Read compressed message frame payload: // inflate even if rd_fh_.len == 0, otherwise we // never emit the end-of-stream deflate block. while(buffer_bytes(cb_) > 0) { if( impl.rd_remain > 0 && impl.rd_buf.size() == 0 && ! did_read_) { // read new BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); impl.stream().async_read_some( impl.rd_buf.prepare(read_size( impl.rd_buf, impl.rd_buf.max_size())), std::move(*this)); } if(impl.check_stop_now(ec)) goto upcall; impl.reset_idle(); BOOST_ASSERT(bytes_transferred > 0); impl.rd_buf.commit(bytes_transferred); if(impl.rd_fh.mask) detail::mask_inplace( buffers_prefix(clamp(impl.rd_remain), impl.rd_buf.data()), impl.rd_key); did_read_ = true; } zlib::z_params zs; { auto const out = buffers_front(cb_); zs.next_out = out.data(); zs.avail_out = out.size(); BOOST_ASSERT(zs.avail_out > 0); } // boolean to track the end of the message. bool fin = false; if(impl.rd_remain > 0) { if(impl.rd_buf.size() > 0) { // use what's there auto const in = buffers_prefix( clamp(impl.rd_remain), buffers_front( impl.rd_buf.data())); zs.avail_in = in.size(); zs.next_in = in.data(); } else { break; } } else if(impl.rd_fh.fin) { // append the empty block codes static std::uint8_t constexpr empty_block[4] = { 0x00, 0x00, 0xff, 0xff }; zs.next_in = empty_block; zs.avail_in = sizeof(empty_block); fin = true; } else { break; } impl.inflate(zs, zlib::Flush::sync, ec); if(impl.check_stop_now(ec)) goto upcall; if(fin && zs.total_out == 0) { impl.do_context_takeover_read(impl.role); impl.rd_done = true; break; } if(impl.rd_msg_max && beast::detail::sum_exceeds( impl.rd_size, zs.total_out, impl.rd_msg_max)) { // _Fail the WebSocket Connection_ code_ = close_code::too_big; result_ = error::message_too_big; goto close; } cb_.consume(zs.total_out); impl.rd_size += zs.total_out; if (! fin) { impl.rd_remain -= zs.total_in; impl.rd_buf.consume(zs.total_in); } bytes_written_ += zs.total_out; } if(impl.rd_op == detail::opcode::text) { // check utf8 if(! impl.rd_utf8.write( buffers_prefix(bytes_written_, bs_)) || ( impl.rd_done && ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ code_ = close_code::bad_payload; result_ = error::bad_frame_payload; goto close; } } } goto upcall; close: // Acquire the write lock if(! impl.wr_block.try_lock(this)) { BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); impl.op_rd.emplace(std::move(*this)); } if (ec) return this->complete(cont, ec, bytes_written_); impl.wr_block.lock(this); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::post(std::move(*this)); } BOOST_ASSERT(impl.wr_block.is_locked(this)); if(impl.check_stop_now(ec)) goto upcall; } impl.change_status(status::closing); if(! impl.wr_close) { impl.wr_close = true; // Serialize close frame impl.rd_fb.clear(); impl.template write_close< flat_static_buffer_base>( impl.rd_fb, code_); // Send close frame BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()), beast::detail::bind_continuation(std::move(*this))); } BOOST_ASSERT(impl.wr_block.is_locked(this)); if(impl.check_stop_now(ec)) goto upcall; } // Teardown using beast::websocket::async_teardown; BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read_some")); async_teardown(impl.role, impl.stream(), beast::detail::bind_continuation(std::move(*this))); } BOOST_ASSERT(impl.wr_block.is_locked(this)); if(ec == net::error::eof) { // Rationale: // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error ec = {}; } if(! ec) { BOOST_BEAST_ASSIGN_EC(ec, result_); } if(ec && ec != error::closed) impl.change_status(status::failed); else impl.change_status(status::closed); impl.close(); upcall: impl.rd_block.try_unlock(this); impl.op_r_close.maybe_invoke(); if(impl.wr_block.try_unlock(this)) impl.op_close.maybe_invoke() || impl.op_idle_ping.maybe_invoke() || impl.op_ping.maybe_invoke() || impl.op_wr.maybe_invoke(); this->complete(cont, ec, bytes_written_); } } }; //------------------------------------------------------------------------------ template template class stream::read_op : public beast::async_base< Handler, beast::executor_type> , public asio::coroutine { boost::weak_ptr wp_; DynamicBuffer& b_; std::size_t limit_; std::size_t bytes_written_ = 0; bool some_; public: template read_op( Handler_&& h, boost::shared_ptr const& sp, DynamicBuffer& b, std::size_t limit, bool some) : async_base>( std::forward(h), sp->stream().get_executor()) , wp_(sp) , b_(b) , limit_(limit ? limit : ( std::numeric_limits::max)()) , some_(some) { (*this)({}, 0, false); } void operator()( error_code ec = {}, std::size_t bytes_transferred = 0, bool cont = true) { using beast::detail::clamp; auto sp = wp_.lock(); if(! sp) { BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted); bytes_written_ = 0; return this->complete(cont, ec, bytes_written_); } auto& impl = *sp; using mutable_buffers_type = typename DynamicBuffer::mutable_buffers_type; BOOST_ASIO_CORO_REENTER(*this) { do { // VFALCO TODO use boost::beast::bind_continuation BOOST_ASIO_CORO_YIELD { auto mb = beast::detail::dynamic_buffer_prepare(b_, clamp(impl.read_size_hint_db(b_), limit_), ec, error::buffer_overflow); if(impl.check_stop_now(ec)) goto upcall; BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "websocket::async_read")); read_some_op( std::move(*this), sp, *mb); } b_.commit(bytes_transferred); bytes_written_ += bytes_transferred; if(ec) goto upcall; } while(! some_ && ! impl.rd_done); upcall: this->complete(cont, ec, bytes_written_); } } }; template struct stream:: run_read_some_op { template< class ReadHandler, class MutableBufferSequence> void operator()( ReadHandler&& h, boost::shared_ptr const& sp, MutableBufferSequence const& b) { // If you get an error on the following line it means // that your handler does not meet the documented type // requirements for the handler. static_assert( beast::detail::is_invocable::value, "ReadHandler type requirements not met"); read_some_op< typename std::decay::type, MutableBufferSequence>( std::forward(h), sp, b); } }; template struct stream:: run_read_op { template< class ReadHandler, class DynamicBuffer> void operator()( ReadHandler&& h, boost::shared_ptr const& sp, DynamicBuffer* b, std::size_t limit, bool some) { // If you get an error on the following line it means // that your handler does not meet the documented type // requirements for the handler. static_assert( beast::detail::is_invocable::value, "ReadHandler type requirements not met"); read_op< typename std::decay::type, DynamicBuffer>( std::forward(h), sp, *b, limit, some); } }; //------------------------------------------------------------------------------ template template std::size_t stream:: read(DynamicBuffer& buffer) { static_assert(is_sync_stream::value, "SyncStream type requirements not met"); static_assert( net::is_dynamic_buffer::value, "DynamicBuffer type requirements not met"); error_code ec; auto const bytes_written = read(buffer, ec); if(ec) BOOST_THROW_EXCEPTION(system_error{ec}); return bytes_written; } template template std::size_t stream:: read(DynamicBuffer& buffer, error_code& ec) { static_assert(is_sync_stream::value, "SyncStream type requirements not met"); static_assert( net::is_dynamic_buffer::value, "DynamicBuffer type requirements not met"); std::size_t bytes_written = 0; do { bytes_written += read_some(buffer, 0, ec); if(ec) return bytes_written; } while(! is_message_done()); return bytes_written; } template template BOOST_BEAST_ASYNC_RESULT2(ReadHandler) stream:: async_read(DynamicBuffer& buffer, ReadHandler&& handler) { static_assert(is_async_stream::value, "AsyncStream type requirements not met"); static_assert( net::is_dynamic_buffer::value, "DynamicBuffer type requirements not met"); return net::async_initiate< ReadHandler, void(error_code, std::size_t)>( run_read_op{}, handler, impl_, &buffer, 0, false); } //------------------------------------------------------------------------------ template template std::size_t stream:: read_some( DynamicBuffer& buffer, std::size_t limit) { static_assert(is_sync_stream::value, "SyncStream type requirements not met"); static_assert( net::is_dynamic_buffer::value, "DynamicBuffer type requirements not met"); error_code ec; auto const bytes_written = read_some(buffer, limit, ec); if(ec) BOOST_THROW_EXCEPTION(system_error{ec}); return bytes_written; } template template std::size_t stream:: read_some( DynamicBuffer& buffer, std::size_t limit, error_code& ec) { static_assert(is_sync_stream::value, "SyncStream type requirements not met"); static_assert( net::is_dynamic_buffer::value, "DynamicBuffer type requirements not met"); using beast::detail::clamp; if(! limit) limit = (std::numeric_limits::max)(); auto const size = clamp(read_size_hint(buffer), limit); BOOST_ASSERT(size > 0); auto mb = beast::detail::dynamic_buffer_prepare( buffer, size, ec, error::buffer_overflow); if(impl_->check_stop_now(ec)) return 0; auto const bytes_written = read_some(*mb, ec); buffer.commit(bytes_written); return bytes_written; } template template BOOST_BEAST_ASYNC_RESULT2(ReadHandler) stream:: async_read_some( DynamicBuffer& buffer, std::size_t limit, ReadHandler&& handler) { static_assert(is_async_stream::value, "AsyncStream type requirements not met"); static_assert( net::is_dynamic_buffer::value, "DynamicBuffer type requirements not met"); return net::async_initiate< ReadHandler, void(error_code, std::size_t)>( run_read_op{}, handler, impl_, &buffer, limit, true); } //------------------------------------------------------------------------------ template template std::size_t stream:: read_some( MutableBufferSequence const& buffers) { static_assert(is_sync_stream::value, "SyncStream type requirements not met"); static_assert(net::is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence type requirements not met"); error_code ec; auto const bytes_written = read_some(buffers, ec); if(ec) BOOST_THROW_EXCEPTION(system_error{ec}); return bytes_written; } template template std::size_t stream:: read_some( MutableBufferSequence const& buffers, error_code& ec) { static_assert(is_sync_stream::value, "SyncStream type requirements not met"); static_assert(net::is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence type requirements not met"); using beast::detail::clamp; auto& impl = *impl_; close_code code{}; std::size_t bytes_written = 0; ec = {}; // Make sure the stream is open if(impl.check_stop_now(ec)) return bytes_written; loop: // See if we need to read a frame header. This // condition is structured to give the decompressor // a chance to emit the final empty deflate block // if(impl.rd_remain == 0 && ( ! impl.rd_fh.fin || impl.rd_done)) { // Read frame header error_code result; while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result)) { if(result) { // _Fail the WebSocket Connection_ if(result == error::message_too_big) code = close_code::too_big; else code = close_code::protocol_error; do_fail(code, result, ec); return bytes_written; } auto const bytes_transferred = impl.stream().read_some( impl.rd_buf.prepare(read_size( impl.rd_buf, impl.rd_buf.max_size())), ec); impl.rd_buf.commit(bytes_transferred); if(impl.check_stop_now(ec)) return bytes_written; } // Immediately apply the mask to the portion // of the buffer holding payload data. if(impl.rd_fh.len > 0 && impl.rd_fh.mask) detail::mask_inplace(buffers_prefix( clamp(impl.rd_fh.len), impl.rd_buf.data()), impl.rd_key); if(detail::is_control(impl.rd_fh.op)) { // Get control frame payload auto const b = buffers_prefix( clamp(impl.rd_fh.len), impl.rd_buf.data()); auto const len = buffer_bytes(b); BOOST_ASSERT(len == impl.rd_fh.len); // Clear this otherwise the next // frame will be considered final. impl.rd_fh.fin = false; // Handle ping frame if(impl.rd_fh.op == detail::opcode::ping) { ping_data payload; detail::read_ping(payload, b); impl.rd_buf.consume(len); if(impl.wr_close) { // Ignore ping when closing goto loop; } if(impl.ctrl_cb) impl.ctrl_cb(frame_type::ping, to_string_view(payload)); detail::frame_buffer fb; impl.template write_ping(fb, detail::opcode::pong, payload); net::write(impl.stream(), fb.data(), ec); if(impl.check_stop_now(ec)) return bytes_written; goto loop; } // Handle pong frame if(impl.rd_fh.op == detail::opcode::pong) { ping_data payload; detail::read_ping(payload, b); impl.rd_buf.consume(len); if(impl.ctrl_cb) impl.ctrl_cb(frame_type::pong, to_string_view(payload)); goto loop; } // Handle close frame BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); { BOOST_ASSERT(! impl.rd_close); impl.rd_close = true; close_reason cr; detail::read_close(cr, b, result); if(result) { // _Fail the WebSocket Connection_ do_fail(close_code::protocol_error, result, ec); return bytes_written; } impl.cr = cr; impl.rd_buf.consume(len); if(impl.ctrl_cb) impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason)); BOOST_ASSERT(! impl.wr_close); // _Start the WebSocket Closing Handshake_ do_fail( cr.code == close_code::none ? close_code::normal : static_cast(cr.code), error::closed, ec); return bytes_written; } } if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin) { // Empty non-final frame goto loop; } impl.rd_done = false; } else { ec = {}; } if(! impl.rd_deflated()) { if(impl.rd_remain > 0) { if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() > (std::min)(clamp(impl.rd_remain), buffer_bytes(buffers))) { // Fill the read buffer first, otherwise we // get fewer bytes at the cost of one I/O. impl.rd_buf.commit(impl.stream().read_some( impl.rd_buf.prepare(read_size(impl.rd_buf, impl.rd_buf.max_size())), ec)); if(impl.check_stop_now(ec)) return bytes_written; if(impl.rd_fh.mask) detail::mask_inplace( buffers_prefix(clamp(impl.rd_remain), impl.rd_buf.data()), impl.rd_key); } if(impl.rd_buf.size() > 0) { // Copy from the read buffer. // The mask was already applied. auto const bytes_transferred = net::buffer_copy( buffers, impl.rd_buf.data(), clamp(impl.rd_remain)); auto const mb = buffers_prefix( bytes_transferred, buffers); impl.rd_remain -= bytes_transferred; if(impl.rd_op == detail::opcode::text) { if(! impl.rd_utf8.write(mb) || (impl.rd_remain == 0 && impl.rd_fh.fin && ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, error::bad_frame_payload, ec); return bytes_written; } } bytes_written += bytes_transferred; impl.rd_size += bytes_transferred; impl.rd_buf.consume(bytes_transferred); } else { // Read into caller's buffer BOOST_ASSERT(impl.rd_remain > 0); BOOST_ASSERT(buffer_bytes(buffers) > 0); BOOST_ASSERT(buffer_bytes(buffers_prefix( clamp(impl.rd_remain), buffers)) > 0); auto const bytes_transferred = impl.stream().read_some(buffers_prefix( clamp(impl.rd_remain), buffers), ec); // VFALCO What if some bytes were written? if(impl.check_stop_now(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffers_prefix( bytes_transferred, buffers); impl.rd_remain -= bytes_transferred; if(impl.rd_fh.mask) detail::mask_inplace(mb, impl.rd_key); if(impl.rd_op == detail::opcode::text) { if(! impl.rd_utf8.write(mb) || (impl.rd_remain == 0 && impl.rd_fh.fin && ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, error::bad_frame_payload, ec); return bytes_written; } } bytes_written += bytes_transferred; impl.rd_size += bytes_transferred; } } BOOST_ASSERT( ! impl.rd_done ); if( impl.rd_remain == 0 && impl.rd_fh.fin ) impl.rd_done = true; } else { // Read compressed message frame payload: // inflate even if rd_fh_.len == 0, otherwise we // never emit the end-of-stream deflate block. // bool did_read = false; buffers_suffix cb(buffers); while(buffer_bytes(cb) > 0) { zlib::z_params zs; { auto const out = beast::buffers_front(cb); zs.next_out = out.data(); zs.avail_out = out.size(); BOOST_ASSERT(zs.avail_out > 0); } // boolean to track the end of the message. bool fin = false; if(impl.rd_remain > 0) { if(impl.rd_buf.size() > 0) { // use what's there auto const in = buffers_prefix( clamp(impl.rd_remain), beast::buffers_front( impl.rd_buf.data())); zs.avail_in = in.size(); zs.next_in = in.data(); } else if(! did_read) { // read new auto const bytes_transferred = impl.stream().read_some( impl.rd_buf.prepare(read_size( impl.rd_buf, impl.rd_buf.max_size())), ec); if(impl.check_stop_now(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); impl.rd_buf.commit(bytes_transferred); if(impl.rd_fh.mask) detail::mask_inplace( buffers_prefix(clamp(impl.rd_remain), impl.rd_buf.data()), impl.rd_key); auto const in = buffers_prefix( clamp(impl.rd_remain), buffers_front( impl.rd_buf.data())); zs.avail_in = in.size(); zs.next_in = in.data(); did_read = true; } else { break; } } else if(impl.rd_fh.fin) { // append the empty block codes static std::uint8_t constexpr empty_block[4] = { 0x00, 0x00, 0xff, 0xff }; zs.next_in = empty_block; zs.avail_in = sizeof(empty_block); fin = true; } else { break; } impl.inflate(zs, zlib::Flush::sync, ec); if(impl.check_stop_now(ec)) return bytes_written; if (fin && zs.total_out == 0) { impl.do_context_takeover_read(impl.role); impl.rd_done = true; break; } if(impl.rd_msg_max && beast::detail::sum_exceeds( impl.rd_size, zs.total_out, impl.rd_msg_max)) { do_fail(close_code::too_big, error::message_too_big, ec); return bytes_written; } cb.consume(zs.total_out); impl.rd_size += zs.total_out; if (! fin) { impl.rd_remain -= zs.total_in; impl.rd_buf.consume(zs.total_in); } bytes_written += zs.total_out; } if(impl.rd_op == detail::opcode::text) { // check utf8 if(! impl.rd_utf8.write(beast::buffers_prefix( bytes_written, buffers)) || ( impl.rd_done && ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, error::bad_frame_payload, ec); return bytes_written; } } } return bytes_written; } template template BOOST_BEAST_ASYNC_RESULT2(ReadHandler) stream:: async_read_some( MutableBufferSequence const& buffers, ReadHandler&& handler) { static_assert(is_async_stream::value, "AsyncStream type requirements not met"); static_assert(net::is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence type requirements not met"); return net::async_initiate< ReadHandler, void(error_code, std::size_t)>( run_read_some_op{}, handler, impl_, buffers); } } // websocket } // beast } // boost #endif