diff --git a/src/core/search/ast_expr.h b/src/core/search/ast_expr.h index 4aec58c8cd0b..97e91759e077 100644 --- a/src/core/search/ast_expr.h +++ b/src/core/search/ast_expr.h @@ -109,7 +109,7 @@ struct AstNode : public NodeVariants { return *this; } - // Aggregations reduce and re-order result sets. + // Aggregations: KNN, SORTBY. They reorder result sets and optionally reduce them. bool IsAggregation() const { return std::holds_alternative(Variant()); } diff --git a/src/server/search/doc_index.cc b/src/server/search/doc_index.cc index e11dfab682ff..3dd73aab0026 100644 --- a/src/server/search/doc_index.cc +++ b/src/server/search/doc_index.cc @@ -56,24 +56,26 @@ const absl::flat_hash_map kSchemaTy {"NUMERIC"sv, search::SchemaField::NUMERIC}, {"VECTOR"sv, search::SchemaField::VECTOR}}; -size_t GetProbabilisticBound(size_t shards, size_t hits, size_t requested, bool is_aggregation) { - auto intlog2 = [](size_t x) { - size_t l = 0; - while (x >>= 1) - ++l; - return l; - }; - size_t avg_shard_min = hits * intlog2(hits) / (12 + shards / 10); +size_t GetProbabilisticBound(size_t hits, size_t requested, optional agg) { + auto intlog2 = [](size_t x) { return std::__bit_width(x); }; + size_t shards = shard_set->size(); + + // Estimate how much every shard has with at least 99% prob + size_t avg_shard_min = hits * intlog2(hits) / (12 + shard_set->size() / 10); avg_shard_min -= min(avg_shard_min, min(hits, size_t(5))); - // VLOG(0) << "PROB BOUND " << hits << " " << shards << " " << requested << " => " << - // avg_shard_min - // << " diffb " << requested / shards + 1 << " & " << requested; + // If it turns out that we might have not enough results to cover the request, don't skip any + if (avg_shard_min * shards < requested) + return requested; - if (!is_aggregation && avg_shard_min * shards >= requested) - return requested / shards + 1; + // If all shards have at least avg min, keep the bare minimum needed to cover the request + size_t limit = requested / shards + 1; - return min(hits, requested); + // Aggregations like SORTBY and KNN reorder the result and thus introduce some variance + if (agg.has_value()) + limit += max(requested / 4 + 1, 3UL); + + return limit; } } // namespace @@ -191,7 +193,7 @@ bool DocIndex::Matches(string_view key, unsigned obj_code) const { } ShardDocIndex::ShardDocIndex(shared_ptr index) - : base_{std::move(index)}, write_epoch_{0}, indices_{{}, nullptr}, key_index_{} { + : base_{std::move(index)}, indices_{{}, nullptr}, key_index_{}, write_epoch_{0} { } void ShardDocIndex::Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr) { @@ -227,23 +229,20 @@ io::Result ShardDocIndex::Search( return nonstd::make_unexpected(facade::ErrorReply(std::move(search_results.error))); size_t requested_count = params.limit_offset + params.limit_total; - size_t serialize_count = min(requested_count, search_results.ids.size()); - - size_t cuttoff_bound = serialize_count; - if (params.enable_cutoff && !params.IdsOnly()) - cuttoff_bound = - GetProbabilisticBound(params.num_shards, search_results.ids.size(), requested_count, - search_algo->HasAggregation().has_value()); + size_t return_count = min(requested_count, search_results.ids.size()); - VLOG(0) << "Requested " << requested_count << " got " << search_results.ids.size() << " cutoff " - << cuttoff_bound; + size_t cuttoff_bound = return_count; + if (params.enable_cutoff && !params.IdsOnly()) { + cuttoff_bound = GetProbabilisticBound(search_results.pre_aggregation_total, requested_count, + search_algo->HasAggregation()); + } - vector out(serialize_count); + vector out(return_count); auto shard_id = EngineShard::tlocal()->shard_id(); + auto& scores = search_results.scores; for (size_t i = 0; i < out.size(); i++) { out[i].value = DocResult::DocReference{shard_id, search_results.ids[i], i < cuttoff_bound}; - out[i].score = - search_results.scores.empty() ? search::ResultScore{} : std::move(search_results.scores[i]); + out[i].score = scores.empty() ? search::ResultScore{} : std::move(scores[i]); } Serialize(op_args, params, absl::MakeSpan(out)); @@ -254,14 +253,18 @@ io::Result ShardDocIndex::Search( bool ShardDocIndex::Refill(const OpArgs& op_args, const SearchParams& params, search::SearchAlgorithm* search_algo, SearchResult* result) const { + // If no writes occured, serialize remaining entries without breaking correctness if (result->write_epoch == write_epoch_) { Serialize(op_args, params, absl::MakeSpan(result->docs)); return true; } + // We're already on the cold path and we don't wanna gamble any more DCHECK(!params.enable_cutoff); + auto new_result = Search(op_args, params, search_algo); - CHECK(new_result.has_value()); + CHECK(new_result.has_value()); // Query should be valid since it passed first step + *result = std::move(new_result.value()); return false; } diff --git a/src/server/search/doc_index.h b/src/server/search/doc_index.h index 389dbcf2bfc9..847a7ef3b98d 100644 --- a/src/server/search/doc_index.h +++ b/src/server/search/doc_index.h @@ -25,38 +25,36 @@ using SearchDocData = absl::flat_hash_map ParseSearchFieldType(std::string_view name); std::string_view SearchFieldTypeToString(search::SchemaField::FieldType); +// Represents results returned from a shard doc index that are then aggregated in the coordinator. struct DocResult { + // Fully serialized value ready to be sent back. struct SerializedValue { std::string key; SearchDocData values; }; + // Reference to a document that matched the query, but it's serialization was the document was + // considered unlikely to be contained in the reply. struct DocReference { ShardId shard_id; search::DocId doc_id; bool requested; }; - std::variant value; - search::ResultScore score; - bool operator<(const DocResult& other) const; bool operator>=(const DocResult& other) const; + + public: + std::variant value; + search::ResultScore score; }; struct SearchResult { - size_t write_epoch = 0; // Write epoch of the index during on the result was created + size_t write_epoch = 0; // Write epoch of the index on which the result was created size_t total_hits = 0; // total number of hits in shard std::vector docs; // serialized documents of first hits - // After combining results from multiple shards and accumulating more documents than initially - // requested, only a subset of all documents will be sent back to the client, - // so it doesn't make sense to serialize strictly all documents in every shard ahead. - // Instead, only documents up to a probablistic bound are serialized, the - // leftover ids and scores are stored in the cutoff tail for use in the "unlikely" scenario. - // size_t num_cutoff = 0; - std::optional profile; }; @@ -64,25 +62,24 @@ struct SearchParams { using FieldReturnList = std::vector>; - // Parameters for "LIMIT offset total": select total amount documents with a specific offset from - // the whole result set + bool IdsOnly() const { + return return_fields && return_fields->empty(); + } + + bool ShouldReturnField(std::string_view field) const; + + public: + // Parameters for "LIMIT offset total": select total amount documents with a specific offset. size_t limit_offset = 0; size_t limit_total = 10; - // Total number of shards, used in probabilistic queries - size_t num_shards = 0; + // Pprobabilistic optimizations that avoid serializing documents unlikely to be returned. bool enable_cutoff = false; - // Set but empty means no fields should be returned - std::optional return_fields; + std::optional return_fields; // Set but empty means no fields should be returned + std::optional sort_option; search::QueryParams query_params; - - bool IdsOnly() const { - return return_fields && return_fields->empty(); - } - - bool ShouldReturnField(std::string_view field) const; }; // Stores basic info about a document index. @@ -139,6 +136,8 @@ class ShardDocIndex { const SearchParams& params, search::SearchAlgorithm* search_algo) const; + // Resolve requested doc references from the result. If no writes occured, the remaining + // entries are serialized and true is returned, otherwise a full new query is performed. bool Refill(const OpArgs& op_args, const SearchParams& params, search::SearchAlgorithm* search_algo, SearchResult* result) const; @@ -154,14 +153,17 @@ class ShardDocIndex { // Clears internal data. Traverses all matching documents and assigns ids. void Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr); + // Serialize prefix of requested doc references. void Serialize(const OpArgs& op_args, const SearchParams& params, absl::Span docs) const; private: std::shared_ptr base_; - size_t write_epoch_; search::FieldIndices indices_; DocKeyIndex key_index_; + + // Incremented during each Add/Remove. Used to track if changes occured since last read. + size_t write_epoch_; }; // Stores shard doc indices by name on a specific shard. diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index 68215a5ae751..0da234461c0e 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -233,39 +233,51 @@ struct MultishardSearch { } void RunAndReply() { - params_.enable_cutoff = true; - params_.num_shards = shard_set->size(); + // First, run search with probabilistic optimizations enabled. + // If the result set was collected successfuly, reply. + { + params_.enable_cutoff = true; - if (auto err = RunSearch(); err) - return (*cntx_)->SendError(std::move(*err)); + if (auto err = RunSearch(); err) + return (*cntx_)->SendError(std::move(*err)); - auto incomplete_shards = BuildOrder(); - if (incomplete_shards.empty()) - return Reply(); - - params_.enable_cutoff = false; + auto incomplete_shards = BuildOrder(); + if (incomplete_shards.empty()) + return Reply(); + } VLOG(0) << "Failed completness check, refilling"; - auto refill_res = RunRefill(); - if (!refill_res.has_value()) - return (*cntx_)->SendError(std::move(refill_res.error())); + // Otherwise, some results made it into the result set but were not serialized. + // Try refilling the requested values. If no reordering occured, reply immediately, otherwise + // try building a new order and reply if it is valid. + { + params_.enable_cutoff = false; - if (bool no_reordering = refill_res.value(); no_reordering) - return Reply(); + auto refill_res = RunRefill(); + if (!refill_res.has_value()) + return (*cntx_)->SendError(std::move(refill_res.error())); - VLOG(0) << "Failed refill, rebuilding"; + if (bool no_reordering = refill_res.value(); no_reordering) + return Reply(); + + if (auto incomplete_shards = BuildOrder(); incomplete_shards.empty()) + return Reply(); + } - if (auto incomplete_shards = BuildOrder(); incomplete_shards.empty()) - return Reply(); + VLOG(0) << "Failed refill and rebuild, re-searching"; - VLOG(0) << "Failed rebuild, re-searching"; + // At this step all optimizations failed. Run search without any cutoffs. + { + DCHECK(!params_.enable_cutoff); - if (auto err = RunSearch(); err) - return (*cntx_)->SendError(std::move(*err)); - incomplete_shards = BuildOrder(); - DCHECK(incomplete_shards.empty()); - Reply(); + if (auto err = RunSearch(); err) + return (*cntx_)->SendError(std::move(*err)); + + auto incomplete_shards = BuildOrder(); + DCHECK(incomplete_shards.empty()); + Reply(); + } } struct ProfileInfo { @@ -318,8 +330,6 @@ struct MultishardSearch { bool ids_only = params_.IdsOnly(); size_t reply_size = ids_only ? (result_count + 1) : (result_count * 2 + 1); - VLOG(0) << "Reply size " << reply_size << " total count " << total_count; - (*cntx_)->StartArray(reply_size); (*cntx_)->SendLong(total_count); @@ -337,7 +347,9 @@ struct MultishardSearch { } } - template optional RunHandler(F&& f) { + // Run function f on all search indices, return first error + std::optional RunHandler( + std::function(EngineShard*, ShardDocIndex*)> f) { hops_++; AggregateValue> err; cntx_->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { @@ -387,6 +399,7 @@ struct MultishardSearch { return failed_refills == 0; } + // Build order from results collected from shards absl::flat_hash_set BuildOrder() { ordered_docs_.clear(); if (auto agg = search_algo_->HasAggregation(); agg) { diff --git a/src/server/search/search_family_test.cc b/src/server/search/search_family_test.cc index f6c356614d56..8056740d6573 100644 --- a/src/server/search/search_family_test.cc +++ b/src/server/search/search_family_test.cc @@ -471,9 +471,9 @@ TEST_F(SearchFamilyTest, FtProfile) { } } -vector> FillShard(ShardId sid, string_view prefix, size_t num) { +vector> FillShard(ShardId sid, string_view prefix, size_t num, size_t idx = 0) { vector> out; - size_t entries = 0, idx = 0; + size_t entries = 0; while (entries < num) { auto key = absl::StrCat(prefix, idx++); if (Shard(key, shard_set->size()) == sid) { @@ -613,10 +613,24 @@ TEST_F(SearchFamilyTest, MultiShardRefillRepeat) { fb.Join(); } -// must have changed +TEST_F(SearchFamilyTest, MultiShardAggregation) { + // Place 50 keys on shards 0 and 1, but values on shard 1 have a larger value + for (auto cmd : FillShard(0, "doc", 50, 0)) + Run(absl::MakeSpan(cmd)); + + for (auto cmd : FillShard(1, "doc", 50, 100)) + Run(absl::MakeSpan(cmd)); + + Run({"ft.create", "i1", "schema", "idx", "numeric", "sortable"}); -// s0 -> refill -// s1 -> re-search -> delete some -> + // The distribution is completely unbalanced, so getting the largest vlaues should require two + // hops + auto resp = Run( + {"ft.profile", "i1", "SEARCH", "QUERY", "*", "LIMIT", "0", "20", "SORTBY", "idx", "DESC"}); + auto stats = resp.GetVec()[0].GetVec(); + EXPECT_EQ(stats[8], "hops"); + EXPECT_THAT(stats[9], IntArg(2)); +} TEST_F(SearchFamilyTest, SimpleExpiry) { EXPECT_EQ(Run({"ft.create", "i1", "schema", "title", "text", "expires-in", "numeric"}), "OK"); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index fff6085d00da..e34c077c0cbe 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -321,7 +321,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { if ((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0) EnableAllShards(); else - EnableShard(0); + EnableSingleShard(0); return OpStatus::OK; } @@ -423,7 +423,7 @@ string Transaction::DebugId() const { void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args) { multi_.reset(); InitBase(db, args); - EnableShard(sid); + EnableSingleShard(sid); OpResult key_index = DetermineKeys(cid_, args); CHECK(key_index); StoreKeysInArgs(*key_index, false); @@ -910,7 +910,7 @@ void Transaction::Refurbish() { cb_ptr_ = nullptr; } -void Transaction::EnableShard(ShardId sid) { +void Transaction::EnableSingleShard(ShardId sid) { unique_shard_cnt_ = 1; unique_shard_id_ = sid; shard_data_.resize(1); diff --git a/src/server/transaction.h b/src/server/transaction.h index 3ec12bea2704..f7f2ff706aea 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -320,9 +320,11 @@ class Transaction { bool multi_commands, bool allow_await) const; void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; + // Reset all execution related data to make it possible for the transaction to be scheduled and + // executed again. void Refurbish(); - void EnableShard(ShardId sid); + void EnableSingleShard(ShardId sid); void EnableAllShards(); private: