Skip to content

Commit e2b2002

Browse files
authored
Switch to custom collector for aggregating trace IDs (#2783)
1 parent b944a5f commit e2b2002

File tree

7 files changed

+459
-76
lines changed

7 files changed

+459
-76
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-search/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ documentation = "https://quickwit.io/docs/"
1313
anyhow = { workspace = true }
1414
async-trait = { workspace = true }
1515
bytes = { workspace = true }
16+
fnv = { workspace = true }
1617
futures = { workspace = true }
1718
http = { workspace = true }
1819
fastfield_codecs = { workspace = true }

quickwit/quickwit-search/src/collector.rs

Lines changed: 135 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::sync::Arc;
2424
use itertools::Itertools;
2525
use quickwit_doc_mapper::{DocMapper, WarmupInfo};
2626
use quickwit_proto::{LeafSearchResponse, PartialHit, SearchRequest, SortOrder};
27+
use serde::Deserialize;
2728
use tantivy::aggregation::agg_req::{
2829
get_fast_field_names, get_term_dict_field_names, Aggregations,
2930
};
@@ -35,6 +36,7 @@ use tantivy::schema::Schema;
3536
use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader};
3637

3738
use crate::filters::{TimestampFilter, TimestampFilterBuilder};
39+
use crate::jaeger_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector};
3840
use crate::partial_hit_sorting_key;
3941

