Skip to content

Commit

Permalink
fix: fixes, comments, polishment
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Oct 29, 2023
1 parent a1efa34 commit 883d326
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 88 deletions.
2 changes: 1 addition & 1 deletion src/core/search/ast_expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ struct AstNode : public NodeVariants {
return *this;
}

// Aggregations reduce and re-order result sets.
// Aggregations: KNN, SORTBY. They reorder result sets and optionally reduce them.
bool IsAggregation() const {
return std::holds_alternative<AstKnnNode>(Variant());
}
Expand Down
59 changes: 31 additions & 28 deletions src/server/search/doc_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,26 @@ const absl::flat_hash_map<string_view, search::SchemaField::FieldType> kSchemaTy
{"NUMERIC"sv, search::SchemaField::NUMERIC},
{"VECTOR"sv, search::SchemaField::VECTOR}};

size_t GetProbabilisticBound(size_t shards, size_t hits, size_t requested, bool is_aggregation) {
auto intlog2 = [](size_t x) {
size_t l = 0;
while (x >>= 1)
++l;
return l;
};
size_t avg_shard_min = hits * intlog2(hits) / (12 + shards / 10);
size_t GetProbabilisticBound(size_t hits, size_t requested, optional<search::AggregationInfo> agg) {
auto intlog2 = [](size_t x) { return std::__bit_width(x); };
size_t shards = shard_set->size();

// Estimate how much every shard has with at least 99% prob
size_t avg_shard_min = hits * intlog2(hits) / (12 + shard_set->size() / 10);
avg_shard_min -= min(avg_shard_min, min(hits, size_t(5)));

// VLOG(0) << "PROB BOUND " << hits << " " << shards << " " << requested << " => " <<
// avg_shard_min
// << " diffb " << requested / shards + 1 << " & " << requested;
// If it turns out that we might have not enough results to cover the request, don't skip any
if (avg_shard_min * shards < requested)
return requested;

if (!is_aggregation && avg_shard_min * shards >= requested)
return requested / shards + 1;
// If all shards have at least avg min, keep the bare minimum needed to cover the request
size_t limit = requested / shards + 1;

return min(hits, requested);
// Aggregations like SORTBY and KNN reorder the result and thus introduce some variance
if (agg.has_value())
limit += max(requested / 4 + 1, 3UL);

return limit;
}

} // namespace
Expand Down Expand Up @@ -191,7 +193,7 @@ bool DocIndex::Matches(string_view key, unsigned obj_code) const {
}

