Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions idl/chromadb/proto/query_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ message RankExpr {
optional float default = 4;
bool return_rank = 5;
}

message RankPair {
RankExpr left = 1;
RankExpr right = 2;
Expand Down Expand Up @@ -170,9 +170,21 @@ message SearchPayload {
SelectOperator select = 4;
}

// ReadLevel specifies which data sources to read from during queries.
// This affects consistency vs performance tradeoffs.
enum ReadLevel {
// Read from both the index and the write-ahead log (default).
// Provides full consistency with all committed writes visible.
INDEX_AND_WAL = 0;
// Read only from the index, skipping the write-ahead log.
// Provides eventual consistency - recent uncommitted writes may not be visible.
INDEX_ONLY = 1;
}

message SearchPlan {
ScanOperator scan = 1;
repeated SearchPayload payloads = 2;
ReadLevel read_level = 3;
}

message SearchRecord {
Expand All @@ -198,4 +210,3 @@ service QueryExecutor {
rpc KNN(KNNPlan) returns (KNNBatchResult) {}
rpc Search(SearchPlan) returns (SearchResult) {}
}

12 changes: 7 additions & 5 deletions rust/chroma/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::sync::Arc;

use chroma_api_types::ForkCollectionPayload;
use chroma_types::{
plan::SearchPayload, AddCollectionRecordsRequest, AddCollectionRecordsResponse, Collection,
CollectionUuid, DeleteCollectionRecordsRequest, DeleteCollectionRecordsResponse, GetRequest,
GetResponse, IncludeList, Metadata, QueryRequest, QueryResponse, Schema, SearchRequest,
SearchResponse, UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse,
UpdateMetadata, UpsertCollectionRecordsRequest, UpsertCollectionRecordsResponse, Where,
plan::{ReadLevel, SearchPayload},
AddCollectionRecordsRequest, AddCollectionRecordsResponse, Collection, CollectionUuid,
DeleteCollectionRecordsRequest, DeleteCollectionRecordsResponse, GetRequest, GetResponse,
IncludeList, Metadata, QueryRequest, QueryResponse, Schema, SearchRequest, SearchResponse,
UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse, UpdateMetadata,
UpsertCollectionRecordsRequest, UpsertCollectionRecordsResponse, Where,
};
use reqwest::Method;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -479,6 +480,7 @@ impl ChromaCollection {
self.collection.database.clone(),
self.collection.collection_id,
searches,
ReadLevel::IndexAndWal,
)?;
let request = request.into_payload();
self.send("search", "search", Method::POST, Some(request))
Expand Down
1 change: 1 addition & 0 deletions rust/frontend/src/impls/service_based_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,7 @@ impl ServiceBasedFrontend {
collection_and_segments,
},
payloads: request.searches,
read_level: request.read_level,
};

// Execute the single search plan using the executor
Expand Down
8 changes: 7 additions & 1 deletion rust/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,13 @@ async fn collection_search(
}
}

let request = SearchRequest::try_new(tenant, database, collection_id, searches)?;
let request = SearchRequest::try_new(
tenant,
database,
collection_id,
searches,
payload.read_level,
)?;
let res = server
.frontend
.search(request)
Expand Down
10 changes: 10 additions & 0 deletions rust/types/src/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::operators_generated::{
FUNCTION_STATISTICS_NAME,
};
use crate::plan::PlanToProtoError;
use crate::plan::ReadLevel;
use crate::plan::SearchPayload;
use crate::validators::{
validate_metadata_vec, validate_name, validate_non_empty_collection_update_metadata,
Expand Down Expand Up @@ -2152,6 +2153,10 @@ impl From<(KnnBatchResult, IncludeList)> for QueryResponse {
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct SearchRequestPayload {
pub searches: Vec<SearchPayload>,
/// Specifies the read level for consistency vs performance tradeoffs.
/// Defaults to IndexAndWal (full consistency).
#[serde(default)]
pub read_level: ReadLevel,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on having this per SearchPayload v/s one level for the entire request?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this. I kind of prefer it to be homogenous. Reason is that if you are making decisions off this data, you want a consistent read across all the queries.

}

#[non_exhaustive]
Expand All @@ -2163,6 +2168,8 @@ pub struct SearchRequest {
pub collection_id: CollectionUuid,
#[validate(nested)]
pub searches: Vec<SearchPayload>,
/// Specifies the read level for consistency vs performance tradeoffs.
pub read_level: ReadLevel,
}

impl SearchRequest {
Expand All @@ -2171,12 +2178,14 @@ impl SearchRequest {
database_name: String,
collection_id: CollectionUuid,
searches: Vec<SearchPayload>,
read_level: ReadLevel,
) -> Result<Self, ChromaValidationError> {
let request = Self {
tenant_id,
database_name,
collection_id,
searches,
read_level,
};
request.validate().map_err(ChromaValidationError::from)?;
Ok(request)
Expand All @@ -2185,6 +2194,7 @@ impl SearchRequest {
pub fn into_payload(self) -> SearchRequestPayload {
SearchRequestPayload {
searches: self.searches,
read_level: self.read_level,
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions rust/types/src/execution/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,16 +543,49 @@ impl TryFrom<SearchPayload> for chroma_proto::SearchPayload {
}
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub enum ReadLevel {
/// Read from both the index and the write-ahead log (default).
/// Provides full consistency with all committed writes visible.
#[default]
IndexAndWal,
/// Read only from the index, skipping the write-ahead log.
/// Provides eventual consistency - recent uncommitted writes may not be visible.
IndexOnly,
}

impl From<chroma_proto::ReadLevel> for ReadLevel {
fn from(value: chroma_proto::ReadLevel) -> Self {
match value {
chroma_proto::ReadLevel::IndexAndWal => ReadLevel::IndexAndWal,
chroma_proto::ReadLevel::IndexOnly => ReadLevel::IndexOnly,
}
}
}

impl From<ReadLevel> for chroma_proto::ReadLevel {
fn from(value: ReadLevel) -> Self {
match value {
ReadLevel::IndexAndWal => chroma_proto::ReadLevel::IndexAndWal,
ReadLevel::IndexOnly => chroma_proto::ReadLevel::IndexOnly,
}
}
}

#[derive(Clone, Debug)]
pub struct Search {
pub scan: Scan,
pub payloads: Vec<SearchPayload>,
pub read_level: ReadLevel,
}

impl TryFrom<chroma_proto::SearchPlan> for Search {
type Error = QueryConversionError;

fn try_from(value: chroma_proto::SearchPlan) -> Result<Self, Self::Error> {
let read_level = value.read_level().into();
Ok(Self {
scan: value
.scan
Expand All @@ -563,6 +596,7 @@ impl TryFrom<chroma_proto::SearchPlan> for Search {
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
read_level,
})
}
}
Expand All @@ -578,6 +612,7 @@ impl TryFrom<Search> for chroma_proto::SearchPlan {
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
read_level: chroma_proto::ReadLevel::from(value.read_level).into(),
})
}
}
5 changes: 4 additions & 1 deletion rust/worker/benches/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use chroma_benchmark::{
use chroma_config::{registry::Registry, Configurable};
use chroma_segment::test::TestDistributedSegment;
use chroma_system::{ComponentHandle, Dispatcher, Orchestrator, System};
use chroma_types::operator::Knn;
use chroma_types::{operator::Knn, plan::ReadLevel};
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{stream, StreamExt, TryStreamExt};
use load::{
Expand Down Expand Up @@ -39,6 +39,7 @@ fn trivial_knn_filter(
test_segments.into(),
empty_fetch_log(collection_uuid),
trivial_filter(),
ReadLevel::IndexAndWal,
)
}

Expand All @@ -57,6 +58,7 @@ fn always_true_knn_filter(
test_segments.into(),
empty_fetch_log(collection_uuid),
always_true_filter_for_modulo_metadata(),
ReadLevel::IndexAndWal,
)
}

Expand All @@ -75,6 +77,7 @@ fn always_false_knn_filter(
test_segments.into(),
empty_fetch_log(collection_uuid),
always_false_filter_for_modulo_metadata(),
ReadLevel::IndexAndWal,
)
}

Expand Down
70 changes: 49 additions & 21 deletions rust/worker/src/execution/orchestration/knn_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use chroma_system::{
OrchestratorContext, PanicError, TaskError, TaskMessage, TaskResult,
};
use chroma_types::{
operator::Filter, CollectionAndSegments, HnswParametersFromSegmentError, SchemaError,
SegmentType,
operator::Filter, plan::ReadLevel, CollectionAndSegments, HnswParametersFromSegmentError,
SchemaError, SegmentType,
};
use opentelemetry::trace::TraceContextExt;
use thiserror::Error;
Expand Down Expand Up @@ -178,6 +178,9 @@ pub struct KnnFilterOrchestrator {
// Fetched logs
fetched_logs: Option<FetchLogOutput>,

// Read level for consistency vs performance tradeoff
read_level: ReadLevel,

// Pipelined operators
filter: Filter,

Expand All @@ -186,6 +189,7 @@ pub struct KnnFilterOrchestrator {
}

impl KnnFilterOrchestrator {
#[allow(clippy::too_many_arguments)]
pub fn new(
blockfile_provider: BlockfileProvider,
dispatcher: ComponentHandle<Dispatcher>,
Expand All @@ -194,6 +198,7 @@ impl KnnFilterOrchestrator {
collection_and_segments: CollectionAndSegments,
fetch_log: FetchLogOperator,
filter: Filter,
read_level: ReadLevel,
) -> Self {
let context = OrchestratorContext::new(dispatcher);
Self {
Expand All @@ -204,10 +209,29 @@ impl KnnFilterOrchestrator {
collection_and_segments,
fetch_log,
fetched_logs: None,
read_level,
filter,
result_channel: None,
}
}

fn create_filter_task(
&self,
logs: FetchLogOutput,
ctx: &ComponentContext<Self>,
) -> TaskMessage {
wrap(
Box::new(self.filter.clone()),
FilterInput {
logs,
blockfile_provider: self.blockfile_provider.clone(),
metadata_segment: self.collection_and_segments.metadata_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
},
ctx.receiver(),
self.context.task_cancellation_token.clone(),
)
}
}

#[async_trait]
Expand Down Expand Up @@ -272,14 +296,28 @@ impl Orchestrator for KnnFilterOrchestrator {
Span::current().add_link(prefetch_span.context().span().span_context().clone());
tasks.push((prefetch_metadata_task, Some(prefetch_span)));

// Fetch log task.
let fetch_log_task = wrap(
Box::new(self.fetch_log.clone()),
(),
ctx.receiver(),
self.context.task_cancellation_token.clone(),
);
tasks.push((fetch_log_task, Some(Span::current())));
match self.read_level {
ReadLevel::IndexOnly => {
// For IndexOnly queries, skip log fetching and use empty logs
tracing::info!("Skipping log fetch for IndexOnly read level");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended

[Performance] Using tracing::info! here will generate a log line for every search query executed with IndexOnly. In a high-throughput scenario (which is the primary use case for IndexOnly), this will flood the logs.

Consider using tracing::debug! instead.

Context for Agents
Using `tracing::info!` here will generate a log line for every search query executed with `IndexOnly`. In a high-throughput scenario (which is the primary use case for `IndexOnly`), this will flood the logs.

Consider using `tracing::debug!` instead.

File: rust/worker/src/execution/orchestration/knn_filter.rs
Line: 302

let empty_logs = FetchLogOutput::new(Vec::new().into());
self.fetched_logs = Some(empty_logs.clone());

// Immediately schedule the filter task with empty logs
let filter_task = self.create_filter_task(empty_logs, ctx);
tasks.push((filter_task, Some(Span::current())));
}
ReadLevel::IndexAndWal => {
// Fetch log task for full consistency.
let fetch_log_task = wrap(
Box::new(self.fetch_log.clone()),
(),
ctx.receiver(),
self.context.task_cancellation_token.clone(),
);
tasks.push((fetch_log_task, Some(Span::current())));
}
}

tasks
}
Expand Down Expand Up @@ -326,17 +364,7 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for KnnFilterOrchestrato

self.fetched_logs = Some(output.clone());

let task = wrap(
Box::new(self.filter.clone()),
FilterInput {
logs: output,
blockfile_provider: self.blockfile_provider.clone(),
metadata_segment: self.collection_and_segments.metadata_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
},
ctx.receiver(),
self.context.task_cancellation_token.clone(),
);
let task = self.create_filter_task(output, ctx);
self.send(task, ctx, Some(Span::current())).await;
}
}
Expand Down
Loading
Loading