diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 755875192..6c12512dd 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -41,6 +41,7 @@ on: - bm-batch-iter-uint8-single - bm-batch-iter-uint8-multi - bm-updated-fp32-single + - bm-hnsw-disk-fp32-single - bm-spaces description: 'Benchmarks set to run' default: benchmarks-all diff --git a/src/VecSim/algorithms/hnsw/hnsw_disk.h b/src/VecSim/algorithms/hnsw/hnsw_disk.h index 15d01fd59..b22076400 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_disk.h +++ b/src/VecSim/algorithms/hnsw/hnsw_disk.h @@ -147,7 +147,6 @@ class HNSWDiskIndex : public VecSimIndexAbstract // Index level generator of the top level for a new element mutable std::default_random_engine levelGenerator; - // Index global state - these should be guarded by the indexDataGuard lock in // multithreaded scenario. size_t curElementCount; size_t numMarkedDeleted; @@ -176,6 +175,13 @@ class HNSWDiskIndex : public VecSimIndexAbstract vecsim_stl::vector pendingMetadata; // Metadata for pending vectors size_t pendingVectorCount; // Count of vectors in memory + /** + * Threshold for batching delete operations. + * When the number of pending deletions reaches this value, the deletions are processed in a batch. + */ + size_t deleteBatchThreshold = 10; + vecsim_stl::vector pendingDeleteIds; + // In-memory graph updates staging (for delayed disk operations) struct GraphUpdate { idType node_id; @@ -190,7 +196,30 @@ class HNSWDiskIndex : public VecSimIndexAbstract }; // Staging area for graph updates during batch processing - vecsim_stl::vector stagedGraphUpdates; + // Separate staging areas for insertions and deletions to avoid conflicts + vecsim_stl::vector stagedInsertUpdates; + vecsim_stl::vector stagedDeleteUpdates; + + // Staged repair updates: opportunistic cleanup when stale edges are filtered during reads + // Mutable to allow staging from const search methods. + // IMPORTANT: This class is NOT thread-safe. All operations (including const methods like + // getNeighbors and search) must be called from a single thread. The mutable fields below + // are modified during read operations for opportunistic graph cleanup. + // TODO: For multi-threaded support, these fields need proper synchronization or a different + // approach (e.g., returning repair suggestions instead of staging them directly). + mutable vecsim_stl::vector stagedRepairUpdates; + + // Hash maps for O(1) lookups in staged updates + // Key: (node_id << 32) | level - combines node_id and level into a single uint64_t + // Value: index into the corresponding staged updates vector + std::unordered_map stagedInsertMap; + std::unordered_map stagedDeleteMap; + + // Hash map for O(1) lookups and duplicate detection in stagedRepairUpdates + // Key: (node_id << 32) | level - combines node_id and level into a single uint64_t + // Value: index into stagedRepairUpdates vector + // Mutable - see thread-safety note above for stagedRepairUpdates + mutable std::unordered_map stagedRepairMap; // Track which nodes need their neighbor lists updated (for bidirectional connections) struct NeighborUpdate { @@ -202,7 +231,8 @@ class HNSWDiskIndex : public VecSimIndexAbstract : node_id(node_id), level(level), new_neighbor_id(new_neighbor_id) {} }; - vecsim_stl::vector stagedNeighborUpdates; + // TODO: Consider merging with stagedInsertUpdates + vecsim_stl::vector stagedInsertNeighborUpdates; // Temporary storage for raw vectors in RAM (until flush batch) // Maps idType -> raw vector data (stored as string for simplicity) @@ -230,7 +260,7 @@ class HNSWDiskIndex : public VecSimIndexAbstract std::string serializeGraphValue(const void* vector_data, const vecsim_stl::vector& neighbors) const; void deserializeGraphValue(const std::string& value, vecsim_stl::vector& neighbors) const; const void* getVectorFromGraphValue(const std::string& value) const; - + public: // Pure virtual methods from VecSimIndexInterface int addVector(const void *blob, labelType label) override; @@ -246,21 +276,41 @@ class HNSWDiskIndex : public VecSimIndexAbstract void processBatch(); void flushBatch(); // Force flush current batch + void processDeleteBatch(); + void flushDeleteBatch(); // Force flush current delete batch + // Helper methods void getNeighbors(idType nodeId, size_t level, vecsim_stl::vector& result) const; void searchPendingVectors(const void* query_data, candidatesLabelsMaxHeap& top_candidates, size_t k) const; - + // Manual control of staged updates void flushStagedUpdates(); // Manually flush any pending staged updates protected: + // Helper method to filter deleted or invalid nodes from a neighbor list (DRY principle) + // Returns true if any nodes were filtered out + // Filters out: nodes marked as deleted, and nodes with invalid IDs (>= curElementCount) + inline bool filterDeletedNodes(vecsim_stl::vector& neighbors) const { + size_t original_size = neighbors.size(); + auto new_end = std::remove_if(neighbors.begin(), neighbors.end(), + [this](idType id) { return id >= curElementCount || isMarkedDeleted(id); }); + neighbors.erase(new_end, neighbors.end()); + return neighbors.size() < original_size; + } + + // Helper to create a unique key for (node_id, level) pair for hash map + inline uint64_t makeRepairKey(idType node_id, size_t level) const { + return (static_cast(node_id) << 32) | static_cast(level); + } + // New method for flushing staged graph updates to disk - void flushStagedGraphUpdates(); + void flushStagedGraphUpdates(vecsim_stl::vector& graphUpdates, + vecsim_stl::vector& neighborUpdates); // New method for handling neighbor connection updates when neighbor lists are full - void stageRevisitNeighborConnections(idType new_node_id, idType selected_neighbor, + void stageRevisitNeighborConnections(idType new_node_id, idType selected_neighbor, size_t level, DistType distance); - + // void patchDeltaList(std::unordered_map> &delta_list, // vecsim_stl::vector &new_elements_meta_data, // std::unordered_map &new_ids_mapping); @@ -368,6 +418,11 @@ class HNSWDiskIndex : public VecSimIndexAbstract vecsim_stl::vector markDelete(labelType label); size_t getNumMarkedDeleted() const { return numMarkedDeleted; } + // Batch deletion control (for benchmarking) + void setDeleteBatchThreshold(size_t threshold) { deleteBatchThreshold = threshold; } + size_t getDeleteBatchThreshold() const { return deleteBatchThreshold; } + size_t getPendingDeleteCount() const { return pendingDeleteIds.size(); } + // Debug methods to inspect graph structure void debugPrintGraphStructure() const; void debugPrintNodeNeighbors(idType node_id) const; @@ -382,6 +437,30 @@ class HNSWDiskIndex : public VecSimIndexAbstract // HNSW helper methods void replaceEntryPoint(); + // Helper to stage a delete update, merging with existing entry if present + inline void stageDeleteUpdate(idType node_id, size_t level, vecsim_stl::vector &neighbors) { + uint64_t key = makeRepairKey(node_id, level); + auto existing_it = stagedDeleteMap.find(key); + if (existing_it != stagedDeleteMap.end()) { + stagedDeleteUpdates[existing_it->second].neighbors = neighbors; + } else { + stagedDeleteMap[key] = stagedDeleteUpdates.size(); + stagedDeleteUpdates.emplace_back(node_id, level, neighbors, this->allocator); + } + } + + // Graph repair helper for deletion - repairs a neighbor's connections after a node is deleted. + // This maintains graph quality and navigability using a heuristic-based approach. + // Parameters: + // neighbor_id: The neighbor whose connections need repair + // level: The graph level being repaired + // deleted_id: The internal ID of the node being deleted + // deleted_node_neighbors: Neighbors of the deleted node (potential repair candidates) + // neighbor_neighbors: Current neighbors of neighbor_id (will be modified) + void repairNeighborConnections(idType neighbor_id, size_t level, idType deleted_id, + const vecsim_stl::vector &deleted_node_neighbors, + vecsim_stl::vector &neighbor_neighbors); + /*****************************************************************/ #ifdef BUILD_TESTS @@ -404,7 +483,10 @@ HNSWDiskIndex::HNSWDiskIndex( visitedNodesHandlerPool(INITIAL_CAPACITY, this->allocator), delta_list(), new_elements_meta_data(this->allocator), batchThreshold(10), pendingVectorIds(this->allocator), pendingMetadata(this->allocator), pendingVectorCount(0), - stagedGraphUpdates(this->allocator), stagedNeighborUpdates(this->allocator) { + pendingDeleteIds(this->allocator), + stagedInsertUpdates(this->allocator), + stagedDeleteUpdates(this->allocator), stagedRepairUpdates(this->allocator), + stagedInsertNeighborUpdates(this->allocator) { M = params->M ? params->M : HNSW_DEFAULT_M; M0 = M * 2; @@ -432,12 +514,18 @@ HNSWDiskIndex::HNSWDiskIndex( template HNSWDiskIndex::~HNSWDiskIndex() { // Clear any staged updates before destruction - stagedGraphUpdates.clear(); - stagedNeighborUpdates.clear(); + stagedInsertUpdates.clear(); + stagedInsertMap.clear(); + stagedDeleteUpdates.clear(); + stagedDeleteMap.clear(); + stagedRepairUpdates.clear(); + stagedRepairMap.clear(); + stagedInsertNeighborUpdates.clear(); // Clear pending vectors pendingVectorIds.clear(); pendingMetadata.clear(); + pendingDeleteIds.clear(); // Clear delta list and new elements metadata delta_list.clear(); @@ -449,7 +537,6 @@ HNSWDiskIndex::~HNSWDiskIndex() { // Ensure all memory is properly released idToMetaData.shrink_to_fit(); - labelToIdMap.clear(); // Note: db and cf are not owned by this class, so we don't delete them // Base class destructor will handle indexCalculator and preprocessors @@ -662,6 +749,7 @@ template int HNSWDiskIndex::addVector( const void *vector, labelType label ) { + // Store raw vector in RAM first (until flush batch) // We need to store the original vector before preprocessing idType newElementId = curElementCount; @@ -671,7 +759,7 @@ int HNSWDiskIndex::addVector( // Preprocess the vector ProcessedBlobs processedBlobs = this->preprocess(vector); - // Store the processed vector in memory immediately + // Store the processed vector in memory size_t containerId = this->vectors->size(); this->vectors->addElement(processedBlobs.getStorageBlob(), containerId); @@ -679,6 +767,7 @@ int HNSWDiskIndex::addVector( size_t elementMaxLevel = getRandomLevel(mult); DiskElementMetaData new_element(label, elementMaxLevel); + // Ensure capacity for the new element ID if (newElementId >= indexCapacity()) { size_t new_cap = ((newElementId + this->blockSize) / this->blockSize) * this->blockSize; visitedNodesHandlerPool.resize(new_cap); @@ -693,6 +782,7 @@ int HNSWDiskIndex::addVector( // Increment vector count immediately curElementCount++; + // Resize visited nodes handler pool to accommodate new elements visitedNodesHandlerPool.resize(curElementCount); @@ -774,8 +864,10 @@ idType HNSWDiskIndex::mutuallyConnectNewElement( neighbor_ids.push_back(top_candidates_list[i].second); } - // Add to staged graph updates - stagedGraphUpdates.emplace_back(new_node_id, level, neighbor_ids, this->allocator); + // Add to staged graph updates (for insertions) + uint64_t insert_key = makeRepairKey(new_node_id, level); + stagedInsertMap[insert_key] = stagedInsertUpdates.size(); + stagedInsertUpdates.emplace_back(new_node_id, level, neighbor_ids, this->allocator); // Stage updates to existing nodes to include the new node in their neighbor lists for (const auto &neighbor_data : top_candidates_list) { @@ -799,7 +891,7 @@ idType HNSWDiskIndex::mutuallyConnectNewElement( if (current_neighbor_count < max_M_cur) { // Neighbor has capacity, just add the new node - stagedNeighborUpdates.emplace_back(selected_neighbor, level, new_node_id); + stagedInsertNeighborUpdates.emplace_back(selected_neighbor, level, new_node_id); } else { // Neighbor is full, need to re-evaluate connections using revisitNeighborConnections // logic @@ -811,8 +903,10 @@ idType HNSWDiskIndex::mutuallyConnectNewElement( } template -void HNSWDiskIndex::flushStagedGraphUpdates() { - if (stagedGraphUpdates.empty() && stagedNeighborUpdates.empty()) { +void HNSWDiskIndex::flushStagedGraphUpdates( + vecsim_stl::vector& graphUpdates, + vecsim_stl::vector& neighborUpdates) { + if (graphUpdates.empty() && neighborUpdates.empty()) { return; } @@ -821,15 +915,27 @@ void HNSWDiskIndex::flushStagedGraphUpdates() { // Write graph updates first (so they're available when processing neighbor updates) rocksdb::WriteBatch graphBatch; - // First, handle new node insertions - for (const auto &update : stagedGraphUpdates) { + // First, handle new node insertions and updates + for (const auto &update : graphUpdates) { auto newKey = GraphKey(update.node_id, update.level); + // If neighbors list is empty, this is a deletion - remove the key from disk + if (update.neighbors.empty()) { + graphBatch.Delete(cf, newKey.asSlice()); + continue; + } + + // Get raw vector data const void* raw_vector_data = getRawVector(update.node_id); + if (raw_vector_data == nullptr) { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: Skipping graph update for node %u at level %zu - no raw vector data available", + update.node_id, update.level); + continue; + } - // Serialize with new format: [raw_vector_data][neighbor_count][neighbor_ids...] + // Serialize with format: [raw_vector_data][neighbor_count][neighbor_ids...] std::string graph_value = serializeGraphValue(raw_vector_data, update.neighbors); - graphBatch.Put(cf, newKey.asSlice(), graph_value); } @@ -845,7 +951,7 @@ void HNSWDiskIndex::flushStagedGraphUpdates() { // Group neighbor updates by node and level for efficient processing std::unordered_map>> neighborUpdatesByNode; - for (const auto& update : stagedNeighborUpdates) { + for (const auto& update : neighborUpdates) { auto& levelMap = neighborUpdatesByNode[update.node_id]; auto it = levelMap.find(update.level); if (it == levelMap.end()) { @@ -861,36 +967,37 @@ void HNSWDiskIndex::flushStagedGraphUpdates() { // Process each node's neighbor updates for (const auto& [node_id, levelMap] : neighborUpdatesByNode) { for (const auto& [level, newNeighbors] : levelMap) { - // Read existing graph value from disk - GraphKey neighborKey(node_id, level); - std::string existing_graph_value; - rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, neighborKey.asSlice(), &existing_graph_value); + // Read existing graph value from disk + GraphKey neighborKey(node_id, level); + std::string existing_graph_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, neighborKey.asSlice(), &existing_graph_value); - vecsim_stl::vector updated_neighbors(this->allocator); - if (status.ok()) { - // Parse existing neighbors using new format - deserializeGraphValue(existing_graph_value, updated_neighbors); - } + vecsim_stl::vector updated_neighbors(this->allocator); - // Add new neighbors (avoiding duplicates) - for (idType new_neighbor : newNeighbors) { - bool found = false; - for (size_t i = 0; i < updated_neighbors.size(); i++) { - if (updated_neighbors[i] == new_neighbor) { - found = true; - break; - } + if (status.ok()) { + // Parse existing neighbors using new format + deserializeGraphValue(existing_graph_value, updated_neighbors); } - if (!found) { - updated_neighbors.push_back(new_neighbor); + + // Add new neighbors (avoiding duplicates) + for (idType new_neighbor : newNeighbors) { + bool found = false; + for (idType existing : updated_neighbors) { + if (existing == new_neighbor) { + found = true; + break; + } + } + if (!found) { + updated_neighbors.push_back(new_neighbor); + } } - } - const void* raw_vector_data = getRawVector(node_id); + const void* raw_vector_data = getRawVector(node_id); - // Serialize with new format and add to batch - std::string graph_value = serializeGraphValue(raw_vector_data, updated_neighbors); - neighborBatch.Put(cf, neighborKey.asSlice(), graph_value); + // Serialize with new format and add to batch + std::string graph_value = serializeGraphValue(raw_vector_data, updated_neighbors); + neighborBatch.Put(cf, neighborKey.asSlice(), graph_value); } } @@ -902,8 +1009,8 @@ void HNSWDiskIndex::flushStagedGraphUpdates() { } // Clear staged updates after successful flush - stagedGraphUpdates.clear(); - stagedNeighborUpdates.clear(); + graphUpdates.clear(); + neighborUpdates.clear(); } template @@ -922,7 +1029,7 @@ void HNSWDiskIndex::stageRevisitNeighborConnections(idType n " WARNING: Could not read existing neighbors for node %u at level %zu", selected_neighbor, level); // Fall back to simple neighbor update - stagedNeighborUpdates.emplace_back(selected_neighbor, level, new_node_id); + stagedInsertNeighborUpdates.emplace_back(selected_neighbor, level, new_node_id); return; } @@ -975,16 +1082,18 @@ void HNSWDiskIndex::stageRevisitNeighborConnections(idType n // Stage this update - the neighbor's neighbor list will be completely replaced // We'll need to handle this specially in flushStagedGraphUpdates - stagedGraphUpdates.emplace_back(selected_neighbor, level, selected_neighbor_ids, + uint64_t insert_key = makeRepairKey(selected_neighbor, level); + stagedInsertMap[insert_key] = stagedInsertUpdates.size(); + stagedInsertUpdates.emplace_back(selected_neighbor, level, selected_neighbor_ids, this->allocator); // Also stage the bidirectional connection from new node to selected neighbor - stagedNeighborUpdates.emplace_back(new_node_id, level, selected_neighbor); + stagedInsertNeighborUpdates.emplace_back(new_node_id, level, selected_neighbor); } else { // The new node was not selected, so we only need to stage the unidirectional connection // from new node to selected neighbor - stagedNeighborUpdates.emplace_back(new_node_id, level, selected_neighbor); + stagedInsertNeighborUpdates.emplace_back(new_node_id, level, selected_neighbor); } } @@ -1166,7 +1275,7 @@ const char* HNSWDiskIndex::getRawVector(idType id) const { return data_ptr; } - // If not in RAM, retrieve from disk + // If not in RAM or cache, retrieve from disk GraphKey graphKey(id, 0); std::string level0_graph_value; rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &level0_graph_value); @@ -1179,10 +1288,15 @@ const char* HNSWDiskIndex::getRawVector(idType id) const { const char* data_ptr = reinterpret_cast(vector_data); rawVectorsDiskCache[id] = std::string(data_ptr, this->inputBlobSize); return rawVectorsDiskCache[id].data(); + } else { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: getVectorFromGraphValue returned nullptr for id %u (graph value size: %zu)", + id, level0_graph_value.size()); } } else if (status.IsNotFound()) { this->log(VecSimCommonStrings::LOG_WARNING_STRING, - "WARNING: Raw vector not found in RAM or on disk for id %u", id); + "WARNING: Raw vector not found in RAM or on disk for id %u (isMarkedDeleted: %d)", + id, isMarkedDeleted(id)); } else { this->log(VecSimCommonStrings::LOG_WARNING_STRING, "WARNING: Failed to retrieve raw vector for id %u: %s", id, @@ -1465,7 +1579,7 @@ size_t HNSWDiskIndex::indexSize() const { template size_t HNSWDiskIndex::indexLabelCount() const { - return this->curElementCount; + return labelToIdMap.size(); } /********************************** Helper Methods **********************************/ @@ -1475,15 +1589,49 @@ void HNSWDiskIndex::getNeighbors(idType nodeId, size_t level // Clear the result vector first result.clear(); - // First check staged graph updates - for (const auto &update : stagedGraphUpdates) { - if (update.node_id == nodeId && update.level == level) { - result.reserve(update.neighbors.size()); - for (size_t i = 0; i < update.neighbors.size(); i++) { - result.push_back(update.neighbors[i]); - } - return; + // First check staged graph updates using hash maps for O(1) lookup + uint64_t lookup_key = makeRepairKey(nodeId, level); + + // Check insert staging area + auto insert_it = stagedInsertMap.find(lookup_key); + if (insert_it != stagedInsertMap.end()) { + const auto &update = stagedInsertUpdates[insert_it->second]; + result.reserve(update.neighbors.size()); + for (size_t i = 0; i < update.neighbors.size(); i++) { + result.push_back(update.neighbors[i]); + } + // Filter out deleted nodes using helper + filterDeletedNodes(result); + return; + } + + // Check delete staging area + auto delete_it = stagedDeleteMap.find(lookup_key); + if (delete_it != stagedDeleteMap.end()) { + const auto &update = stagedDeleteUpdates[delete_it->second]; + result.reserve(update.neighbors.size()); + for (size_t i = 0; i < update.neighbors.size(); i++) { + result.push_back(update.neighbors[i]); + } + // Filter out deleted nodes using helper + filterDeletedNodes(result); + return; + } + + // Also check staged repair updates (already cleaned neighbors waiting to be flushed) + auto repair_it = stagedRepairMap.find(lookup_key); + if (repair_it != stagedRepairMap.end()) { + auto &update = stagedRepairUpdates[repair_it->second]; + result.reserve(update.neighbors.size()); + for (size_t i = 0; i < update.neighbors.size(); i++) { + result.push_back(update.neighbors[i]); } + // Filter in case nodes were deleted after this repair was staged + if (filterDeletedNodes(result)) { + // Update the existing repair entry with the more up-to-date cleaned list + update.neighbors = result; + } + return; } // If not found in staged updates, check disk @@ -1494,6 +1642,17 @@ void HNSWDiskIndex::getNeighbors(idType nodeId, size_t level if (status.ok()) { deserializeGraphValue(graph_value, result); + + // Filter out deleted nodes and check if any were filtered + if (filterDeletedNodes(result)) { + // Lazy repair: if we filtered any deleted nodes, stage for cleanup + // Use hash map for O(1) duplicate detection + uint64_t repair_key = makeRepairKey(nodeId, level); + if (stagedRepairMap.find(repair_key) == stagedRepairMap.end()) { + stagedRepairMap[repair_key] = stagedRepairUpdates.size(); + stagedRepairUpdates.emplace_back(nodeId, level, result, this->allocator); + } + } } } @@ -1502,6 +1661,10 @@ void HNSWDiskIndex::searchPendingVectors( const void *query_data, candidatesLabelsMaxHeap &top_candidates, size_t k) const { for (size_t i = 0; i < pendingVectorCount; i++) { idType vectorId = pendingVectorIds[i]; + if (isMarkedDeleted(vectorId)) { + // Skip deleted vectors + continue; + } // Get the vector data from memory const void *vector_data = this->vectors->getElement(vectorId); @@ -1531,13 +1694,18 @@ void HNSWDiskIndex::processBatch() { return; } - // Clear any previous staged updates - stagedGraphUpdates.clear(); - stagedNeighborUpdates.clear(); + // Clear any previous staged updates (for insertions) + stagedInsertUpdates.clear(); + stagedInsertMap.clear(); + stagedInsertNeighborUpdates.clear(); // Process each pending vector ID (vectors are already stored in memory) for (size_t i = 0; i < pendingVectorCount; i++) { idType vectorId = pendingVectorIds[i]; + if (isMarkedDeleted(vectorId)) { + // Skip deleted vectors + continue; + } // Get the vector data from memory const void *vector_data = this->vectors->getElement(vectorId); @@ -1557,7 +1725,8 @@ void HNSWDiskIndex::processBatch() { } // Now flush all staged graph updates to disk in a single batch operation - flushStagedGraphUpdates(); + flushStagedGraphUpdates(stagedInsertUpdates, stagedInsertNeighborUpdates); + stagedInsertMap.clear(); // Clear the pending vector IDs pendingVectorIds.clear(); @@ -1571,6 +1740,206 @@ void HNSWDiskIndex::flushBatch() { processBatch(); } +template +void HNSWDiskIndex::repairNeighborConnections( + idType neighbor_id, size_t level, idType deleted_id, + const vecsim_stl::vector &deleted_node_neighbors, + vecsim_stl::vector &neighbor_neighbors) { + + // ===== Graph Repair Strategy ===== + // When deleting a node, we need to repair its neighbors' connections to maintain + // graph quality and navigability. We use a heuristic-based approach similar to + // the regular HNSW implementation (see hnsw.h::repairConnectionsForDeletion). + // + // Strategy: + // 1. Collect candidates: existing neighbors + deleted node's neighbors + // 2. Calculate distances using quantized vectors (fast, in-memory) + // 3. Apply getNeighborsByHeuristic2 to select best neighbors + // 4. This ensures high-quality connections that maintain search performance + + size_t max_M = (level == 0) ? M0 : M; + + // Build candidate set with distances + // Candidates include: existing neighbors (minus deleted) + deleted node's neighbors + candidatesList candidates(this->allocator); + const void *neighbor_data = getDataByInternalId(neighbor_id); + + // Use a hash set to track candidate IDs for O(1) duplicate detection + std::unordered_set candidate_ids; + + // Add existing neighbors (excluding the deleted node) with their distances + for (idType nn : neighbor_neighbors) { + if (nn != deleted_id && nn < curElementCount && !isMarkedDeleted(nn)) { + const void *nn_data = getDataByInternalId(nn); + DistType dist = this->calcDistance(nn_data, neighbor_data); + candidates.emplace_back(dist, nn); + candidate_ids.insert(nn); + } + } + + // Add deleted node's neighbors (excluding current neighbor) as repair candidates + for (idType candidate_id : deleted_node_neighbors) { + if (candidate_id != neighbor_id && candidate_id < curElementCount && + !isMarkedDeleted(candidate_id)) { + // Check if already in candidates to avoid duplicates using O(1) hash set lookup + if (candidate_ids.find(candidate_id) == candidate_ids.end()) { + const void *candidate_data = getDataByInternalId(candidate_id); + DistType dist = this->calcDistance(candidate_data, neighbor_data); + candidates.emplace_back(dist, candidate_id); + candidate_ids.insert(candidate_id); + } + } + } + + // Track original neighbors for bidirectional edge updates + vecsim_stl::unordered_set original_neighbors_set(this->allocator); + original_neighbors_set.reserve(neighbor_neighbors.size()); + for (idType nn : neighbor_neighbors) { + if (nn != deleted_id && nn < curElementCount) { + original_neighbors_set.insert(nn); + } + } + + // Apply heuristic to select best neighbors if we have more than max_M + vecsim_stl::vector updated_neighbors(this->allocator); + if (candidates.size() > max_M) { + vecsim_stl::vector removed_candidates(this->allocator); + getNeighborsByHeuristic2(candidates, max_M, removed_candidates); + } + // Extract selected neighbor IDs (works for both cases) + updated_neighbors.reserve(candidates.size()); + for (const auto &[dist, id] : candidates) { + updated_neighbors.push_back(id); + } + + // Stage the update for this neighbor + stageDeleteUpdate(neighbor_id, level, updated_neighbors); + +} + +template +void HNSWDiskIndex::processDeleteBatch() { + if (pendingDeleteIds.empty()) return; + + // Clear any previous staged updates (for deletions) + stagedDeleteUpdates.clear(); + stagedDeleteMap.clear(); + + // Create a set of IDs being deleted in this batch for quick lookup + std::unordered_set deletingIds(pendingDeleteIds.begin(), pendingDeleteIds.end()); + + // Process each deleted node + for (idType deleted_id : pendingDeleteIds) { + // Skip if already processed or invalid + if (deleted_id >= curElementCount || deleted_id >= idToMetaData.size()) { + continue; + } + + const DiskElementMetaData &metadata = idToMetaData[deleted_id]; + if (metadata.label == INVALID_LABEL) { + continue; // Already deleted + } + + size_t topLevel = metadata.topLevel; + + // Process each level of the deleted node + for (size_t level = 0; level <= topLevel; level++) { + // Get the deleted node's neighbors at this level + vecsim_stl::vector deleted_node_neighbors(this->allocator); + getNeighbors(deleted_id, level, deleted_node_neighbors); + + // For each neighbor of the deleted node + for (idType neighbor_id : deleted_node_neighbors) { + // Skip if neighbor is also deleted, invalid, or in the current deletion batch + if (neighbor_id >= curElementCount || isMarkedDeleted(neighbor_id) || + deletingIds.find(neighbor_id) != deletingIds.end()) { + continue; + } + + // Get the neighbor's current neighbor list + vecsim_stl::vector neighbor_neighbors(this->allocator); + getNeighbors(neighbor_id, level, neighbor_neighbors); + + // Check if this is a bidirectional edge + bool is_bidirectional = false; + for (idType nn : neighbor_neighbors) { + if (nn == deleted_id) { + is_bidirectional = true; + break; + } + } + + if (is_bidirectional) { + // Repair connections using the dedicated helper method + repairNeighborConnections(neighbor_id, level, deleted_id, + deleted_node_neighbors, neighbor_neighbors); + } + } + + // Delete the node's graph entry at this level by staging an empty neighbor list + // (or we could use a Delete operation in the batch) + vecsim_stl::vector empty_neighbors(this->allocator); + uint64_t del_key = makeRepairKey(deleted_id, level); + // For deletion entries, always overwrite - the node is being deleted + stagedDeleteMap[del_key] = stagedDeleteUpdates.size(); + stagedDeleteUpdates.emplace_back(deleted_id, level, empty_neighbors, this->allocator); + } + } + + // Mark metadata as invalid and clean up raw vectors AFTER processing all nodes + // This ensures getNeighbors() and other methods work correctly during graph repair + for (idType deleted_id : pendingDeleteIds) { + if (deleted_id >= curElementCount || deleted_id >= idToMetaData.size()) { + continue; + } + // Mark the metadata as invalid + idToMetaData[deleted_id].label = INVALID_LABEL; + + // Remove raw vector from RAM if it exists + auto ram_it = rawVectorsInRAM.find(deleted_id); + if (ram_it != rawVectorsInRAM.end()) { + rawVectorsInRAM.erase(ram_it); + } + + // Also remove from disk cache to prevent stale data access + auto cache_it = rawVectorsDiskCache.find(deleted_id); + if (cache_it != rawVectorsDiskCache.end()) { + rawVectorsDiskCache.erase(cache_it); + } + } + + // Flush all staged graph updates to disk in a single batch operation + vecsim_stl::vector emptyNeighborUpdates(this->allocator); + flushStagedGraphUpdates(stagedDeleteUpdates, emptyNeighborUpdates); + stagedDeleteMap.clear(); + + // Flush staged repair updates (opportunistic cleanup from getNeighbors) + // But first, filter out any repairs for nodes that were just deleted + if (!stagedRepairUpdates.empty()) { + vecsim_stl::vector filteredRepairUpdates(this->allocator); + for (const auto &update : stagedRepairUpdates) { + // Skip repairs for nodes that are in the deletion batch + if (deletingIds.find(update.node_id) == deletingIds.end()) { + filteredRepairUpdates.push_back(update); + } + } + if (!filteredRepairUpdates.empty()) { + flushStagedGraphUpdates(filteredRepairUpdates, emptyNeighborUpdates); + } + // Clear all staged repairs (including filtered ones) + stagedRepairUpdates.clear(); + stagedRepairMap.clear(); + } + + // Clear the pending delete IDs + pendingDeleteIds.clear(); +} + +template +void HNSWDiskIndex::flushDeleteBatch() { + processDeleteBatch(); +} + /********************************** Debug Methods **********************************/ template @@ -1909,23 +2278,33 @@ HNSWDiskIndex::hierarchicalSearch(const void *data_point, id template void HNSWDiskIndex::flushStagedUpdates() { - // Implement the logic to manually flush any pending staged updates - // This could involve writing any pending staged updates to disk - // or clearing the staged updates if they are no longer needed - // For example, you might want to call flushStagedGraphUpdates() here - flushStagedGraphUpdates(); + // Flush both insert and delete staged updates + // Note: This is a non-const method that modifies the staging areas + flushStagedGraphUpdates(stagedInsertUpdates, stagedInsertNeighborUpdates); + stagedInsertMap.clear(); + vecsim_stl::vector emptyNeighborUpdates(this->allocator); + flushStagedGraphUpdates(stagedDeleteUpdates, emptyNeighborUpdates); + stagedDeleteMap.clear(); + + // Also flush staged repair updates (opportunistic cleanup from getNeighbors) + if (!stagedRepairUpdates.empty()) { + flushStagedGraphUpdates(stagedRepairUpdates, emptyNeighborUpdates); + stagedRepairMap.clear(); + } } template void HNSWDiskIndex::debugPrintStagedUpdates() const { - // Implement the logic to debug print staged updates - // This could involve logging the contents of stagedGraphUpdates and stagedNeighborUpdates - // or any other relevant information + // Print both insert and delete staged updates this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "=== Staged Updates ==="); - this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Staged Graph Updates: %zu", - stagedGraphUpdates.size()); - this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Staged Neighbor Updates: %zu", - stagedNeighborUpdates.size()); + this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Staged Insert Graph Updates: %zu", + stagedInsertUpdates.size()); + this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Staged Insert Neighbor Updates: %zu", + stagedInsertNeighborUpdates.size()); + this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Staged Delete Graph Updates: %zu", + stagedDeleteUpdates.size()); + this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Staged Repair Updates: %zu", + stagedRepairUpdates.size()); } // Add missing method implementations for benchmark framework @@ -1938,9 +2317,28 @@ void HNSWDiskIndex::fitMemory() { template void HNSWDiskIndex::getDataByLabel( labelType label, std::vector> &vectors_output) const { - // TODO: Implement data retrieval by label - // For now, just a stub implementation vectors_output.clear(); + + std::shared_lock index_data_lock(indexDataGuard); + + // Check if label exists in the map + auto it = labelToIdMap.find(label); + if (it == labelToIdMap.end()) { + return; // Label not found + } + + idType id = it->second; + + // Get the raw vector data + const void *raw_data = getRawVector(id); + if (raw_data == nullptr) { + return; // Vector not found + } + + // Copy the vector data + const DataType *data_ptr = static_cast(raw_data); + std::vector vec(data_ptr, data_ptr + this->dim); + vectors_output.push_back(std::move(vec)); } template @@ -1953,18 +2351,29 @@ HNSWDiskIndex::getStoredVectorDataByLabel(labelType label) c template vecsim_stl::set HNSWDiskIndex::getLabelsSet() const { - // TODO: Implement labels set retrieval - // For now, just a stub implementation + std::shared_lock index_data_lock(indexDataGuard); vecsim_stl::set labels(this->allocator); + for (const auto &it : labelToIdMap) { + labels.insert(it.first); + } return labels; } template int HNSWDiskIndex::deleteVector(labelType label) { - // TODO: Implement vector deletion - // For now, just mark the vector as deleted - markDelete(label); - return 0; + + vecsim_stl::vector deleted_ids = markDelete(label); + if (deleted_ids.empty()) { + return 0; // Label not found or already deleted + } + + pendingDeleteIds.insert(pendingDeleteIds.end(), deleted_ids.begin(), deleted_ids.end()); + + if (pendingDeleteIds.size() >= deleteBatchThreshold) { + processDeleteBatch(); + } + + return 1; } template @@ -1977,7 +2386,7 @@ double HNSWDiskIndex::getDistanceFrom_Unsafe(labelType id, template uint64_t HNSWDiskIndex::getAllocationSize() const { - + return this->allocator->getAllocationSize(); } @@ -2035,8 +2444,6 @@ void HNSWDiskIndex::releaseSharedLocks() { template vecsim_stl::vector HNSWDiskIndex::markDelete(labelType label) { - std::unique_lock index_data_lock(indexDataGuard); - vecsim_stl::vector internal_ids(this->allocator); // Find the internal ID for this label @@ -2054,7 +2461,8 @@ vecsim_stl::vector HNSWDiskIndex::markDelete(labelTy return internal_ids; } - // Mark as deleted + // Mark as deleted (but don't clean up raw vectors yet - they're needed for graph repair + // in processDeleteBatch. Cleanup happens there after repair is complete.) markAs(internalId); this->numMarkedDeleted++; diff --git a/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h b/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h index 5e7b9be02..5966a8136 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h +++ b/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h @@ -58,8 +58,10 @@ HNSWDiskIndex::HNSWDiskIndex( indexDataGuard(), visitedNodesHandlerPool(INITIAL_CAPACITY, this->allocator), delta_list(), new_elements_meta_data(this->allocator), batchThreshold(0), // Will be restored from file pendingVectorIds(this->allocator), pendingMetadata(this->allocator), - pendingVectorCount(0), stagedGraphUpdates(this->allocator), - stagedNeighborUpdates(this->allocator) { + pendingVectorCount(0), pendingDeleteIds(this->allocator), + stagedInsertUpdates(this->allocator), + stagedDeleteUpdates(this->allocator), stagedRepairUpdates(this->allocator), + stagedInsertNeighborUpdates(this->allocator) { // Restore index fields from file (including batchThreshold) this->restoreIndexFields(input); @@ -264,11 +266,14 @@ void HNSWDiskIndex::saveIndexIMP(std::ofstream &output) { if (!pendingVectorIds.empty()) { throw std::runtime_error("Serialization error: pendingVectorIds not empty after flush"); } - if (!stagedGraphUpdates.empty()) { - throw std::runtime_error("Serialization error: stagedGraphUpdates not empty after flush"); + if (!stagedInsertUpdates.empty()) { + throw std::runtime_error("Serialization error: stagedInsertUpdates not empty after flush"); } - if (!stagedNeighborUpdates.empty()) { - throw std::runtime_error("Serialization error: stagedNeighborUpdates not empty after flush"); + if (!stagedDeleteUpdates.empty()) { + throw std::runtime_error("Serialization error: stagedDeleteUpdates not empty after flush"); + } + if (!stagedInsertNeighborUpdates.empty()) { + throw std::runtime_error("Serialization error: stagedInsertNeighborUpdates not empty after flush"); } if (!rawVectorsInRAM.empty()) { throw std::runtime_error("Serialization error: rawVectorsInRAM not empty after flush"); @@ -276,6 +281,12 @@ void HNSWDiskIndex::saveIndexIMP(std::ofstream &output) { if (pendingVectorCount != 0) { throw std::runtime_error("Serialization error: pendingVectorCount not zero after flush"); } + if (!stagedRepairUpdates.empty()) { + throw std::runtime_error("Serialization error: stagedRepairUpdates not empty after flush"); + } + if (pendingDeleteIds.size() != 0) { + throw std::runtime_error("Serialization error: pendingDeleteIds not empty after flush"); + } // Note: delta_list and new_elements_meta_data are currently unused legacy variables // but we verify them for future-proofing if (!delta_list.empty()) { @@ -692,8 +703,9 @@ void HNSWDiskIndex::restoreGraph(std::ifstream &input, this->pendingVectorIds.clear(); this->pendingMetadata.clear(); this->pendingVectorCount = 0; - this->stagedGraphUpdates.clear(); - this->stagedNeighborUpdates.clear(); + this->stagedInsertUpdates.clear(); + this->stagedDeleteUpdates.clear(); + this->stagedInsertNeighborUpdates.clear(); // Resize visited nodes handler pool this->visitedNodesHandlerPool.resize(this->curElementCount); diff --git a/tests/benchmark/bm_common.h b/tests/benchmark/bm_common.h index 997835998..5c2bd1476 100644 --- a/tests/benchmark/bm_common.h +++ b/tests/benchmark/bm_common.h @@ -36,6 +36,13 @@ class BM_VecSimCommon : public BM_VecSimIndex { static void TopK_HNSW(benchmark::State &st, unsigned short index_offset = 0); static void TopK_HNSW_DISK(benchmark::State &st); static void TopK_HNSW_DISK_MarkDeleted(benchmark::State &st); + static void TopK_HNSW_DISK_DeleteLabel(benchmark::State &st); + // Same as DeleteLabel but excludes ground truth vectors from deletion to keep recall stable + static void TopK_HNSW_DISK_DeleteLabel_ProtectGT(benchmark::State &st); + // Test deletion performance with different batch sizes + static void TopK_HNSW_DISK_DeleteLabel_BatchSize(benchmark::State &st); + // Stress test with high deletion ratios (50%, 75%, 90%+) + static void TopK_HNSW_DISK_DeleteLabel_Stress(benchmark::State &st); static void TopK_Tiered(benchmark::State &st, unsigned short index_offset = 0); // Does nothing but returning the index memory. @@ -157,13 +164,17 @@ void BM_VecSimCommon::TopK_HNSW_DISK_MarkDeleted(benchmark::State size_t stratum_start = (i * N_VECTORS) / num_to_delete; size_t stratum_end = ((i + 1) * N_VECTORS) / num_to_delete; size_t stratum_size = stratum_end - stratum_start; - + std::uniform_int_distribution dist(0, stratum_size - 1); labelType label = stratum_start + dist(rng); deleted_labels.push_back(label); disk_index->markDelete(label); } + // Create hash set for O(1) lookup during ground truth filtering + // With up to 50K deleted labels, this avoids O(n) linear search overhead + std::unordered_set deleted_labels_set(deleted_labels.begin(), deleted_labels.end()); + size_t total_marked = disk_index->getNumMarkedDeleted(); st.counters["num_marked_deleted"] = total_marked; @@ -191,7 +202,8 @@ void BM_VecSimCommon::TopK_HNSW_DISK_MarkDeleted(benchmark::State auto filtered_res = new VecSimQueryReply(VecSimAllocator::newVecsimAllocator()); for (const auto &res : gt_results->results) { - if (std::find(deleted_labels.begin(), deleted_labels.end(), res.id) == deleted_labels.end()) { + // Use hash set for O(1) lookup instead of O(n) linear search + if (deleted_labels_set.find(res.id) == deleted_labels_set.end()) { filtered_res->results.emplace_back(res.id, res.score); // Stop once we have k non-deleted results if (filtered_res->results.size() >= k) { @@ -219,6 +231,438 @@ void BM_VecSimCommon::TopK_HNSW_DISK_MarkDeleted(benchmark::State } +template +void BM_VecSimCommon::TopK_HNSW_DISK_DeleteLabel(benchmark::State &st) { + using data_t = typename index_type_t::data_t; + using dist_t = typename index_type_t::dist_t; + + size_t iter = 0; + + // Reload the index to get a fresh copy without any deleted vectors + std::string folder_path = BM_VecSimGeneral::AttachRootPath(BM_VecSimGeneral::hnsw_index_file); + INDICES[INDEX_HNSW_DISK] = IndexPtr(HNSWDiskFactory::NewIndex(folder_path)); + auto hnsw_index = GET_INDEX(INDEX_HNSW_DISK); + auto *disk_index = dynamic_cast *>(hnsw_index); + + // Delete vectors using deleteVector (which processes batch and repairs graph) + std::vector deleted_labels; + const size_t num_to_delete = st.range(2); + + // Get pseudo-random unique labels, but the same ones for all runs of the benchmark + // Divide N_VECTORS into num_to_delete equal strata and pick one from each + std::mt19937 rng(42); // Fixed seed for determinism + for (size_t i = 0; i < num_to_delete && i < N_VECTORS; i++) { + size_t stratum_start = (i * N_VECTORS) / num_to_delete; + size_t stratum_end = ((i + 1) * N_VECTORS) / num_to_delete; + size_t stratum_size = stratum_end - stratum_start; + + std::uniform_int_distribution dist(0, stratum_size - 1); + labelType label = stratum_start + dist(rng); + deleted_labels.push_back(label); + } + + // Measure the time spent on deleteVector calls (includes automatic batch processing) + auto delete_start = std::chrono::high_resolution_clock::now(); + for (const auto &label : deleted_labels) { + disk_index->deleteVector(label); + } + // Force flush any pending deletes to ensure graph is fully repaired + disk_index->flushDeleteBatch(); + auto delete_end = std::chrono::high_resolution_clock::now(); + double delete_time_ms = std::chrono::duration(delete_end - delete_start).count(); + + // Create hash set for O(1) lookup during ground truth filtering + // With up to 50K deleted labels, this avoids O(n) linear search overhead + std::unordered_set deleted_labels_set(deleted_labels.begin(), deleted_labels.end()); + + size_t total_deleted = deleted_labels.size(); + st.counters["num_deleted"] = total_deleted; + st.counters["delete_time_ms"] = delete_time_ms; + if (total_deleted > 0) { + st.counters["delete_time_per_vector_ms"] = delete_time_ms / total_deleted; + } else { + st.counters["delete_time_per_vector_ms"] = 0.0; + } + + // Get DB statistics before benchmark + auto stats = disk_index->getDBStatistics(); + size_t io_bytes_before = 0; + if (stats) { + io_bytes_before = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + } + + std::atomic_int correct = 0; + size_t ef = st.range(0); + size_t k = st.range(1); + + for (auto _ : st) { + HNSWRuntimeParams hnswRuntimeParams = {.efRuntime = ef}; + auto query_params = BM_VecSimGeneral::CreateQueryParams(hnswRuntimeParams); + auto &q = QUERIES[iter % N_QUERIES]; + + auto hnsw_results = VecSimIndex_TopKQuery(hnsw_index, q.data(), k, &query_params, BY_SCORE); + st.PauseTiming(); + + // Get all (100) ground truth results + auto gt_results = BM_VecSimIndex::TopKGroundTruth(iter % N_QUERIES, 100); + + auto filtered_res = new VecSimQueryReply(VecSimAllocator::newVecsimAllocator()); + for (const auto &res : gt_results->results) { + // Use hash set for O(1) lookup instead of O(n) linear search + if (deleted_labels_set.find(res.id) == deleted_labels_set.end()) { + filtered_res->results.emplace_back(res.id, res.score); + // Stop once we have k non-deleted results + if (filtered_res->results.size() >= k) { + break; + } + } + } + if (filtered_res->results.size() < k) { + std::cout << "Not enough non-deleted ground truth results to compare against (only " + << filtered_res->results.size() << " out of " << k << " requested)" << std::endl; + } + + BM_VecSimGeneral::MeasureRecall(hnsw_results, filtered_res, correct); + + VecSimQueryReply_Free(hnsw_results); + VecSimQueryReply_Free(filtered_res); + VecSimQueryReply_Free(gt_results); + st.ResumeTiming(); + iter++; + } + st.counters["Recall"] = (float)correct / (float)(k * iter); + if (stats) { + size_t io_bytes_after = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + st.counters["io_bytes_per_query"] = static_cast(io_bytes_after - io_bytes_before) / iter; + } +} + +// Same as TopK_HNSW_DISK_DeleteLabel but excludes ground truth vectors from deletion. +// This keeps the ground truth stable across different deletion counts for fair recall comparison. +// st.range(0) = ef_runtime +// st.range(1) = k +// st.range(2) = number of vectors to delete +template +void BM_VecSimCommon::TopK_HNSW_DISK_DeleteLabel_ProtectGT(benchmark::State &st) { + using data_t = typename index_type_t::data_t; + using dist_t = typename index_type_t::dist_t; + + + + // Build a set of all ground truth vector IDs (to protect from deletion) + size_t num_iterations = st.iterations(); + std::unordered_set gt_labels_set; + for (size_t q = 0; q < std::min(num_iterations, N_QUERIES); q++) { + auto gt_results = BM_VecSimIndex::TopKGroundTruth(q, 100); + for (const auto &res : gt_results->results) { + gt_labels_set.insert(res.id); + } + VecSimQueryReply_Free(gt_results); + } + + // Reload the index to get a fresh copy without any deleted vectors + std::string folder_path = BM_VecSimGeneral::AttachRootPath(BM_VecSimGeneral::hnsw_index_file); + INDICES[INDEX_HNSW_DISK] = IndexPtr(HNSWDiskFactory::NewIndex(folder_path)); + auto hnsw_index = GET_INDEX(INDEX_HNSW_DISK); + auto *disk_index = dynamic_cast *>(hnsw_index); + + // Delete vectors using deleteVector, but skip ground truth vectors + std::vector deleted_labels; + const size_t num_to_delete = st.range(2); + + // Get pseudo-random unique labels, but the same ones for all runs of the benchmark + // Divide N_VECTORS into num_to_delete equal strata and pick one from each + // Skip any labels that are in ground truth + std::mt19937 rng(42); // Fixed seed for determinism + size_t skipped_gt = 0; + for (size_t i = 0; i < num_to_delete && i < N_VECTORS; i++) { + size_t stratum_start = (i * N_VECTORS) / num_to_delete; + size_t stratum_end = ((i + 1) * N_VECTORS) / num_to_delete; + size_t stratum_size = stratum_end - stratum_start; + + std::uniform_int_distribution dist(0, stratum_size - 1); + labelType label = stratum_start + dist(rng); + + // Skip if this label is in ground truth + if (gt_labels_set.find(label) != gt_labels_set.end()) { + skipped_gt++; + continue; + } + deleted_labels.push_back(label); + } + + // Measure the time spent on deleteVector calls (includes automatic batch processing) + auto delete_start = std::chrono::high_resolution_clock::now(); + for (const auto &label : deleted_labels) { + disk_index->deleteVector(label); + } + // Force flush any pending deletes to ensure graph is fully repaired + disk_index->flushDeleteBatch(); + auto delete_end = std::chrono::high_resolution_clock::now(); + double delete_time_ms = std::chrono::duration(delete_end - delete_start).count(); + + size_t total_deleted = deleted_labels.size(); + st.counters["num_deleted"] = total_deleted; + st.counters["num_gt_protected"] = skipped_gt; + st.counters["delete_time_ms"] = delete_time_ms; + if (total_deleted > 0) { + st.counters["delete_time_per_vector_ms"] = delete_time_ms / total_deleted; + } + + // Get DB statistics before benchmark + auto stats = disk_index->getDBStatistics(); + size_t io_bytes_before = 0; + if (stats) { + io_bytes_before = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + } + + size_t iter = 0; + std::atomic_int correct = 0; + size_t ef = st.range(0); + size_t k = st.range(1); + + for (auto _ : st) { + HNSWRuntimeParams hnswRuntimeParams = {.efRuntime = ef}; + auto query_params = BM_VecSimGeneral::CreateQueryParams(hnswRuntimeParams); + auto &q = QUERIES[iter % N_QUERIES]; + + auto hnsw_results = VecSimIndex_TopKQuery(hnsw_index, q.data(), k, &query_params, BY_SCORE); + st.PauseTiming(); + + // Ground truth is unchanged since we protected all GT vectors from deletion + auto gt_results = BM_VecSimIndex::TopKGroundTruth(iter % N_QUERIES, k); + + BM_VecSimGeneral::MeasureRecall(hnsw_results, gt_results, correct); + + VecSimQueryReply_Free(hnsw_results); + VecSimQueryReply_Free(gt_results); + st.ResumeTiming(); + iter++; + } + st.counters["Recall"] = (float)correct / (float)(k * iter); + if (stats) { + size_t io_bytes_after = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + st.counters["io_bytes_per_query"] = static_cast(io_bytes_after - io_bytes_before) / iter; + } +} + +// Benchmark deletion performance with different batch sizes +// st.range(0) = ef_runtime +// st.range(1) = num_deleted +// st.range(2) = batch_size +template +void BM_VecSimCommon::TopK_HNSW_DISK_DeleteLabel_BatchSize(benchmark::State &st) { + using data_t = typename index_type_t::data_t; + using dist_t = typename index_type_t::dist_t; + + size_t iter = 0; + const size_t ef = st.range(0); + const size_t num_to_delete = st.range(1); + const size_t batch_size = st.range(2); + + // Reload the index to get a fresh copy without any deleted vectors + std::string folder_path = BM_VecSimGeneral::AttachRootPath(BM_VecSimGeneral::hnsw_index_file); + INDICES[INDEX_HNSW_DISK] = IndexPtr(HNSWDiskFactory::NewIndex(folder_path)); + auto hnsw_index = GET_INDEX(INDEX_HNSW_DISK); + auto *disk_index = dynamic_cast *>(hnsw_index); + + // Set the batch threshold to the specified batch size + disk_index->setDeleteBatchThreshold(batch_size); + + // Get pseudo-random unique labels using stratified sampling + std::vector deleted_labels; + std::mt19937 rng(42); // Fixed seed for determinism + for (size_t i = 0; i < num_to_delete && i < N_VECTORS; i++) { + size_t stratum_start = (i * N_VECTORS) / num_to_delete; + size_t stratum_end = ((i + 1) * N_VECTORS) / num_to_delete; + size_t stratum_size = stratum_end - stratum_start; + + std::uniform_int_distribution dist(0, stratum_size - 1); + labelType label = stratum_start + dist(rng); + deleted_labels.push_back(label); + } + + // Create hash set for O(1) lookup during ground truth filtering + std::unordered_set deleted_labels_set(deleted_labels.begin(), deleted_labels.end()); + + // Measure deletion time (includes batch processing triggered automatically) + auto delete_start = std::chrono::high_resolution_clock::now(); + size_t batch_flushes = 0; + for (const auto &label : deleted_labels) { + size_t pending_before = disk_index->getPendingDeleteCount(); + disk_index->deleteVector(label); + // Count when a batch flush was triggered + if (disk_index->getPendingDeleteCount() < pending_before) { + batch_flushes++; + } + } + // Force flush any remaining pending deletes + if (disk_index->getPendingDeleteCount() > 0) { + disk_index->flushDeleteBatch(); + batch_flushes++; + } + auto delete_end = std::chrono::high_resolution_clock::now(); + double delete_time_ms = std::chrono::duration(delete_end - delete_start).count(); + + // Report metrics + st.counters["batch_size"] = batch_size; + st.counters["num_deleted"] = deleted_labels.size(); + st.counters["delete_time_ms"] = delete_time_ms; + st.counters["batch_flushes"] = batch_flushes; + if (deleted_labels.size() > 0) { + st.counters["delete_time_per_vector_ms"] = delete_time_ms / deleted_labels.size(); + } + + // Get DB statistics before search benchmark + auto stats = disk_index->getDBStatistics(); + size_t io_bytes_before = 0; + if (stats) { + io_bytes_before = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + } + + std::atomic_int correct = 0; + size_t k = 10; // Fixed k for batch size testing + + for (auto _ : st) { + HNSWRuntimeParams hnswRuntimeParams = {.efRuntime = ef}; + auto query_params = BM_VecSimGeneral::CreateQueryParams(hnswRuntimeParams); + auto &q = QUERIES[iter % N_QUERIES]; + + auto hnsw_results = VecSimIndex_TopKQuery(hnsw_index, q.data(), k, &query_params, BY_SCORE); + st.PauseTiming(); + + // Get ground truth and filter deleted vectors + auto gt_results = BM_VecSimIndex::TopKGroundTruth(iter % N_QUERIES, 100); + + auto filtered_res = new VecSimQueryReply(VecSimAllocator::newVecsimAllocator()); + for (const auto &res : gt_results->results) { + if (deleted_labels_set.find(res.id) == deleted_labels_set.end()) { + filtered_res->results.emplace_back(res.id, res.score); + if (filtered_res->results.size() >= k) { + break; + } + } + } + + BM_VecSimGeneral::MeasureRecall(hnsw_results, filtered_res, correct); + + VecSimQueryReply_Free(hnsw_results); + VecSimQueryReply_Free(filtered_res); + VecSimQueryReply_Free(gt_results); + st.ResumeTiming(); + iter++; + } + st.counters["Recall"] = (float)correct / (float)(k * iter); + if (stats) { + size_t io_bytes_after = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + st.counters["io_bytes_per_query"] = static_cast(io_bytes_after - io_bytes_before) / iter; + } +} + +// Stress test with high deletion ratios (50%, 75%, 90%+) +// st.range(0) = ef_runtime +// st.range(1) = num_deleted (expected to be high percentage of N_VECTORS) +template +void BM_VecSimCommon::TopK_HNSW_DISK_DeleteLabel_Stress(benchmark::State &st) { + using data_t = typename index_type_t::data_t; + using dist_t = typename index_type_t::dist_t; + + size_t iter = 0; + const size_t ef = st.range(0); + const size_t num_to_delete = st.range(1); + + // Reload the index to get a fresh copy + std::string folder_path = BM_VecSimGeneral::AttachRootPath(BM_VecSimGeneral::hnsw_index_file); + INDICES[INDEX_HNSW_DISK] = IndexPtr(HNSWDiskFactory::NewIndex(folder_path)); + auto hnsw_index = GET_INDEX(INDEX_HNSW_DISK); + auto *disk_index = dynamic_cast *>(hnsw_index); + + // Get pseudo-random unique labels using stratified sampling + std::vector deleted_labels; + std::mt19937 rng(42); // Fixed seed for determinism + for (size_t i = 0; i < num_to_delete && i < N_VECTORS; i++) { + size_t stratum_start = (i * N_VECTORS) / num_to_delete; + size_t stratum_end = ((i + 1) * N_VECTORS) / num_to_delete; + size_t stratum_size = stratum_end - stratum_start; + + std::uniform_int_distribution dist(0, stratum_size - 1); + labelType label = stratum_start + dist(rng); + deleted_labels.push_back(label); + } + + // Create hash set for O(1) lookup + std::unordered_set deleted_labels_set(deleted_labels.begin(), deleted_labels.end()); + + // Measure deletion time + auto delete_start = std::chrono::high_resolution_clock::now(); + for (const auto &label : deleted_labels) { + disk_index->deleteVector(label); + } + disk_index->flushDeleteBatch(); + auto delete_end = std::chrono::high_resolution_clock::now(); + double delete_time_ms = std::chrono::duration(delete_end - delete_start).count(); + + // Calculate deletion ratio + double deletion_ratio = (double)deleted_labels.size() / (double)N_VECTORS * 100.0; + + // Report metrics + st.counters["num_deleted"] = deleted_labels.size(); + st.counters["deletion_ratio_pct"] = deletion_ratio; + st.counters["delete_time_ms"] = delete_time_ms; + st.counters["remaining_vectors"] = N_VECTORS - deleted_labels.size(); + if (deleted_labels.size() > 0) { + st.counters["delete_time_per_vector_ms"] = delete_time_ms / deleted_labels.size(); + } + + // Get DB statistics before search benchmark + auto stats = disk_index->getDBStatistics(); + size_t io_bytes_before = 0; + if (stats) { + io_bytes_before = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + } + + std::atomic_int correct = 0; + size_t k = 10; // Fixed k for stress testing + + for (auto _ : st) { + HNSWRuntimeParams hnswRuntimeParams = {.efRuntime = ef}; + auto query_params = BM_VecSimGeneral::CreateQueryParams(hnswRuntimeParams); + auto &q = QUERIES[iter % N_QUERIES]; + + auto hnsw_results = VecSimIndex_TopKQuery(hnsw_index, q.data(), k, &query_params, BY_SCORE); + st.PauseTiming(); + + // Get ground truth and filter deleted vectors + auto gt_results = BM_VecSimIndex::TopKGroundTruth(iter % N_QUERIES, 100); + + auto filtered_res = new VecSimQueryReply(VecSimAllocator::newVecsimAllocator()); + for (const auto &res : gt_results->results) { + if (deleted_labels_set.find(res.id) == deleted_labels_set.end()) { + filtered_res->results.emplace_back(res.id, res.score); + if (filtered_res->results.size() >= k) { + break; + } + } + } + + // For stress tests, it's OK if we can't find k results + if (filtered_res->results.size() > 0) { + BM_VecSimGeneral::MeasureRecall(hnsw_results, filtered_res, correct); + } + + VecSimQueryReply_Free(hnsw_results); + VecSimQueryReply_Free(filtered_res); + VecSimQueryReply_Free(gt_results); + st.ResumeTiming(); + iter++; + } + st.counters["Recall"] = (float)correct / (float)(k * iter); + if (stats) { + size_t io_bytes_after = stats->getTickerCount(rocksdb::Tickers::BYTES_COMPRESSED_TO); + st.counters["io_bytes_per_query"] = static_cast(io_bytes_after - io_bytes_before) / iter; + } +} + template void BM_VecSimCommon::TopK_BF(benchmark::State &st, unsigned short index_offset) { @@ -231,8 +675,6 @@ void BM_VecSimCommon::TopK_BF(benchmark::State &st, unsigned short } } - - template void BM_VecSimCommon::TopK_HNSW(benchmark::State &st, unsigned short index_offset) { size_t ef = st.range(0); @@ -332,14 +774,42 @@ void BM_VecSimCommon::TopK_Tiered(benchmark::State &st, unsigned s BENCHMARK_REGISTER_F(BM_CLASS, BM_FUNC) \ ->Args({10, 10, 1000}) \ ->Args({10, 10, 10000}) \ - ->Args({10, 10, 50000}) \ + ->Args({10, 10, 25000}) \ ->Args({200, 50, 1000}) \ ->Args({200, 50, 10000}) \ - ->Args({200, 50, 50000}) \ + ->Args({200, 50, 25000}) \ ->ArgNames({"ef_runtime", "k", "num_marked_deleted"}) \ ->Iterations(10) \ ->Unit(benchmark::kMillisecond) +// {ef_runtime, k, num_deleted} +// Test the performance after fully deleting vectors (with graph repair) +#define REGISTER_TopK_HNSW_DISK_DeleteLabel(BM_CLASS, BM_FUNC) \ + BENCHMARK_REGISTER_F(BM_CLASS, BM_FUNC) \ + ->Args({10, 10, 1000}) \ + ->Args({10, 10, 10000}) \ + ->Args({10, 10, 25000}) \ + ->Args({200, 50, 1000}) \ + ->Args({200, 50, 10000}) \ + ->Args({200, 50, 25000}) \ + ->ArgNames({"ef_runtime", "k", "num_deleted"}) \ + ->Iterations(10) \ + ->Unit(benchmark::kMillisecond) + +// {ef_runtime, k, num_deleted} +// Same as DeleteLabel but protects ground truth vectors from deletion for stable recall comparison +#define REGISTER_TopK_HNSW_DISK_DeleteLabel_ProtectGT(BM_CLASS, BM_FUNC) \ + BENCHMARK_REGISTER_F(BM_CLASS, BM_FUNC) \ + ->Args({10, 10, 1000}) \ + ->Args({10, 10, 10000}) \ + ->Args({10, 10, 25000}) \ + ->Args({200, 50, 1000}) \ + ->Args({200, 50, 10000}) \ + ->Args({200, 50, 25000}) \ + ->ArgNames({"ef_runtime", "k", "num_deleted"}) \ + ->Iterations(10) \ + ->Unit(benchmark::kMillisecond) + // {ef_runtime, k} (recall that always ef_runtime >= k) #define REGISTER_TopK_Tiered(BM_CLASS, BM_FUNC) \ BENCHMARK_REGISTER_F(BM_CLASS, BM_FUNC) \ @@ -351,3 +821,35 @@ void BM_VecSimCommon::TopK_Tiered(benchmark::State &st, unsigned s ->ArgNames({"ef_runtime", "k"}) \ ->Iterations(50) \ ->Unit(benchmark::kMillisecond) + +// Registration macro for batch size testing +#define REGISTER_TopK_HNSW_DISK_DeleteLabel_BatchSize(BM_CLASS, BM_FUNC) \ + BENCHMARK_REGISTER_F(BM_CLASS, BM_FUNC) \ + ->Args({100, 5000, 1}) /* ef=100, 5K deletions, batch_size=1 */ \ + ->Args({100, 5000, 5}) /* batch_size=5 */ \ + ->Args({100, 5000, 50}) /* batch_size=50 */ \ + ->Args({100, 5000, 500}) /* batch_size=500 */ \ + ->Args({100, 5000, 1000}) /* batch_size=1000 */ \ + ->Args({50, 2000, 1}) /* Lower ef, fewer deletions */ \ + ->Args({50, 2000, 10}) \ + ->Args({50, 2000, 100}) \ + ->Args({200, 10000, 1}) /* Higher ef, more deletions */ \ + ->Args({200, 10000, 50}) \ + ->Args({200, 10000, 1000}) /* Large batch */ \ + ->ArgNames({"ef_runtime", "num_deleted", "batch_size"}) \ + ->Unit(benchmark::kMicrosecond); + +// Registration macro for stress testing +#define REGISTER_TopK_HNSW_DISK_DeleteLabel_Stress(BM_CLASS, BM_FUNC) \ + BENCHMARK_REGISTER_F(BM_CLASS, BM_FUNC) \ + ->Args({50, 12500}) /* 50% deletion ratio */ \ + ->Args({100, 12500}) /* 50% deletion, higher ef */ \ + ->Args({200, 12500}) /* 50% deletion, max ef */ \ + ->Args({50, 18750}) /* 75% deletion ratio */ \ + ->Args({100, 18750}) /* 75% deletion, higher ef */ \ + ->Args({50, 22500}) /* 90% deletion ratio (extreme stress) */ \ + ->Args({100, 22500}) /* 90% deletion, higher ef */ \ + ->Args({25, 24000}) /* 96% deletion (maximum stress) */ \ + ->ArgNames({"ef_runtime", "num_deleted"}) \ + ->Unit(benchmark::kMicrosecond) \ + ->Iterations(3); /* Fewer iterations for stress tests */ diff --git a/tests/benchmark/bm_files.sh b/tests/benchmark/bm_files.sh index eee4315fc..0cc9d94d0 100755 --- a/tests/benchmark/bm_files.sh +++ b/tests/benchmark/bm_files.sh @@ -41,6 +41,8 @@ then file_name="basic_uint8" elif [ "$BM_TYPE" = "bm-updated-fp32-single" ]; then file_name="updated" +elif [ "$BM_TYPE" = "bm-hnsw-disk-fp32-single" ]; then + file_name="bm-hnsw-disk-fp32-single" else echo "No files to download for BM_TYPE=$BM_TYPE" exit 0 diff --git a/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h b/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h index 6ea95217a..5c765f2b9 100644 --- a/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h +++ b/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h @@ -44,7 +44,15 @@ BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimCommon, BM_FUNC_NAME(TopK_MarkDeleted, HNSW (benchmark::State &st) { TopK_HNSW_DISK_MarkDeleted(st); } REGISTER_TopK_HNSW_DISK_MarkDeleted(BM_VecSimCommon, BM_FUNC_NAME(TopK_MarkDeleted, HNSWDisk)); +// TopK benchmark after deleting vectors (with graph repair) +BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimCommon, BM_FUNC_NAME(TopK_DeleteLabel, HNSWDisk), fp32_index_t) +(benchmark::State &st) { TopK_HNSW_DISK_DeleteLabel(st); } +REGISTER_TopK_HNSW_DISK_DeleteLabel(BM_VecSimCommon, BM_FUNC_NAME(TopK_DeleteLabel, HNSWDisk)); +// TopK benchmark after deleting vectors (with graph repair), protecting GT vectors for stable recall +BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimCommon, BM_FUNC_NAME(TopK_DeleteLabel_ProtectGT, HNSWDisk), fp32_index_t) +(benchmark::State &st) { TopK_HNSW_DISK_DeleteLabel_ProtectGT(st); } +REGISTER_TopK_HNSW_DISK_DeleteLabel_ProtectGT(BM_VecSimCommon, BM_FUNC_NAME(TopK_DeleteLabel_ProtectGT, HNSWDisk)); // Range benchmarks // BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimBasics, BM_FUNC_NAME(Range, BF), fp32_index_t) diff --git a/tests/benchmark/data/hnsw_indices/hnsw_indices_basic_fp32.txt b/tests/benchmark/data/hnsw_indices/hnsw_indices_basic_fp32.txt index 82284368e..baf216a7d 100644 --- a/tests/benchmark/data/hnsw_indices/hnsw_indices_basic_fp32.txt +++ b/tests/benchmark/data/hnsw_indices/hnsw_indices_basic_fp32.txt @@ -3,3 +3,4 @@ https://dev.cto.redis.s3.amazonaws.com/VectorSimilarity/dbpedia-cosine-dim768-te https://dev.cto.redis.s3.amazonaws.com/VectorSimilarity/fashion_images_multi_value-cosine-dim512-M64-efc512.hnsw_v3 https://dev.cto.redis.s3.amazonaws.com/VectorSimilarity/fashion_images_multi_value-cosine-dim512-test_vectors.raw + diff --git a/tests/benchmark/data/hnsw_indices/hnsw_indices_bm-hnsw-disk-fp32-single.txt b/tests/benchmark/data/hnsw_indices/hnsw_indices_bm-hnsw-disk-fp32-single.txt new file mode 100644 index 000000000..473207f35 --- /dev/null +++ b/tests/benchmark/data/hnsw_indices/hnsw_indices_bm-hnsw-disk-fp32-single.txt @@ -0,0 +1,3 @@ +https://dev.cto.redis.s3.amazonaws.com/VectorSimilarity/deep1b/deep-1M-cosine-dim96-M32-efc200-disk-vectors.zip +https://dev.cto.redis.s3.amazonaws.com/VectorSimilarity/deep1b/deep.query.public.10K.fbin +https://dev.cto.redis.s3.amazonaws.com/VectorSimilarity/deep1b/deep.groundtruth.1M.10K.ibin diff --git a/tests/unit/test_hnsw_disk.cpp b/tests/unit/test_hnsw_disk.cpp index a6139073a..24e05fdbf 100644 --- a/tests/unit/test_hnsw_disk.cpp +++ b/tests/unit/test_hnsw_disk.cpp @@ -1003,3 +1003,764 @@ TEST_F(HNSWDiskIndexTest, markDelete) { } delete results; } + +TEST_F(HNSWDiskIndexTest, BatchedDeletionTest) { + // Test batched deletion functionality + const size_t dim = 64; + const size_t n = 150; // More than deleteBatchThreshold (10) + + // Create HNSW parameters + HNSWParams params; + params.dim = dim; + params.type = VecSimType_FLOAT32; + params.metric = VecSimMetric_L2; + params.multi = false; + params.M = 8; + params.efConstruction = 100; + params.efRuntime = 50; + params.epsilon = 0.01; + + // Create abstract init parameters + AbstractIndexInitParams abstractInitParams; + abstractInitParams.dim = dim; + abstractInitParams.vecType = params.type; + abstractInitParams.dataSize = dim * sizeof(float); + abstractInitParams.blockSize = 1024; // Set block size + abstractInitParams.multi = false; + abstractInitParams.allocator = VecSimAllocator::newVecsimAllocator(); + + // Create index components + IndexComponents components = CreateIndexComponents( + abstractInitParams.allocator, VecSimMetric_L2, dim, false); + + // Create HNSWDiskIndex + rocksdb::ColumnFamilyHandle *default_cf = db->DefaultColumnFamily(); + HNSWDiskIndex index(¶ms, abstractInitParams, components, db.get(), + default_cf, temp_dir); + + // Add vectors to the index + std::mt19937 rng(42); + for (labelType label = 0; label < n; label++) { + auto vec = createRandomVector(dim, rng); + index.addVector(vec.data(), label); + } + + // Flush any pending batches + index.flushBatch(); + + // Verify all vectors were added + ASSERT_EQ(index.indexSize(), n); + ASSERT_EQ(index.indexLabelCount(), n); + + // Delete vectors in batches (delete every other vector) + // This should trigger batch processing when we reach deleteBatchThreshold + size_t deleted_count = 0; + for (labelType label = 0; label < n; label += 2) { + int result = index.deleteVector(label); + ASSERT_EQ(result, 1); // Deletion should succeed + deleted_count++; + } + + // Manually flush any remaining deletes + index.flushDeleteBatch(); + + // Verify the index size and label count + // Note: indexSize includes marked deleted vectors + ASSERT_EQ(index.indexSize(), n); + ASSERT_EQ(index.indexLabelCount(), n - deleted_count); + + // Verify that deleted vectors cannot be deleted again (they don't exist) + for (labelType label = 0; label < n; label += 2) { + int result = index.deleteVector(label); + ASSERT_EQ(result, 0) << "Deleted vector " << label << " should not be found"; + } + + // Get the set of labels to verify which ones exist + auto labels_set = index.getLabelsSet(); + + // Verify that deleted vectors are not in the labels set + for (labelType label = 0; label < n; label += 2) { + ASSERT_EQ(labels_set.count(label), 0) << "Deleted label " << label << " should not be in labels set"; + } + + // Verify that non-deleted vectors are in the labels set + for (labelType label = 1; label < n; label += 2) { + ASSERT_EQ(labels_set.count(label), 1) << "Non-deleted label " << label << " should be in labels set"; + } + + // Perform a search to verify graph connectivity is maintained + VecSimQueryParams queryParams; + queryParams.hnswRuntimeParams.efRuntime = 50; + + // Search using a non-deleted vector + auto query_vec = createRandomVector(dim, rng); + size_t k = 10; + auto results = index.topKQuery(query_vec.data(), k, &queryParams); + + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + ASSERT_LE(results->results.size(), k); + + // Verify that all returned results are non-deleted vectors (odd labels) + for (size_t i = 0; i < results->results.size(); i++) { + labelType result_label = results->results[i].id; + ASSERT_EQ(result_label % 2, 1) << "Found deleted vector in search results: " << result_label; + } + + delete results; +} + +// Test interleaved insertions and deletions to verify separated staging areas +TEST_F(HNSWDiskIndexTest, InterleavedInsertDeleteTest) { + const size_t dim = 64; + const size_t initial_count = 100; + + // Create HNSW parameters + HNSWParams params; + params.dim = dim; + params.type = VecSimType_FLOAT32; + params.metric = VecSimMetric_L2; + params.multi = false; + params.M = 8; + params.efConstruction = 100; + params.efRuntime = 50; + params.epsilon = 0.01; + + // Create abstract init parameters + AbstractIndexInitParams abstractInitParams; + abstractInitParams.dim = dim; + abstractInitParams.vecType = params.type; + abstractInitParams.dataSize = dim * sizeof(float); + abstractInitParams.blockSize = 1024; + abstractInitParams.multi = false; + abstractInitParams.allocator = VecSimAllocator::newVecsimAllocator(); + + // Create index components + IndexComponents components = CreateIndexComponents( + abstractInitParams.allocator, VecSimMetric_L2, dim, false); + + // Create HNSWDiskIndex + rocksdb::ColumnFamilyHandle *default_cf = db->DefaultColumnFamily(); + HNSWDiskIndex index(¶ms, abstractInitParams, components, db.get(), + default_cf, temp_dir); + + // Phase 1: Add initial vectors (0-99) + std::vector> vectors; + std::mt19937 rng(42); + + for (labelType label = 0; label < initial_count; label++) { + auto vec = createRandomVector(dim, rng); + vectors.push_back(vec); + int ret = index.addVector(vec.data(), label); + ASSERT_EQ(ret, 1) << "Failed to add vector " << label; + } + + // Flush any pending batches + index.flushBatch(); + + ASSERT_EQ(index.indexSize(), initial_count); + ASSERT_EQ(index.indexLabelCount(), initial_count); + + // Phase 2: Interleave deletions and insertions + // Delete vectors 0-19 (20 deletions) + // Add vectors 100-119 (20 insertions) + // This tests that both staging areas work independently + + size_t delete_start = 0; + size_t delete_count = 20; + size_t insert_start = 100; + size_t insert_count = 20; + + // Interleave: delete one, insert one, delete one, insert one, etc. + for (size_t i = 0; i < delete_count; i++) { + // Delete a vector + labelType delete_label = delete_start + i; + int delete_ret = index.deleteVector(delete_label); + ASSERT_EQ(delete_ret, 1) << "Failed to delete vector " << delete_label; + + // Insert a new vector + labelType insert_label = insert_start + i; + auto new_vec = createRandomVector(dim, rng); + vectors.push_back(new_vec); + int insert_ret = index.addVector(new_vec.data(), insert_label); + ASSERT_EQ(insert_ret, 1) << "Failed to add vector " << insert_label; + } + + // Flush any pending batches + index.flushBatch(); + index.flushDeleteBatch(); + + // Verify index state + // indexSize() returns curElementCount (highest ID + 1) + // Without ID recycling, curElementCount grows with each insertion. + // After initial_count insertions + insert_count new insertions = initial_count + insert_count + ASSERT_EQ(index.indexSize(), initial_count + insert_count); + // indexLabelCount() returns labelToIdMap.size() which reflects active (non-deleted) labels + // So it should be: initial_count - delete_count + insert_count = 100 - 20 + 20 = 100 + ASSERT_EQ(index.indexLabelCount(), initial_count - delete_count + insert_count); + + // Phase 3: Verify deleted vectors are gone + for (size_t i = delete_start; i < delete_start + delete_count; i++) { + int ret = index.deleteVector(i); + ASSERT_EQ(ret, 0) << "Vector " << i << " should already be deleted"; + } + + // Phase 4: Verify new vectors are searchable + auto labels = index.getLabelsSet(); + + // Check deleted vectors are not in the set + for (size_t i = delete_start; i < delete_start + delete_count; i++) { + ASSERT_EQ(labels.count(i), 0) << "Deleted vector " << i << " still in labels set"; + } + + // Check new vectors are in the set + for (size_t i = insert_start; i < insert_start + insert_count; i++) { + ASSERT_EQ(labels.count(i), 1) << "New vector " << i << " not in labels set"; + } + + // Phase 5: Perform a search to verify graph integrity + size_t k = 10; + auto *results = index.topKQuery(vectors[50].data(), k, nullptr); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + ASSERT_LE(results->results.size(), k); + + // Verify no deleted vectors appear in results + for (size_t i = 0; i < results->results.size(); i++) { + labelType result_label = results->results[i].id; + ASSERT_FALSE(result_label >= delete_start && result_label < delete_start + delete_count) + << "Found deleted vector in search results: " << result_label; + } + + delete results; + + // Phase 6: More aggressive interleaving - multiple operations before batch flush + // Delete vectors 20-29 and add vectors 120-129 + delete_start = 20; + delete_count = 10; + insert_start = 120; + insert_count = 10; + + for (size_t i = 0; i < std::max(delete_count, insert_count); i++) { + if (i < delete_count) { + labelType delete_label = delete_start + i; + int delete_ret = index.deleteVector(delete_label); + ASSERT_EQ(delete_ret, 1) << "Failed to delete vector " << delete_label; + } + + if (i < insert_count) { + labelType insert_label = insert_start + i; + auto new_vec = createRandomVector(dim, rng); + vectors.push_back(new_vec); + int insert_ret = index.addVector(new_vec.data(), insert_label); + ASSERT_EQ(insert_ret, 1) << "Failed to add vector " << insert_label; + } + } + + // Flush any pending batches + index.flushBatch(); + index.flushDeleteBatch(); + + // Final verification + // Without ID recycling, indexSize() grows with each insertion. + // Total insertions: initial_count + 20 (Phase 2) + 10 (Phase 6) = initial_count + 30 + ASSERT_EQ(index.indexSize(), initial_count + 30); + // indexLabelCount() = initial_count - total_deletes + total_inserts = 100 - 30 + 30 = 100 + size_t expected_label_count = initial_count - 30 + 30; // deleted 30 total, added 30 total + ASSERT_EQ(index.indexLabelCount(), expected_label_count); + + // Verify graph is still searchable + results = index.topKQuery(vectors[80].data(), k, nullptr); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + ASSERT_GT(results->results.size(), 0) << "Search returned no results after interleaved operations"; + + delete results; +} + +TEST_F(HNSWDiskIndexTest, StagedRepairTest) { + const size_t dim = 64; + const size_t n = 50; + + // Create HNSW parameters + HNSWParams params; + params.dim = dim; + params.type = VecSimType_FLOAT32; + params.metric = VecSimMetric_L2; + params.multi = false; + params.M = 8; // Small M to ensure neighbors are interconnected + params.efConstruction = 100; + params.efRuntime = 50; + params.epsilon = 0.01; + + // Create abstract init parameters + AbstractIndexInitParams abstractInitParams; + abstractInitParams.dim = dim; + abstractInitParams.vecType = params.type; + abstractInitParams.dataSize = dim * sizeof(float); + abstractInitParams.blockSize = 1024; + abstractInitParams.multi = false; + abstractInitParams.allocator = VecSimAllocator::newVecsimAllocator(); + + // Create index components + IndexComponents components = CreateIndexComponents( + abstractInitParams.allocator, VecSimMetric_L2, dim, false); + + // Create HNSWDiskIndex + rocksdb::ColumnFamilyHandle *default_cf = db->DefaultColumnFamily(); + HNSWDiskIndex index(¶ms, abstractInitParams, components, db.get(), + default_cf, temp_dir); + + // Add vectors to the index - use sequential vectors so they have predictable neighbors + std::mt19937 rng(42); + std::vector> vectors; + for (labelType label = 0; label < n; label++) { + auto vec = createRandomVector(dim, rng); + vectors.push_back(vec); + int ret = index.addVector(vec.data(), label); + ASSERT_EQ(ret, 1) << "Failed to add vector " << label; + } + + // Flush to disk so all graph data is persisted + index.flushBatch(); + + ASSERT_EQ(index.indexSize(), n); + ASSERT_EQ(index.indexLabelCount(), n); + + // Delete some vectors (e.g., every 3rd vector) + // This creates stale edges: nodes that point to deleted nodes + std::vector deleted_labels; + for (labelType label = 0; label < n; label += 3) { + int ret = index.deleteVector(label); + ASSERT_EQ(ret, 1) << "Failed to delete vector " << label; + deleted_labels.push_back(label); + } + + // Flush the delete batch to mark vectors as deleted + index.flushDeleteBatch(); + + size_t num_deleted = deleted_labels.size(); + ASSERT_EQ(index.getNumMarkedDeleted(), num_deleted); + + // Now perform searches - this will trigger getNeighbors which should: + // 1. Filter out deleted nodes from neighbor lists + // 2. Stage the cleaned lists for repair (opportunistic cleanup) + VecSimQueryParams queryParams; + queryParams.hnswRuntimeParams.efRuntime = 50; + + // Do multiple searches to access different parts of the graph + for (size_t i = 0; i < 10; i++) { + auto results = index.topKQuery(vectors[i * 3 + 1].data(), 5, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + + // Verify no deleted vectors in results + for (const auto &result : results->results) { + bool is_deleted = std::find(deleted_labels.begin(), deleted_labels.end(), + result.id) != deleted_labels.end(); + ASSERT_FALSE(is_deleted) << "Deleted vector " << result.id << " found in search results"; + } + + delete results; + } + + // Flush staged repair updates (triggered by next batch operation) + // The repairs are flushed along with delete batch + index.flushDeleteBatch(); + + // Verify the index is still functional after repairs + auto final_results = index.topKQuery(vectors[1].data(), 10, &queryParams); + ASSERT_TRUE(final_results != nullptr); + ASSERT_EQ(final_results->code, VecSim_OK); + ASSERT_GT(final_results->results.size(), 0); + + // Verify all results are non-deleted vectors + for (const auto &result : final_results->results) { + bool is_deleted = std::find(deleted_labels.begin(), deleted_labels.end(), + result.id) != deleted_labels.end(); + ASSERT_FALSE(is_deleted) << "Deleted vector " << result.id << " found in final results"; + } + + delete final_results; + + // Additional verification: re-query to ensure cleaned neighbor lists work correctly + // After staged repair flush, the disk should have cleaned neighbor lists + for (size_t i = 0; i < 5; i++) { + size_t query_idx = (i * 7 + 2) % n; + // Skip if this vector was deleted + if (query_idx % 3 == 0) query_idx++; + + auto results = index.topKQuery(vectors[query_idx].data(), 5, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + + for (const auto &result : results->results) { + bool is_deleted = std::find(deleted_labels.begin(), deleted_labels.end(), + result.id) != deleted_labels.end(); + ASSERT_FALSE(is_deleted) << "Deleted vector " << result.id + << " found after repair flush"; + } + + delete results; + } +} + +// Test that verifies bidirectional edge updates during graph repair +TEST_F(HNSWDiskIndexTest, GraphRepairBidirectionalEdges) { + size_t n = 50; + size_t dim = 4; + + // Create HNSW parameters with small M to make bidirectional updates more likely + HNSWParams params; + params.dim = dim; + params.type = VecSimType_FLOAT32; + params.metric = VecSimMetric_L2; + params.multi = false; + params.M = 4; // Small M to test edge capacity limits + params.efConstruction = 50; + params.efRuntime = 20; + params.epsilon = 0.01; + + // Create abstract init parameters + AbstractIndexInitParams abstractInitParams; + abstractInitParams.dim = dim; + abstractInitParams.vecType = params.type; + abstractInitParams.dataSize = dim * sizeof(float); + abstractInitParams.blockSize = 1024; + abstractInitParams.multi = false; + abstractInitParams.allocator = VecSimAllocator::newVecsimAllocator(); + + // Create index components + IndexComponents components = CreateIndexComponents( + abstractInitParams.allocator, VecSimMetric_L2, dim, false); + + // Create HNSWDiskIndex + rocksdb::ColumnFamilyHandle *default_cf = db->DefaultColumnFamily(); + HNSWDiskIndex index(¶ms, abstractInitParams, components, db.get(), + default_cf, temp_dir); + + // Add vectors in a clustered pattern to create predictable neighbor relationships + std::mt19937 rng(12345); + std::vector> vectors; + + // Create 5 clusters of 10 vectors each + for (size_t cluster = 0; cluster < 5; cluster++) { + std::vector cluster_center(dim); + for (size_t d = 0; d < dim; d++) { + cluster_center[d] = static_cast(cluster * 10); + } + + for (size_t i = 0; i < 10; i++) { + std::vector vec(dim); + std::uniform_real_distribution dist(-1.0f, 1.0f); + for (size_t d = 0; d < dim; d++) { + vec[d] = cluster_center[d] + dist(rng); + } + vectors.push_back(vec); + labelType label = cluster * 10 + i; + int ret = index.addVector(vec.data(), label); + ASSERT_EQ(ret, 1) << "Failed to add vector " << label; + } + } + + // Flush to disk + index.flushBatch(); + ASSERT_EQ(index.indexSize(), n); + + // Delete a vector from the middle of a cluster (should trigger repair) + // Delete vector 15 (middle of cluster 1) + labelType deleted_label = 15; + int ret = index.deleteVector(deleted_label); + ASSERT_EQ(ret, 1) << "Failed to delete vector " << deleted_label; + + // Flush the delete batch - this triggers graph repair + index.flushDeleteBatch(); + ASSERT_EQ(index.getNumMarkedDeleted(), 1); + + // Verify the index is still functional + VecSimQueryParams queryParams; + queryParams.hnswRuntimeParams.efRuntime = 20; + + // Query with vectors from the same cluster + for (size_t i = 10; i < 20; i++) { + if (i == deleted_label) continue; + + auto results = index.topKQuery(vectors[i].data(), 5, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + ASSERT_GT(results->results.size(), 0); + + // Verify deleted vector is not in results + for (const auto &result : results->results) { + ASSERT_NE(result.id, deleted_label) << "Deleted vector found in search results"; + } + + delete results; + } + + // Delete multiple vectors to test batch repair with bidirectional updates + std::vector deleted_labels = {5, 15, 25, 35}; + for (labelType label : deleted_labels) { + if (label == 15) continue; // Already deleted + ret = index.deleteVector(label); + ASSERT_EQ(ret, 1) << "Failed to delete vector " << label; + } + + // Flush and verify + index.flushDeleteBatch(); + ASSERT_EQ(index.getNumMarkedDeleted(), 4); + + // Verify graph connectivity is maintained + for (size_t i = 0; i < n; i++) { + if (std::find(deleted_labels.begin(), deleted_labels.end(), i) != deleted_labels.end()) { + continue; + } + + auto results = index.topKQuery(vectors[i].data(), 3, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + + // Should find at least some neighbors + ASSERT_GT(results->results.size(), 0) << "No neighbors found for vector " << i; + + // Verify no deleted vectors in results + for (const auto &result : results->results) { + bool is_deleted = std::find(deleted_labels.begin(), deleted_labels.end(), + result.id) != deleted_labels.end(); + ASSERT_FALSE(is_deleted) << "Deleted vector " << result.id << " found in results"; + } + + delete results; + } +} + +// Test that verifies unidirectional edges are cleaned up via opportunistic repair +TEST_F(HNSWDiskIndexTest, UnidirectionalEdgeCleanup) { + size_t n = 30; + size_t dim = 4; + + // Create HNSW parameters + HNSWParams params; + params.dim = dim; + params.type = VecSimType_FLOAT32; + params.metric = VecSimMetric_L2; + params.multi = false; + params.M = 8; + params.efConstruction = 100; + params.efRuntime = 30; + params.epsilon = 0.01; + + // Create abstract init parameters + AbstractIndexInitParams abstractInitParams; + abstractInitParams.dim = dim; + abstractInitParams.vecType = params.type; + abstractInitParams.dataSize = dim * sizeof(float); + abstractInitParams.blockSize = 1024; + abstractInitParams.multi = false; + abstractInitParams.allocator = VecSimAllocator::newVecsimAllocator(); + + // Create index components + IndexComponents components = CreateIndexComponents( + abstractInitParams.allocator, VecSimMetric_L2, dim, false); + + // Create HNSWDiskIndex + rocksdb::ColumnFamilyHandle *default_cf = db->DefaultColumnFamily(); + HNSWDiskIndex index(¶ms, abstractInitParams, components, db.get(), + default_cf, temp_dir); + + // Add vectors + std::mt19937 rng(99999); + std::vector> vectors; + for (labelType label = 0; label < n; label++) { + auto vec = createRandomVector(dim, rng); + vectors.push_back(vec); + int ret = index.addVector(vec.data(), label); + ASSERT_EQ(ret, 1) << "Failed to add vector " << label; + } + + // Flush to disk + index.flushBatch(); + ASSERT_EQ(index.indexSize(), n); + + // Delete some vectors - this may create unidirectional dangling edges + // (nodes that point to deleted nodes but are not in the deleted node's neighbor list) + std::vector deleted_labels = {5, 10, 15, 20}; + for (labelType label : deleted_labels) { + int ret = index.deleteVector(label); + ASSERT_EQ(ret, 1) << "Failed to delete vector " << label; + } + + // Flush delete batch + index.flushDeleteBatch(); + ASSERT_EQ(index.getNumMarkedDeleted(), deleted_labels.size()); + + // Perform searches - this triggers getNeighbors which will: + // 1. Filter out deleted nodes from neighbor lists + // 2. Stage repairs for nodes with dangling edges (opportunistic cleanup) + VecSimQueryParams queryParams; + queryParams.hnswRuntimeParams.efRuntime = 30; + + // Do multiple searches to traverse the graph and trigger opportunistic repair + for (size_t i = 0; i < n; i++) { + if (std::find(deleted_labels.begin(), deleted_labels.end(), i) != deleted_labels.end()) { + continue; + } + + auto results = index.topKQuery(vectors[i].data(), 5, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + + // Verify no deleted vectors in results + for (const auto &result : results->results) { + bool is_deleted = std::find(deleted_labels.begin(), deleted_labels.end(), + result.id) != deleted_labels.end(); + ASSERT_FALSE(is_deleted) << "Deleted vector " << result.id << " found in search results"; + } + + delete results; + } + + // Flush staged repair updates (cleanup of unidirectional dangling edges) + // This happens automatically during the next batch operation + index.flushDeleteBatch(); + + // Verify the graph is still functional after cleanup + for (size_t i = 0; i < 10; i++) { + if (std::find(deleted_labels.begin(), deleted_labels.end(), i) != deleted_labels.end()) { + continue; + } + + auto results = index.topKQuery(vectors[i].data(), 5, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + ASSERT_GT(results->results.size(), 0); + + // Verify all results are non-deleted + for (const auto &result : results->results) { + bool is_deleted = std::find(deleted_labels.begin(), deleted_labels.end(), + result.id) != deleted_labels.end(); + ASSERT_FALSE(is_deleted) << "Deleted vector found after opportunistic cleanup"; + } + + delete results; + } +} + +// Test graph repair with heuristic selection +TEST_F(HNSWDiskIndexTest, GraphRepairWithHeuristic) { + size_t n = 40; + size_t dim = 8; + + // Create HNSW parameters with small M to force heuristic selection + HNSWParams params; + params.dim = dim; + params.type = VecSimType_FLOAT32; + params.metric = VecSimMetric_L2; + params.multi = false; + params.M = 3; // Very small M to force heuristic pruning + params.efConstruction = 50; + params.efRuntime = 20; + params.epsilon = 0.01; + + // Create abstract init parameters + AbstractIndexInitParams abstractInitParams; + abstractInitParams.dim = dim; + abstractInitParams.vecType = params.type; + abstractInitParams.dataSize = dim * sizeof(float); + abstractInitParams.blockSize = 1024; + abstractInitParams.multi = false; + abstractInitParams.allocator = VecSimAllocator::newVecsimAllocator(); + + // Create index components + IndexComponents components = CreateIndexComponents( + abstractInitParams.allocator, VecSimMetric_L2, dim, false); + + // Create HNSWDiskIndex + rocksdb::ColumnFamilyHandle *default_cf = db->DefaultColumnFamily(); + HNSWDiskIndex index(¶ms, abstractInitParams, components, db.get(), + default_cf, temp_dir); + + // Add vectors in a dense cluster to create many neighbor relationships + std::mt19937 rng(54321); + std::vector> vectors; + + for (labelType label = 0; label < n; label++) { + std::vector vec(dim); + std::uniform_real_distribution dist(-2.0f, 2.0f); + for (size_t d = 0; d < dim; d++) { + vec[d] = dist(rng); + } + vectors.push_back(vec); + int ret = index.addVector(vec.data(), label); + ASSERT_EQ(ret, 1) << "Failed to add vector " << label; + } + + // Flush to disk + index.flushBatch(); + ASSERT_EQ(index.indexSize(), n); + + // Delete a vector that likely has many neighbors + // This will trigger repair with heuristic selection (candidates > max_M) + labelType deleted_label = 20; + int ret = index.deleteVector(deleted_label); + ASSERT_EQ(ret, 1); + + // Flush delete batch - triggers graph repair with heuristic + index.flushDeleteBatch(); + ASSERT_EQ(index.getNumMarkedDeleted(), 1); + + // Verify search quality is maintained (heuristic selected good neighbors) + VecSimQueryParams queryParams; + queryParams.hnswRuntimeParams.efRuntime = 20; + + for (size_t i = 0; i < n; i++) { + if (i == deleted_label) continue; + + auto results = index.topKQuery(vectors[i].data(), 5, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + + // Should find neighbors despite small M + ASSERT_GT(results->results.size(), 0) << "No neighbors found for vector " << i; + + // Verify deleted vector not in results + for (const auto &result : results->results) { + ASSERT_NE(result.id, deleted_label); + } + + delete results; + } + + // Delete multiple vectors to stress test the heuristic repair + std::vector deleted_labels = {5, 10, 15, 20, 25, 30}; + for (labelType label : deleted_labels) { + if (label == 20) continue; // Already deleted + ret = index.deleteVector(label); + ASSERT_EQ(ret, 1); + } + + index.flushDeleteBatch(); + ASSERT_EQ(index.getNumMarkedDeleted(), 6); + + // Verify graph is still navigable + size_t successful_queries = 0; + for (size_t i = 0; i < n; i++) { + if (std::find(deleted_labels.begin(), deleted_labels.end(), i) != deleted_labels.end()) { + continue; + } + + auto results = index.topKQuery(vectors[i].data(), 3, &queryParams); + ASSERT_TRUE(results != nullptr); + ASSERT_EQ(results->code, VecSim_OK); + + if (results->results.size() > 0) { + successful_queries++; + } + + delete results; + } + + // Most queries should succeed (graph should remain connected) + ASSERT_GT(successful_queries, (n - deleted_labels.size()) / 2) + << "Too many queries failed - graph may be disconnected"; +}