Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,7 @@ func (s *Coordinator) FinishDatabaseDeletion(ctx context.Context, req *coordinat
}
return res, nil
}

func (s *Coordinator) IncrementCompactionFailureCount(ctx context.Context, collectionID types.UniqueID) error {
return s.catalog.IncrementCompactionFailureCount(ctx, collectionID)
}
1 change: 1 addition & 0 deletions go/pkg/sysdb/coordinator/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Collection struct {
VersionFileName string
CreatedAt time.Time
DatabaseId types.UniqueID
CompactionFailureCount int32
}

type CollectionToGc struct {
Expand Down
1 change: 1 addition & 0 deletions go/pkg/sysdb/coordinator/model_db_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
CreatedAt: collectionAndMetadata.Collection.CreatedAt,
UpdatedAt: collectionAndMetadata.Collection.UpdatedAt.Unix(),
DatabaseId: types.MustParse(collectionAndMetadata.Collection.DatabaseID),
CompactionFailureCount: collectionAndMetadata.Collection.CompactionFailureCount,
}
collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata)
collections = append(collections, collection)
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2465,3 +2465,7 @@ func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantI
func (tc *Catalog) FinishDatabaseDeletion(ctx context.Context, cutoffTime time.Time) (uint64, error) {
return tc.metaDomain.DatabaseDb(ctx).FinishDatabaseDeletion(cutoffTime)
}

func (tc *Catalog) IncrementCompactionFailureCount(ctx context.Context, collectionID types.UniqueID) error {
return tc.metaDomain.CollectionDb(ctx).IncrementCompactionFailureCount(collectionID.String())
}
17 changes: 17 additions & 0 deletions go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,3 +767,20 @@ func (s *Server) BatchGetCollectionSoftDeleteStatus(ctx context.Context, req *co
}
return res, nil
}

func (s *Server) IncrementCompactionFailureCount(ctx context.Context, req *coordinatorpb.IncrementCompactionFailureCountRequest) (*coordinatorpb.IncrementCompactionFailureCountResponse, error) {
Copy link
Contributor

@Sicheng-Pan Sicheng-Pan Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe leave a note about when compaction failure count is incremented/reset

collectionID := req.CollectionId
parsedCollectionID, err := types.ToUniqueID(&collectionID)
if err != nil {
log.Error("IncrementCompactionFailureCount failed. collection id format error", zap.Error(err), zap.String("collection_id", collectionID))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}

err = s.coordinator.IncrementCompactionFailureCount(ctx, parsedCollectionID)
if err != nil {
log.Error("IncrementCompactionFailureCount failed", zap.Error(err), zap.String("collection_id", collectionID))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}

return &coordinatorpb.IncrementCompactionFailureCountResponse{}, nil
}
3 changes: 2 additions & 1 deletion go/pkg/sysdb/grpc/proto_model_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle
Seconds: collection.UpdatedAt,
Nanos: 0,
},
DatabaseId: &dbId,
DatabaseId: &dbId,
CompactionFailureCount: collection.CompactionFailureCount,
}

if collection.RootCollectionID != nil {
Expand Down
16 changes: 14 additions & 2 deletions go/pkg/sysdb/metastore/db/dao/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ func (s *collectionDb) UpdateLogPositionAndVersionInfo(
"size_bytes_post_compaction": sizeBytesPostCompaction,
"last_compaction_time_secs": lastCompactionTimeSecs,
"num_versions": numVersions,
"compaction_failure_count": 0, // Reset on successful compaction
}

if schemaStr != nil {
Expand Down Expand Up @@ -592,9 +593,9 @@ func (s *collectionDb) UpdateLogPositionVersionTotalRecordsAndLogicalSize(collec
version := currentCollectionVersion + 1
// only writing if schemaStr is not nil to avoid overwriting the schemaStr
if schemaStr != nil {
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant, "schema_str": schemaStr}).Error
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant, "schema_str": schemaStr, "compaction_failure_count": 0}).Error
} else {
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant}).Error
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant, "compaction_failure_count": 0}).Error
}
if err != nil {
return 0, err
Expand Down Expand Up @@ -714,3 +715,14 @@ func (s *collectionDb) BatchGetCollectionSoftDeleteStatus(collectionIDs []string
}
return result, nil
}

