diff --git a/.github/workflows/gha.yml b/.github/workflows/gha.yml index 0c3ebff0c..320cb973d 100644 --- a/.github/workflows/gha.yml +++ b/.github/workflows/gha.yml @@ -55,7 +55,7 @@ jobs: run: | sudo add-apt-repository ppa:mhier/libboost-latest sudo apt-get update - sudo apt-get install boost1.67 + sudo apt-get install boost1.70 - name: Configure run: | [ ${{ matrix.pattern }} == 0 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=ON -DMQTT_TEST_2=ON -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=OFF -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=ON -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" diff --git a/include/mqtt/server.hpp b/include/mqtt/server.hpp index 62cdb37c3..93feb0918 100644 --- a/include/mqtt/server.hpp +++ b/include/mqtt/server.hpp @@ -11,6 +11,7 @@ #include #include +#include #include @@ -121,7 +122,15 @@ class server { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + } + ); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -171,23 +180,31 @@ class server { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_); - acceptor_.value().async_accept( - socket->lowest_layer(), - [this, socket] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); - do_accept(); - } - ); + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + socket = std::shared_ptr{}] + (auto &self, boost::system::error_code ec = {}) mutable + { + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + + socket = std::make_shared(ioc_con_); + BOOST_ASIO_CORO_YIELD + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); + if (ec) { + this->acceptor_.reset(); + return self.complete(ec, std::shared_ptr{}); + } + } + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); + }, + handler, ioc_con_); } private: @@ -284,7 +301,15 @@ class server_tls { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + } + ); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -362,52 +387,53 @@ class server_tls { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_, ctx_); - auto ps = socket.get(); - acceptor_.value().async_accept( - ps->lowest_layer(), - [this, socket = force_move(socket)] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto underlying_finished = std::make_shared(false); - auto tim = std::make_shared(ioc_con_); - tim->expires_after(underlying_connect_timeout_); - tim->async_wait( - [socket, tim, underlying_finished] - (error_code ec) { - if (*underlying_finished) return; - if (ec) return; - socket->post( - [socket] { + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + underlying_finished = false, + tim = as::steady_timer{ioc_con_, underlying_connect_timeout_}, + socket = std::shared_ptr{}] + (auto &self, boost::system::error_code ec = {}) mutable + { + auto pGuard = shared_scope_guard([&](void) + { + underlying_finished = true; + tim.cancel(); + }); + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + tim.async_wait([&](error_code ec) { + if (ec) return; + if (underlying_finished) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); } ); - } - ); - auto ps = socket.get(); - ps->async_handshake( - as::ssl::stream_base::server, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); + + socket = std::make_shared(ioc_con_, ctx_); + + BOOST_ASIO_CORO_YIELD + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } + + BOOST_ASIO_CORO_YIELD + socket->async_handshake( + as::ssl::stream_base::server, + force_move(self)); if (ec) { - return; + return self.complete(ec, std::shared_ptr{}); } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); } - ); - do_accept(); - } - ); + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); + }, + handler, ioc_con_); } private: @@ -503,7 +529,15 @@ class server_ws { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + } + ); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -565,116 +599,96 @@ class server_ws { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_); - auto ps = socket.get(); - acceptor_.value().async_accept( - ps->next_layer(), - [this, socket = force_move(socket)] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto underlying_finished = std::make_shared(false); - auto tim = std::make_shared(ioc_con_); - tim->expires_after(underlying_connect_timeout_); - tim->async_wait( - [socket, tim, underlying_finished] - (error_code ec) { - if (*underlying_finished) return; - if (ec) return; - socket->post( - [socket] { + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + underlying_finished = false, + tim = as::steady_timer{ioc_con_, underlying_connect_timeout_}, + socket = std::shared_ptr{}, + sb = std::shared_ptr{}, + request = std::shared_ptr>{}] + (auto &self, boost::system::error_code ec = {}, size_t = 0) mutable + { + auto pGuard = shared_scope_guard([&](void) { + underlying_finished = true; + tim.cancel(); + } + ); + + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + tim.async_wait([&](error_code ec) { + if (ec) return; + if (underlying_finished) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); } ); - } - ); - auto sb = std::make_shared(); - auto request = std::make_shared>(); - auto ps = socket.get(); - boost::beast::http::async_read( - ps->next_layer(), - *sb, - *request, - [this, socket = force_move(socket), sb, request, tim, underlying_finished] - (error_code ec, std::size_t) mutable { + socket = std::make_shared(ioc_con_); + BOOST_ASIO_CORO_YIELD + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); if (ec) { - *underlying_finished = true; - tim->cancel(); - return; + return self.complete(ec, std::shared_ptr{}); } - if (!boost::beast::websocket::is_upgrade(*request)) { - *underlying_finished = true; - tim->cancel(); - return; + + sb = std::make_shared(); + request = std::make_shared>(); + BOOST_ASIO_CORO_YIELD + boost::beast::http::async_read( + socket->next_layer(), + *sb, + *request, + force_move(self)); + if(ec || ! boost::beast::websocket::is_upgrade(*request)) + { + return self.complete(ec, std::shared_ptr{}); } - auto ps = socket.get(); #if BOOST_BEAST_VERSION >= 248 auto it = request->find("Sec-WebSocket-Protocol"); if (it != request->end()) { - ps->set_option( + socket->set_option( boost::beast::websocket::stream_base::decorator( [name = it->name(), value = it->value()] // name is enum, value is boost::string_view (boost::beast::websocket::response_type& res) { - // This lambda is called before the scope out point *1 res.set(name, value); } ) ); } - ps->async_accept( - *request, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); - } - ); + BOOST_ASIO_CORO_YIELD + socket->async_accept(*request, force_move(self)); #else // BOOST_BEAST_VERSION >= 248 - ps->async_accept_ex( - *request, - [request] - (boost::beast::websocket::response_type& m) { - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - m.insert(it->name(), it->value()); - } - }, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); - } - ); + BOOST_ASIO_CORO_YIELD + socket->async_accept_ex( + *request, + [&] + (boost::beast::websocket::response_type& m) { + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + m.insert(it->name(), it->value()); + } + }, + force_move(self)); #endif // BOOST_BEAST_VERSION >= 248 - // scope out point *1 + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } } - ); - do_accept(); - } - ); + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); + }, + handler, ioc_con_); } private: @@ -773,7 +787,14 @@ class server_tls_ws { return; } } - do_accept(); + do_accept([this](error_code ec, std::shared_ptr sp) { + if(ec) { + if (h_error_) h_error_(ec); + return; + } + + if (h_accept_) h_accept_(force_move(sp)); + }); } unsigned short port() const { return acceptor_.value().local_endpoint().port(); } @@ -851,131 +872,107 @@ class server_tls_ws { } private: - void do_accept() { - if (close_request_) return; - auto socket = std::make_shared(ioc_con_, ctx_); - auto ps = socket.get(); - acceptor_.value().async_accept( - ps->next_layer().next_layer(), - [this, socket = force_move(socket)] - (error_code ec) mutable { - if (ec) { - acceptor_.reset(); - if (h_error_) h_error_(ec); - return; - } - auto underlying_finished = std::make_shared(false); - auto tim = std::make_shared(ioc_con_); - tim->expires_after(underlying_connect_timeout_); - tim->async_wait( - [socket, tim, underlying_finished] - (error_code ec) { - if (*underlying_finished) return; - if (ec) return; - socket->post( - [socket] { + template + auto do_accept(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { + return as::async_compose)> ( + [this, + coro = as::coroutine(), + underlying_finished = false, + tim = as::steady_timer{ioc_con_, underlying_connect_timeout_}, + socket = std::shared_ptr{}, + sb = std::shared_ptr{}, + request = std::shared_ptr>{}] + (auto &self, boost::system::error_code ec = {}, size_t = 0) mutable + { + auto pGuard = shared_scope_guard([&](void) { + underlying_finished = true; + tim.cancel(); + } + ); + + BOOST_ASIO_CORO_REENTER(coro) + { + if (close_request_) return self.complete(ec, std::shared_ptr{}); + tim.async_wait([&](error_code ec) { + if (underlying_finished) return; + if (ec) return; boost::system::error_code close_ec; socket->lowest_layer().close(close_ec); } ); - } - ); - auto ps = socket.get(); - ps->next_layer().async_handshake( - as::ssl::stream_base::server, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { + socket = std::make_shared(ioc_con_, ctx_); + + BOOST_ASIO_CORO_YIELD + acceptor_.value().async_accept( + socket->lowest_layer(), + force_move(self)); + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } + + BOOST_ASIO_CORO_YIELD + socket->next_layer().async_handshake( + as::ssl::stream_base::server, + force_move(self)); if (ec) { - *underlying_finished = true; - tim->cancel(); - return; + return self.complete(ec, std::shared_ptr{}); + } + + sb = std::make_shared(); + request = std::make_shared>(); + + BOOST_ASIO_CORO_YIELD + boost::beast::http::async_read( + socket->next_layer(), + *sb, + *request, + force_move(self)); + if(ec || ! boost::beast::websocket::is_upgrade(*request)) + { + return self.complete(ec, std::shared_ptr{}); } - auto sb = std::make_shared(); - auto request = std::make_shared>(); - auto ps = socket.get(); - boost::beast::http::async_read( - ps->next_layer(), - *sb, - *request, - [this, socket = force_move(socket), sb, request, tim, underlying_finished] - (error_code ec, std::size_t) mutable { - if (ec) { - *underlying_finished = true; - tim->cancel(); - return; - } - if (!boost::beast::websocket::is_upgrade(*request)) { - *underlying_finished = true; - tim->cancel(); - return; - } - auto ps = socket.get(); #if BOOST_BEAST_VERSION >= 248 - auto it = request->find("Sec-WebSocket-Protocol"); - if (it != request->end()) { - ps->set_option( - boost::beast::websocket::stream_base::decorator( - [name = it->name(), value = it->value()] // name is enum, value is boost::string_view - (boost::beast::websocket::response_type& res) { - // This lambda is called before the scope out point *1 - res.set(name, value); - } - ) - ); - } - ps->async_accept( - *request, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - auto sp = std::make_shared(ioc_con_, force_move(socket), version_); - if (h_accept_) h_accept_(force_move(sp)); + auto it = request->find("Sec-WebSocket-Protocol"); + if (it != request->end()) { + socket->set_option( + boost::beast::websocket::stream_base::decorator( + [name = it->name(), value = it->value()] // name is enum, value is boost::string_view + (boost::beast::websocket::response_type& res) { + res.set(name, value); } - ); + ) + ); + } + + BOOST_ASIO_CORO_YIELD + socket->async_accept(*request, force_move(self)); #else // BOOST_BEAST_VERSION >= 248 - ps->async_accept_ex( + BOOST_ASIO_CORO_YIELD + socket->async_accept_ex( *request, - [request] + [&] (boost::beast::websocket::response_type& m) { auto it = request->find("Sec-WebSocket-Protocol"); if (it != request->end()) { m.insert(it->name(), it->value()); } }, - [this, socket = force_move(socket), tim, underlying_finished] - (error_code ec) mutable { - *underlying_finished = true; - tim->cancel(); - if (ec) { - return; - } - // TODO: The use of force_move on this line of code causes - // a static assertion that socket is a const object when - // TLS is enabled, and WS is enabled, with Boost 1.70, and gcc 8.3.0 - auto sp = std::make_shared(ioc_con_, socket, version_); - if (h_accept_) h_accept_(force_move(sp)); - } - ); + force_move(self)); #endif // BOOST_BEAST_VERSION >= 248 - // scope out point *1 - } - ); + if (ec) { + return self.complete(ec, std::shared_ptr{}); + } } - ); - do_accept(); - } - ); + return self.complete(ec, std::make_shared(ioc_con_, force_move(socket), version_)); + }, + handler, ioc_con_); } private: