Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions idl/chromadb/proto/query_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ message RankExpr {
optional float default = 4;
bool return_rank = 5;
}

message RankPair {
RankExpr left = 1;
RankExpr right = 2;
Expand Down Expand Up @@ -170,9 +170,22 @@ message SearchPayload {
SelectOperator select = 4;
}

// ReadLevel specifies which data sources to read from during queries.
// This affects consistency vs performance tradeoffs.
enum ReadLevel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ReadLevel feels unclear to me, is there a use case for this other than toggling whether to read logs or not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was "eventual_consistency" before. I went back and forth on many names. I think its the clearest one I can generate that is clear, specific, and correct in its use of terms.

Databases often offer "levels" to a read but we are devoid of any formal naming here, so I prefer to be explicit in IndexOnly vs IndexAndWal. Then the question is what to call those options. If its "ConsistencyLevel" I'd expect formal notions of consistency not wishy-washy ones like what we are offering. The reason to make it an enum is we can offer BoundedConsistency in the future.

// Read from both the index and the write-ahead log (default).
// Provides full consistency with all committed writes visible.
INDEX_AND_WAL = 0;
// Read only from the index, skipping the write-ahead log.
// Provides eventual consistency - recent uncommitted writes may not be visible.
INDEX_ONLY = 1;
}

message SearchPlan {
ScanOperator scan = 1;
repeated SearchPayload payloads = 2;
// Specifies the read level for this query
ReadLevel read_level = 3;
}

message SearchRecord {
Expand All @@ -198,4 +211,3 @@ service QueryExecutor {
rpc KNN(KNNPlan) returns (KNNBatchResult) {}
rpc Search(SearchPlan) returns (SearchResult) {}
}

3 changes: 2 additions & 1 deletion rust/frontend/src/impls/service_based_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use chroma_sysdb::{GetCollectionsOptions, SysDb};
use chroma_system::System;
use chroma_types::{
operator::{Filter, KnnBatch, KnnProjection, Limit, Projection, Scan},
plan::{Count, Get, Knn, Search},
plan::{Count, Get, Knn, ReadLevel, Search},
AddCollectionRecordsError, AddCollectionRecordsRequest, AddCollectionRecordsResponse,
AttachFunctionRequest, AttachFunctionResponse, Cmek, Collection, CollectionUuid,
CountCollectionsError, CountCollectionsRequest, CountCollectionsResponse, CountRequest,
Expand Down Expand Up @@ -1834,6 +1834,7 @@ impl ServiceBasedFrontend {
collection_and_segments,
},
payloads: request.searches,
read_level: ReadLevel::IndexAndWal,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

[Requirements] read_level is hardcoded to IndexAndWal here. This prevents clients from utilizing the new IndexOnly functionality even if the backend supports it.

If request (of type SearchRequest) exposes a read_level preference, it should be mapped here. If SearchRequest does not yet support this field, this limitation should be tracked.

Context for Agents
`read_level` is hardcoded to `IndexAndWal` here. This prevents clients from utilizing the new `IndexOnly` functionality even if the backend supports it. 

If `request` (of type `SearchRequest`) exposes a `read_level` preference, it should be mapped here. If `SearchRequest` does not yet support this field, this limitation should be tracked.

File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1837

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is deliberate and handled up-stack

};

// Execute the single search plan using the executor
Expand Down
33 changes: 33 additions & 0 deletions rust/types/src/execution/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,16 +543,47 @@ impl TryFrom<SearchPayload> for chroma_proto::SearchPayload {
}
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum ReadLevel {
/// Read from both the index and the write-ahead log (default).
/// Provides full consistency with all committed writes visible.
#[default]
IndexAndWal,
/// Read only from the index, skipping the write-ahead log.
/// Provides eventual consistency - recent uncommitted writes may not be visible.
IndexOnly,
}

impl From<chroma_proto::ReadLevel> for ReadLevel {
fn from(value: chroma_proto::ReadLevel) -> Self {
match value {
chroma_proto::ReadLevel::IndexAndWal => ReadLevel::IndexAndWal,
chroma_proto::ReadLevel::IndexOnly => ReadLevel::IndexOnly,
}
}
}

impl From<ReadLevel> for chroma_proto::ReadLevel {
fn from(value: ReadLevel) -> Self {
match value {
ReadLevel::IndexAndWal => chroma_proto::ReadLevel::IndexAndWal,
ReadLevel::IndexOnly => chroma_proto::ReadLevel::IndexOnly,
}
}
}

#[derive(Clone, Debug)]
pub struct Search {
pub scan: Scan,
pub payloads: Vec<SearchPayload>,
pub read_level: ReadLevel,
}

impl TryFrom<chroma_proto::SearchPlan> for Search {
type Error = QueryConversionError;

fn try_from(value: chroma_proto::SearchPlan) -> Result<Self, Self::Error> {
let read_level = value.read_level().into();
Ok(Self {
scan: value
.scan
Expand All @@ -563,6 +594,7 @@ impl TryFrom<chroma_proto::SearchPlan> for Search {
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
read_level,
})
}
}
Expand All @@ -578,6 +610,7 @@ impl TryFrom<Search> for chroma_proto::SearchPlan {
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
read_level: chroma_proto::ReadLevel::from(value.read_level).into(),
})
}
}
5 changes: 4 additions & 1 deletion rust/worker/benches/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use chroma_benchmark::{
use chroma_config::{registry::Registry, Configurable};
use chroma_segment::test::TestDistributedSegment;
use chroma_system::{ComponentHandle, Dispatcher, Orchestrator, System};
use chroma_types::operator::Knn;
use chroma_types::{operator::Knn, plan::ReadLevel};
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{stream, StreamExt, TryStreamExt};
use load::{
Expand Down Expand Up @@ -39,6 +39,7 @@ fn trivial_knn_filter(
test_segments.into(),
empty_fetch_log(collection_uuid),
trivial_filter(),
ReadLevel::IndexAndWal,
)
}

Expand All @@ -57,6 +58,7 @@ fn always_true_knn_filter(
test_segments.into(),
empty_fetch_log(collection_uuid),
always_true_filter_for_modulo_metadata(),
ReadLevel::IndexAndWal,
)
}