func (s *collectionDb) IncrementCompactionFailureCount(collectionID string) error {
err := s.db.Model(&dbmodel.Collection{}).
Where("id = ?", collectionID).
UpdateColumn("compaction_failure_count", gorm.Expr("compaction_failure_count + 1")).Error
if err != nil {
log.Error("IncrementCompactionFailureCount failed", zap.Error(err), zap.String("collectionID", collectionID))
return err
}
return nil
}
2 changes: 2 additions & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Collection struct {
NumVersions uint32 `gorm:"num_versions;type:integer;default:0"`
OldestVersionTs time.Time `gorm:"oldest_version_ts;type:timestamp"`
Tenant string `gorm:"tenant"`
CompactionFailureCount int32 `gorm:"compaction_failure_count;default:0"`
}

type CollectionToGc struct {
Expand Down Expand Up @@ -73,4 +74,5 @@ type ICollectionDb interface {
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath *string, newLineageFilePath string) error
BatchGetCollectionVersionFilePaths(collectionIDs []string) (map[string]string, error)
BatchGetCollectionSoftDeleteStatus(collectionIDs []string) (map[string]bool, error)
IncrementCompactionFailureCount(collectionID string) error
}
18 changes: 18 additions & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go/pkg/sysdb/metastore/db/migrations/20251209143000.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add compaction_failure_count column to collections table to track compaction failures persistently
ALTER TABLE "public"."collections" ADD COLUMN "compaction_failure_count" integer NOT NULL DEFAULT 0;
3 changes: 2 additions & 1 deletion go/pkg/sysdb/metastore/db/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:TI+7Y8LdtEPUUsspBjfrE9KdjS8XS8SLfKUoSJhmhEI=
h1:Qu6wEENNffsixNyG6bzBEGihu8sC5xa6dtQB37QF7Gg=
20240313233558.sql h1:Gv0TiSYsqGoOZ2T2IWvX4BOasauxool8PrBOIjmmIdg=
20240321194713.sql h1:kVkNpqSFhrXGVGFFvL7JdK3Bw31twFcEhI6A0oCFCkg=
20240327075032.sql h1:nlr2J74XRU8erzHnKJgMr/tKqJxw9+R6RiiEBuvuzgo=
Expand Down Expand Up @@ -26,3 +26,4 @@ h1:TI+7Y8LdtEPUUsspBjfrE9KdjS8XS8SLfKUoSJhmhEI=
20251114125442.sql h1:oRHN+AO+xYnYa3aF0QzSa3T/TRi8ETCydp2sDT/nSnI=
20251114134400.sql h1:N30qnVNjR+d4RoArJ11YrixyIsNODxqXFpgiREEhczs=
20251116154842.sql h1:G0qy4MPDayH+Y9/Dm9PS2xwvyLt+nmIw90uJTzZSJUM=
20251209143000.sql h1:r7locVeKMfzp+/olpFQHx1Shy7juhR5Y+xWZw3vel5M=
3 changes: 3 additions & 0 deletions idl/chromadb/proto/chroma.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ message Collection {
// This is the database id of the collection.
optional string database_id = 17;
optional string schema_str = 18;
// Number of consecutive compaction failures for this collection
// Defaults to 0.
int32 compaction_failure_count = 19;
}

