From 9335173acb1d0e50b2e50efa54ff8f639b0d39ec Mon Sep 17 00:00:00 2001 From: Michael Jones Date: Tue, 14 Jul 2020 02:13:48 -0500 Subject: [PATCH 1/2] Initial work on using coroutines for some deeply nested async code --- .github/workflows/gha.yml | 2 +- include/mqtt/server.hpp | 525 ++++++++++++++++++++------------------ 2 files changed, 274 insertions(+), 253 deletions(-) diff --git a/.github/workflows/gha.yml b/.github/workflows/gha.yml index 0c3ebff0c..320cb973d 100644 --- a/.github/workflows/gha.yml +++ b/.github/workflows/gha.yml @@ -55,7 +55,7 @@ jobs: run: | sudo add-apt-repository ppa:mhier/libboost-latest sudo apt-get update - sudo apt-get install boost1.67 + sudo apt-get install boost1.70 - name: Configure run: | [ ${{ matrix.pattern }} == 0 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=ON -DMQTT_TEST_2=ON -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=OFF -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=ON -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" diff --git a/include/mqtt/server.hpp b/include/mqtt/server.hpp index 62cdb37c3..4759313af 100644 --- a/include/mqtt/server.hpp +++ b/include/mqtt/server.hpp @@ -11,6 +11,7 @@ #include #include +#include #include @@ -121,7 +122,14 @@ class server { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + }); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -171,23 +179,35 @@ class server { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_); - acceptor_.value().async_accept( - socket->lowest_layer(), - [this, socket] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); - do_accept(); - } - ); + template)) + CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) + do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + socket = std::shared_ptr{}] + (auto &self, boost::system::error_code ec = {}) mutable + { + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + BOOST_ASIO_CORO_YIELD + { + socket = std::make_shared(ioc_con_); + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); + } + if (ec) { + this->acceptor_.reset(); + return self.complete(ec, std::shared_ptr{}); + } + + return self.complete(ec, std::make_shared(force_move(socket), version_)); + } + }, + handler, ioc_con_); } private: @@ -284,7 +304,14 @@ class server_tls { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + }); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -362,52 +389,58 @@ class server_tls { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_, ctx_); - auto ps = socket.get(); - acceptor_.value().async_accept( - ps->lowest_layer(), - [this, socket = force_move(socket)] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto underlying_finished = std::make_shared(false); - auto tim = std::make_shared(ioc_con_); - tim->expires_after(underlying_connect_timeout_); - tim->async_wait( - [socket, tim, underlying_finished] - (error_code ec) { - if (*underlying_finished) return; - if (ec) return; - socket->post( - [socket] { + template)) + CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) + do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + underlying_finished = false, + tim = as::steady_timer{ioc_con_, underlying_connect_timeout_}, + socket = std::shared_ptr{}] + (auto &self, boost::system::error_code ec = {}) mutable + { + auto pGuard = shared_scope_guard([&](void) + { + underlying_finished = true; + tim.cancel(); + }); + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + tim.async_wait([&](error_code ec) { + if (ec) return; + if (underlying_finished) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); - } - ); - } - ); - auto ps = socket.get(); - ps->async_handshake( - as::ssl::stream_base::server, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); + }); + + BOOST_ASIO_CORO_YIELD + { + socket = std::make_shared(ioc_con_, ctx_); + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); + } + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } + + BOOST_ASIO_CORO_YIELD + { + socket->async_handshake( + as::ssl::stream_base::server, + force_move(self)); + } if (ec) { - return; + return self.complete(ec, std::shared_ptr{}); } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); + + return self.complete(ec, std::make_shared(force_move(socket), version_)); } - ); - do_accept(); - } - ); + }, + handler, ioc_con_); } private: @@ -503,7 +536,14 @@ class server_ws { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + }); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -565,116 +605,105 @@ class server_ws { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_); - auto ps = socket.get(); - acceptor_.value().async_accept( - ps->next_layer(), - [this, socket = force_move(socket)] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto underlying_finished = std::make_shared(false); - auto tim = std::make_shared(ioc_con_); - tim->expires_after(underlying_connect_timeout_); - tim->async_wait( - [socket, tim, underlying_finished] - (error_code ec) { - if (*underlying_finished) return; - if (ec) return; - socket->post( - [socket] { + template)) + CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) + do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + underlying_finished = false, + tim = as::steady_timer{ioc_con_, underlying_connect_timeout_}, + socket = std::shared_ptr{}, + sb = std::shared_ptr{}, + request = std::shared_ptr>{}] + (auto &self, boost::system::error_code ec = {}, size_t = 0) mutable + { + auto pGuard = shared_scope_guard([&](void) + { + underlying_finished = true; + tim.cancel(); + }); + + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + tim.async_wait([&](error_code ec) { + if (ec) return; + if (underlying_finished) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); - } - ); - } - ); - - auto sb = std::make_shared(); - auto request = std::make_shared>(); - auto ps = socket.get(); - boost::beast::http::async_read( - ps->next_layer(), - *sb, - *request, - [this, socket = force_move(socket), sb, request, tim, underlying_finished] - (error_code ec, std::size_t) mutable { + }); + + BOOST_ASIO_CORO_YIELD + { + socket = std::make_shared(ioc_con_); + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); + } if (ec) { - *underlying_finished = true; - tim->cancel(); - return; + return self.complete(ec, std::shared_ptr{}); } - if (!boost::beast::websocket::is_upgrade(*request)) { - *underlying_finished = true; - tim->cancel(); - return; + + BOOST_ASIO_CORO_YIELD + { + sb = std::make_shared(); + request = std::make_shared>(); + boost::beast::http::async_read( + socket->next_layer(), + *sb, + *request, + force_move(self)); + } + if(ec || ! boost::beast::websocket::is_upgrade(*request)) + { + return self.complete(ec, std::shared_ptr{}); } - auto ps = socket.get(); #if BOOST_BEAST_VERSION >= 248 - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - ps->set_option( - boost::beast::websocket::stream_base::decorator( - [name = it->name(), value = it->value()] // name is enum, value is boost::string_view - (boost::beast::websocket::response_type& res) { - // This lambda is called before the scope out point *1 - res.set(name, value); - } - ) - ); - } - ps->async_accept( - *request, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); + BOOST_ASIO_CORO_YIELD + { + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + socket->set_option( + boost::beast::websocket::stream_base::decorator( + [name = it->name(), value = it->value()] // name is enum, value is boost::string_view + (boost::beast::websocket::response_type& res) { + res.set(name, value); + } + ) + ); } - ); + socket->async_accept(*request, force_move(self)); + } #else // BOOST_BEAST_VERSION >= 248 - ps->async_accept_ex( - *request, - [request] - (boost::beast::websocket::response_type& m) { - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - m.insert(it->name(), it->value()); - } - }, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); - } - ); + BOOST_ASIO_CORO_YIELD + { + socket->async_accept_ex( + *request, + [&] + (boost::beast::websocket::response_type& m) { + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + m.insert(it->name(), it->value()); + } + }, + force_move(self)); + } #endif // BOOST_BEAST_VERSION >= 248 - // scope out point *1 + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } + return self.complete(ec, std::make_shared(force_move(socket), version_)); } - ); - do_accept(); - } - ); + }, + handler, ioc_con_); } private: @@ -773,7 +802,14 @@ class server_tls_ws { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + }); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -851,131 +887,116 @@ class server_tls_ws { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_, ctx_); - auto ps = socket.get(); - acceptor_.value().async_accept( - ps->next_layer().next_layer(), - [this, socket = force_move(socket)] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto underlying_finished = std::make_shared(false); - auto tim = std::make_shared(ioc_con_); - tim->expires_after(underlying_connect_timeout_); - tim->async_wait( - [socket, tim, underlying_finished] - (error_code ec) { - if (*underlying_finished) return; - if (ec) return; - socket->post( - [socket] { + template)) + CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) + do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + underlying_finished = false, + tim = as::steady_timer{ioc_con_, underlying_connect_timeout_}, + socket = std::shared_ptr{}, + sb = std::shared_ptr{}, + request = std::shared_ptr>{}] + (auto &self, boost::system::error_code ec = {}, size_t = 0) mutable + { + auto pGuard = shared_scope_guard([&](void) + { + underlying_finished = true; + tim.cancel(); + }); + + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + tim.async_wait([&](error_code ec) { + if (underlying_finished) return; + if (ec) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); - } - ); - } - ); + }); + + BOOST_ASIO_CORO_YIELD + { + socket = std::make_shared(ioc_con_, ctx_); + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); + } + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } - auto ps = socket.get(); - ps->next_layer().async_handshake( - as::ssl::stream_base::server, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { + BOOST_ASIO_CORO_YIELD + { + socket->next_layer().async_handshake( + as::ssl::stream_base::server, + force_move(self)); + } if (ec) { - *underlying_finished = true; - tim->cancel(); - return; + return self.complete(ec, std::shared_ptr{}); + } + + BOOST_ASIO_CORO_YIELD + { + sb = std::make_shared(); + request = std::make_shared>(); + boost::beast::http::async_read( + socket->next_layer(), + *sb, + *request, + force_move(self)); + } + if(ec || ! boost::beast::websocket::is_upgrade(*request)) + { + return self.complete(ec, std::shared_ptr{}); } - auto sb = std::make_shared(); - auto request = std::make_shared>(); - auto ps = socket.get(); - boost::beast::http::async_read( - ps->next_layer(), - *sb, - *request, - [this, socket = force_move(socket), sb, request, tim, underlying_finished] - (error_code ec, std::size_t) mutable { - if (ec) { - *underlying_finished = true; - tim->cancel(); - return; - } - if (!boost::beast::websocket::is_upgrade(*request)) { - *underlying_finished = true; - tim->cancel(); - return; - } - auto ps = socket.get(); #if BOOST_BEAST_VERSION >= 248 - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - ps->set_option( + BOOST_ASIO_CORO_YIELD + { + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + socket->set_option( boost::beast::websocket::stream_base::decorator( [name = it->name(), value = it->value()] // name is enum, value is boost::string_view (boost::beast::websocket::response_type& res) { - // This lambda is called before the scope out point *1 res.set(name, value); } ) - ); - } - ps->async_accept( - *request, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); - } ); + } + socket->async_accept(*request, force_move(self)); + } #else // BOOST_BEAST_VERSION >= 248 - ps->async_accept_ex( + BOOST_ASIO_CORO_YIELD + { + socket->async_accept_ex( *request, - [request] + [&] (boost::beast::websocket::response_type& m) { auto it = request->find("Sec-WebSocket-Protocol"); if (it != request->end()) { m.insert(it->name(), it->value()); } }, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - // TODO: The use of force_move on this line of code causes - // a static assertion that socket is a const object when - // TLS is enabled, and WS is enabled, with Boost 1.70, and gcc 8.3.0 - auto sp = std::make_shared(ioc_con_, socket, version_); - if (h_accept_) h_accept_(force_move(sp)); - } - ); + force_move(self)); + } #endif // BOOST_BEAST_VERSION >= 248 - // scope out point *1 - } - ); + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } + + return self.complete(ec, std::make_shared(force_move(socket), version_)); } - ); - do_accept(); - } - ); + }, + handler, ioc_con_); } private: From 49bc0cd19961e2e4bd746b69b300f25b75a571bd Mon Sep 17 00:00:00 2001 From: Michael Jones Date: Wed, 15 Jul 2020 14:49:56 -0500 Subject: [PATCH 2/2] Remove use of BOOST_ASIO_COMPLETION_TOKEN_FOR, not introduced until boost 1.72 --- include/mqtt/server.hpp | 234 ++++++++++++++++++---------------------- 1 file changed, 105 insertions(+), 129 deletions(-) diff --git a/include/mqtt/server.hpp b/include/mqtt/server.hpp index 4759313af..93feb0918 100644 --- a/include/mqtt/server.hpp +++ b/include/mqtt/server.hpp @@ -123,13 +123,14 @@ class server { } } do_accept([this](error_code ec, std::shared_ptr sp) { - if(ec) { - if (h_error_) h_error_(ec); + if(ec) { + if (h_error_) h_error_(ec); return; - } + } - if (h_accept_) h_accept_(force_move(sp)); - }); + if (h_accept_) h_accept_(force_move(sp)); + } + ); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -179,10 +180,8 @@ class server { } private: - template)) - CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> - BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) - do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { return as::async_compose)> ( [this, coro = as::coroutine(), @@ -192,20 +191,18 @@ class server { BOOST_ASIO_CORO_REENTER(coro) { if (close_request_) return self.complete(ec, std::shared_ptr{}); + + socket = std::make_shared(ioc_con_); BOOST_ASIO_CORO_YIELD - { - socket = std::make_shared(ioc_con_); acceptor_.value().async_accept( - socket->lowest_layer(), - force_move(self)); - } + socket->lowest_layer(), + force_move(self)); if (ec) { this->acceptor_.reset(); return self.complete(ec, std::shared_ptr{}); } - - return self.complete(ec, std::make_shared(force_move(socket), version_)); } + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); }, handler, ioc_con_); } @@ -305,13 +302,14 @@ class server_tls { } } do_accept([this](error_code ec, std::shared_ptr sp) { - if(ec) { - if (h_error_) h_error_(ec); - return; - } + if(ec) { + if (h_error_) h_error_(ec); + return; + } - if (h_accept_) h_accept_(force_move(sp)); - }); + if (h_accept_) h_accept_(force_move(sp)); + } + ); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -389,10 +387,8 @@ class server_tls { } private: - template)) - CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> - BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) - do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { return as::async_compose)> ( [this, coro = as::coroutine(), @@ -414,31 +410,28 @@ class server_tls { if (underlying_finished) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); - }); + } + ); + + socket = std::make_shared(ioc_con_, ctx_); BOOST_ASIO_CORO_YIELD - { - socket = std::make_shared(ioc_con_, ctx_); acceptor_.value().async_accept( - socket->lowest_layer(), - force_move(self)); - } + socket->lowest_layer(), + force_move(self)); if (ec) { return self.complete(ec, std::shared_ptr{}); } BOOST_ASIO_CORO_YIELD - { socket->async_handshake( as::ssl::stream_base::server, force_move(self)); - } if (ec) { return self.complete(ec, std::shared_ptr{}); } - - return self.complete(ec, std::make_shared(force_move(socket), version_)); } + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); }, handler, ioc_con_); } @@ -537,13 +530,14 @@ class server_ws { } } do_accept([this](error_code ec, std::shared_ptr sp) { - if(ec) { - if (h_error_) h_error_(ec); - return; - } + if(ec) { + if (h_error_) h_error_(ec); + return; + } - if (h_accept_) h_accept_(force_move(sp)); - }); + if (h_accept_) h_accept_(force_move(sp)); + } + ); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -605,10 +599,8 @@ class server_ws { } private: - template)) - CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> - BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) - do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { return as::async_compose)> ( [this, coro = as::coroutine(), @@ -619,11 +611,11 @@ class server_ws { request = std::shared_ptr>{}] (auto &self, boost::system::error_code ec = {}, size_t = 0) mutable { - auto pGuard = shared_scope_guard([&](void) - { - underlying_finished = true; - tim.cancel(); - }); + auto pGuard = shared_scope_guard([&](void) { + underlying_finished = true; + tim.cancel(); + } + ); BOOST_ASIO_CORO_REENTER(coro) { @@ -633,29 +625,26 @@ class server_ws { if (underlying_finished) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); - }); + } + ); + socket = std::make_shared(ioc_con_); BOOST_ASIO_CORO_YIELD - { - socket = std::make_shared(ioc_con_); acceptor_.value().async_accept( - socket->lowest_layer(), - force_move(self)); - } + socket->lowest_layer(), + force_move(self)); if (ec) { return self.complete(ec, std::shared_ptr{}); } + sb = std::make_shared(); + request = std::make_shared>(); BOOST_ASIO_CORO_YIELD - { - sb = std::make_shared(); - request = std::make_shared>(); boost::beast::http::async_read( socket->next_layer(), *sb, *request, force_move(self)); - } if(ec || ! boost::beast::websocket::is_upgrade(*request)) { return self.complete(ec, std::shared_ptr{}); @@ -663,45 +652,41 @@ class server_ws { #if BOOST_BEAST_VERSION >= 248 + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + socket->set_option( + boost::beast::websocket::stream_base::decorator( + [name = it->name(), value = it->value()] // name is enum, value is boost::string_view + (boost::beast::websocket::response_type& res) { + res.set(name, value); + } + ) + ); + } BOOST_ASIO_CORO_YIELD - { - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - socket->set_option( - boost::beast::websocket::stream_base::decorator( - [name = it->name(), value = it->value()] // name is enum, value is boost::string_view - (boost::beast::websocket::response_type& res) { - res.set(name, value); - } - ) - ); - } socket->async_accept(*request, force_move(self)); - } #else // BOOST_BEAST_VERSION >= 248 BOOST_ASIO_CORO_YIELD - { socket->async_accept_ex( - *request, - [&] - (boost::beast::websocket::response_type& m) { - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - m.insert(it->name(), it->value()); - } - }, - force_move(self)); - } + *request, + [&] + (boost::beast::websocket::response_type& m) { + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + m.insert(it->name(), it->value()); + } + }, + force_move(self)); #endif // BOOST_BEAST_VERSION >= 248 if (ec) { return self.complete(ec, std::shared_ptr{}); } - return self.complete(ec, std::make_shared(force_move(socket), version_)); } + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); }, handler, ioc_con_); } @@ -803,12 +788,12 @@ class server_tls_ws { } } do_accept([this](error_code ec, std::shared_ptr sp) { - if(ec) { - if (h_error_) h_error_(ec); - return; - } + if(ec) { + if (h_error_) h_error_(ec); + return; + } - if (h_accept_) h_accept_(force_move(sp)); + if (h_accept_) h_accept_(force_move(sp)); }); } @@ -887,10 +872,8 @@ class server_tls_ws { } private: - template)) - CompletionHandler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(as::io_context)> - BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionHandler, void(boost::system::error_code, std::shared_ptr)) - do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(as::io_context)) { + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { return as::async_compose)> ( [this, coro = as::coroutine(), @@ -901,11 +884,11 @@ class server_tls_ws { request = std::shared_ptr>{}] (auto &self, boost::system::error_code ec = {}, size_t = 0) mutable { - auto pGuard = shared_scope_guard([&](void) - { - underlying_finished = true; - tim.cancel(); - }); + auto pGuard = shared_scope_guard([&](void) { + underlying_finished = true; + tim.cancel(); + } + ); BOOST_ASIO_CORO_REENTER(coro) { @@ -915,39 +898,36 @@ class server_tls_ws { if (ec) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); - }); + } + ); + + socket = std::make_shared(ioc_con_, ctx_); BOOST_ASIO_CORO_YIELD - { - socket = std::make_shared(ioc_con_, ctx_); acceptor_.value().async_accept( - socket->lowest_layer(), - force_move(self)); - } + socket->lowest_layer(), + force_move(self)); if (ec) { return self.complete(ec, std::shared_ptr{}); } BOOST_ASIO_CORO_YIELD - { socket->next_layer().async_handshake( as::ssl::stream_base::server, force_move(self)); - } if (ec) { return self.complete(ec, std::shared_ptr{}); } + sb = std::make_shared(); + request = std::make_shared>(); + BOOST_ASIO_CORO_YIELD - { - sb = std::make_shared(); - request = std::make_shared>(); boost::beast::http::async_read( socket->next_layer(), *sb, *request, force_move(self)); - } if(ec || ! boost::beast::websocket::is_upgrade(*request)) { return self.complete(ec, std::shared_ptr{}); @@ -955,26 +935,24 @@ class server_tls_ws { #if BOOST_BEAST_VERSION >= 248 + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + socket->set_option( + boost::beast::websocket::stream_base::decorator( + [name = it->name(), value = it->value()] // name is enum, value is boost::string_view + (boost::beast::websocket::response_type& res) { + res.set(name, value); + } + ) + ); + } + BOOST_ASIO_CORO_YIELD - { - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - socket->set_option( - boost::beast::websocket::stream_base::decorator( - [name = it->name(), value = it->value()] // name is enum, value is boost::string_view - (boost::beast::websocket::response_type& res) { - res.set(name, value); - } - ) - ); - } socket->async_accept(*request, force_move(self)); - } #else // BOOST_BEAST_VERSION >= 248 BOOST_ASIO_CORO_YIELD - { socket->async_accept_ex( *request, [&] @@ -985,16 +963,14 @@ class server_tls_ws { } }, force_move(self)); - } #endif // BOOST_BEAST_VERSION >= 248 if (ec) { return self.complete(ec, std::shared_ptr{}); } - - return self.complete(ec, std::make_shared(force_move(socket), version_)); } + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); }, handler, ioc_con_); }