Expand All @@ -75,6 +77,7 @@ fn always_false_knn_filter(
test_segments.into(),
empty_fetch_log(collection_uuid),
always_false_filter_for_modulo_metadata(),
ReadLevel::IndexAndWal,
)
}

Expand Down
70 changes: 49 additions & 21 deletions rust/worker/src/execution/orchestration/knn_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use chroma_system::{
OrchestratorContext, PanicError, TaskError, TaskMessage, TaskResult,
};
use chroma_types::{
operator::Filter, CollectionAndSegments, HnswParametersFromSegmentError, SchemaError,
SegmentType,
operator::Filter, plan::ReadLevel, CollectionAndSegments, HnswParametersFromSegmentError,
SchemaError, SegmentType,
};
use opentelemetry::trace::TraceContextExt;
use thiserror::Error;
Expand Down Expand Up @@ -178,6 +178,9 @@ pub struct KnnFilterOrchestrator {
// Fetched logs
fetched_logs: Option<FetchLogOutput>,

// Read level for consistency vs performance tradeoff
read_level: ReadLevel,

// Pipelined operators
filter: Filter,

Expand All @@ -186,6 +189,7 @@ pub struct KnnFilterOrchestrator {
}

impl KnnFilterOrchestrator {
#[allow(clippy::too_many_arguments)]
pub fn new(
blockfile_provider: BlockfileProvider,
dispatcher: ComponentHandle<Dispatcher>,
Expand All @@ -194,6 +198,7 @@ impl KnnFilterOrchestrator {
collection_and_segments: CollectionAndSegments,
fetch_log: FetchLogOperator,
filter: Filter,
read_level: ReadLevel,
) -> Self {
let context = OrchestratorContext::new(dispatcher);
Self {
Expand All @@ -204,10 +209,29 @@ impl KnnFilterOrchestrator {
collection_and_segments,
fetch_log,
fetched_logs: None,
read_level,
filter,
result_channel: None,
}
}

fn create_filter_task(
&self,
logs: FetchLogOutput,
ctx: &ComponentContext<Self>,
) -> TaskMessage {
wrap(
Box::new(self.filter.clone()),
FilterInput {
logs,
blockfile_provider: self.blockfile_provider.clone(),
metadata_segment: self.collection_and_segments.metadata_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
},
ctx.receiver(),
self.context.task_cancellation_token.clone(),
)
}
}

#[async_trait]
Expand Down Expand Up @@ -272,14 +296,28 @@ impl Orchestrator for KnnFilterOrchestrator {
Span::current().add_link(prefetch_span.context().span().span_context().clone());
tasks.push((prefetch_metadata_task, Some(prefetch_span)));

// Fetch log task.
let fetch_log_task = wrap(
Box::new(self.fetch_log.clone()),
(),
ctx.receiver(),
self.context.task_cancellation_token.clone(),
);
tasks.push((fetch_log_task, Some(Span::current())));
match self.read_level {
ReadLevel::IndexOnly => {
// For IndexOnly queries, skip log fetching and use empty logs
tracing::info!("Skipping log fetch for IndexOnly read level");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended

[Maintainability] Using tracing::info! in the query execution path will generate excessive logs in high-throughput scenarios (one log per query). Consider using tracing::debug! or tracing::trace! instead to reduce noise.

Context for Agents
Using `tracing::info!` in the query execution path will generate excessive logs in high-throughput scenarios (one log per query). Consider using `tracing::debug!` or `tracing::trace!` instead to reduce noise.

File: rust/worker/src/execution/orchestration/knn_filter.rs
Line: 302

let empty_logs = FetchLogOutput::new(Vec::new().into());
self.fetched_logs = Some(empty_logs.clone());

// Immediately schedule the filter task with empty logs
let filter_task = self.create_filter_task(empty_logs, ctx);
tasks.push((filter_task, Some(Span::current())));
}
ReadLevel::IndexAndWal => {
// Fetch log task for full consistency.
let fetch_log_task = wrap(
Box::new(self.fetch_log.clone()),
(),
ctx.receiver(),
self.context.task_cancellation_token.clone(),
);
tasks.push((fetch_log_task, Some(Span::current())));
}
}

tasks
}
Expand Down Expand Up @@ -326,17 +364,7 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for KnnFilterOrchestrato

self.fetched_logs = Some(output.clone());

let task = wrap(
Box::new(self.filter.clone()),
FilterInput {
logs: output,
blockfile_provider: self.blockfile_provider.clone(),
metadata_segment: self.collection_and_segments.metadata_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
},
ctx.receiver(),
self.context.task_cancellation_token.clone(),
);
let task = self.create_filter_task(output, ctx);
self.send(task, ctx, Some(Span::current())).await;
}
}
Expand Down
8 changes: 6 additions & 2 deletions rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use chroma_types::{
query_executor_server::{QueryExecutor, QueryExecutorServer},
},
operator::{GetResult, Knn, KnnBatch, KnnBatchResult, KnnProjection, QueryVector, Scan},
plan::SearchPayload,
plan::{ReadLevel, SearchPayload},
CollectionAndSegments, SegmentType,
};
use futures::{stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -342,6 +342,7 @@ impl WorkerServer {
collection_and_segments.clone(),
fetch_log,
filter.try_into()?,
ReadLevel::IndexAndWal, // Full consistency for KNN queries
);

let matching_records = match knn_filter_orchestrator.run(system.clone()).await {
Expand Down Expand Up @@ -478,6 +479,7 @@ impl WorkerServer {
&self,
scan: chroma_proto::ScanOperator,
payload: chroma_proto::SearchPayload,
read_level: ReadLevel,
) -> Result<RankOrchestratorOutput, Status> {
let collection_and_segments = Scan::try_from(scan)?.collection_and_segments;
let search_payload = SearchPayload::try_from(payload)?;
Expand All @@ -497,6 +499,7 @@ impl WorkerServer {
collection_and_segments.clone(),
fetch_log,
search_payload.filter.clone(),
read_level, // Use the specified read level
);

let knn_filter_output = match knn_filter_orchestrator.run(self.system.clone()).await {
Expand Down Expand Up @@ -618,14 +621,15 @@ impl WorkerServer {
search: Request<chroma_proto::SearchPlan>,
) -> Result<Response<chroma_proto::SearchResult>, Status> {
let search_plan = search.into_inner();
let read_level: ReadLevel = search_plan.read_level().into();
let scan = search_plan
.scan
.ok_or(Status::invalid_argument("Invalid Scan Operator"))?;

let futures = search_plan
.payloads
.into_iter()
.map(|payload| self.orchestrate_search(scan.clone(), payload));
.map(|payload| self.orchestrate_search(scan.clone(), payload, read_level));

let orchestrator_results = stream::iter(futures)
.buffered(32) // Process up to 32 payloads concurrently
Expand Down