ShardDocIndex::ShardDocIndex(shared_ptr<DocIndex> index)
: base_{std::move(index)}, write_epoch_{0}, indices_{{}, nullptr}, key_index_{} {
: base_{std::move(index)}, indices_{{}, nullptr}, key_index_{}, write_epoch_{0} {
}

void ShardDocIndex::Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr) {
Expand Down Expand Up @@ -227,23 +229,20 @@ io::Result<SearchResult, facade::ErrorReply> ShardDocIndex::Search(
return nonstd::make_unexpected(facade::ErrorReply(std::move(search_results.error)));

size_t requested_count = params.limit_offset + params.limit_total;
size_t serialize_count = min(requested_count, search_results.ids.size());

size_t cuttoff_bound = serialize_count;
if (params.enable_cutoff && !params.IdsOnly())
cuttoff_bound =
GetProbabilisticBound(params.num_shards, search_results.ids.size(), requested_count,
search_algo->HasAggregation().has_value());
size_t return_count = min(requested_count, search_results.ids.size());

VLOG(0) << "Requested " << requested_count << " got " << search_results.ids.size() << " cutoff "
<< cuttoff_bound;
size_t cuttoff_bound = return_count;
if (params.enable_cutoff && !params.IdsOnly()) {
cuttoff_bound = GetProbabilisticBound(search_results.pre_aggregation_total, requested_count,
search_algo->HasAggregation());
}

vector<DocResult> out(serialize_count);
vector<DocResult> out(return_count);
auto shard_id = EngineShard::tlocal()->shard_id();
auto& scores = search_results.scores;
for (size_t i = 0; i < out.size(); i++) {
out[i].value = DocResult::DocReference{shard_id, search_results.ids[i], i < cuttoff_bound};
out[i].score =
search_results.scores.empty() ? search::ResultScore{} : std::move(search_results.scores[i]);
out[i].score = scores.empty() ? search::ResultScore{} : std::move(scores[i]);
}

Serialize(op_args, params, absl::MakeSpan(out));
Expand All @@ -254,14 +253,18 @@ io::Result<SearchResult, facade::ErrorReply> ShardDocIndex::Search(

bool ShardDocIndex::Refill(const OpArgs& op_args, const SearchParams& params,
search::SearchAlgorithm* search_algo, SearchResult* result) const {
// If no writes occured, serialize remaining entries without breaking correctness
if (result->write_epoch == write_epoch_) {
Serialize(op_args, params, absl::MakeSpan(result->docs));
return true;
}

// We're already on the cold path and we don't wanna gamble any more
DCHECK(!params.enable_cutoff);

auto new_result = Search(op_args, params, search_algo);
CHECK(new_result.has_value());
CHECK(new_result.has_value()); // Query should be valid since it passed first step

*result = std::move(new_result.value());
return false;
}
Expand Down
50 changes: 26 additions & 24 deletions src/server/search/doc_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,64 +25,61 @@ using SearchDocData = absl::flat_hash_map<std::string /*field*/, std::string /*v
std::optional<search::SchemaField::FieldType> ParseSearchFieldType(std::string_view name);
std::string_view SearchFieldTypeToString(search::SchemaField::FieldType);

// Represents results returned from a shard doc index that are then aggregated in the coordinator.
struct DocResult {
// Fully serialized value ready to be sent back.
struct SerializedValue {
std::string key;
SearchDocData values;
};

// Reference to a document that matched the query, but it's serialization was the document was
// considered unlikely to be contained in the reply.
struct DocReference {
ShardId shard_id;
search::DocId doc_id;
bool requested;
};

std::variant<SerializedValue, DocReference> value;
search::ResultScore score;

bool operator<(const DocResult& other) const;
bool operator>=(const DocResult& other) const;

public:
std::variant<SerializedValue, DocReference> value;
search::ResultScore score;
};

struct SearchResult {
size_t write_epoch = 0; // Write epoch of the index during on the result was created
size_t write_epoch = 0; // Write epoch of the index on which the result was created

size_t total_hits = 0; // total number of hits in shard
std::vector<DocResult> docs; // serialized documents of first hits

// After combining results from multiple shards and accumulating more documents than initially
// requested, only a subset of all documents will be sent back to the client,
// so it doesn't make sense to serialize strictly all documents in every shard ahead.
// Instead, only documents up to a probablistic bound are serialized, the
// leftover ids and scores are stored in the cutoff tail for use in the "unlikely" scenario.
// size_t num_cutoff = 0;

std::optional<search::AlgorithmProfile> profile;
};

struct SearchParams {
using FieldReturnList =
std::vector<std::pair<std::string /*identifier*/, std::string /*short name*/>>;

// Parameters for "LIMIT offset total": select total amount documents with a specific offset from
// the whole result set
bool IdsOnly() const {
return return_fields && return_fields->empty();
}

bool ShouldReturnField(std::string_view field) const;

public:
// Parameters for "LIMIT offset total": select total amount documents with a specific offset.
size_t limit_offset = 0;
size_t limit_total = 10;

// Total number of shards, used in probabilistic queries
size_t num_shards = 0;
// Pprobabilistic optimizations that avoid serializing documents unlikely to be returned.
bool enable_cutoff = false;

// Set but empty means no fields should be returned
std::optional<FieldReturnList> return_fields;
std::optional<FieldReturnList> return_fields; // Set but empty means no fields should be returned

std::optional<search::SortOption> sort_option;
search::QueryParams query_params;

bool IdsOnly() const {
return return_fields && return_fields->empty();
}

bool ShouldReturnField(std::string_view field) const;
};

// Stores basic info about a document index.
Expand Down Expand Up @@ -139,6 +136,8 @@ class ShardDocIndex {
const SearchParams& params,
search::SearchAlgorithm* search_algo) const;

// Resolve requested doc references from the result. If no writes occured, the remaining
// entries are serialized and true is returned, otherwise a full new query is performed.
bool Refill(const OpArgs& op_args, const SearchParams& params,
search::SearchAlgorithm* search_algo, SearchResult* result) const;

Expand All @@ -154,14 +153,17 @@ class ShardDocIndex {
// Clears internal data. Traverses all matching documents and assigns ids.
void Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr);

// Serialize prefix of requested doc references.
void Serialize(const OpArgs& op_args, const SearchParams& params,
absl::Span<DocResult> docs) const;

private:
std::shared_ptr<const DocIndex> base_;
size_t write_epoch_;
search::FieldIndices indices_;
DocKeyIndex key_index_;

// Incremented during each Add/Remove. Used to track if changes occured since last read.
size_t write_epoch_;
};

// Stores shard doc indices by name on a specific shard.
Expand Down
65 changes: 39 additions & 26 deletions src/server/search/search_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,39 +233,51 @@ struct MultishardSearch {
}

void RunAndReply() {
params_.enable_cutoff = true;
params_.num_shards = shard_set->size();
// First, run search with probabilistic optimizations enabled.
// If the result set was collected successfuly, reply.
{
params_.enable_cutoff = true;

if (auto err = RunSearch(); err)
return (*cntx_)->SendError(std::move(*err));
if (auto err = RunSearch(); err)
return (*cntx_)->SendError(std::move(*err));

auto incomplete_shards = BuildOrder();
if (incomplete_shards.empty())
return Reply();

params_.enable_cutoff = false;
auto incomplete_shards = BuildOrder();
if (incomplete_shards.empty())
return Reply();
}

VLOG(0) << "Failed completness check, refilling";

auto refill_res = RunRefill();
if (!refill_res.has_value())
return (*cntx_)->SendError(std::move(refill_res.error()));
// Otherwise, some results made it into the result set but were not serialized.
// Try refilling the requested values. If no reordering occured, reply immediately, otherwise
// try building a new order and reply if it is valid.
{
params_.enable_cutoff = false;

if (bool no_reordering = refill_res.value(); no_reordering)
return Reply();
auto refill_res = RunRefill();
if (!refill_res.has_value())
return (*cntx_)->SendError(std::move(refill_res.error()));

VLOG(0) << "Failed refill, rebuilding";
if (bool no_reordering = refill_res.value(); no_reordering)
return Reply();

if (auto incomplete_shards = BuildOrder(); incomplete_shards.empty())
return Reply();
}

if (auto incomplete_shards = BuildOrder(); incomplete_shards.empty())
return Reply();
VLOG(0) << "Failed refill and rebuild, re-searching";

VLOG(0) << "Failed rebuild, re-searching";
// At this step all optimizations failed. Run search without any cutoffs.
{
DCHECK(!params_.enable_cutoff);

if (auto err = RunSearch(); err)
return (*cntx_)->SendError(std::move(*err));
incomplete_shards = BuildOrder();
DCHECK(incomplete_shards.empty());
Reply();
if (auto err = RunSearch(); err)
return (*cntx_)->SendError(std::move(*err));

auto incomplete_shards = BuildOrder();
DCHECK(incomplete_shards.empty());
Reply();
}
}

struct ProfileInfo {
Expand Down Expand Up @@ -318,8 +330,6 @@ struct MultishardSearch {
bool ids_only = params_.IdsOnly();
size_t reply_size = ids_only ? (result_count + 1) : (result_count * 2 + 1);

VLOG(0) << "Reply size " << reply_size << " total count " << total_count;

(*cntx_)->StartArray(reply_size);
(*cntx_)->SendLong(total_count);

Expand All @@ -337,7 +347,9 @@ struct MultishardSearch {
}
}

template <typename F> optional<facade::ErrorReply> RunHandler(F&& f) {
// Run function f on all search indices, return first error
std::optional<facade::ErrorReply> RunHandler(
std::function<std::optional<ErrorReply>(EngineShard*, ShardDocIndex*)> f) {
hops_++;
AggregateValue<optional<facade::ErrorReply>> err;
cntx_->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
Expand Down Expand Up @@ -387,6 +399,7 @@ struct MultishardSearch {
return failed_refills == 0;
}

// Build order from results collected from shards
absl::flat_hash_set<ShardId> BuildOrder() {
ordered_docs_.clear();
if (auto agg = search_algo_->HasAggregation(); agg) {
Expand Down
24 changes: 19 additions & 5 deletions src/server/search/search_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,9 @@ TEST_F(SearchFamilyTest, FtProfile) {
}
}

vector<vector<string>> FillShard(ShardId sid, string_view prefix, size_t num) {
vector<vector<string>> FillShard(ShardId sid, string_view prefix, size_t num, size_t idx = 0) {
vector<vector<string>> out;
size_t entries = 0, idx = 0;
size_t entries = 0;
while (entries < num) {
auto key = absl::StrCat(prefix, idx++);
if (Shard(key, shard_set->size()) == sid) {
Expand Down Expand Up @@ -613,10 +613,24 @@ TEST_F(SearchFamilyTest, MultiShardRefillRepeat) {
fb.Join();
}

// must have changed
TEST_F(SearchFamilyTest, MultiShardAggregation) {
// Place 50 keys on shards 0 and 1, but values on shard 1 have a larger value
for (auto cmd : FillShard(0, "doc", 50, 0))
Run(absl::MakeSpan(cmd));

for (auto cmd : FillShard(1, "doc", 50, 100))
Run(absl::MakeSpan(cmd));

Run({"ft.create", "i1", "schema", "idx", "numeric", "sortable"});

// s0 -> refill
// s1 -> re-search -> delete some ->
// The distribution is completely unbalanced, so getting the largest vlaues should require two
// hops
auto resp = Run(
{"ft.profile", "i1", "SEARCH", "QUERY", "*", "LIMIT", "0", "20", "SORTBY", "idx", "DESC"});
auto stats = resp.GetVec()[0].GetVec();
EXPECT_EQ(stats[8], "hops");
EXPECT_THAT(stats[9], IntArg(2));
}

TEST_F(SearchFamilyTest, SimpleExpiry) {
EXPECT_EQ(Run({"ft.create", "i1", "schema", "title", "text", "expires-in", "numeric"}), "OK");
Expand Down
6 changes: 3 additions & 3 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
if ((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0)
EnableAllShards();
else
EnableShard(0);
EnableSingleShard(0);
return OpStatus::OK;
}

Expand Down Expand Up @@ -423,7 +423,7 @@ string Transaction::DebugId() const {
void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args) {
multi_.reset();
InitBase(db, args);
EnableShard(sid);
EnableSingleShard(sid);
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
CHECK(key_index);
StoreKeysInArgs(*key_index, false);
Expand Down Expand Up @@ -910,7 +910,7 @@ void Transaction::Refurbish() {
cb_ptr_ = nullptr;
}

void Transaction::EnableShard(ShardId sid) {
void Transaction::EnableSingleShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
shard_data_.resize(1);
Expand Down
Loading

0 comments on commit 883d326

Please sign in to comment.