message Database {
Expand Down
7 changes: 7 additions & 0 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ message FinishAttachedFunctionDeletionRequest {

message FinishAttachedFunctionDeletionResponse {}

message IncrementCompactionFailureCountRequest {
string collection_id = 1;
}

message IncrementCompactionFailureCountResponse {}

service SysDB {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
Expand Down Expand Up @@ -717,4 +723,5 @@ service SysDB {
rpc GetSoftDeletedAttachedFunctions(GetSoftDeletedAttachedFunctionsRequest) returns (GetSoftDeletedAttachedFunctionsResponse) {}
rpc FinishAttachedFunctionDeletion(FinishAttachedFunctionDeletionRequest) returns (FinishAttachedFunctionDeletionResponse) {}
rpc FlushCollectionCompactionAndAttachedFunction(FlushCollectionCompactionAndAttachedFunctionRequest) returns (FlushCollectionCompactionAndAttachedFunctionResponse) {}
rpc IncrementCompactionFailureCount(IncrementCompactionFailureCountRequest) returns (IncrementCompactionFailureCountResponse) {}
}
2 changes: 2 additions & 0 deletions rust/sysdb/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ impl SqliteSysDb {
lineage_file_path: None,
updated_at: SystemTime::UNIX_EPOCH,
database_id: database_uuid,
compaction_failure_count: 0,
})
}

Expand Down Expand Up @@ -860,6 +861,7 @@ impl SqliteSysDb {
lineage_file_path: None,
updated_at: SystemTime::UNIX_EPOCH,
database_id,
compaction_failure_count: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

[Logic] The compaction_failure_count is hardcoded to 0 here. It should be fetched from the database row to correctly reflect the persisted failure count for the collection. This will cause the DLQ logic to not work as intended for the SQLite backend, as it will always report 0 failures.

To fix this, you'll need to:

  1. Update the sea_query builder in get_collections_with_conn to select the compaction_failure_count column.
  2. Parse the value from the SqliteRow and use it when constructing the Collection struct.

This will likely also require changes in chroma-sqlite/src/table.rs to add CompactionFailureCount to the Collections enum, and an update to the SQLite schema creation logic to include the new column.

Example of what needs to change in get_collections_with_conn:

// In the query builder
// ...
.column((table::Collections::Table, table::Collections::SchemaStr))
.column((table::Collections::Table, table::Collections::CompactionFailureCount)) // Add this
// ...

// In the row processing loop
// ...
let compaction_failure_count: i32 = first_row.get("compaction_failure_count"); // Get the value
// ...
Some(Ok(Collection {
    // ...
    compaction_failure_count, // Use the fetched value
}))
// ...
Context for Agents
The `compaction_failure_count` is hardcoded to 0 here. It should be fetched from the database row to correctly reflect the persisted failure count for the collection. This will cause the DLQ logic to not work as intended for the SQLite backend, as it will always report 0 failures.

To fix this, you'll need to:
1.  Update the `sea_query` builder in `get_collections_with_conn` to select the `compaction_failure_count` column.
2.  Parse the value from the `SqliteRow` and use it when constructing the `Collection` struct.

This will likely also require changes in `chroma-sqlite/src/table.rs` to add `CompactionFailureCount` to the `Collections` enum, and an update to the SQLite schema creation logic to include the new column.

Example of what needs to change in `get_collections_with_conn`:

```rust
// In the query builder
// ...
.column((table::Collections::Table, table::Collections::SchemaStr))
.column((table::Collections::Table, table::Collections::CompactionFailureCount)) // Add this
// ...

// In the row processing loop
// ...
let compaction_failure_count: i32 = first_row.get("compaction_failure_count"); // Get the value
// ...
Some(Ok(Collection {
    // ...
    compaction_failure_count, // Use the fetched value
}))
// ...
```

File: rust/sysdb/src/sqlite.rs
Line: 864

}))
})
.collect::<Result<Vec<_>, GetCollectionsError>>()?;
Expand Down
48 changes: 48 additions & 0 deletions rust/sysdb/src/sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ impl SysDb {
lineage_file_path: None,
updated_at: SystemTime::now(),
database_id: DatabaseUuid::new(),
compaction_failure_count: 0,
};

test_sysdb.add_collection(collection.clone());
Expand Down Expand Up @@ -694,6 +695,21 @@ impl SysDb {
}
}

/// Increment the compaction failure count for a collection.
pub async fn increment_compaction_failure_count(
&mut self,
collection_id: CollectionUuid,
) -> Result<(), IncrementCompactionFailureCountError> {
match self {
SysDb::Grpc(grpc) => grpc.increment_compaction_failure_count(collection_id).await,
SysDb::Test(test) => {
test.increment_compaction_failure_count(collection_id);
Ok(())
}
SysDb::Sqlite(_) => Err(IncrementCompactionFailureCountError::Unimplemented),
}
}

