diff --git a/include/boost/redis/adapter/any_adapter.hpp b/include/boost/redis/adapter/any_adapter.hpp index a5b51b51..a32e2545 100644 --- a/include/boost/redis/adapter/any_adapter.hpp +++ b/include/boost/redis/adapter/any_adapter.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -34,24 +34,41 @@ namespace boost::redis { */ class any_adapter { public: - using fn_type = std::function; + /** @brief Parse events that an adapter must support. + */ + enum class parse_event + { + /// Called before the parser starts processing data + init, + /// Called for each and every node of RESP3 data + node, + /// Called when done processing a complete RESP3 message + done + }; - struct impl_t { - fn_type adapt_fn; - std::size_t supported_response_size; - } impl_; + /// The type erased implementation type. + using impl_t = std::function; template static auto create_impl(T& resp) -> impl_t { using namespace boost::redis::adapter; - auto adapter = boost_redis_adapt(resp); - std::size_t size = adapter.get_supported_response_size(); - return {std::move(adapter), size}; + return [adapter2 = boost_redis_adapt(resp)]( + any_adapter::parse_event ev, + resp3::node_view const& nd, + system::error_code& ec) mutable { + switch (ev) { + case parse_event::init: adapter2.on_init(); break; + case parse_event::node: adapter2.on_node(nd, ec); break; + case parse_event::done: adapter2.on_done(); break; + } + }; } - template - friend class basic_connection; + /// Contructs from a type erased adaper + any_adapter(impl_t fn = [](parse_event, resp3::node_view const&, system::error_code&) { }) + : impl_{std::move(fn)} + { } /** * @brief Constructor. @@ -67,6 +84,29 @@ class any_adapter { explicit any_adapter(T& resp) : impl_(create_impl(resp)) { } + + /// Calls the implementation with the arguments `impl_(parse_event::init, ...);` + void on_init() + { + system::error_code ec; + impl_(parse_event::init, {}, ec); + }; + + /// Calls the implementation with the arguments `impl_(parse_event::done, ...);` + void on_done() + { + system::error_code ec; + impl_(parse_event::done, {}, ec); + }; + + /// Calls the implementation with the arguments `impl_(parse_event::node, ...);` + void on_node(resp3::node_view const& nd, system::error_code& ec) + { + impl_(parse_event::node, nd, ec); + }; + +private: + impl_t impl_; }; } // namespace boost::redis diff --git a/include/boost/redis/adapter/detail/adapters.hpp b/include/boost/redis/adapter/detail/adapters.hpp index 5bb8af76..9c05b23d 100644 --- a/include/boost/redis/adapter/detail/adapters.hpp +++ b/include/boost/redis/adapter/detail/adapters.hpp @@ -147,8 +147,12 @@ class general_aggregate { explicit general_aggregate(Result* c = nullptr) : result_(c) { } + + void on_init() { } + void on_done() { } + template - void operator()(resp3::basic_node const& nd, system::error_code&) + void on_node(resp3::basic_node const& nd, system::error_code&) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); switch (nd.data_type) { @@ -180,8 +184,11 @@ class general_simple { : result_(t) { } + void on_init() { } + void on_done() { } + template - void operator()(resp3::basic_node const& nd, system::error_code&) + void on_node(resp3::basic_node const& nd, system::error_code&) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); switch (nd.data_type) { @@ -206,8 +213,11 @@ class simple_impl { public: void on_value_available(Result&) { } + void on_init() { } + void on_done() { } + template - void operator()(Result& result, resp3::basic_node const& node, system::error_code& ec) + void on_node(Result& result, resp3::basic_node const& node, system::error_code& ec) { if (is_aggregate(node.data_type)) { ec = redis::error::expects_resp3_simple_type; @@ -226,8 +236,11 @@ class set_impl { public: void on_value_available(Result& result) { hint_ = std::end(result); } + void on_init() { } + void on_done() { } + template - void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) + void on_node(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (nd.data_type != resp3::type::set) @@ -257,8 +270,11 @@ class map_impl { public: void on_value_available(Result& result) { current_ = std::end(result); } + void on_init() { } + void on_done() { } + template - void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) + void on_node(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (element_multiplicity(nd.data_type) != 2) @@ -292,8 +308,11 @@ class vector_impl { public: void on_value_available(Result&) { } + void on_init() { } + void on_done() { } + template - void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) + void on_node(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { auto const m = element_multiplicity(nd.data_type); @@ -313,8 +332,11 @@ class array_impl { public: void on_value_available(Result&) { } + void on_init() { } + void on_done() { } + template - void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) + void on_node(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (i_ != -1) { @@ -344,8 +366,11 @@ template struct list_impl { void on_value_available(Result&) { } + void on_init() { } + void on_done() { } + template - void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) + void on_node(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (!is_aggregate(nd.data_type)) { BOOST_ASSERT(nd.aggregate_size == 1); @@ -468,8 +493,11 @@ class wrapper> { } } + void on_init() { impl_.on_init(); } + void on_done() { impl_.on_done(); } + template - void operator()(resp3::basic_node const& nd, system::error_code& ec) + void on_node(resp3::basic_node const& nd, system::error_code& ec) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); @@ -480,7 +508,7 @@ class wrapper> { return; BOOST_ASSERT(result_); - impl_(result_->value(), nd, ec); + impl_.on_node(result_->value(), nd, ec); } }; @@ -514,8 +542,11 @@ class wrapper>> { : result_(o) { } + void on_init() { impl_.on_init(); } + void on_done() { impl_.on_done(); } + template - void operator()(resp3::basic_node const& nd, system::error_code& ec) + void on_node(resp3::basic_node const& nd, system::error_code& ec) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); @@ -533,7 +564,7 @@ class wrapper>> { impl_.on_value_available(result_->value().value()); } - impl_(result_->value().value(), nd, ec); + impl_.on_node(result_->value().value(), nd, ec); } }; diff --git a/include/boost/redis/adapter/detail/response_traits.hpp b/include/boost/redis/adapter/detail/response_traits.hpp index 66684a70..f24dc891 100644 --- a/include/boost/redis/adapter/detail/response_traits.hpp +++ b/include/boost/redis/adapter/detail/response_traits.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -8,6 +8,7 @@ #define BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP #include +#include #include #include @@ -21,26 +22,6 @@ namespace boost::redis::adapter::detail { -class ignore_adapter { -public: - template - void operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) - { - switch (nd.data_type) { - case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break; - case resp3::type::blob_error: ec = redis::error::resp3_blob_error; break; - case resp3::type::null: ec = redis::error::resp3_null; break; - default: ; - } - } - - [[nodiscard]] - auto get_supported_response_size() const noexcept - { - return static_cast(-1); - } -}; - template class static_adapter { private: @@ -50,51 +31,44 @@ class static_adapter { using adapters_array_type = std::array; adapters_array_type adapters_; + std::size_t i_ = 0; public: explicit static_adapter(Response& r) { assigner::assign(adapters_, r); } - [[nodiscard]] - auto get_supported_response_size() const noexcept - { - return size; - } - - template - void operator()(std::size_t i, resp3::basic_node const& nd, system::error_code& ec) + void on_init() { using std::visit; - // I am usure whether this should be an error or an assertion. - BOOST_ASSERT(i < adapters_.size()); visit( [&](auto& arg) { - arg(nd, ec); + arg.on_init(); }, - adapters_.at(i)); + adapters_.at(i_)); } -}; - -template -class vector_adapter { -private: - using adapter_type = typename result_traits::adapter_type; - adapter_type adapter_; - -public: - explicit vector_adapter(Vector& v) - : adapter_{internal_adapt(v)} - { } - [[nodiscard]] - auto get_supported_response_size() const noexcept + void on_done() { - return static_cast(-1); + using std::visit; + visit( + [&](auto& arg) { + arg.on_done(); + }, + adapters_.at(i_)); + i_ += 1; } template - void operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) + void on_node(resp3::basic_node const& nd, system::error_code& ec) { - adapter_(nd, ec); + using std::visit; + + // I am usure whether this should be an error or an assertion. + BOOST_ASSERT(i_ < adapters_.size()); + visit( + [&](auto& arg) { + arg.on_node(nd, ec); + }, + adapters_.at(i_)); } }; @@ -104,25 +78,25 @@ struct response_traits; template <> struct response_traits { using response_type = ignore_t; - using adapter_type = detail::ignore_adapter; + using adapter_type = ignore; - static auto adapt(response_type&) noexcept { return detail::ignore_adapter{}; } + static auto adapt(response_type&) noexcept { return ignore{}; } }; template <> struct response_traits> { using response_type = result; - using adapter_type = detail::ignore_adapter; + using adapter_type = ignore; - static auto adapt(response_type&) noexcept { return detail::ignore_adapter{}; } + static auto adapt(response_type&) noexcept { return ignore{}; } }; template struct response_traits, Allocator>>> { using response_type = result, Allocator>>; - using adapter_type = vector_adapter; + using adapter_type = general_aggregate; - static auto adapt(response_type& v) noexcept { return adapter_type{v}; } + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } }; template @@ -133,35 +107,6 @@ struct response_traits> { static auto adapt(response_type& r) noexcept { return adapter_type{r}; } }; -template -class wrapper { -public: - explicit wrapper(Adapter adapter) - : adapter_{adapter} - { } - - template - void operator()(resp3::basic_node const& nd, system::error_code& ec) - { - return adapter_(0, nd, ec); - } - - [[nodiscard]] - auto get_supported_response_size() const noexcept - { - return adapter_.get_supported_response_size(); - } - -private: - Adapter adapter_; -}; - -template -auto make_adapter_wrapper(Adapter adapter) -{ - return wrapper{adapter}; -} - } // namespace boost::redis::adapter::detail #endif // BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP diff --git a/include/boost/redis/adapter/detail/result_traits.hpp b/include/boost/redis/adapter/detail/result_traits.hpp index 723ca2aa..da625c99 100644 --- a/include/boost/redis/adapter/detail/result_traits.hpp +++ b/include/boost/redis/adapter/detail/result_traits.hpp @@ -132,8 +132,32 @@ class static_aggregate_adapter> { } } + void on_init() + { + using std::visit; + for (auto& adapter : adapters_) { + visit( + [&](auto& arg) { + arg.on_init(); + }, + adapter); + } + } + + void on_done() + { + using std::visit; + for (auto& adapter : adapters_) { + visit( + [&](auto& arg) { + arg.on_done(); + }, + adapter); + } + } + template - void operator()(resp3::basic_node const& elem, system::error_code& ec) + void on_node(resp3::basic_node const& elem, system::error_code& ec) { using std::visit; @@ -148,9 +172,9 @@ class static_aggregate_adapter> { visit( [&](auto& arg) { - arg(elem, ec); + arg.on_node(elem, ec); }, - adapters_[i_]); + adapters_.at(i_)); count(elem); } }; diff --git a/include/boost/redis/adapter/ignore.hpp b/include/boost/redis/adapter/ignore.hpp index 0979c9fc..9a524930 100644 --- a/include/boost/redis/adapter/ignore.hpp +++ b/include/boost/redis/adapter/ignore.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -19,7 +19,10 @@ namespace boost::redis::adapter { * RESP3 errors won't be ignored. */ struct ignore { - void operator()(resp3::basic_node const& nd, system::error_code& ec) + void on_init() { } + void on_done() { } + + void on_node(resp3::basic_node const& nd, system::error_code& ec) { switch (nd.data_type) { case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break; diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 93b7e7fd..57df0915 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -138,7 +138,7 @@ struct connection_impl { , health_checker_{ex} , logger_{std::move(lgr)} { - mpx_.set_receive_response(ignore); + set_receive_adapter(any_adapter{ignore}); writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); // Reserve some memory to avoid excessive memory allocations in @@ -187,13 +187,8 @@ struct connection_impl { template auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token) { - auto& adapter_impl = adapter.impl_; - BOOST_ASSERT_MSG( - req.get_expected_responses() <= adapter_impl.supported_response_size, - "Request and response have incompatible sizes."); - - auto notifier = std::make_shared(writer_timer_.get_executor(), 1); - auto info = make_elem(req, std::move(adapter_impl.adapt_fn)); + auto notifier = std::make_shared(get_executor(), 1); + auto info = make_elem(req, std::move(adapter)); info->set_done_callback([notifier]() { notifier->try_send(std::error_code{}, 0); @@ -204,6 +199,8 @@ struct connection_impl { token, writer_timer_); } + + void set_receive_adapter(any_adapter adapter) { mpx_.set_receive_adapter(std::move(adapter)); } }; template @@ -503,11 +500,10 @@ class basic_connection { executor_type ex, asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr = {}) - : impl_( - std::make_unique>( - std::move(ex), - std::move(ctx), - std::move(lgr))) + : impl_(std::make_unique>( + std::move(ex), + std::move(ctx), + std::move(lgr))) { } /** @brief Constructor from an executor and a logger. @@ -765,7 +761,10 @@ class basic_connection { class CompletionToken = asio::default_completion_token_t> auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {}) { - return this->async_exec(req, any_adapter(resp), std::forward(token)); + return this->async_exec( + req, + any_adapter{resp}, + std::forward(token)); } /** @brief Executes commands on the Redis server asynchronously. @@ -891,9 +890,9 @@ class basic_connection { /// Sets the response object of @ref async_receive operations. template - void set_receive_response(Response& response) + void set_receive_response(Response& resp) { - impl_->mpx_.set_receive_response(response); + impl_->set_receive_adapter(any_adapter{resp}); } /// Returns connection usage information. @@ -1070,7 +1069,10 @@ class connection { template auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {}) { - return async_exec(req, any_adapter(resp), std::forward(token)); + return async_exec( + req, + any_adapter{resp}, + std::forward(token)); } /** diff --git a/include/boost/redis/detail/health_checker.hpp b/include/boost/redis/detail/health_checker.hpp index 7e4bde72..c4a7eccb 100644 --- a/include/boost/redis/detail/health_checker.hpp +++ b/include/boost/redis/detail/health_checker.hpp @@ -51,7 +51,10 @@ class ping_op { } BOOST_ASIO_CORO_YIELD - conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self)); + conn_->async_exec( + checker_->req_, + any_adapter{checker_->resp_}, + std::move(self)); if (ec) { conn_->logger_.trace("ping_op (3)", ec); checker_->wait_timer_.cancel(); diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index 8ee643d6..1c495759 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -8,6 +8,7 @@ #define BOOST_REDIS_MULTIPLEXER_HPP #include +#include #include #include #include @@ -34,13 +35,9 @@ using tribool = std::optional; class multiplexer { public: - using adapter_type = std::function; - using pipeline_adapter_type = std::function< - void(std::size_t, resp3::node_view const&, system::error_code&)>; - struct elem { public: - explicit elem(request const& req, pipeline_adapter_type adapter); + explicit elem(request const& req, any_adapter adapter); void set_done_callback(std::function f) noexcept { done_ = std::move(f); }; @@ -92,7 +89,7 @@ class multiplexer { auto commit_response(std::size_t read_size) -> void; - auto get_adapter() -> adapter_type& { return adapter_; } + auto get_adapter() -> any_adapter& { return adapter_; } private: enum class status @@ -104,8 +101,7 @@ class multiplexer { }; request const* req_; - adapter_type adapter_; - + any_adapter adapter_; std::function done_; // Contains the number of commands that haven't been read yet. @@ -158,15 +154,7 @@ class multiplexer { return std::string_view{write_buffer_}; } - // TODO: Change signature to receive an adapter instead of a - // response. - template - void set_receive_response(Response& response) - { - using namespace boost::redis::adapter; - auto g = boost_redis_adapt(response); - receive_adapter_ = adapter::detail::make_adapter_wrapper(g); - } + void set_receive_adapter(any_adapter adapter); [[nodiscard]] auto get_usage() const noexcept -> usage @@ -199,11 +187,10 @@ class multiplexer { bool on_push_ = false; bool cancel_run_called_ = false; usage usage_; - adapter_type receive_adapter_; + any_adapter receive_adapter_; }; -auto make_elem(request const& req, multiplexer::pipeline_adapter_type adapter) - -> std::shared_ptr; +auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr; } // namespace detail } // namespace boost::redis diff --git a/include/boost/redis/detail/resp3_handshaker.hpp b/include/boost/redis/detail/resp3_handshaker.hpp index 05edc795..06eef7ae 100644 --- a/include/boost/redis/detail/resp3_handshaker.hpp +++ b/include/boost/redis/detail/resp3_handshaker.hpp @@ -42,8 +42,9 @@ struct hello_op { BOOST_ASIO_CORO_YIELD conn_->async_exec( handshaker_->hello_req_, - any_adapter(handshaker_->hello_resp_), + any_adapter{handshaker_->hello_resp_}, std::move(self)); + conn_->logger_.on_hello(ec, handshaker_->hello_resp_); if (ec) { diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index a4007c2e..c77e2d86 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -11,19 +11,14 @@ namespace boost::redis::detail { -multiplexer::elem::elem(request const& req, pipeline_adapter_type adapter) +multiplexer::elem::elem(request const& req, any_adapter adapter) : req_{&req} -, adapter_{} +, adapter_{std::move(adapter)} , remaining_responses_{req.get_expected_responses()} , status_{status::waiting} , ec_{} , read_size_{0} -{ - adapter_ = [this, adapter](resp3::node_view const& nd, system::error_code& ec) { - auto const i = req_->get_expected_responses() - remaining_responses_; - adapter(i, nd, ec); - }; -} +{ } auto multiplexer::elem::notify_error(system::error_code ec) noexcept -> void { @@ -314,8 +309,12 @@ bool multiplexer::is_waiting_response() const noexcept bool multiplexer::is_writing() const noexcept { return !write_buffer_.empty(); } -auto make_elem(request const& req, multiplexer::pipeline_adapter_type adapter) - -> std::shared_ptr +void multiplexer::set_receive_adapter(any_adapter adapter) +{ + receive_adapter_ = std::move(adapter); +} + +auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr { return std::make_shared(req, std::move(adapter)); } diff --git a/include/boost/redis/resp3/impl/parser.ipp b/include/boost/redis/resp3/impl/parser.ipp index c53920d5..2f34c8a6 100644 --- a/include/boost/redis/resp3/impl/parser.ipp +++ b/include/boost/redis/resp3/impl/parser.ipp @@ -27,11 +27,10 @@ parser::parser() { reset(); } void parser::reset() { depth_ = 0; - sizes_ = {{1}}; - bulk_length_ = (std::numeric_limits::max)(); + sizes_ = default_sizes; + bulk_length_ = default_bulk_length; bulk_ = type::invalid; consumed_ = 0; - sizes_[0] = 2; // The sentinel must be more than 1. } std::size_t parser::get_consumed() const noexcept { return consumed_; } @@ -195,4 +194,13 @@ auto parser::consume_impl(type t, std::string_view elem, system::error_code& ec) return ret; } + +bool parser::is_parsing() const noexcept +{ + auto const v = depth_ == 0 && sizes_ == default_sizes && bulk_length_ == default_bulk_length && + bulk_ == type::invalid && consumed_ == 0; + + return !v; +} + } // namespace boost::redis::resp3 diff --git a/include/boost/redis/resp3/parser.hpp b/include/boost/redis/resp3/parser.hpp index 52018074..33436d79 100644 --- a/include/boost/redis/resp3/parser.hpp +++ b/include/boost/redis/resp3/parser.hpp @@ -27,6 +27,14 @@ class parser { static constexpr std::string_view sep = "\r\n"; private: + using sizes_type = std::array; + + // sizes_[0] = 2 because the sentinel must be more than 1. + static constexpr sizes_type default_sizes = { + {2, 1, 1, 1, 1, 1} + }; + static constexpr auto default_bulk_length = static_cast(-1); + // The current depth. Simple data types will have depth 0, whereas // the elements of aggregates will have depth 1. Embedded types // will have increasing depth. @@ -35,7 +43,7 @@ class parser { // The parser supports up to 5 levels of nested structures. The // first element in the sizes stack is a sentinel and must be // different from 1. - std::array sizes_; + sizes_type sizes_; // Contains the length expected in the next bulk read. std::size_t bulk_length_; @@ -72,14 +80,21 @@ class parser { auto consume(std::string_view view, system::error_code& ec) noexcept -> result; void reset(); + + bool is_parsing() const noexcept; }; // Returns false if more data is needed. If true is returned the // parser is either done or an error occured, that can be checked on // ec. template -bool parse(resp3::parser& p, std::string_view const& msg, Adapter& adapter, system::error_code& ec) +bool parse(parser& p, std::string_view const& msg, Adapter& adapter, system::error_code& ec) { + // This if could be avoid with a state machine that jumps into the + // correct position. + if (!p.is_parsing()) + adapter.on_init(); + while (!p.done()) { auto const res = p.consume(msg, ec); if (ec) @@ -88,11 +103,12 @@ bool parse(resp3::parser& p, std::string_view const& msg, Adapter& adapter, syst if (!res) return false; - adapter(res.value(), ec); + adapter.on_node(res.value(), ec); if (ec) return true; } + adapter.on_done(); return true; } diff --git a/include/boost/redis/resp3/serialization.hpp b/include/boost/redis/resp3/serialization.hpp index de35cab3..72de1b28 100644 --- a/include/boost/redis/resp3/serialization.hpp +++ b/include/boost/redis/resp3/serialization.hpp @@ -108,6 +108,8 @@ namespace detail { template void deserialize(std::string_view const& data, Adapter adapter, system::error_code& ec) { + adapter.on_init(); + parser parser; while (!parser.done()) { auto const res = parser.consume(data, ec); @@ -116,12 +118,14 @@ void deserialize(std::string_view const& data, Adapter adapter, system::error_co BOOST_ASSERT(res.has_value()); - adapter(res.value(), ec); + adapter.on_node(res.value(), ec); if (ec) return; } BOOST_ASSERT(parser.get_consumed() == std::size(data)); + + adapter.on_done(); } template diff --git a/include/boost/redis/response.hpp b/include/boost/redis/response.hpp index 70fcd032..94fdd63d 100644 --- a/include/boost/redis/response.hpp +++ b/include/boost/redis/response.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 24cf9410..e9525e62 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -44,7 +44,6 @@ make_test(test_reader_fsm) # Tests that require a real Redis server make_test(test_conn_quit) -make_test(test_conn_tls) make_test(test_conn_exec_retry) make_test(test_conn_exec_error) make_test(test_run) @@ -59,6 +58,7 @@ make_test(test_conn_move) make_test(test_issue_50) make_test(test_issue_181) make_test(test_conversions) +make_test(test_conn_tls) make_test(test_unix_sockets) # Coverage diff --git a/test/test_any_adapter.cpp b/test/test_any_adapter.cpp index b360b744..f0345ac1 100644 --- a/test/test_any_adapter.cpp +++ b/test/test_any_adapter.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -16,6 +16,7 @@ using boost::redis::generic_response; using boost::redis::response; using boost::redis::ignore; using boost::redis::any_adapter; +using boost::redis::any_adapter; BOOST_AUTO_TEST_CASE(any_adapter_response_types) { @@ -34,13 +35,13 @@ BOOST_AUTO_TEST_CASE(any_adapter_copy_move) { // any_adapter can be copied/moved response r; - any_adapter ad1{r}; + auto ad1 = any_adapter{r}; // copy constructor - any_adapter ad2{ad1}; + auto ad2 = any_adapter(ad1); // move constructor - any_adapter ad3{std::move(ad2)}; + auto ad3 = any_adapter(std::move(ad2)); // copy assignment BOOST_CHECK_NO_THROW(ad2 = ad1); diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index e02c3bce..4c69cdda 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -31,6 +31,7 @@ using boost::redis::ignore; using boost::redis::operation; using boost::redis::request; using boost::redis::response; +using boost::redis::any_adapter; using boost::system::error_code; using namespace std::chrono_literals; @@ -229,7 +230,7 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter) bool finished = false; - conn->async_exec(req, boost::redis::any_adapter(res), [&](error_code ec, std::size_t) { + conn->async_exec(req, res, [&](error_code ec, std::size_t) { BOOST_TEST(ec == error_code()); conn->cancel(); finished = true; @@ -242,4 +243,4 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter) BOOST_TEST(std::get<0>(res).value() == "PONG"); } -} // namespace \ No newline at end of file +} // namespace diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 8533fc0b..3708cfd2 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -180,19 +180,15 @@ struct response_error_tag { }; response_error_tag error_tag_obj; struct response_error_adapter { - void operator()( - std::size_t, + void on_init() { } + void on_done() { } + + void on_node( boost::redis::resp3::basic_node const&, boost::system::error_code& ec) { ec = boost::redis::error::incompatible_size; } - - [[nodiscard]] - auto get_supported_response_size() const noexcept - { - return static_cast(-1); - } }; auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index 7f3711c5..1d73864b 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -83,10 +83,8 @@ struct elem_and_request { { // Empty requests are not valid. The request needs to be populated before creating the element req.push("get", "mykey"); + elm = std::make_shared(req, any_adapter{}); - elm = std::make_shared( - req, - [](std::size_t, resp3::node_view const&, error_code&) { }); elm->set_done_callback([this] { ++done_calls; }); diff --git a/test/test_low_level.cpp b/test/test_low_level.cpp index c50d2b0f..c173d750 100644 --- a/test/test_low_level.cpp +++ b/test/test_low_level.cpp @@ -595,8 +595,12 @@ BOOST_AUTO_TEST_CASE(adapter) response resp; auto f = boost_redis_adapt(resp); - f(0, resp3::basic_node{type::simple_string, 1, 0, "Hello"}, ec); - f(1, resp3::basic_node{type::number, 1, 0, "42"}, ec); + f.on_init(); + f.on_node(resp3::node_view{type::simple_string, 1, 0, "Hello"}, ec); + f.on_done(); + f.on_init(); + f.on_node(resp3::node_view{type::number, 1, 0, "42"}, ec); + f.on_done(); BOOST_CHECK_EQUAL(std::get<0>(resp).value(), "Hello"); BOOST_TEST(!ec); @@ -614,7 +618,7 @@ BOOST_AUTO_TEST_CASE(adapter_as) for (auto const& e : set_expected1a.value()) { error_code ec; - adapter(e, ec); + adapter.on_node(e, ec); } } diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 73e1cdda..9b198c4b 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -20,28 +20,33 @@ #include #include -using boost::redis::request; -using boost::redis::config; -using boost::redis::detail::push_hello; -using boost::redis::response; using boost::redis::adapter::adapt2; using boost::redis::adapter::result; -using boost::redis::resp3::detail::deserialize; -using boost::redis::ignore_t; +using boost::redis::config; using boost::redis::detail::multiplexer; +using boost::redis::detail::push_hello; using boost::redis::generic_response; +using boost::redis::ignore_t; +using boost::redis::request; +using boost::redis::resp3::detail::deserialize; using boost::redis::resp3::node; using boost::redis::resp3::to_string; +using boost::redis::response; using boost::redis::any_adapter; using boost::system::error_code; +#define RESP3_SET_PART1 "~6\r\n+orange\r" +#define RESP3_SET_PART2 "\n+apple\r\n+one" +#define RESP3_SET_PART3 "\r\n+two\r" +#define RESP3_SET_PART4 "\n+three\r\n+orange\r\n" +char const* resp3_set = RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3 RESP3_SET_PART4; + BOOST_AUTO_TEST_CASE(low_level_sync_sans_io) { try { result> resp; - char const* wire = "~6\r\n+orange\r\n+apple\r\n+one\r\n+two\r\n+three\r\n+orange\r\n"; - deserialize(wire, adapt2(resp)); + deserialize(resp3_set, adapt2(resp)); for (auto const& e : resp.value()) std::cout << e << std::endl; @@ -260,7 +265,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push) { multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); boost::system::error_code ec; auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec); @@ -282,7 +287,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) { multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); std::string msg; // Only part of the message. @@ -318,7 +323,7 @@ struct test_item { // to Redis. req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg"); - elem_ptr = std::make_shared(req, any_adapter(resp).impl_.adapt_fn); + elem_ptr = std::make_shared(req, any_adapter{resp}); elem_ptr->set_done_callback([this]() { done = true; @@ -462,3 +467,47 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size) BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u); } + +BOOST_AUTO_TEST_CASE(check_counter_adapter) +{ + using boost::redis::any_adapter; + using boost::redis::resp3::parse; + using boost::redis::resp3::parser; + using boost::redis::resp3::node_view; + using boost::system::error_code; + + int init = 0; + int node = 0; + int done = 0; + + auto counter_adapter = [&](any_adapter::parse_event ev, node_view const&, error_code&) mutable { + switch (ev) { + case any_adapter::parse_event::init: init++; break; + case any_adapter::parse_event::node: node++; break; + case any_adapter::parse_event::done: done++; break; + } + }; + + any_adapter wrapped{any_adapter::impl_t{counter_adapter}}; + + error_code ec; + parser p; + + auto const ret1 = parse(p, RESP3_SET_PART1, wrapped, ec); + auto const ret2 = parse(p, RESP3_SET_PART1 RESP3_SET_PART2, wrapped, ec); + auto const ret3 = parse(p, RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3, wrapped, ec); + auto const ret4 = parse( + p, + RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3 RESP3_SET_PART4, + wrapped, + ec); + + BOOST_TEST(!ret1); + BOOST_TEST(!ret2); + BOOST_TEST(!ret3); + BOOST_TEST(ret4); + + BOOST_CHECK_EQUAL(init, 1); + BOOST_CHECK_EQUAL(node, 7); + BOOST_CHECK_EQUAL(done, 1); +} diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 58a25f0e..db11df7f 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -23,6 +23,7 @@ using redis::detail::reader_fsm; using redis::detail::multiplexer; using redis::detail::read_buffer; using redis::generic_response; +using redis::any_adapter; using action = redis::detail::reader_fsm::action; namespace boost::redis::detail { @@ -44,7 +45,7 @@ void test_push() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -92,7 +93,7 @@ void test_read_needs_more() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -138,7 +139,7 @@ void test_read_error() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -169,7 +170,7 @@ void test_parse_error() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -200,7 +201,7 @@ void test_push_deliver_error() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -236,7 +237,7 @@ void test_max_read_buffer_size() rbuf.set_config({5, 7}); multiplexer mpx; generic_response resp; - mpx.set_receive_response(resp); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act;