4042
#[derive(Clone, Debug)]
@@ -161,6 +163,11 @@ impl PartialEq for PartialHitHeapItem {
161163

162164
impl Eq for PartialHitHeapItem {}
163165

166+
enum AggregationSegmentCollectors {
167+
FindTraceIdsSegmentCollector(FindTraceIdsSegmentCollector),
168+
TantivyAggregationSegmentCollector(AggregationSegmentCollector),
169+
}
170+
164171
/// Quickwit collector working at the scale of the segment.
165172
pub struct QuickwitSegmentCollector {
166173
num_hits: u64,
@@ -170,7 +177,7 @@ pub struct QuickwitSegmentCollector {
170177
max_hits: usize,
171178
segment_ord: u32,
172179
timestamp_filter_opt: Option<TimestampFilter>,
173-
aggregation: Option<AggregationSegmentCollector>,
180+
aggregation: Option<AggregationSegmentCollectors>,
174181
}
175182

176183
impl QuickwitSegmentCollector {
@@ -219,8 +226,15 @@ impl SegmentCollector for QuickwitSegmentCollector {
219226

220227
self.num_hits += 1;
221228
self.collect_top_k(doc_id, score);
222-
if let Some(aggregation_collector) = self.aggregation.as_mut() {
223-
aggregation_collector.collect(doc_id, score);
229+
230+
match self.aggregation.as_mut() {
231+
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => {
232+
collector.collect(doc_id, score)
233+
}
234+
Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => {
235+
collector.collect(doc_id, score)
236+
}
237+
None => (),
224238
}
225239
}
226240

@@ -240,15 +254,19 @@ impl SegmentCollector for QuickwitSegmentCollector {
240254
})
241255
.collect();
242256

243-
let intermediate_aggregation_result = if let Some(collector) = self.aggregation {
244-
Some(
245-
serde_json::to_string(&collector.harvest()?)
246-
.expect("could not serialize aggregation to json"),
247-
)
248-
} else {
249-
None
257+
let intermediate_aggregation_result = match self.aggregation {
258+
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => Some(
259+
serde_json::to_string(&collector.harvest())
260+
.expect("Collector fruit should be JSON serializable."),
261+
),
262+
Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => {
263+
Some(
264+
serde_json::to_string(&collector.harvest()?)
265+
.expect("Collector fruit should be JSON serializable."),
266+
)
267+
}
268+
None => None,
250269
};
251-
252270
Ok(LeafSearchResponse {
253271
intermediate_aggregation_result,
254272
num_hits: self.num_hits,
@@ -259,6 +277,37 @@ impl SegmentCollector for QuickwitSegmentCollector {
259277
}
260278
}
261279

280+
#[derive(Debug, Clone, Deserialize)]
281+
#[serde(untagged)]
282+
pub enum QuickwitAggregations {
283+
FindTraceIdsAggregation(FindTraceIdsCollector),
284+
TantivyAggregations(Aggregations),
285+
}
286+
287+
impl QuickwitAggregations {
288+
fn fast_field_names(&self) -> HashSet<String> {
289+
match self {
290+
QuickwitAggregations::FindTraceIdsAggregation(collector) => {
291+
collector.fast_field_names()
292+
}
293+
QuickwitAggregations::TantivyAggregations(aggregations) => {
294+
get_fast_field_names(aggregations)
295+
}
296+
}
297+
}
298+
299+
fn term_dict_field_names(&self) -> HashSet<String> {
300+
match self {
301+
QuickwitAggregations::FindTraceIdsAggregation(collector) => {
302+
collector.term_dict_field_names()
303+
}
304+
QuickwitAggregations::TantivyAggregations(aggregations) => {
305+
get_term_dict_field_names(aggregations)
306+
}
307+
}
308+
}
309+
}
310+
262311
/// The quickwit collector is the tantivy Collector used in Quickwit.
263312
///
264313
/// It defines the data that should be accumulated about the documents matching
@@ -270,7 +319,7 @@ pub(crate) struct QuickwitCollector {
270319
pub max_hits: usize,
271320
pub sort_by: SortBy,
272321
timestamp_filter_builder_opt: Option<TimestampFilterBuilder>,
273-
pub aggregation: Option<Aggregations>,
322+
pub aggregation: Option<QuickwitAggregations>,
274323
}
275324

276325
impl QuickwitCollector {
@@ -282,21 +331,23 @@ impl QuickwitCollector {
282331
fast_field_names.insert(field_name.clone());
283332
}
284333
}
285-
if let Some(aggregate) = self.aggregation.as_ref() {
286-
fast_field_names.extend(get_fast_field_names(aggregate));
334+
if let Some(aggregations) = &self.aggregation {
335+
fast_field_names.extend(aggregations.fast_field_names());
287336
}
288337
if let Some(timestamp_filter_builder) = &self.timestamp_filter_builder_opt {
289338
fast_field_names.insert(timestamp_filter_builder.timestamp_field_name.clone());
290339
}
291340
fast_field_names
292341
}
342+
293343
pub fn term_dict_field_names(&self) -> HashSet<String> {
294344
let mut term_dict_field_names = HashSet::default();
295-
if let Some(aggregate) = self.aggregation.as_ref() {
296-
term_dict_field_names.extend(get_term_dict_field_names(aggregate));
345+
if let Some(aggregations) = &self.aggregation {
346+
term_dict_field_names.extend(aggregations.term_dict_field_names());
297347
}
298348
term_dict_field_names
299349
}
350+
300351
pub fn warmup_info(&self) -> WarmupInfo {
301352
WarmupInfo {
302353
term_dict_field_names: self.term_dict_field_names(),
@@ -323,13 +374,27 @@ impl Collector for QuickwitCollector {
323374
// starting from 0 for every leaves.
324375
let leaf_max_hits = self.max_hits + self.start_offset;
325376

326-
let timestamp_filter_opt =
327-
if let Some(timestamp_filter_builder) = &self.timestamp_filter_builder_opt {
328-
timestamp_filter_builder.build(segment_reader)?
329-
} else {
330-
None
331-
};
332-
377+
let timestamp_filter_opt = match &self.timestamp_filter_builder_opt {
378+
Some(timestamp_filter_builder) => timestamp_filter_builder.build(segment_reader)?,
379+
None => None,
380+
};
381+
let aggregation = match &self.aggregation {
382+
Some(QuickwitAggregations::FindTraceIdsAggregation(collector)) => {
383+
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(
384+
collector.for_segment(0, segment_reader)?,
385+
))
386+
}
387+
Some(QuickwitAggregations::TantivyAggregations(aggs)) => Some(
388+
AggregationSegmentCollectors::TantivyAggregationSegmentCollector(
389+
AggregationSegmentCollector::from_agg_req_and_reader(
390+
aggs,
391+
segment_reader,
392+
AGGREGATION_BUCKET_LIMIT,
393+
)?,
394+
),
395+
),
396+
None => None,
397+
};
333398
Ok(QuickwitSegmentCollector {
334399
num_hits: 0u64,
335400
split_id: self.split_id.clone(),
@@ -338,17 +403,7 @@ impl Collector for QuickwitCollector {
338403
segment_ord,
339404
max_hits: leaf_max_hits,
340405
timestamp_filter_opt,
341-
aggregation: self
342-
.aggregation
343-
.as_ref()
344-
.map(|aggs| {
345-
AggregationSegmentCollector::from_agg_req_and_reader(
346-
aggs,
347-
segment_reader,
348-
AGGREGATION_BUCKET_LIMIT,
349-
)
350-
})
351-
.transpose()?,
406+
aggregation,
352407
})
353408
}
354409

@@ -372,7 +427,8 @@ impl Collector for QuickwitCollector {
372427
// All leaves will return their top [0..max_hits) documents.
373428
// We compute the overall [0..start_offset + max_hits) documents ...
374429
let num_hits = self.start_offset + self.max_hits;
375-
let mut merged_leaf_response = merge_leaf_responses(segment_fruits?, num_hits)?;
430+
let mut merged_leaf_response =
431+
merge_leaf_responses(&self.aggregation, segment_fruits?, num_hits)?;
376432
// ... and drop the first [..start_offsets) hits.
377433
merged_leaf_response
378434
.partial_hits
@@ -388,31 +444,54 @@ impl Collector for QuickwitCollector {
388444

389445
/// Merges a set of Leaf Results.
390446
fn merge_leaf_responses(
447+
aggregations_opt: &Option<QuickwitAggregations>,
391448
leaf_responses: Vec<LeafSearchResponse>,
392449
max_hits: usize,
393450
) -> tantivy::Result<LeafSearchResponse> {
394451
// Optimization: No merging needed if there is only one result.
395452
if leaf_responses.len() == 1 {
396453
return Ok(leaf_responses.into_iter().next().unwrap_or_default()); //< default is actually never called
397454
}
398-
let intermediate_aggregation_results = leaf_responses
399-
.iter()
400-
.flat_map(|leaf_response| {
401-
leaf_response
402-
.intermediate_aggregation_result
403-
.as_ref()
404-
.map(|res| serde_json::from_str(res))
405-
})
406-
.collect::<Result<Vec<IntermediateAggregationResults>, _>>()?;
407-
408-
let intermediate_aggregation_result =
409-
intermediate_aggregation_results
410-
.into_iter()
411-
.reduce(|mut res1, res2| {
412-
res1.merge_fruits(res2);
413-
res1
414-
});
455+
let merged_intermediate_aggregation_result = match aggregations_opt {
456+
Some(QuickwitAggregations::FindTraceIdsAggregation(collector)) => {
457+
let fruits: Vec<
458+
<<FindTraceIdsCollector as Collector>::Child as SegmentCollector>::Fruit,
459+
> = leaf_responses
460+
.iter()
461+
.filter_map(|leaf_response| {
462+
leaf_response.intermediate_aggregation_result.as_ref().map(
463+
|intermediate_aggregation_result| {
464+
serde_json::from_str(intermediate_aggregation_result)
465+
},
466+
)
467+
})
468+
.collect::<Result<_, _>>()?;
469+
let merged_fruit = collector.merge_fruits(fruits)?;
470+
Some(serde_json::to_string(&merged_fruit)?)
471+
}
472+
Some(QuickwitAggregations::TantivyAggregations(_)) => {
473+
let fruits: Vec<IntermediateAggregationResults> = leaf_responses
474+
.iter()
475+
.filter_map(|leaf_response| {
476+
leaf_response.intermediate_aggregation_result.as_ref().map(
477+
|intermediate_aggregation_result| {
478+
serde_json::from_str(intermediate_aggregation_result)
479+
},
480+
)
481+
})
482+
.collect::<Result<_, _>>()?;
415483

484+
fruits
485+
.into_iter()
486+
.reduce(|mut left, right| {
487+
left.merge_fruits(right);
488+
left
489+
})
490+
.map(|merged_fruit| serde_json::to_string(&merged_fruit))
491+
.transpose()?
492+
}
493+
None => None,
494+
};
416495
let num_attempted_splits = leaf_responses
417496
.iter()
418497
.map(|leaf_response| leaf_response.num_attempted_splits)
@@ -433,10 +512,7 @@ fn merge_leaf_responses(
433512
// TODO optimize
434513
let top_k_partial_hits = top_k_partial_hits(all_partial_hits, max_hits);
435514
Ok(LeafSearchResponse {
436-
intermediate_aggregation_result: intermediate_aggregation_result
437-
.as_ref()
438-
.map(serde_json::to_string)
439-
.transpose()?,
515+
intermediate_aggregation_result: merged_intermediate_aggregation_result,
440516
num_hits,
441517
partial_hits: top_k_partial_hits,
442518
failed_splits,
@@ -465,12 +541,10 @@ pub(crate) fn make_collector_for_split(
465541
search_request: &SearchRequest,
466542
split_schema: &Schema,
467543
) -> crate::Result<QuickwitCollector> {
468-
let aggregation = if let Some(agg) = &search_request.aggregation_request {
469-
Some(serde_json::from_str(agg)?)
470-
} else {
471-
None
544+
let aggregation = match &search_request.aggregation_request {
545+
Some(aggregation) => Some(serde_json::from_str(aggregation)?),
546+
None => None,
472547
};
473-
474548
let timestamp_field_opt = doc_mapper.timestamp_field(split_schema);
475549
let timestamp_filter_builder_opt = TimestampFilterBuilder::new(
476550
doc_mapper.timestamp_field_name(),

0 commit comments

Comments
 (0)