pub async fn reset(&mut self) -> Result<ResetResponse, ResetError> {
match self {
SysDb::Grpc(grpc) => grpc.reset().await,
Expand Down Expand Up @@ -1722,6 +1738,18 @@ impl GrpcSysDb {
Ok(res.into_inner().collection_id_to_success)
}

async fn increment_compaction_failure_count(
&mut self,
collection_id: CollectionUuid,
) -> Result<(), IncrementCompactionFailureCountError> {
let req = chroma_proto::IncrementCompactionFailureCountRequest {
collection_id: collection_id.0.to_string(),
};

self.client.increment_compaction_failure_count(req).await?;
Ok(())
}

async fn update_tenant(
&mut self,
tenant_id: String,
Expand Down Expand Up @@ -2195,6 +2223,26 @@ impl ChromaError for DeleteCollectionVersionError {
}
}

#[derive(Error, Debug)]
pub enum IncrementCompactionFailureCountError {
#[error("Failed to increment compaction failure count: {0}")]
FailedToIncrement(#[from] tonic::Status),
#[error("SQLite error: {0}")]
Sqlite(#[from] sqlx::Error),
#[error("Unimplemented: increment_compaction_failure_count is not supported for SqliteSysDb")]
Unimplemented,
}

impl ChromaError for IncrementCompactionFailureCountError {
fn code(&self) -> ErrorCodes {
match self {
IncrementCompactionFailureCountError::FailedToIncrement(_) => ErrorCodes::Internal,
IncrementCompactionFailureCountError::Sqlite(_) => ErrorCodes::Internal,
IncrementCompactionFailureCountError::Unimplemented => ErrorCodes::Internal,
}
}
}

////////////////////////// Attached Function Operations //////////////////////////

impl SysDb {
Expand Down
8 changes: 8 additions & 0 deletions rust/sysdb/src/test_sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,14 @@ impl TestSysDb {
inner.tenant_resource_names.insert(tenant_id, resource_name);
Ok(UpdateTenantResponse {})
}

/// Increment the compaction failure count for a collection.
pub fn increment_compaction_failure_count(&mut self, collection_id: CollectionUuid) {
let mut inner = self.inner.lock();
if let Some(collection) = inner.collections.get_mut(&collection_id) {
collection.compaction_failure_count += 1;
}
}
}

fn attached_function_to_proto(
Expand Down
8 changes: 8 additions & 0 deletions rust/types/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ pub struct Collection {
pub updated_at: SystemTime,
#[serde(skip)]
pub database_id: DatabaseUuid,
/// Number of consecutive compaction failures for this collection.
/// Used by the scheduler to track and skip collections that repeatedly fail compaction.
#[serde(skip)]
pub compaction_failure_count: i32,
}

impl Default for Collection {
Expand All @@ -155,6 +159,7 @@ impl Default for Collection {
lineage_file_path: None,
updated_at: SystemTime::now(),
database_id: DatabaseUuid::new(),
compaction_failure_count: 0,
}
}
}
Expand Down Expand Up @@ -333,6 +338,7 @@ impl TryFrom<chroma_proto::Collection> for Collection {
lineage_file_path: proto_collection.lineage_file_path,
updated_at,
database_id,
compaction_failure_count: proto_collection.compaction_failure_count,
})
}
}
Expand Down Expand Up @@ -377,6 +383,7 @@ impl TryFrom<Collection> for chroma_proto::Collection {
lineage_file_path: value.lineage_file_path,
updated_at: Some(value.updated_at.into()),
database_id: Some(value.database_id.0.to_string()),
compaction_failure_count: value.compaction_failure_count,
})
}
}
Expand Down Expand Up @@ -460,6 +467,7 @@ mod test {
nanos: 1,
}),
database_id: Some("00000000-0000-0000-0000-000000000000".to_string()),
compaction_failure_count: 0,
};
let converted_collection: Collection = proto_collection.try_into().unwrap();
assert_eq!(
Expand Down
Loading