diff --git a/.github/workflows/sync_release_notes.yml b/.github/workflows/sync_release_notes.yml new file mode 100644 index 0000000000..5d66b84327 --- /dev/null +++ b/.github/workflows/sync_release_notes.yml @@ -0,0 +1,197 @@ +name: Release Notes Syncing + +on: + workflow_dispatch: + inputs: + tag_name: + description: 'Release tag to build (e.g. v1.0.0)' + required: true + type: string + + +jobs: + + create-and-build: + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + arch: + - name: avx2 + instance_type: c6i.large + binary_file_name: ndd-avx2 + - name: avx512 + instance_type: c6i.large + binary_file_name: ndd-avx2 + - name: neon + instance_type: c6g.large + binary_file_name: ndd-neon + - name: sve2 + instance_type: c7g.large + binary_file_name: ndd-neon + + steps: + + - name: Checkout PR commit + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.tag_name }} + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ vars.AWS_REGION }} + + - name: Launch Endee Server + id: launch + shell: bash + run: | + ARCH_NAME="${{ matrix.arch.name }}" + INSTANCE_TYPE="${{ matrix.arch.instance_type }}" + + if [[ "$ARCH_NAME" == "avx2" ]] || [[ "$ARCH_NAME" == "avx512" ]]; then + AMI_ID="${{ vars.AMI_ID }}" + else + AMI_ID="${{ vars.ARM_AMI_ID }}" + fi + + ENDEE_INSTANCE_ID=$(aws ec2 run-instances \ + --region ${{ vars.AWS_REGION }} \ + --image-id "$AMI_ID" \ + --instance-type "$INSTANCE_TYPE" \ + --key-name ${{ secrets.ENDEE_PEM }} \ + --security-group-ids ${{ secrets.VECTORDBBENCH_SERVER_GROUP_ID }} \ + --subnet-id ${{ secrets.AWS_SUBNET_ID }} \ + --block-device-mappings '[{"DeviceName":"/dev/sda1","Ebs":{"VolumeSize":30,"VolumeType":"gp3"}}]' \ + --tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=$ARCH_NAME}]" \ + --query 'Instances[0].InstanceId' \ + --output text) + + echo "InstanceID: $ENDEE_INSTANCE_ID" + echo "instance_id=$ENDEE_INSTANCE_ID" >> $GITHUB_OUTPUT + + aws ec2 wait instance-running \ + --instance-ids $ENDEE_INSTANCE_ID + + IP=$(aws ec2 describe-instances \ + --instance-ids $ENDEE_INSTANCE_ID \ + --query 'Reservations[0].Instances[0].PublicIpAddress' \ + --output text) + + echo "IP: $IP" + echo "ip=$IP" >> $GITHUB_OUTPUT + + - name: Write PEM file + run: | + mkdir -p "$HOME/.ssh" + echo "${{ secrets.ENDEE_SSH_PRIVATE_KEY }}" > "$HOME/.ssh/${{ secrets.ENDEE_PEM }}" + chmod 400 "$HOME/.ssh/${{ secrets.ENDEE_PEM }}" + echo "PEM file created" + + - name: Wait for SSH to be ready + shell: bash + run: | + ENDEE_SSH_READY=false + ENDEE_IP="${{ steps.launch.outputs.ip }}" + ENDEE_PEM_FILE="$HOME/.ssh/${{ secrets.ENDEE_PEM }}" + + for i in {1..20}; do + if ssh -i "$ENDEE_PEM_FILE" -o StrictHostKeyChecking=no -o ConnectTimeout=5 -o BatchMode=yes ubuntu@"$ENDEE_IP" "echo ok" 2>/dev/null; then + echo "SSH ready on ${{ matrix.arch.name }} @ $ENDEE_IP" + ENDEE_SSH_READY=true + break + fi + echo "Attempt $i/20 failed, retrying in 10 seconds..." + sleep 10 + done + + if [ "$ENDEE_SSH_READY" = false ]; then + echo "Failed to SSH to Endee Server" + exit 1 + fi + + - name: Build Endee Binary + run: | + ssh -o StrictHostKeyChecking=no -i "$HOME/.ssh/${{ secrets.ENDEE_PEM }}" ubuntu@${{ steps.launch.outputs.ip }} << 'EOF' + set -euo pipefail + sudo apt-get update -y + sudo apt-get install -y git build-essential + cd ~ + git clone https://github.com/endee-io/endee.git + cd endee + ulimit -n 5000 + chmod +x ./install.sh + ARCH="${{ matrix.arch.name }}" + if [[ "$ARCH" == "avx2" ]] || [[ "$ARCH" == "avx512" ]]; then + ./install.sh --release --avx2 + else + ./install.sh --release --neon + fi + EOF + + - name: Download binary + run: | + # verify path exists first + ssh -o StrictHostKeyChecking=no -i "$HOME/.ssh/${{ secrets.ENDEE_PEM }}" \ + ubuntu@${{ steps.launch.outputs.ip }} \ + "find /home/ubuntu -name '${{ matrix.arch.binary_file_name }}' 2>/dev/null" + + scp -o StrictHostKeyChecking=no -i "$HOME/.ssh/${{ secrets.ENDEE_PEM }}" \ + ubuntu@${{ steps.launch.outputs.ip }}:"/home/ubuntu/endee/build/${{ matrix.arch.binary_file_name }}" \ + ./ndd-${{ matrix.arch.name }} + + - name: Upload binary as artifact + uses: actions/upload-artifact@v4 + with: + name: ndd-${{ matrix.arch.name }} + path: ./ndd-${{ matrix.arch.name }} + + + - name: Terminate instance + if: always() + run: | + aws ec2 terminate-instances \ + --instance-ids ${{ steps.launch.outputs.instance_id }} + + # ← separate job, runs AFTER all 4 builds finish + push-binaries: + runs-on: ubuntu-latest + needs: create-and-build # waits for all 4 matrix jobs to complete + + steps: + - name: Download all binaries + uses: actions/download-artifact@v4 + with: + path: ./binaries # downloads all 4 artifacts here + + - name: Push all binaries to ndd-repo + run: | + git clone https://x-access-token:${{ secrets.PAT }}@github.com/Endee-Pro/ndd-docker.git + cd ndd-docker + + git checkout main + mkdir -p build + + # copy all 4 binaries at once + cp ../binaries/ndd-avx2/ndd-avx2 ./build/ndd-avx2 + cp ../binaries/ndd-avx512/ndd-avx512 ./build/ndd-avx512 + cp ../binaries/ndd-neon/ndd-neon ./build/ndd-neon + cp ../binaries/ndd-sve2/ndd-sve2 ./build/ndd-sve2 + + # UPDATE TAG IN DOCKERFILE + sed -i 's/LABEL version=".*"/LABEL version="${{ github.event.inputs.tag_name }}"/' ./Dockerfile + + git config user.email "actions@github.com" + git config user.name "GitHub Actions" + + git add . + + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "Add binaries from release ${{ github.event.inputs.tag_name }}" + git push -u origin omnish/release-note-sync + fi \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 5fa15b0d93..824e5c4684 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,6 +254,8 @@ message(STATUS "Binary name: ${NDD_BINARY_NAME}") set(NDD_CORE_SOURCES src/sparse/inverted_index.cpp src/utils/system_sanity/system_sanity.cpp + src/core/rebuild.cpp + src/storage/backup_store.cpp ) # Build non-main project sources separately so they can be compiled in parallel @@ -288,6 +290,7 @@ target_include_directories(ndd_core PRIVATE ${ASIO_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${CURL_INCLUDE_DIRS} + ${LIBARCHIVE_INCLUDE_DIR} ) target_include_directories(${NDD_BINARY_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src diff --git a/docs/backup-system.md b/docs/backup-system.md index 25ad0b4839..baa5cff823 100644 --- a/docs/backup-system.md +++ b/docs/backup-system.md @@ -2,7 +2,7 @@ `BackupStore` is a standalone utility class owned by `IndexManager` as a direct member (`BackupStore backup_store_`). It has no dependency on IndexManager — it handles tar operations, backup JSON, file paths, and active backup tracking. `IndexManager` orchestrates the backup flow (save, lock, metadata) and delegates file-level operations to `BackupStore`. All backup API calls go through `IndexManager` — `BackupStore` is not exposed to `main.cpp`. -Backups are stored as `.tar` archives in per-user directories: `{DATA_DIR}/backups/{username}/`. Temp files use a centralized `{DATA_DIR}/backups/.tmp/{username}/` directory. Active backup state is tracked in-memory with mutex protection (`backup_state_mutex_`). +Backups are stored as `.tar` archives in per-user directories: `{DATA_DIR}/backups/{username}/`. Temp files use a centralized `{DATA_DIR}/backups/.tmp/{username}/` directory. Active backup state is tracked in-memory with mutex protection (`active_user_backups_mutex_`). ## Architecture @@ -10,7 +10,7 @@ Backups are stored as `.tar` archives in per-user directories: `{DATA_DIR}/backu IndexManager (ndd.hpp) ├── BackupStore backup_store_ (direct member) ├── 3 orchestration methods (inline, defined after class): -│ executeBackupJob, createBackupAsync, restoreBackup, uploadBackup +│ executeBackupJob, createBackupAsync, restoreBackupAsync, uploadBackup ├── 5 forwarding methods: │ listBackups, deleteBackup, getActiveBackup, getBackupInfo, validateBackupName └── Handles: saveIndexInternal, getIndexEntry, metadata_manager_, loadIndex @@ -18,10 +18,11 @@ IndexManager (ndd.hpp) BackupStore (src/storage/backup_store.hpp — standalone, no IndexManager dependency) ├── Archive: createBackupTar(), extractBackupTar() ├── Helpers: getUserBackupDir(), getUserTempDir(), readBackupJson(), writeBackupJson(), cleanupTempDir() -├── Active backup: setActiveBackup(), clearActiveBackup(), hasActiveBackup(), getActiveBackup() -│ (all protected by backup_state_mutex_) +├── Active backup: setActiveBackup(), attachBackupThread(), clearActiveBackup(), hasActiveBackup(), getActiveBackup() +│ (all protected by active_user_backups_mutex_; tracks both Creation and Restoration operations) +│ setActiveBackup() registers the entry before the thread is spawned; attachBackupThread() moves the jthread in after ├── Public methods: validateBackupName(), listBackups(), deleteBackup(), getBackupInfo() -└── Owns: data_dir_, active_user_backups_, backup_state_mutex_ (mutable) +└── Owns: data_dir_, active_user_backups_, active_user_backups_mutex_ (mutable) ``` ## API Endpoints @@ -32,7 +33,7 @@ BackupStore (src/storage/backup_store.hpp — standalone, no IndexManager depend | GET | `/api/v1/backups` | List all backup files | | GET | `/api/v1/backups/active` | Check active backup for current user | | GET | `/api/v1/backups/{name}/info` | Get backup metadata (read from .tar) | -| POST | `/api/v1/backups/{name}/restore` | Restore backup to new index | +| POST | `/api/v1/backups/{name}/restore` | Restore backup to new index (async, 202) | | DELETE | `/api/v1/backups/{name}` | Delete a backup file | | GET | `/api/v1/backups/{name}/download` | Download backup (streaming) | | POST | `/api/v1/backups/upload` | Upload a backup file | @@ -49,7 +50,7 @@ operation_mutex (mutex, per-index) └── Write operations block until mutex is available ``` -**Simple approach:** No atomic flags or file locks. The backup thread holds `operation_mutex` while saving and creating the tar. Write operations that arrive during backup simply block on the mutex until the backup releases it. One active backup per user is enforced via in-memory map protected by `backup_state_mutex_` for thread-safe access. +**Simple approach:** No atomic flags or file locks. The backup thread holds `operation_mutex` while saving and creating the tar. Write operations that arrive during backup simply block on the mutex until the backup releases it. One active operation per user is enforced via in-memory map protected by `active_user_backups_mutex_` — this covers both backup creation and restore operations, so a user cannot run a backup and a restore concurrently. **Write path during backup:** @@ -69,8 +70,8 @@ If backup holds the mutex, writes block until it completes. Normal write-vs-writ ``` POST /index/X/backup → validateBackupName() → check no duplicate .tar on disk → check active_user_backups_[username] empty (one per user) -→ insert into active_user_backups_ map -→ spawn detached thread → return 202 { backup_name } +→ setActiveBackup() — insert entry into active_user_backups_ map (no thread yet) +→ spawn jthread → attachBackupThread() — move jthread into map entry → return 202 { backup_name } ``` **Background thread** (`executeBackupJob`): @@ -92,16 +93,31 @@ addVectors/deleteVectors/updateFilters/deleteByFilter/deleteIndex (blocks if backup holds operation_mutex — resumes after backup completes) ``` -### Restore Backup +### Restore Backup (Async) ``` -POST /backups/{name}/restore -→ validate name → check tar exists → check target index does NOT exist -→ extract tar to backups/.tmp/{username}/ → read metadata.json → copy files to target dir -→ register in MetadataManager → cleanup temp dir → loadIndex() -→ 201 OK +POST /backups/{name}/restore → validate name → check backup exists in backup registry +→ check target index does NOT exist → check disk space (need 2x tar size) +→ check active_user_backups_[username] empty (one per user) +→ setActiveBackup() — insert entry into active_user_backups_ map (BackupOperation::Restoration, no thread yet) +→ spawn jthread → attachBackupThread() — move jthread into map entry → return 202 { backup_name, target_index, status: "in_progress" } +``` + +**Background thread** (`restoreBackup`): + +``` +→ extract tar to backups/.tmp/{username}/{backup_name}/ +→ validate archive structure (expect exactly 1 directory) +→ read metadata.json → copy files to target dir → remove metadata.json from target +→ register in MetadataManager +→ [LOCK indices_mutex_] loadIndex() [UNLOCK] +→ cleanup temp dir → erase from active_user_backups_ ``` +**On failure**: cleanup temp dir → erase from active_user_backups_ → log error (not returned to client). + +**Status polling**: client polls `GET /api/v1/backups/active` to check if restore is still in progress. + ### Download (Streaming) ``` @@ -138,11 +154,12 @@ GET /backups/{name}/info | # | Check | Where | |---|-------|-------| -| 1 | **One backup per user** — `active_user_backups_` map rejects if user already has active backup | createBackupAsync | +| 1 | **One operation per user** — `active_user_backups_` map rejects if user already has an active backup or restore | createBackupAsync, restoreBackupAsync | | 2 | **Write blocking** — writes block on `operation_mutex` until backup completes | addVectors, deleteVectors, updateFilters, deleteByFilter, deleteIndex | | 3 | **Name validation** — alphanumeric, underscores, hyphens only; max 200 chars | validateBackupName | | 4 | **Duplicate prevention** — checks if .tar file already exists on disk | createBackupAsync, upload | -| 5 | **Disk space** — requires 2x index size available | executeBackupJob | +| 5 | **Disk space (create)** — requires 2x index size available in backup dir | executeBackupJob | +| 5b | **Disk space (restore)** — requires 2x tar file size available in temp dir | restoreBackupAsync | | 6 | **Atomic tar** — writes to `backups/.tmp/{username}/` first, then renames to final location | executeBackupJob | | 7 | **Crash recovery** — on startup: `cleanupTempDir()` deletes entire `backups/.tmp/` directory | BackupStore constructor | -| 8 | **Restore safety** — target must not exist, metadata must be valid, cleanup on failure | restoreBackup | +| 8 | **Restore safety** — target must not exist, metadata must be valid; cleanup (temp dir + active status) on failure in background thread | restoreBackupAsync, restoreBackup | diff --git a/docs/logs.md b/docs/logs.md index 5ceb9d78b2..b8829df3c7 100644 --- a/docs/logs.md +++ b/docs/logs.md @@ -88,6 +88,7 @@ The same overload shapes apply to `LOG_WARN` and `LOG_ERROR`. - `1500s` metadata logs - `1600s` vector storage logs - `1700s` system sanity checks (CPU compatibility, disk, memory, ulimits) + - `1800s` rebuild subsystem logs - `2000s` index manager logs - `2100s` HNSW load/cache logs diff --git a/docs/rebuild.md b/docs/rebuild.md new file mode 100644 index 0000000000..a78342c344 --- /dev/null +++ b/docs/rebuild.md @@ -0,0 +1,134 @@ +# Index Rebuild + +Rebuild allows you to reconstruct an HNSW index graph with new configuration parameters (M, ef_construction) without re-uploading vector data. All vectors are re-indexed from MDBX storage — only the graph structure is rebuilt. + +## API Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/v1/index/{name}/rebuild` | Start async rebuild | +| GET | `/api/v1/index/{name}/rebuild/status` | Check rebuild progress | + +--- + +## Start Rebuild + +**POST** `/api/v1/index/{name}/rebuild` + +All parameters are optional. Omitted parameters retain their current values. + +```json +{ + "M": 32, + "ef_con": 256 +} +``` + +**Parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `M` | int | HNSW graph connectivity (4–512) | +| `ef_con` | int | Construction-time search quality (8–4096) | + +**Response 202:** +```json +{ + "status": "rebuilding", + "previous_config": { "M": 16, "ef_con": 128 }, + "new_config": { "M": 32, "ef_con": 256 }, + "total_vectors": 50000 +} +``` + +**Errors:** + +| Code | Condition | +|------|-----------| +| 400 | No changes specified, invalid parameters, or attempted to change `precision`/`space_type` | +| 404 | Index not found | +| 409 | Rebuild or backup already in progress for this user | + +--- + +## Check Progress + +**GET** `/api/v1/index/{name}/rebuild/status` + +**Status values:** + +| Status | Meaning | +|--------|---------| +| `idle` | No rebuild has run for this index (or querying a different index) | +| `in_progress` | Rebuild is currently running | +| `completed` | Rebuild finished successfully | +| `failed` | Rebuild failed (see `error` field) | + +**In progress:** +```json +{ + "status": "in_progress", + "vectors_processed": 45000, + "total_vectors": 100000, + "percent_complete": 45.0, + "started_at": "2026-03-25T10:30:00Z" +} +``` + +**Completed:** +```json +{ + "status": "completed", + "vectors_processed": 100000, + "total_vectors": 100000, + "percent_complete": 100.0, + "started_at": "2026-03-25T10:30:00Z", + "completed_at": "2026-03-25T10:32:15Z" +} +``` + +**Failed:** +```json +{ + "status": "failed", + "vectors_processed": 45000, + "total_vectors": 100000, + "percent_complete": 45.0, + "started_at": "2026-03-25T10:30:00Z", + "completed_at": "2026-03-25T10:31:05Z", + "error": "Out of memory" +} +``` + +Status is per-index. The `completed`/`failed` state persists until the next rebuild is started for that user. + +--- + +## Restrictions + +The following parameters **cannot** be changed via rebuild (returns 400): +- `precision` (quantization level) +- `space_type` + + +--- + +## Behavior + +- **All vectors are re-indexed** from MDBX storage into a new HNSW graph with the updated configuration. +- **Search continues** during rebuild — queries use the old index until the rebuild completes. +- **Write operations** (insert, delete, update) will block and timeout while the rebuild is running, same as during backup. +- **One rebuild at a time per user** — cannot start a rebuild on any index while another rebuild is in progress for the same user. Also cannot run concurrently with a backup. +- **Periodic checkpoints** — the in-progress graph is saved to a temp file at regular intervals. +- **On completion**, the new graph replaces `default.idx`. All temporary and intermediate files are cleaned up. +- **On server restart** during an incomplete rebuild, the old index loads normally. Orphaned temp files are removed automatically on startup. The rebuild must be restarted manually. To confirm a rebuild was incomplete, check that M/ef_con in the index info still show the original values. + +--- + +## Capacity and Timing + +**Disk space:** Plan for roughly **2× the index file size** free. A temporary copy of the completed graph is written before being renamed into place. + +**Memory:** Both the old and new graphs are in RAM simultaneously during rebuild. Peak usage is approximately **2× the index graph size** in addition to normal vector storage. + +**Duration:** Roughly 8-10 minutes per million vectors on commodity hardware at default settings. Higher M or ef_con increases build time. The final disk save adds additional time proportional to index size. diff --git a/install.sh b/install.sh index 3facd070fb..4a62ea6e53 100755 --- a/install.sh +++ b/install.sh @@ -197,7 +197,7 @@ distro_factory() { # **************************************** add_frontend() { - VERSION="v1.6.1" + VERSION="v1.6.0-alpha.5" log "Pulling frontend version ${VERSION}" mkdir -p $script_dir/frontend cd $script_dir/frontend diff --git a/src/core/ndd.hpp b/src/core/ndd.hpp index 55f6e5bc57..7a1eca6094 100644 --- a/src/core/ndd.hpp +++ b/src/core/ndd.hpp @@ -107,7 +107,7 @@ struct CacheEntry { * * writers: addVectors, saveIndexInternal, saveIndex, deleteVectors, * evictIfNeeded, recoverIndex, deleteVectorsByFilter, updateFilters, - * deleteIndex, executeBackupJob + * deleteIndex * * readers: searchKNN, getVector, getIndexInfo (loaded-index path only) * @@ -197,8 +197,12 @@ struct PersistenceConfig { }; #include "../storage/backup_store.hpp" +#include "rebuild.hpp" +#include "utils/types.hpp" class IndexManager { + friend class Rebuild; // executeJob accesses saveIndexInternal + metadata_manager_ + friend class BackupStore; private: std::deque indices_list_; std::unordered_map> indices_; @@ -220,8 +224,7 @@ class IndexManager { std::thread autosave_thread_; std::atomic running_{true}; BackupStore backup_store_; - void executeBackupJob(const std::string& index_id, const std::string& backup_name, - std::stop_token st); + Rebuild rebuild_; std::unique_ptr createWAL(const std::string& index_id) { const std::string wal_dir = data_dir_ + "/" + index_id; @@ -581,6 +584,7 @@ class IndexManager { backup_store_(data_dir) { std::filesystem::create_directories(data_dir); metadata_manager_ = std::make_unique(data_dir); + rebuild_.cleanupTempFiles(data_dir); // Start the autosave thread autosave_thread_ = std::thread(&IndexManager::autosaveLoop, this); } @@ -589,9 +593,10 @@ class IndexManager { // Signal all threads to stop (running_ is checked by autosave and backup threads) running_ = false; - // Join background backup threads before destroying members - // (prevents use-after-free when detached threads outlive IndexManager) + // Join background backup and rebuild threads before destroying members + // (prevents use-after-free when threads outlive IndexManager) backup_store_.joinAllThreads(); + rebuild_.joinAllThreads(); /** * Don't wait for autosave thread to exit. @@ -1108,47 +1113,15 @@ class IndexManager { logInsertsAndUpdates(entry, numeric_ids); // Add to HNSW index in parallel using pre-quantized data from QuantVectorObject - size_t available_threads = settings::NUM_PARALLEL_INSERTS; - const size_t num_threads = (available_threads < quantized_vectors.size()) - ? available_threads - : quantized_vectors.size(); - std::vector threads; - const size_t chunk_size = - (quantized_vectors.size() + num_threads - 1) / num_threads; // Ceiling division - - threads.reserve(num_threads); - for(size_t t = 0; t < num_threads; t++) { - threads.emplace_back([&, t]() { - // Calculate start and end indices for this thread - size_t start_idx = t * chunk_size; - size_t end_idx = (start_idx + chunk_size < quantized_vectors.size()) - ? (start_idx + chunk_size) - : quantized_vectors.size(); - - // Process assigned chunk of vectors - for(size_t i = start_idx; i < end_idx; i++) { - const auto& quant_vec_obj = quantized_vectors[i]; - - // Use pre-quantized data directly from QuantVectorObject - no conversion - // needed! - const uint8_t* vector_data = quant_vec_obj.quant_vector.data(); - - // Add to HNSW index using pre-quantized raw bytes - if(numeric_ids[i].second) { - // If it's a new ID, add it to the index - entry.alg->addPoint(vector_data, numeric_ids[i].first); - } else { - // If it's an update, add it to the index - entry.alg->addPoint(vector_data, numeric_ids[i].first); - } + parallelAddPoints(quantized_vectors.size(), settings::NUM_PARALLEL_INSERTS, + [&](size_t i) { + const uint8_t* vector_data = quantized_vectors[i].quant_vector.data(); + if(numeric_ids[i].second) { + entry.alg->addPoint(vector_data, numeric_ids[i].first); + } else { + entry.alg->addPoint(vector_data, numeric_ids[i].first); } }); - } - - // Wait for all threads to complete - for(auto& thread : threads) { - thread.join(); - } entry.markDirty(); @@ -1898,7 +1871,7 @@ class IndexManager { std::pair createBackupAsync(const std::string& index_id, const std::string& backup_name); - std::pair restoreBackup(const std::string& backup_name, + std::pair restoreBackupAsync(const std::string& backup_name, const std::string& target_index_name, const std::string& username); @@ -1928,262 +1901,75 @@ class IndexManager { std::pair uploadBackup(const std::string& backup_name, const std::string& username, const std::string& file_content); -}; -// ========== IndexManager backup implementations ========== - -inline void IndexManager::executeBackupJob(const std::string& index_id, const std::string& backup_name, - std::stop_token st) { - std::string username; - size_t upos = index_id.find('/'); - if (upos != std::string::npos) { - username = index_id.substr(0, upos); + // Metadata access + std::optional getMetadata(const std::string& index_id) { + return metadata_manager_->getMetadata(index_id); } - try { - std::string index_name; - if (upos != std::string::npos) { - index_name = index_id.substr(upos + 1); - } else { - throw std::runtime_error("Invalid index ID format"); - } - - std::string user_backup_dir = backup_store_.getUserBackupDir(username); - std::filesystem::create_directories(user_backup_dir); - std::string user_temp_dir = backup_store_.getUserTempDir(username); - std::filesystem::create_directories(user_temp_dir); - std::string source_dir = data_dir_ + "/" + index_id; - std::string backup_tar_final = user_backup_dir + "/" + backup_name + ".tar"; - std::string backup_tar_temp = user_temp_dir + "/.tmp_" + backup_name + ".tar"; - - if(std::filesystem::exists(backup_tar_final)) { - throw std::runtime_error("Backup already exists: " + backup_name); - } - - size_t index_size = 0; - for(const auto& file : std::filesystem::recursive_directory_iterator(source_dir)) { - if(!std::filesystem::is_directory(file)) { - index_size += std::filesystem::file_size(file); - } - } - - auto space_info = std::filesystem::space(user_backup_dir); - if(space_info.available < index_size * 2) { - throw std::runtime_error("Insufficient disk space: need " + - std::to_string(index_size * 2 / MB) + " MB"); - } - - auto meta = metadata_manager_->getMetadata(index_id); - nlohmann::json metadata_json; - if(meta) { - metadata_json["original_index"] = index_name; - metadata_json["timestamp"] = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - metadata_json["size_mb"] = index_size / MB; - metadata_json["params"] = {{"M", meta->M}, - {"ef_construction", meta->ef_con}, - {"dim", meta->dimension}, - {"sparse_model", - ndd::sparseScoringModelToString(meta->sparse_model)}, - {"space_type", meta->space_type_str}, - {"quant_level", static_cast(meta->quant_level)}, - {"total_elements", meta->total_elements}, - {"checksum", meta->checksum}}; - LOG_DEBUG("Metadata prepared for backup: " << metadata_json.dump()); - } else { - LOG_ERROR(2041, index_id, "Failed to get metadata for backup"); - throw std::runtime_error("Cannot create backup without index metadata"); - } - - // Check stop_token before expensive operations - if (st.stop_requested()) { - LOG_INFO(2056, index_id, "Backup cancelled before backup work started"); - backup_store_.clearActiveBackup(username); - return; - } - - auto entry_ptr = getIndexEntry(index_id); - auto& entry = *entry_ptr; - std::string metadata_file_in_index = source_dir + "/metadata.json"; - { - /** - * NOTE: While making a backup is a reading operation on the index, - * we are picking a writer's lock here because we have disabled reader's - * locks on other instances of read in the system right now. - * - * This is to enable reads while writes are happening on the index. - * Check other instances of shared_lock on operation_mutex. - */ - std::unique_lock operation_lock(entry.operation_mutex); - - // Check again after acquiring lock (shutdown may have been requested while waiting) - if (st.stop_requested()) { - LOG_INFO(2057, index_id, "Backup cancelled"); - backup_store_.clearActiveBackup(username); - return; - } - - saveIndexInternal(entry); - - if(!metadata_json.empty()) { - std::ofstream meta_file(metadata_file_in_index, std::ios::binary); - if(!meta_file) { - throw std::runtime_error("Failed to create metadata file: " + metadata_file_in_index); - } - meta_file << metadata_json.dump(4); - meta_file.flush(); - meta_file.close(); - - if(!std::filesystem::exists(metadata_file_in_index)) { - throw std::runtime_error("Metadata file was not created: " + metadata_file_in_index); - } - LOG_DEBUG("Metadata file created: " << metadata_file_in_index << " (size: " << std::filesystem::file_size(metadata_file_in_index) << " bytes)"); - } - - std::string error_msg; - LOG_DEBUG("Creating tar archive from " << source_dir << " to " << backup_tar_temp); - if(!backup_store_.createBackupTar(source_dir, backup_tar_temp, error_msg, st)) { - if(std::filesystem::exists(metadata_file_in_index)) { - std::filesystem::remove(metadata_file_in_index); - } - throw std::runtime_error("Failed to create tar archive: " + error_msg); - } - - if(!std::filesystem::exists(backup_tar_temp)) { - throw std::runtime_error("Tar archive was not created: " + backup_tar_temp); - } - LOG_DEBUG("Tar archive created successfully: " << backup_tar_temp << " (size: " << std::filesystem::file_size(backup_tar_temp) << " bytes)"); - - if(std::filesystem::exists(metadata_file_in_index)) { - std::filesystem::remove(metadata_file_in_index); - } - } - - backup_store_.clearActiveBackup(username); - - LOG_INFO(2042, index_id, "Backup tar created; write operations resumed"); - - std::filesystem::rename(backup_tar_temp, backup_tar_final); - - nlohmann::json backup_db = backup_store_.readBackupJson(username); - backup_db[backup_name] = metadata_json; - backup_store_.writeBackupJson(username, backup_db); - - LOG_INFO(2043, index_id, "Backup completed: " << backup_name << " -> " << backup_tar_final); - - } catch (const std::exception& e) { - std::string user_backup_dir = backup_store_.getUserBackupDir(username); - std::string user_temp_dir = backup_store_.getUserTempDir(username); - std::string source_dir = data_dir_ + "/" + index_id; - std::string backup_tar_final = user_backup_dir + "/" + backup_name + ".tar"; - std::string backup_tar_temp = user_temp_dir + "/.tmp_" + backup_name + ".tar"; - std::string metadata_file_in_index = source_dir + "/metadata.json"; - - if(std::filesystem::exists(backup_tar_temp)) { - std::filesystem::remove(backup_tar_temp); - } - if(std::filesystem::exists(backup_tar_final)) { - std::filesystem::remove(backup_tar_final); - } - if(std::filesystem::exists(metadata_file_in_index)) { - std::filesystem::remove(metadata_file_in_index); - } - - backup_store_.clearActiveBackup(username); - - LOG_ERROR(2044, index_id, "Backup failed for " << backup_name << ": " << e.what()); - } -} - -inline std::pair IndexManager::restoreBackup(const std::string& backup_name, - const std::string& target_index_name, - const std::string& username) { - std::pair result = backup_store_.validateBackupName(backup_name); - if(!result.first) { - return result; + // Reads live count from the in-memory HNSW graph; meta->total_elements can be stale between saves. + size_t getElementCount(const std::string& index_id) { + auto entry = getIndexEntry(index_id); + return entry->alg->getElementsCount(); } - std::string backup_dir_root = backup_store_.getUserBackupDir(username); - std::string backup_tar = backup_dir_root + "/" + backup_name + ".tar"; - std::string user_temp_dir = backup_store_.getUserTempDir(username); - std::filesystem::create_directories(user_temp_dir); - std::string backup_extract_dir = user_temp_dir + "/" + backup_name; - std::string target_index_id = username + "/" + target_index_name; - std::string target_dir = data_dir_ + "/" + target_index_id; - - if(!std::filesystem::exists(backup_tar)) { - return {false, "Backup not found: " + backup_name}; - } - if(metadata_manager_->getMetadata(target_index_id).has_value()) { - return {false, "Target index already exists"}; - } + // ========== Rebuild operations ========== - std::string error_msg; - if(!backup_store_.extractBackupTar(backup_tar, backup_extract_dir, error_msg)) { - return {false, "Failed to extract backup archive: " + error_msg}; - } + // Return codes: + // 0: rebuild started successfully + // 1: index not found + // 2: rebuild or backup already in progress for this user + // 3: no configuration changes specified / invalid parameters + OperationResult rebuildIndexAsync(const std::string& index_id, + size_t new_M, + size_t new_ef_con); - std::vector folders; - for(const auto& entry : std::filesystem::directory_iterator(backup_extract_dir)) { - if(entry.is_directory()) { - folders.push_back(entry.path().string()); - } + bool hasActiveRebuild(const std::string& username) const { + return rebuild_.hasActiveRebuild(username); } - if(folders.size() != 1) { - std::filesystem::remove_all(backup_extract_dir); - return {false, "Backup extraction failed - directory not found"}; + nlohmann::json getRebuildProgress(const std::string& username, + const std::string& index_id) const { + return rebuild_.getProgress(username, index_id); } - std::string backup_dir = folders[0]; - - try { - std::ifstream f(backup_dir + "/metadata.json"); - if(!f.good()) { - std::filesystem::remove_all(backup_extract_dir); - return {false, "Backup metadata missing"}; + // Shared parallel addPoint utility — static chunk partition (same as addVectors). + // ProcessFn signature: void(size_t index) + template + static void parallelAddPoints(size_t count, size_t max_threads, ProcessFn&& process) { + if (count == 0) return; + size_t num_threads = std::min(max_threads, count); + const size_t chunk_size = (count + num_threads - 1) / num_threads; + std::vector threads; + threads.reserve(num_threads); + for (size_t t = 0; t < num_threads; ++t) { + threads.emplace_back([&, t]() { + size_t start_idx = t * chunk_size; + size_t end_idx = std::min(start_idx + chunk_size, count); + for (size_t i = start_idx; i < end_idx; ++i) { + process(i); + } + }); } - nlohmann::json meta_json = nlohmann::json::parse(f); - - std::filesystem::create_directories(target_dir); - std::filesystem::copy(backup_dir, - target_dir, - std::filesystem::copy_options::recursive - | std::filesystem::copy_options::overwrite_existing); - - std::filesystem::remove(target_dir + "/metadata.json"); - - IndexMetadata new_meta; - new_meta.name = target_index_name; - new_meta.dimension = meta_json["params"]["dim"]; - new_meta.M = meta_json["params"]["M"]; - new_meta.ef_con = meta_json["params"]["ef_construction"]; - new_meta.space_type_str = meta_json["params"]["space_type"]; - new_meta.quant_level = static_cast( - meta_json["params"]["quant_level"].get()); - const auto sparse_model = ndd::sparseScoringModelFromString( - meta_json["params"]["sparse_model"].get()); - new_meta.sparse_model = *sparse_model; - new_meta.created_at = std::chrono::system_clock::now(); - new_meta.total_elements = meta_json["params"].value("total_elements", 0ul); - new_meta.checksum = meta_json["params"].value("checksum", -1); - - metadata_manager_->storeMetadata(target_index_id, new_meta); - - std::filesystem::remove_all(backup_extract_dir); - - { - std::unique_lock write_lock(indices_mutex_); - loadIndex(target_index_id); + for (auto& th : threads) { + th.join(); } + } - LOG_INFO(2045, username, target_index_name, "Restored backup from " << backup_tar); - return {true, ""}; - } catch(const std::exception& e) { - std::filesystem::remove_all(backup_extract_dir); - return {false, "Failed to restore backup: " + std::string(e.what())}; + // Wires vector fetchers on an HNSW graph. Must be called before addPoint — searchBaseLayer + // during graph construction needs fetchers to compute distances for base-layer-only nodes. + static void wireVectorFetchers(hnswlib::HierarchicalNSW* alg, + std::shared_ptr vs) { + alg->setVectorFetcher([vs](ndd::idInt label, uint8_t* buffer) { + return vs->get_vector(label, buffer); + }); + alg->setVectorFetcherBatch([vs](const ndd::idInt* labels, uint8_t* buffers, + bool* success, size_t count) -> size_t { + return vs->get_vectors_batch_into(labels, buffers, success, count); + }); } -} +}; inline std::pair IndexManager::createBackupAsync(const std::string& index_id, const std::string& backup_name) { @@ -2211,16 +1997,77 @@ inline std::pair IndexManager::createBackupAsync(const std::s return {false, "Backup already exists: " + backup_name}; } - std::jthread t([this, index_id, backup_name](std::stop_token st) { - executeBackupJob(index_id, backup_name, st); + backup_store_.setActiveBackup(username, backup_name, BackupOperation::Creation); + + CreateBackupParams params{ + .index_id = index_id, + .backup_name = backup_name, + .index_manager = this, + }; + + std::jthread t([this, params = std::move(params)](std::stop_token st) { + backup_store_.createBackup(params, st); }); - backup_store_.setActiveBackup(username, index_id, backup_name, std::move(t)); + + backup_store_.attachBackupThread(username, std::move(t)); LOG_INFO(2046, index_id, "Backup started: " << backup_name); return {true, backup_name}; } +inline std::pair IndexManager::restoreBackupAsync(const std::string& backup_name, + const std::string& target_index_name, + const std::string& username) { + + // Check if any backup is already under creation or restoration + if(backup_store_.hasActiveBackup(username)) { + return {false, "Backup already in progress for user: " + username}; + } + + // Check if the backup exists + nlohmann::json backup_db = backup_store_.readBackupJson(username); + if(!backup_db.contains(backup_name)) { + return {false, "Backup not found: " + backup_name}; + } + + // Check if an index with target name already exists + std::string target_index_id = username + "/" + target_index_name; + if(metadata_manager_->getMetadata(target_index_id).has_value()) { + return {false, "Target index already exists"}; + } + + // Check disk space before making the backup active + std::string backup_tar = backup_store_.getUserBackupDir(username) + "/" + backup_name + ".tar"; + std::string user_temp_dir = backup_store_.getUserTempDir(username); + std::filesystem::create_directories(user_temp_dir); + size_t backup_size = std::filesystem::file_size(backup_tar); + auto space_info = std::filesystem::space(user_temp_dir); + if (space_info.available < backup_size * 2) { + return {false, "Insufficient disk space: need " + + std::to_string(backup_size * 2 / MB) + " MB"}; + } + + backup_store_.setActiveBackup(username, backup_name, BackupOperation::Restoration); + + RestoreBackupParams params{ + .backup_name = backup_name, + .target_index_name = target_index_name, + .username = username, + .index_manager = this, + }; + + std::jthread t([this, params = std::move(params)](std::stop_token st) { + backup_store_.restoreBackup(params,st); + }); + + backup_store_.attachBackupThread(username, std::move(t)); + + LOG_INFO(2059, username, "Restoration started for backup: " << backup_name <<", target_index: " << target_index_name); + + return {true, target_index_name}; +} + inline std::pair IndexManager::uploadBackup(const std::string& backup_name, const std::string& username, const std::string& file_content) { std::string user_backup_dir = backup_store_.getUserBackupDir(username); std::filesystem::create_directories(user_backup_dir); @@ -2280,4 +2127,67 @@ inline std::pair IndexManager::uploadBackup(const std::string backup_store_.writeBackupJson(username, backup_db); return {true, "Backup uploaded successfully"}; -} \ No newline at end of file +} + +// ========== IndexManager rebuild implementations ========== + +inline OperationResult IndexManager::rebuildIndexAsync(const std::string& index_id, + size_t new_M, + size_t new_ef_con) { + auto meta = metadata_manager_->getMetadata(index_id); + if (!meta) { + return {1, "Index not found"}; + } + + std::string username; + size_t pos = index_id.find('/'); + if (pos != std::string::npos) { + username = index_id.substr(0, pos); + } else { + return {3, "Invalid index ID format"}; + } + + if (backup_store_.hasActiveBackup(username)) { + return {2, "Backup already in progress for user: " + username}; + } + if (rebuild_.hasActiveRebuild(username)) { + return {2, "Rebuild already in progress for user: " + username}; + } + + // Pre-fetch entry now — captured by lambdas so the thread never calls getIndexEntry + auto entry = getIndexEntry(index_id); + size_t current_count = entry->alg->getElementsCount(); + + if (new_M == meta->M && new_ef_con == meta->ef_con) { + return {3, "No configuration changes specified"}; + } + + std::string base_path = data_dir_ + "/" + index_id; + std::string vector_storage_dir = base_path + "/vectors"; + + RebuildJobParams params{ + .username = username, + .new_M = new_M, + .new_ef_con = new_ef_con, + .entry = entry, + .manager = this, + .temp_path = Rebuild::getTempPath(base_path), + .timestamped_path = Rebuild::getTimestampedPath(base_path), + .index_path = vector_storage_dir + "/" + settings::DEFAULT_SUBINDEX + ".idx", + .num_parallel_inserts = settings::NUM_PARALLEL_INSERTS, + }; + + // Register state FIRST with empty thread — hasActiveRebuild() returns true immediately + rebuild_.setActiveRebuild(username, index_id, current_count); + + // Spawn thread — lambda calls rebuild_.executeJob directly (execution lives in Rebuild) + std::jthread t([this, params = std::move(params)](std::stop_token st) { + rebuild_.executeJob(params, st); + }); + + // Move real thread into the already-registered state + rebuild_.attachRebuildThread(username, std::move(t)); + + LOG_INFO(1800, index_id, "Rebuild started: M=" << new_M << " ef_con=" << new_ef_con); + return {0, "Rebuild started"}; +} diff --git a/src/core/rebuild.cpp b/src/core/rebuild.cpp new file mode 100644 index 0000000000..9526d7aaf1 --- /dev/null +++ b/src/core/rebuild.cpp @@ -0,0 +1,268 @@ +#include "rebuild.hpp" + +#include +#include +#include + +#include "settings.hpp" +#include "log.hpp" +#include "utils/types.hpp" +#include "ndd.hpp" // CacheEntry, IndexManager (friend access) + +std::string Rebuild::statusToString(RebuildStatus s) { + switch (s) { + case RebuildStatus::IN_PROGRESS: return "in_progress"; + case RebuildStatus::COMPLETED: return "completed"; + case RebuildStatus::FAILED: return "failed"; + } + __builtin_unreachable(); +} + +std::string Rebuild::timeToISO8601(std::chrono::system_clock::time_point tp) { + auto time_t_val = std::chrono::system_clock::to_time_t(tp); + std::tm tm_val{}; + gmtime_r(&time_t_val, &tm_val); + std::ostringstream oss; + oss << std::put_time(&tm_val, "%Y-%m-%dT%H:%M:%SZ"); + return oss.str(); +} + +void Rebuild::cleanupTempFiles(const std::string& data_dir) { + if (!std::filesystem::exists(data_dir)) { + return; + } + try { + std::string temp_filename = std::string(settings::DEFAULT_SUBINDEX) + ".idx.temp"; + std::string ts_prefix = std::string(settings::DEFAULT_SUBINDEX) + ".idx."; + for (const auto& entry : std::filesystem::recursive_directory_iterator(data_dir)) { + if (!entry.is_regular_file()) continue; + const std::string fname = entry.path().filename().string(); + bool is_temp = (fname == temp_filename); + bool is_ts = fname.size() > ts_prefix.size() + && fname.substr(0, ts_prefix.size()) == ts_prefix + && std::all_of(fname.begin() + ts_prefix.size(), fname.end(), ::isdigit); + if (is_temp || is_ts) { + std::filesystem::remove(entry.path()); + } + } + } catch (const std::filesystem::filesystem_error& e) { + if (e.code() != std::errc::no_such_file_or_directory) + LOG_WARN(1803, "rebuild", "Error during temp cleanup: " << e.what()); + } catch (const std::exception& e) { + LOG_WARN(1803, "rebuild", "Error during temp cleanup: " << e.what()); + } +} + +void Rebuild::setActiveRebuild(const std::string& username, const std::string& index_id, + size_t total_vectors) { + std::lock_guard lock(rebuild_state_mutex_); + auto state = std::make_shared(); + state->index_id = index_id; + state->status = RebuildStatus::IN_PROGRESS; + state->total_vectors = total_vectors; + state->vectors_processed = 0; + state->started_at = std::chrono::system_clock::now(); + active_rebuilds_[username] = state; +} + +void Rebuild::completeActiveRebuild(const std::string& username) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + // Called from within the thread — detach so the jthread dtor doesn't join us + if (it->second->thread.joinable()) { + it->second->thread.detach(); + } + it->second->status = RebuildStatus::COMPLETED; + it->second->completed_at = std::chrono::system_clock::now(); + } +} + +void Rebuild::failActiveRebuild(const std::string& username, const std::string& error) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + // Called from within the thread — detach so the jthread dtor doesn't join us + if (it->second->thread.joinable()) { + it->second->thread.detach(); + } + it->second->status = RebuildStatus::FAILED; + it->second->error_message = error; + it->second->completed_at = std::chrono::system_clock::now(); + } +} + +bool Rebuild::hasActiveRebuild(const std::string& username) const { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + // Only IN_PROGRESS blocks a new rebuild + return it != active_rebuilds_.end() && it->second->status == RebuildStatus::IN_PROGRESS; +} + +void Rebuild::joinAllThreads() { + std::vector threads_to_join; + { + std::lock_guard lock(rebuild_state_mutex_); + for (auto& [username, state] : active_rebuilds_) { + if (state->thread.joinable()) { + threads_to_join.push_back(std::move(state->thread)); + } + } + active_rebuilds_.clear(); + } + for (auto& t : threads_to_join) { + t.request_stop(); + if (t.joinable()) { + t.join(); + } + } +} + +void Rebuild::attachRebuildThread(const std::string& username, std::jthread&& thread) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + it->second->thread = std::move(thread); + } +} + +void Rebuild::updateProgress(const std::string& username, size_t processed) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + it->second->vectors_processed = processed; + } +} + +nlohmann::json Rebuild::getProgress(const std::string& username, const std::string& index_id) const { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end() && it->second->index_id == index_id) { + const auto& state = *it->second; + size_t processed = state.vectors_processed; + size_t total = state.total_vectors; + double percent = total > 0 ? (100.0 * processed / total) : 0.0; + nlohmann::json result = { + {"status", statusToString(state.status)}, + {"vectors_processed", processed}, + {"total_vectors", total}, + {"percent_complete", percent}, + {"started_at", timeToISO8601(state.started_at)} + }; + if (state.status == RebuildStatus::COMPLETED || state.status == RebuildStatus::FAILED) { + result["completed_at"] = timeToISO8601(state.completed_at); + } + if (state.status == RebuildStatus::FAILED && !state.error_message.empty()) { + result["error"] = state.error_message; + } + return result; + } + return {{"status", "idle"}}; +} + +std::string Rebuild::getTempPath(const std::string& index_dir) { + return index_dir + "/vectors/" + settings::DEFAULT_SUBINDEX + ".idx.temp"; +} + +std::string Rebuild::getTimestampedPath(const std::string& index_dir) { + auto ts = std::to_string( + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch() + ).count() + ); + return index_dir + "/vectors/" + settings::DEFAULT_SUBINDEX + ".idx." + ts; +} + +void Rebuild::executeJob(const RebuildJobParams& p, std::stop_token st) { + auto& entry = p.entry; // shared_ptr + auto* manager = p.manager; + try { + std::unique_lock op_lock(entry->operation_mutex); + + // Phase 1 — save current state before rebuilding + manager->saveIndexInternal(*entry); + + // Phase 2 — build new HNSW with updated M/ef_con + auto* old_alg = entry->alg.get(); + auto new_alg = std::make_unique>( + old_alg->getMaxElements(), old_alg->getSpaceType(), old_alg->getDimension(), + p.new_M, p.new_ef_con, + settings::RANDOM_SEED, old_alg->getQuantLevel(), old_alg->getChecksum()); + + // MUST wire fetchers before addPoint — searchBaseLayer needs this for base-layer-only nodes + IndexManager::wireVectorFetchers(new_alg.get(), entry->vector_storage); + + auto cursor = entry->vector_storage->getCursor(); + const size_t batch_size = settings::RECOVERY_BATCH_SIZE; + size_t total_processed = 0; + size_t batches_since_checkpoint = 0; + constexpr size_t CHECKPOINT_INTERVAL = 5; + + while (cursor.hasNext()) { + if (st.stop_requested()) { + if (std::filesystem::exists(p.temp_path)) + std::filesystem::remove(p.temp_path); + failActiveRebuild(p.username, "Rebuild interrupted by server shutdown"); + return; + } + + std::vector>> batch; + batch.reserve(batch_size); + while (cursor.hasNext() && batch.size() < batch_size) { + auto [label, vec_bytes] = cursor.next(); + if (!vec_bytes.empty()) + batch.emplace_back(label, std::move(vec_bytes)); + } + if (batch.empty()) break; + + IndexManager::parallelAddPoints(batch.size(), p.num_parallel_inserts, + [&](size_t i) { + const auto& [label, vec_bytes] = batch[i]; + new_alg->addPoint(vec_bytes.data(), label); + }); + + total_processed += batch.size(); + updateProgress(p.username, total_processed); + + if (++batches_since_checkpoint >= CHECKPOINT_INTERVAL) { + new_alg->saveIndex(p.temp_path); + batches_since_checkpoint = 0; + } + } + + if (st.stop_requested()) { + if (std::filesystem::exists(p.temp_path)) + std::filesystem::remove(p.temp_path); + failActiveRebuild(p.username, "Rebuild interrupted by server shutdown"); + return; + } + + // Phase 3 — persist to timestamped path, atomically rename to canonical path + new_alg->saveIndex(p.timestamped_path); + std::filesystem::rename(p.timestamped_path, p.index_path); + + if (std::filesystem::exists(p.temp_path)) std::filesystem::remove(p.temp_path); + + // new_alg is fully built and fetchers are already wired (line 194) — use directly + entry->alg = std::move(new_alg); + + // Update metadata (uses friend access to manager->metadata_manager_) + auto m = manager->metadata_manager_->getMetadata(entry->index_id); + if (m) { + m->M = p.new_M; + m->ef_con = p.new_ef_con; + m->total_elements = entry->alg->getElementsCount(); + manager->metadata_manager_->storeMetadata(entry->index_id, *m); + } + + entry->is_dirty = false; + + LOG_INFO(1801, entry->index_id, "Rebuild completed: " << total_processed << " vectors rebuilt"); + completeActiveRebuild(p.username); + + } catch (const std::exception& e) { + LOG_ERROR(1802, entry->index_id, "Rebuild failed: " << e.what()); + if (std::filesystem::exists(p.temp_path)) std::filesystem::remove(p.temp_path); + failActiveRebuild(p.username, e.what()); + } +} diff --git a/src/core/rebuild.hpp b/src/core/rebuild.hpp new file mode 100644 index 0000000000..4bd14a44ae --- /dev/null +++ b/src/core/rebuild.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "json/nlohmann_json.hpp" + +// Forward declarations — full definitions live in ndd.hpp, included by rebuild.cpp. +struct CacheEntry; +class IndexManager; + +enum class RebuildStatus : unsigned char { + IN_PROGRESS = 0, + COMPLETED = 1, + FAILED = 2 +}; + +struct ActiveRebuild { + std::string index_id; + RebuildStatus status{RebuildStatus::IN_PROGRESS}; + std::string error_message; + size_t vectors_processed{0}; + size_t total_vectors{0}; + std::chrono::system_clock::time_point started_at; + std::chrono::system_clock::time_point completed_at; + std::jthread thread; // jthread: built-in stop_token + auto-join on destruction +}; + +// Parameters passed to Rebuild::executeJob. `entry` and `manager` give executeJob +// direct access to graph config, vector storage, mutexes, save/metadata operations. +struct RebuildJobParams { + std::string username; + size_t new_M; + size_t new_ef_con; + + std::shared_ptr entry; // shared_ptr keeps CacheEntry alive for the rebuild duration + IndexManager* manager; // saveIndexInternal, metadata_manager_ (via friend) + + std::string temp_path; + std::string timestamped_path; + std::string index_path; + size_t num_parallel_inserts; +}; + +class Rebuild { +private: + std::unordered_map> active_rebuilds_; + mutable std::mutex rebuild_state_mutex_; + + static std::string statusToString(RebuildStatus s); + static std::string timeToISO8601(std::chrono::system_clock::time_point tp); + +public: + Rebuild() = default; + + void cleanupTempFiles(const std::string& data_dir); + + void setActiveRebuild(const std::string& username, const std::string& index_id, + size_t total_vectors); + void completeActiveRebuild(const std::string& username); + void failActiveRebuild(const std::string& username, const std::string& error); + bool hasActiveRebuild(const std::string& username) const; + void joinAllThreads(); + void attachRebuildThread(const std::string& username, std::jthread&& thread); + void updateProgress(const std::string& username, size_t processed); + nlohmann::json getProgress(const std::string& username, const std::string& index_id) const; + + static std::string getTempPath(const std::string& index_dir); + static std::string getTimestampedPath(const std::string& index_dir); + + void executeJob(const RebuildJobParams& p, std::stop_token st); +}; diff --git a/src/main.cpp b/src/main.cpp index 4654a54c20..5109e46e20 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -521,12 +521,16 @@ int main(int argc, char** argv) { try { std::pair result = - index_manager.restoreBackup(backup_name, target_index_name, ctx.username); + index_manager.restoreBackupAsync(backup_name, target_index_name, ctx.username); if(!result.first) { LOG_WARN(1023, ctx.username, target_index_name, "Restore-backup request rejected: " << result.second); return json_error(400, result.second); } - return crow::response(201, "Backup restored successfully"); + crow::json::wvalue response; + response["backup_name"] = backup_name; + response["target_index"] = result.second; + response["status"] = "in_progress"; + return crow::response(202, response.dump()); } catch(const std::exception& e) { return json_error_500(ctx.username, target_index_name, req.url, e.what()); } @@ -659,8 +663,8 @@ int main(int argc, char** argv) { crow::json::wvalue response; if (active) { response["active"] = true; - response["backup_name"] = active->second; - response["index_id"] = active->first; + response["backup_name"] = active->first; + response["operation"] = active->second; } else { response["active"] = false; } @@ -692,6 +696,98 @@ int main(int argc, char** argv) { } }); + // ========== Rebuild operations ========== + + // Start index rebuild + CROW_ROUTE(app, "/api/v1/index//rebuild") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("POST"_method)([&index_manager, &app](const crow::request& req, + const std::string& index_name) { + auto& ctx = app.get_context(req); + std::string index_id = ctx.username + "/" + index_name; + + auto body = crow::json::load(req.body); + if (!body) { + return json_error(400, "Invalid JSON"); + } + + // Reject parameters that cannot be changed via rebuild + if (body.has("precision")) { + return json_error(400, "precision cannot be changed via rebuild"); + } + if (body.has("space_type")) { + return json_error(400, "space_type cannot be changed via rebuild"); + } + + // Get current metadata for defaults + auto meta = index_manager.getMetadata(index_id); + if (!meta) { + return json_error(404, "Index not found"); + } + // Parse parameters with current values as defaults + size_t new_M = body.has("M") ? (size_t)body["M"].i() : meta->M; + size_t new_ef_con = body.has("ef_con") ? (size_t)body["ef_con"].i() : meta->ef_con; + + // Validate M + if (new_M < settings::MIN_M || new_M > settings::MAX_M) { + return json_error(400, + "M must be between " + std::to_string(settings::MIN_M) + + " and " + std::to_string(settings::MAX_M)); + } + + // Validate ef_con + if (new_ef_con < settings::MIN_EF_CONSTRUCT || new_ef_con > settings::MAX_EF_CONSTRUCT) { + return json_error(400, + "ef_con must be between " + std::to_string(settings::MIN_EF_CONSTRUCT) + + " and " + std::to_string(settings::MAX_EF_CONSTRUCT)); + } + + // Use live count — meta->total_elements can be stale if not yet flushed to disk + size_t actual_element_count = index_manager.getElementCount(index_id); + if (actual_element_count == 0) { + return json_error(400, "Cannot rebuild an empty index"); + } + + try { + auto result = index_manager.rebuildIndexAsync(index_id, new_M, new_ef_con); + + if (result.code == 1) return json_error(404, result.message); + if (result.code == 2) return json_error(409, result.message); + if (result.code == 3) return json_error(400, result.message); + + crow::json::wvalue response; + response["status"] = "rebuilding"; + response["previous_config"]["M"] = meta->M; + response["previous_config"]["ef_con"] = meta->ef_con; + response["new_config"]["M"] = new_M; + response["new_config"]["ef_con"] = new_ef_con; + response["total_vectors"] = actual_element_count; + return crow::response(202, response.dump()); + } catch (const std::exception& e) { + return json_error_500(ctx.username, index_name, req.url, e.what()); + } + }); + + // Get rebuild status + CROW_ROUTE(app, "/api/v1/index//rebuild/status") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("GET"_method)([&index_manager, &app](const crow::request& req, + const std::string& index_name) { + auto& ctx = app.get_context(req); + std::string index_id = ctx.username + "/" + index_name; + + try { + auto progress = index_manager.getRebuildProgress(ctx.username, index_id); + crow::response res; + res.code = 200; + res.set_header("Content-Type", "application/json"); + res.body = progress.dump(); + return res; + } catch (const std::exception& e) { + return json_error_500(ctx.username, index_name, req.url, e.what()); + } + }); + // List indexes for current user CROW_ROUTE(app, "/api/v1/index/list") .CROW_MIDDLEWARES(app, AuthMiddleware) diff --git a/src/storage/backup_store.cpp b/src/storage/backup_store.cpp new file mode 100644 index 0000000000..d3c564b0a5 --- /dev/null +++ b/src/storage/backup_store.cpp @@ -0,0 +1,545 @@ +#include +#include +#include + +#include "backup_store.hpp" +#include "../core/ndd.hpp" +#include "utils/types.hpp" + +// Construction + +BackupStore::BackupStore(const std::string& data_dir) : + data_dir_(data_dir) { + std::filesystem::create_directories(data_dir + "/backups"); + cleanupTempDir(); +} + +// Core backup operations + +void BackupStore::createBackup(const CreateBackupParams& params, std::stop_token st) { + std::string index_id = params.index_id; + std::string backup_name = params.backup_name; + auto* index_manager = params.index_manager; + + std::string username; + size_t upos = index_id.find('/'); + if (upos != std::string::npos) { + username = index_id.substr(0, upos); + } + + try { + std::string index_name; + if (upos != std::string::npos) { + index_name = index_id.substr(upos + 1); + } else { + throw std::runtime_error("Invalid index ID format"); + } + + std::string user_backup_dir = getUserBackupDir(username); + std::filesystem::create_directories(user_backup_dir); + std::string user_temp_dir = getUserTempDir(username); + std::filesystem::create_directories(user_temp_dir); + std::string source_dir = data_dir_ + "/" + index_id; + std::string backup_tar_final = user_backup_dir + "/" + backup_name + ".tar"; + std::string backup_tar_temp = user_temp_dir + "/.tmp_" + backup_name + ".tar"; + + if(std::filesystem::exists(backup_tar_final)) { + throw std::runtime_error("Backup already exists: " + backup_name); + } + + size_t index_size = 0; + for(const auto& file : std::filesystem::recursive_directory_iterator(source_dir)) { + if(!std::filesystem::is_directory(file)) { + index_size += std::filesystem::file_size(file); + } + } + + auto space_info = std::filesystem::space(user_backup_dir); + if(space_info.available < index_size * 2) { + throw std::runtime_error("Insufficient disk space: need " + + std::to_string(index_size * 2 / MB) + " MB"); + } + + auto meta = index_manager->metadata_manager_->getMetadata(index_id); + nlohmann::json metadata_json; + if(meta) { + metadata_json["original_index"] = index_name; + metadata_json["timestamp"] = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + metadata_json["size_mb"] = index_size / MB; + metadata_json["params"] = {{"M", meta->M}, + {"ef_construction", meta->ef_con}, + {"dim", meta->dimension}, + {"sparse_model", + ndd::sparseScoringModelToString(meta->sparse_model)}, + {"space_type", meta->space_type_str}, + {"quant_level", static_cast(meta->quant_level)}, + {"total_elements", meta->total_elements}, + {"checksum", meta->checksum}}; + LOG_DEBUG("Metadata prepared for backup: " << metadata_json.dump()); + } else { + LOG_ERROR(1305, index_id, "Failed to get metadata for backup"); + throw std::runtime_error("Cannot create backup without index metadata"); + } + + // Check stop_token before expensive operations + if (st.stop_requested()) { + LOG_INFO(1306, index_id, "Backup cancelled before backup work started"); + clearActiveBackup(username); + return; + } + + auto entry_ptr = index_manager->getIndexEntry(index_id); + auto& entry = *entry_ptr; + std::string metadata_file_in_index = source_dir + "/metadata.json"; + { + /** + * NOTE: While making a backup is a reading operation on the index, + * we are picking a writer's lock here because we have disabled reader's + * locks on other instances of read in the system right now. + * + * This is to enable reads while writes are happening on the index. + * Check other instances of shared_lock on operation_mutex. + */ + std::unique_lock operation_lock(entry.operation_mutex); + + // Check again after acquiring lock (shutdown may have been requested while waiting) + if (st.stop_requested()) { + LOG_INFO(1307, index_id, "Backup cancelled"); + clearActiveBackup(username); + return; + } + + index_manager->saveIndexInternal(entry); + + if(!metadata_json.empty()) { + std::ofstream meta_file(metadata_file_in_index, std::ios::binary); + if(!meta_file) { + throw std::runtime_error("Failed to create metadata file: " + metadata_file_in_index); + } + meta_file << metadata_json.dump(4); + meta_file.flush(); + meta_file.close(); + + if(!std::filesystem::exists(metadata_file_in_index)) { + throw std::runtime_error("Metadata file was not created: " + metadata_file_in_index); + } + LOG_DEBUG("Metadata file created: " << metadata_file_in_index << " (size: " << std::filesystem::file_size(metadata_file_in_index) << " bytes)"); + } + + std::string error_msg; + LOG_DEBUG("Creating tar archive from " << source_dir << " to " << backup_tar_temp); + if(!createBackupTar(source_dir, backup_tar_temp, error_msg, st)) { + if(std::filesystem::exists(metadata_file_in_index)) { + std::filesystem::remove(metadata_file_in_index); + } + throw std::runtime_error("Failed to create tar archive: " + error_msg); + } + + if(!std::filesystem::exists(backup_tar_temp)) { + throw std::runtime_error("Tar archive was not created: " + backup_tar_temp); + } + LOG_DEBUG("Tar archive created successfully: " << backup_tar_temp << " (size: " << std::filesystem::file_size(backup_tar_temp) << " bytes)"); + + if(std::filesystem::exists(metadata_file_in_index)) { + std::filesystem::remove(metadata_file_in_index); + } + } + + clearActiveBackup(username); + + LOG_INFO(1308, index_id, "Backup tar created; write operations resumed"); + + std::filesystem::rename(backup_tar_temp, backup_tar_final); + + nlohmann::json backup_db = readBackupJson(username); + backup_db[backup_name] = metadata_json; + writeBackupJson(username, backup_db); + + LOG_INFO(1309, index_id, "Backup completed: " << backup_name << " -> " << backup_tar_final); + + } catch (const std::exception& e) { + std::string user_backup_dir = getUserBackupDir(username); + std::string user_temp_dir = getUserTempDir(username); + std::string source_dir = data_dir_ + "/" + index_id; + std::string backup_tar_final = user_backup_dir + "/" + backup_name + ".tar"; + std::string backup_tar_temp = user_temp_dir + "/.tmp_" + backup_name + ".tar"; + std::string metadata_file_in_index = source_dir + "/metadata.json"; + + if(std::filesystem::exists(backup_tar_temp)) { + std::filesystem::remove(backup_tar_temp); + } + if(std::filesystem::exists(backup_tar_final)) { + std::filesystem::remove(backup_tar_final); + } + if(std::filesystem::exists(metadata_file_in_index)) { + std::filesystem::remove(metadata_file_in_index); + } + + clearActiveBackup(username); + + LOG_ERROR(1310, index_id, "Backup failed for " << backup_name << ": " << e.what()); + } +} + +void BackupStore::restoreBackup(const RestoreBackupParams& params, std::stop_token st) { + std::string username = params.username; + std::string backup_name = params.backup_name; + std::string target_index_name = params.target_index_name; + auto* index_manager = params.index_manager; + + std::string backup_dir_root = getUserBackupDir(username); + std::string backup_tar = backup_dir_root + "/" + backup_name + ".tar"; + std::string user_temp_dir = getUserTempDir(username); + std::filesystem::create_directories(user_temp_dir); + std::string backup_extract_dir = user_temp_dir + "/" + backup_name; + std::string target_index_id = username + "/" + target_index_name; + std::string target_dir = data_dir_ + "/" + target_index_id; + + try { + std::string error_msg; + if(!extractBackupTar(backup_tar, backup_extract_dir, error_msg)) { + throw std::runtime_error("Failed to extract backup archive: " + error_msg); + } + + std::vector folders; + for(const auto& entry : std::filesystem::directory_iterator(backup_extract_dir)) { + if(entry.is_directory()) { + folders.push_back(entry.path().string()); + } + } + + if(folders.size() != 1) { + std::filesystem::remove_all(backup_extract_dir); + throw std::runtime_error("Backup extraction failed - directory not found"); + } + + std::string backup_dir = folders[0]; + + std::ifstream f(backup_dir + "/metadata.json"); + if(!f.good()) { + std::filesystem::remove_all(backup_extract_dir); + throw std::runtime_error("Backup metadata missing"); + } + nlohmann::json meta_json = nlohmann::json::parse(f); + + std::filesystem::create_directories(target_dir); + std::filesystem::copy(backup_dir, + target_dir, + std::filesystem::copy_options::recursive + | std::filesystem::copy_options::overwrite_existing); + + std::filesystem::remove(target_dir + "/metadata.json"); + + IndexMetadata new_meta; + new_meta.name = target_index_name; + new_meta.dimension = meta_json["params"]["dim"]; + new_meta.M = meta_json["params"]["M"]; + new_meta.ef_con = meta_json["params"]["ef_construction"]; + new_meta.space_type_str = meta_json["params"]["space_type"]; + new_meta.quant_level = static_cast( + meta_json["params"]["quant_level"].get()); + const auto sparse_model = ndd::sparseScoringModelFromString( + meta_json["params"]["sparse_model"].get()); + new_meta.sparse_model = *sparse_model; + new_meta.created_at = std::chrono::system_clock::now(); + new_meta.total_elements = meta_json["params"].value("total_elements", 0ul); + new_meta.checksum = meta_json["params"].value("checksum", -1); + + index_manager->metadata_manager_->storeMetadata(target_index_id, new_meta); + + std::filesystem::remove_all(backup_extract_dir); + + { + std::unique_lock write_lock(index_manager->indices_mutex_); + index_manager->loadIndex(target_index_id); + } + + clearActiveBackup(username); + + LOG_INFO(1311, username, target_index_name, "Restored backup from " << backup_tar); + } catch(const std::exception& e) { + std::filesystem::remove_all(backup_extract_dir); + clearActiveBackup(username); + LOG_ERROR(1312, username, target_index_name, + "Restoration of backup failed for " << backup_name << ": " << e.what()); + } +} + +// Archive operations + +bool BackupStore::createBackupTar(const std::filesystem::path& source_dir, + const std::filesystem::path& archive_path, + std::string& error_msg, + std::stop_token st) { + struct archive* a = archive_write_new(); + archive_write_set_format_pax_restricted(a); + + if(archive_write_open_filename(a, archive_path.string().c_str()) != ARCHIVE_OK) { + error_msg = archive_error_string(a); + archive_write_free(a); + return false; + } + + for(const auto& entry : std::filesystem::recursive_directory_iterator(source_dir)) { + // Check stop_token per-file so shutdown doesn't block on large tar operations + if(st.stop_requested()) { + archive_write_close(a); + archive_write_free(a); + error_msg = "Backup cancelled"; + return false; + } + if(entry.is_regular_file()) { + struct archive_entry* e = archive_entry_new(); + + std::filesystem::path rel_path = + std::filesystem::relative(entry.path(), source_dir.parent_path()); + archive_entry_set_pathname(e, rel_path.string().c_str()); + archive_entry_set_size(e, std::filesystem::file_size(entry.path())); + archive_entry_set_filetype(e, AE_IFREG); + archive_entry_set_perm(e, 0644); + + if(archive_write_header(a, e) != ARCHIVE_OK) { + error_msg = archive_error_string(a); + archive_entry_free(e); + archive_write_close(a); + archive_write_free(a); + return false; + } + + std::ifstream file(entry.path(), std::ios::binary); + char buffer[8192]; + while(file.read(buffer, sizeof(buffer)) || file.gcount() > 0) { + archive_write_data(a, buffer, file.gcount()); + } + file.close(); + archive_entry_free(e); + } + } + + archive_write_close(a); + archive_write_free(a); + return true; +} + +bool BackupStore::extractBackupTar(const std::filesystem::path& archive_path, + const std::filesystem::path& dest_dir, + std::string& error_msg) { + struct archive* a = archive_read_new(); + struct archive* ext = archive_write_disk_new(); + struct archive_entry* entry; + + archive_read_support_format_all(a); + archive_read_support_filter_all(a); + archive_write_disk_set_options(ext, ARCHIVE_EXTRACT_TIME | ARCHIVE_EXTRACT_PERM); + archive_write_disk_set_standard_lookup(ext); + + if(archive_read_open_filename(a, archive_path.string().c_str(), 10240) != ARCHIVE_OK) { + error_msg = archive_error_string(a); + archive_read_free(a); + archive_write_free(ext); + return false; + } + + while(archive_read_next_header(a, &entry) == ARCHIVE_OK) { + std::filesystem::path full_path = dest_dir / archive_entry_pathname(entry); + archive_entry_set_pathname(entry, full_path.string().c_str()); + + if(archive_write_header(ext, entry) == ARCHIVE_OK) { + const void* buff; + size_t size; + la_int64_t offset; + + while(archive_read_data_block(a, &buff, &size, &offset) == ARCHIVE_OK) { + archive_write_data_block(ext, buff, size, offset); + } + } + archive_write_finish_entry(ext); + } + + archive_read_close(a); + archive_read_free(a); + archive_write_close(ext); + archive_write_free(ext); + return true; +} + +// Backup listing & info + +nlohmann::json BackupStore::listBackups(const std::string& username) { + nlohmann::json backup_list_json = readBackupJson(username); + return backup_list_json; +} + +nlohmann::json BackupStore::getBackupInfo(const std::string& backup_name, + const std::string& username) { + nlohmann::json backup_db = readBackupJson(username); + if(backup_db.contains(backup_name)) { + return backup_db[backup_name]; + } + return nlohmann::json(); +} + +// Backup name validation + +std::pair BackupStore::validateBackupName(const std::string& backup_name) const { + if(backup_name.empty()) { + return std::make_pair(false, "Backup name cannot be empty"); + } + + if(backup_name.length() > settings::MAX_BACKUP_NAME_LENGTH) { + return std::make_pair(false, + "Backup name too long (max " + + std::to_string(settings::MAX_BACKUP_NAME_LENGTH) + + " characters)"); + } + + static const std::regex backup_name_regex("^[a-zA-Z0-9_-]+$"); + if(!std::regex_match(backup_name, backup_name_regex)) { + return std::make_pair(false, + "Invalid backup name: only alphanumeric, underscores, " + "and hyphens allowed"); + } + + return std::make_pair(true, ""); +} + +// Backup deletion + +std::pair BackupStore::deleteBackup(const std::string& backup_name, + const std::string& username) { + std::pair result = validateBackupName(backup_name); + if(!result.first) { + return result; + } + + std::string backup_tar = getUserBackupDir(username) + "/" + backup_name + ".tar"; + + if(std::filesystem::exists(backup_tar)) { + std::filesystem::remove(backup_tar); + + nlohmann::json backup_db = readBackupJson(username); + backup_db.erase(backup_name); + writeBackupJson(username, backup_db); + + LOG_INFO(1303, username, "Deleted backup " << backup_tar); + return {true, ""}; + } else { + return {false, "Backup not found"}; + } +} + +// Active backup tracking + +void BackupStore::setActiveBackup(const std::string& username, + const std::string& backup_name, + const BackupOperation& operation) { + std::lock_guard lock(active_user_backups_mutex_); + active_user_backups_[username] = ActiveBackup{backup_name, operation, {}}; +} + +void BackupStore::attachBackupThread(const std::string& username, std::jthread&& thread) { + std::lock_guard lock(active_user_backups_mutex_); + auto it = active_user_backups_.find(username); + if(it != active_user_backups_.end()) { + it->second.thread = std::move(thread); + } +} + +void BackupStore::clearActiveBackup(const std::string& username) { + std::lock_guard lock(active_user_backups_mutex_); + auto it = active_user_backups_.find(username); + if(it != active_user_backups_.end()) { + // Called from within the thread itself — detach so erase doesn't try to join + if(it->second.thread.joinable()) { + it->second.thread.detach(); + } + active_user_backups_.erase(it); + } +} + +bool BackupStore::hasActiveBackup(const std::string& username) const { + std::lock_guard lock(active_user_backups_mutex_); + return active_user_backups_.count(username) > 0; +} + +std::optional> BackupStore::getActiveBackup(const std::string& username) { + std::lock_guard lock(active_user_backups_mutex_); + auto it = active_user_backups_.find(username); + if(it != active_user_backups_.end()) { + return std::make_pair(it->second.backup_name, + backupOperationToString(it->second.operation)); + } + return std::nullopt; +} + +void BackupStore::joinAllThreads() { + std::vector threads_to_join; + { + std::lock_guard lock(active_user_backups_mutex_); + for(auto& [username, backup] : active_user_backups_) { + if(backup.thread.joinable()) { + threads_to_join.push_back(std::move(backup.thread)); + } + } + active_user_backups_.clear(); + } + // request_stop + join outside the lock + for(auto& t : threads_to_join) { + t.request_stop(); // signal stop_token — thread sees it inside createBackupTar + if(t.joinable()) { + t.join(); + } + } +} + +// Path helpers + +std::string BackupStore::getUserBackupDir(const std::string& username) const { + return data_dir_ + "/backups/" + username; +} + +std::string BackupStore::getBackupJsonPath(const std::string& username) const { + return getUserBackupDir(username) + "/backup.json"; +} + +std::string BackupStore::getUserTempDir(const std::string& username) const { + return data_dir_ + "/backups/.tmp/" + username; +} + +// Backup JSON helpers + +void BackupStore::writeBackupJson(const std::string& username, const nlohmann::json& data) { + std::string path = getBackupJsonPath(username); + std::ofstream f(path); + f << data.dump(2); +} + +nlohmann::json BackupStore::readBackupJson(const std::string& username) { + std::string path = getBackupJsonPath(username); + if(!std::filesystem::exists(path)) { + return nlohmann::json::object(); + } + try { + std::ifstream f(path); + return nlohmann::json::parse(f); + } catch(const std::exception& e) { + LOG_WARN(1304, + username, + "Failed to parse backup metadata file " << path << ": " << e.what()); + return nlohmann::json::object(); + } +} + +// Temp directory cleanup + +void BackupStore::cleanupTempDir() { + std::string temp_dir = data_dir_ + "/backups/.tmp"; + if(std::filesystem::exists(temp_dir)) { + try { + std::filesystem::remove_all(temp_dir); + LOG_INFO(1301, "Cleaned up backup temp directory"); + } catch(const std::exception& e) { + LOG_ERROR(1302, "Failed to clean up backup temp directory: " << e.what()); + } + } +} diff --git a/src/storage/backup_store.hpp b/src/storage/backup_store.hpp index 45fea9ec4c..1c75189ceb 100644 --- a/src/storage/backup_store.hpp +++ b/src/storage/backup_store.hpp @@ -2,27 +2,45 @@ #include #include -#include #include #include - #include -#include -#include #include #include #include #include "json/nlohmann_json.hpp" -#include "index_meta.hpp" -#include "settings.hpp" -#include "log.hpp" -struct ActiveBackup { +class IndexManager; + +enum class BackupOperation { Creation, Restoration }; + +inline std::string backupOperationToString(BackupOperation op) { + switch(op) { + case BackupOperation::Creation: return "creation"; + case BackupOperation::Restoration: return "restoration"; + } + return ""; +} + +struct CreateBackupParams { std::string index_id; std::string backup_name; - std::jthread thread; // jthread: built-in stop_token + auto-join on destruction + IndexManager* index_manager; +}; + +struct RestoreBackupParams { + std::string backup_name; + std::string target_index_name; + std::string username; + IndexManager* index_manager; +}; + +struct ActiveBackup { + std::string backup_name; + BackupOperation operation; + std::jthread thread; // jthread: built-in stop_token + auto-join on destruction }; class BackupStore { @@ -32,282 +50,75 @@ class BackupStore { mutable std::mutex active_user_backups_mutex_; public: - BackupStore(const std::string& data_dir) - : data_dir_(data_dir) { - std::filesystem::create_directories(data_dir + "/backups"); - cleanupTempDir(); - } + BackupStore(const std::string& data_dir); + + // Core backup operations + + void createBackup(const CreateBackupParams& params, std::stop_token st); - // Archive methods + void restoreBackup(const RestoreBackupParams& params, std::stop_token st); + + // Archive operations bool createBackupTar(const std::filesystem::path& source_dir, const std::filesystem::path& archive_path, std::string& error_msg, - std::stop_token st = {}) { - struct archive* a = archive_write_new(); - archive_write_set_format_pax_restricted(a); - - if(archive_write_open_filename(a, archive_path.string().c_str()) != ARCHIVE_OK) { - error_msg = archive_error_string(a); - archive_write_free(a); - return false; - } - - for(const auto& entry : std::filesystem::recursive_directory_iterator(source_dir)) { - // Check stop_token per-file so shutdown doesn't block on large tar operations - if(st.stop_requested()) { - archive_write_close(a); - archive_write_free(a); - error_msg = "Backup cancelled"; - return false; - } - if(entry.is_regular_file()) { - struct archive_entry* e = archive_entry_new(); - - std::filesystem::path rel_path = - std::filesystem::relative(entry.path(), source_dir.parent_path()); - archive_entry_set_pathname(e, rel_path.string().c_str()); - archive_entry_set_size(e, std::filesystem::file_size(entry.path())); - archive_entry_set_filetype(e, AE_IFREG); - archive_entry_set_perm(e, 0644); - - if(archive_write_header(a, e) != ARCHIVE_OK) { - error_msg = archive_error_string(a); - archive_entry_free(e); - archive_write_close(a); - archive_write_free(a); - return false; - } - - std::ifstream file(entry.path(), std::ios::binary); - char buffer[8192]; - while(file.read(buffer, sizeof(buffer)) || file.gcount() > 0) { - archive_write_data(a, buffer, file.gcount()); - } - file.close(); - archive_entry_free(e); - } - } - - archive_write_close(a); - archive_write_free(a); - return true; - } + std::stop_token st = {}); bool extractBackupTar(const std::filesystem::path& archive_path, const std::filesystem::path& dest_dir, - std::string& error_msg) { - struct archive* a = archive_read_new(); - struct archive* ext = archive_write_disk_new(); - struct archive_entry* entry; - - archive_read_support_format_all(a); - archive_read_support_filter_all(a); - archive_write_disk_set_options(ext, ARCHIVE_EXTRACT_TIME | ARCHIVE_EXTRACT_PERM); - archive_write_disk_set_standard_lookup(ext); - - if(archive_read_open_filename(a, archive_path.string().c_str(), 10240) != ARCHIVE_OK) { - error_msg = archive_error_string(a); - archive_read_free(a); - archive_write_free(ext); - return false; - } - - while(archive_read_next_header(a, &entry) == ARCHIVE_OK) { - std::filesystem::path full_path = dest_dir / archive_entry_pathname(entry); - archive_entry_set_pathname(entry, full_path.string().c_str()); - - if(archive_write_header(ext, entry) == ARCHIVE_OK) { - const void* buff; - size_t size; - la_int64_t offset; - - while(archive_read_data_block(a, &buff, &size, &offset) == ARCHIVE_OK) { - archive_write_data_block(ext, buff, size, offset); - } - } - archive_write_finish_entry(ext); - } - - archive_read_close(a); - archive_read_free(a); - archive_write_close(ext); - archive_write_free(ext); - return true; - } + std::string& error_msg); - // Path helpers + // Backup listing & info - std::string getUserBackupDir(const std::string& username) const { - return data_dir_ + "/backups/" + username; - } + nlohmann::json listBackups(const std::string& username); - std::string getBackupJsonPath(const std::string& username) const { - return getUserBackupDir(username) + "/backup.json"; - } + nlohmann::json getBackupInfo(const std::string& backup_name, const std::string& username); - std::string getUserTempDir(const std::string& username) const { - return data_dir_ + "/backups/.tmp/" + username; - } + // Backup name validation - // Backup JSON helpers + std::pair validateBackupName(const std::string& backup_name) const; - nlohmann::json readBackupJson(const std::string& username) { - std::string path = getBackupJsonPath(username); - if (!std::filesystem::exists(path)) return nlohmann::json::object(); - try { - std::ifstream f(path); - return nlohmann::json::parse(f); - } catch (const std::exception& e) { - LOG_WARN(1304, - username, - "Failed to parse backup metadata file " << path << ": " << e.what()); - return nlohmann::json::object(); - } - } + // Backup deletion - void writeBackupJson(const std::string& username, const nlohmann::json& data) { - std::string path = getBackupJsonPath(username); - std::ofstream f(path); - f << data.dump(2); - } + std::pair deleteBackup(const std::string& backup_name, + const std::string& username); - // Temp directory cleanup + // Active backup tracking - void cleanupTempDir() { - std::string temp_dir = data_dir_ + "/backups/.tmp"; - if (std::filesystem::exists(temp_dir)) { - try { - std::filesystem::remove_all(temp_dir); - LOG_INFO(1301, "Cleaned up backup temp directory"); - } catch (const std::exception& e) { - LOG_ERROR(1302, "Failed to clean up backup temp directory: " << e.what()); - } - } - } + void setActiveBackup(const std::string& username, + const std::string& backup_name, + const BackupOperation& operation); - // Active backup tracking + void attachBackupThread(const std::string& username, std::jthread&& thread); - void setActiveBackup(const std::string& username, const std::string& index_id, - const std::string& backup_name, std::jthread&& thread) { - std::lock_guard lock(active_user_backups_mutex_); - active_user_backups_[username] = {index_id, backup_name, std::move(thread)}; - } + void clearActiveBackup(const std::string& username); - void clearActiveBackup(const std::string& username) { - std::lock_guard lock(active_user_backups_mutex_); - auto it = active_user_backups_.find(username); - if (it != active_user_backups_.end()) { - // Called from within the thread itself — detach so erase doesn't try to join - if (it->second.thread.joinable()) { - it->second.thread.detach(); - } - active_user_backups_.erase(it); - } - } + bool hasActiveBackup(const std::string& username) const; - bool hasActiveBackup(const std::string& username) const { - std::lock_guard lock(active_user_backups_mutex_); - return active_user_backups_.count(username) > 0; - } + std::optional> getActiveBackup(const std::string& username); // Join all background backup threads before destroying IndexManager members. // Moves threads out under lock, then request_stop + join outside lock to avoid - // deadlock (finishing threads call clearActiveBackup which also locks active_user_backups_mutex_). - void joinAllThreads() { - std::vector threads_to_join; - { - std::lock_guard lock(active_user_backups_mutex_); - for (auto& [username, backup] : active_user_backups_) { - if (backup.thread.joinable()) { - threads_to_join.push_back(std::move(backup.thread)); - } - } - active_user_backups_.clear(); - } - // request_stop + join outside the lock - for (auto& t : threads_to_join) { - t.request_stop(); // signal stop_token — thread sees it inside createBackupTar - if (t.joinable()) { - t.join(); - } - } - } + // deadlock (finishing threads call clearActiveBackup which also locks + // active_user_backups_mutex_). + void joinAllThreads(); - // Backup name validation - - std::pair validateBackupName(const std::string& backup_name) const { - if(backup_name.empty()) { - return std::make_pair(false, "Backup name cannot be empty"); - } - - if(backup_name.length() > settings::MAX_BACKUP_NAME_LENGTH) { - return std::make_pair(false, - "Backup name too long (max " - + std::to_string(settings::MAX_BACKUP_NAME_LENGTH) - + " characters)"); - } - - static const std::regex backup_name_regex("^[a-zA-Z0-9_-]+$"); - if(!std::regex_match(backup_name, backup_name_regex)) { - return std::make_pair(false, - "Invalid backup name: only alphanumeric, underscores, " - "and hyphens allowed"); - } - - return std::make_pair(true, ""); - } + // Path helpers - // Backup listing + std::string getUserBackupDir(const std::string& username) const; - nlohmann::json listBackups(const std::string& username) { - nlohmann::json backup_list_json = readBackupJson(username); - return backup_list_json; - } + std::string getBackupJsonPath(const std::string& username) const; - // Backup deletion + std::string getUserTempDir(const std::string& username) const; - std::pair deleteBackup(const std::string& backup_name, - const std::string& username) { - std::pair result = validateBackupName(backup_name); - if(!result.first) { - return result; - } - - std::string backup_tar = getUserBackupDir(username) + "/" + backup_name + ".tar"; - - if(std::filesystem::exists(backup_tar)) { - std::filesystem::remove(backup_tar); - - nlohmann::json backup_db = readBackupJson(username); - backup_db.erase(backup_name); - writeBackupJson(username, backup_db); - - LOG_INFO(1303, username, "Deleted backup " << backup_tar); - return {true, ""}; - } else { - return {false, "Backup not found"}; - } - } + // Backup JSON helpers - // Active backup query + nlohmann::json readBackupJson(const std::string& username); - std::optional> getActiveBackup(const std::string& username) { - std::lock_guard lock(active_user_backups_mutex_); - auto it = active_user_backups_.find(username); - if (it != active_user_backups_.end()) { - return std::make_pair(it->second.index_id, it->second.backup_name); - } - return std::nullopt; - } + void writeBackupJson(const std::string& username, const nlohmann::json& data); - // Backup info + // Temp directory cleanup - nlohmann::json getBackupInfo(const std::string& backup_name, const std::string& username) { - nlohmann::json backup_db = readBackupJson(username); - if (backup_db.contains(backup_name)) { - return backup_db[backup_name]; - } - return nlohmann::json(); - } + void cleanupTempDir(); }; diff --git a/src/utils/settings.hpp b/src/utils/settings.hpp index 9949e9109e..07210e7bc9 100644 --- a/src/utils/settings.hpp +++ b/src/utils/settings.hpp @@ -5,6 +5,7 @@ #include #include #include +#include constexpr uint64_t KB = (1024ULL); constexpr uint64_t MB = (1024ULL * KB); diff --git a/src/utils/types.hpp b/src/utils/types.hpp new file mode 100644 index 0000000000..e45d13678f --- /dev/null +++ b/src/utils/types.hpp @@ -0,0 +1,11 @@ +#pragma once +#include + +// Generic operation result returned by async and sync operations. +// Each function documents its return codes in comments above its declaration. +// Code 0 always means success. Non-zero codes are operation-specific. +// Codes can be conglomerated into ENUMs per operation as the codebase matures. +struct OperationResult { + unsigned char code; // 0 = success, non-zero = error (operation-specific) + std::string message; +}; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0793a2e2f3..c34cf9a56a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -3,6 +3,7 @@ include(FetchContent) FetchContent_Declare( googletest URL https://github.com/google/googletest/archive/refs/tags/v1.14.0.zip + DOWNLOAD_EXTRACT_TIMESTAMP TRUE ) # For Windows: Prevent overriding the parent project's compiler/linker settings set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) @@ -38,3 +39,71 @@ target_compile_definitions(ndd_filter_test PRIVATE MDB_MAXKEYSIZE=512) include(GoogleTest) gtest_discover_tests(ndd_filter_test) + +# --- ndd_rebuild_test --- +add_executable(ndd_rebuild_test rebuild_test.cpp ${LMDB_SOURCES} ${ROARING_SOURCE}) + +set_source_files_properties(${LMDB_SOURCES} PROPERTIES + COMPILE_FLAGS "-DMDBX_BUILD_SHARED_LIBRARY=0 -DMDBX_BUILD_FLAGS=\\\"NDD_EMBEDDED\\\"" +) + +target_include_directories(ndd_rebuild_test PRIVATE + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_SOURCE_DIR}/src/core + ${CMAKE_SOURCE_DIR}/src/utils + ${CMAKE_SOURCE_DIR}/src/server + ${CMAKE_SOURCE_DIR}/src/storage + ${CMAKE_SOURCE_DIR}/src/filter + ${CMAKE_SOURCE_DIR}/src/sparse + ${CMAKE_SOURCE_DIR}/src/hnsw + ${CMAKE_SOURCE_DIR}/src/quant + ${CMAKE_SOURCE_DIR}/third_party + ${CMAKE_SOURCE_DIR}/third_party/json + ${CMAKE_SOURCE_DIR}/third_party/msgpack/include + ${LIBARCHIVE_INCLUDE_DIR} +) + +target_link_libraries(ndd_rebuild_test + PRIVATE + ndd_core + archive_static + GTest::gtest_main +) + +target_compile_definitions(ndd_rebuild_test PRIVATE MDB_MAXKEYSIZE=512) + +gtest_discover_tests(ndd_rebuild_test) + +# --- ndd_backup_test --- +add_executable(ndd_backup_test backup_test.cpp ${LMDB_SOURCES} ${ROARING_SOURCE}) + +set_source_files_properties(${LMDB_SOURCES} PROPERTIES + COMPILE_FLAGS "-DMDBX_BUILD_SHARED_LIBRARY=0 -DMDBX_BUILD_FLAGS=\\\"NDD_EMBEDDED\\\"" +) + +target_include_directories(ndd_backup_test PRIVATE + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_SOURCE_DIR}/src/core + ${CMAKE_SOURCE_DIR}/src/utils + ${CMAKE_SOURCE_DIR}/src/server + ${CMAKE_SOURCE_DIR}/src/storage + ${CMAKE_SOURCE_DIR}/src/filter + ${CMAKE_SOURCE_DIR}/src/sparse + ${CMAKE_SOURCE_DIR}/src/hnsw + ${CMAKE_SOURCE_DIR}/src/quant + ${CMAKE_SOURCE_DIR}/third_party + ${CMAKE_SOURCE_DIR}/third_party/json + ${CMAKE_SOURCE_DIR}/third_party/msgpack/include + ${LIBARCHIVE_INCLUDE_DIR} +) + +target_link_libraries(ndd_backup_test + PRIVATE + ndd_core + archive_static + GTest::gtest_main +) + +target_compile_definitions(ndd_backup_test PRIVATE MDB_MAXKEYSIZE=512) + +gtest_discover_tests(ndd_backup_test) diff --git a/tests/README.md b/tests/README.md index a62ef40998..a37db0d7a8 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,19 +1,145 @@ # Tests -This folder contains unit tests for Endee. +Unit tests for Endee. Currently three test suites: filter, rebuild, and backup. -## Build & Run +## Build & Run All Tests -From the repository root: + cmake -S . -B build -DENABLE_TESTING=ON -DUSE_NEON=ON # Apple Silicon + cmake -S . -B build -DENABLE_TESTING=ON -DUSE_AVX2=ON # Intel/AMD + cmake --build build + ctest --test-dir build --output-on-failure -1. Configure with tests enabled: - - `cmake -S . -B build -DENABLE_TESTING=ON` -2. Build the test target: - - `cmake --build build --target ndd_filter_test` -3. Run: - - `./build/tests/ndd_filter_test` +## ndd_filter_test + +Tests for the filter subsystem (categorical, numeric, boolean filtering). + +Build and run individually: + + cmake --build build --target ndd_filter_test + ./build/tests/ndd_filter_test + +Test cases: +- BucketTest: bucket serialization and deserialization +- FilterTest/CategoryFilterBasics: string category filter add and query +- FilterTest/BooleanFilterBasics: boolean filter via JSON input +- FilterTest/NumericFilterBasics: integer range queries +- FilterTest/FloatNumericFilter: float range queries +- FilterTest/MixedAndLogic: AND logic across multiple fields +- FilterTest/InOperator: $in operator with multiple values +- FilterTest/DeleteFilter: removal of categorical filters +- FilterTest/NumericDelete: removal of numeric filters + +## ndd_rebuild_test + +Unit and integration tests for the rebuild subsystem. + +Build and run individually: + + cmake --build build --target ndd_rebuild_test + ./build/tests/ndd_rebuild_test + +Test cases: + +State management (Rebuild class in isolation): +- RebuildStateTest/NoRebuild_HasActiveIsFalse +- RebuildStateTest/NoRebuild_GetProgressIsIdle +- RebuildStateTest/SetActive_HasActiveIsTrue +- RebuildStateTest/SetActive_GetProgressShowsInProgress +- RebuildStateTest/UpdateProgress_ReflectedInGetProgress +- RebuildStateTest/PercentComplete_CalculatedCorrectly +- RebuildStateTest/PercentComplete_ZeroTotal_IsZero +- RebuildStateTest/Complete_StatusIsCompleted +- RebuildStateTest/Complete_HasActiveIsFalse +- RebuildStateTest/Complete_CompletedAtPresent +- RebuildStateTest/Fail_StatusIsFailed +- RebuildStateTest/Fail_HasActiveIsFalse +- RebuildStateTest/Fail_ErrorMessagePresent +- RebuildStateTest/Fail_CompletedAtPresent +- RebuildStateTest/TwoUsers_IndependentState +- RebuildStateTest/GetProgress_WrongIndex_ReturnsIdle +- RebuildStateTest/SetActive_OverwritesPreviousCompleted + +Temp file cleanup and path helpers: +- RebuildCleanupTest/CleanupTempFiles_NonExistentDir_NoOp +- RebuildCleanupTest/CleanupTempFiles_RemovesTempFile +- RebuildCleanupTest/CleanupTempFiles_RemovesTimestampedFile +- RebuildCleanupTest/CleanupTempFiles_LeavesCanonicalIndex +- RebuildCleanupTest/CleanupTempFiles_EmptyDir_NoOp +- RebuildPathTest/GetTempPath_Format +- RebuildPathTest/GetTimestampedPath_HasTimestamp + +End-to-end rebuild via IndexManager: +- RebuildIntegrationTest/RebuildAsync_ReturnSuccessCode +- RebuildIntegrationTest/RebuildCompletes_ConfigUpdated +- RebuildIntegrationTest/RebuildCompletes_VectorCountPreserved +- RebuildIntegrationTest/RebuildWhileInProgress_Returns409Code +- RebuildIntegrationTest/RebuildNonExistentIndex_Returns404Code +- RebuildIntegrationTest/RebuildNoChange_Returns400Code + +## ndd_backup_test + +Unit and integration tests for the backup subsystem (`BackupStore` + `IndexManager` backup methods). + +Build and run individually: + + cmake --build build --target ndd_backup_test + ./build/tests/ndd_backup_test + +Test cases: + +BackupStore state management (no IndexManager): +- BackupStoreStateTest/ValidateName_AlphanumericUnderscore_Passes +- BackupStoreStateTest/ValidateName_WithHyphen_Passes +- BackupStoreStateTest/ValidateName_Empty_Fails +- BackupStoreStateTest/ValidateName_TooLong_Fails +- BackupStoreStateTest/ValidateName_Slash_Fails +- BackupStoreStateTest/ValidateName_Space_Fails +- BackupStoreStateTest/ValidateName_Dot_Fails +- BackupStoreStateTest/NoActive_HasActiveIsFalse +- BackupStoreStateTest/SetActive_HasActiveIsTrue +- BackupStoreStateTest/SetActive_GetActiveReturnsNameAndOperation +- BackupStoreStateTest/SetActive_Restoration_OperationString +- BackupStoreStateTest/ClearActive_HasActiveIsFalse +- BackupStoreStateTest/ClearActive_GetActiveReturnsNullopt +- BackupStoreStateTest/ClearNonExistent_NoOp +- BackupStoreStateTest/TwoUsers_IndependentState +- BackupStoreStateTest/ReadBackupJson_MissingFile_ReturnsEmptyObject +- BackupStoreStateTest/WriteAndReadBackupJson_RoundTrip +- BackupStoreStateTest/ListBackups_EmptyWhenNoneExist +- BackupStoreStateTest/ListBackups_ReturnsAllWrittenEntries +- BackupStoreStateTest/GetBackupInfo_ExistingEntry +- BackupStoreStateTest/GetBackupInfo_NonExistent_ReturnsNull +- BackupStoreStateTest/DeleteBackup_NonExistent_ReturnsFalse +- BackupStoreStateTest/DeleteBackup_InvalidName_ReturnsFalse +- BackupStoreStateTest/DeleteBackup_RemovesTarAndJsonEntry + +Archive (tar) operations: +- BackupArchiveTest/CreateBackupTar_ProducesNonEmptyFile +- BackupArchiveTest/ExtractBackupTar_FilesRoundTrip +- BackupArchiveTest/ExtractBackupTar_ContentPreserved +- BackupArchiveTest/ExtractBackupTar_NonExistentArchive_Fails +- BackupArchiveTest/CreateBackupTar_PreCancelledStopToken_ReturnsFalse + +End-to-end backup and restore via IndexManager: +- BackupIntegrationTest/CreateBackupAsync_ReturnsTrueAndBackupName +- BackupIntegrationTest/CreateBackup_SetsActiveBackupDuringRun +- BackupIntegrationTest/CreateBackup_ProducesTarFile +- BackupIntegrationTest/CreateBackup_AppearsInListBackups +- BackupIntegrationTest/CreateBackup_MetadataHasExpectedFields +- BackupIntegrationTest/CreateBackup_WhileInProgress_ReturnsFalse +- BackupIntegrationTest/CreateBackup_DuplicateName_ReturnsFalse +- BackupIntegrationTest/CreateBackup_InvalidName_ReturnsFalse +- BackupIntegrationTest/DeleteBackup_RemovesTarAndJsonEntry +- BackupIntegrationTest/DeleteBackup_NonExistent_ReturnsFalse +- BackupIntegrationTest/RestoreBackupAsync_ReturnsTrueAndTargetName +- BackupIntegrationTest/RestoreBackup_CreatesIndexWithCorrectMetadata +- BackupIntegrationTest/RestoreBackup_PreservesVectorCount +- BackupIntegrationTest/RestoreBackup_NonExistentBackup_ReturnsFalse +- BackupIntegrationTest/RestoreBackup_TargetIndexAlreadyExists_ReturnsFalse +- BackupIntegrationTest/RestoreBackup_WhileCreateInProgress_ReturnsFalse ## Notes -- Tests can also be built in a dedicated tests build directory (e.g., `tests/build/`). +- Tests use real file I/O and real MDBX databases — no mocking. +- Each test creates its own temp directory and removes it on teardown. - The `tests/build/` directory is ignored by git. diff --git a/tests/backup_test.cpp b/tests/backup_test.cpp new file mode 100644 index 0000000000..0e0cfe9e44 --- /dev/null +++ b/tests/backup_test.cpp @@ -0,0 +1,559 @@ +#include +#include +#include +#include +#include +#include + +#include "backup_store.hpp" +#include "ndd.hpp" +#include "utils/msgpack_ndd.hpp" +#include "server/auth.hpp" + +namespace fs = std::filesystem; + +// ============================================================ +// Layer 1 — BackupStore state management (no IndexManager) +// ============================================================ + +class BackupStoreStateTest : public ::testing::Test { +protected: + std::string dir_; + std::unique_ptr store_; + + void SetUp() override { + dir_ = "./test_backup_state_" + std::to_string(rand()); + fs::create_directories(dir_); + store_ = std::make_unique(dir_); + } + + void TearDown() override { + store_.reset(); + if (fs::exists(dir_)) fs::remove_all(dir_); + } +}; + +// --- validateBackupName --- + +TEST_F(BackupStoreStateTest, ValidateName_AlphanumericUnderscore_Passes) { + auto [ok, msg] = store_->validateBackupName("my_backup"); + EXPECT_TRUE(ok); + EXPECT_TRUE(msg.empty()); +} + +TEST_F(BackupStoreStateTest, ValidateName_WithHyphen_Passes) { + auto [ok, msg] = store_->validateBackupName("backup-2024"); + EXPECT_TRUE(ok); +} + +TEST_F(BackupStoreStateTest, ValidateName_Empty_Fails) { + auto [ok, msg] = store_->validateBackupName(""); + EXPECT_FALSE(ok); + EXPECT_FALSE(msg.empty()); +} + +TEST_F(BackupStoreStateTest, ValidateName_TooLong_Fails) { + auto [ok, msg] = store_->validateBackupName(std::string(201, 'a')); + EXPECT_FALSE(ok); + EXPECT_NE(msg.find("too long"), std::string::npos); +} + +TEST_F(BackupStoreStateTest, ValidateName_Slash_Fails) { + auto [ok, msg] = store_->validateBackupName("bad/name"); + EXPECT_FALSE(ok); +} + +TEST_F(BackupStoreStateTest, ValidateName_Space_Fails) { + auto [ok, msg] = store_->validateBackupName("bad name"); + EXPECT_FALSE(ok); +} + +TEST_F(BackupStoreStateTest, ValidateName_Dot_Fails) { + auto [ok, msg] = store_->validateBackupName("backup.tar"); + EXPECT_FALSE(ok); +} + +// --- Active backup tracking --- + +TEST_F(BackupStoreStateTest, NoActive_HasActiveIsFalse) { + EXPECT_FALSE(store_->hasActiveBackup("alice")); +} + +TEST_F(BackupStoreStateTest, SetActive_HasActiveIsTrue) { + store_->setActiveBackup("alice", "bk1", BackupOperation::Creation); + EXPECT_TRUE(store_->hasActiveBackup("alice")); +} + +TEST_F(BackupStoreStateTest, SetActive_GetActiveReturnsNameAndOperation) { + store_->setActiveBackup("alice", "bk1", BackupOperation::Creation); + auto active = store_->getActiveBackup("alice"); + ASSERT_TRUE(active.has_value()); + EXPECT_EQ(active->first, "bk1"); + EXPECT_EQ(active->second, "creation"); +} + +TEST_F(BackupStoreStateTest, SetActive_Restoration_OperationString) { + store_->setActiveBackup("alice", "bk1", BackupOperation::Restoration); + auto active = store_->getActiveBackup("alice"); + ASSERT_TRUE(active.has_value()); + EXPECT_EQ(active->second, "restoration"); +} + +TEST_F(BackupStoreStateTest, ClearActive_HasActiveIsFalse) { + store_->setActiveBackup("alice", "bk1", BackupOperation::Creation); + store_->clearActiveBackup("alice"); + EXPECT_FALSE(store_->hasActiveBackup("alice")); +} + +TEST_F(BackupStoreStateTest, ClearActive_GetActiveReturnsNullopt) { + store_->setActiveBackup("alice", "bk1", BackupOperation::Creation); + store_->clearActiveBackup("alice"); + EXPECT_FALSE(store_->getActiveBackup("alice").has_value()); +} + +TEST_F(BackupStoreStateTest, ClearNonExistent_NoOp) { + EXPECT_NO_THROW(store_->clearActiveBackup("nobody")); +} + +TEST_F(BackupStoreStateTest, TwoUsers_IndependentState) { + store_->setActiveBackup("alice", "bk1", BackupOperation::Creation); + EXPECT_TRUE(store_->hasActiveBackup("alice")); + EXPECT_FALSE(store_->hasActiveBackup("bob")); + + store_->setActiveBackup("bob", "bk2", BackupOperation::Restoration); + EXPECT_TRUE(store_->hasActiveBackup("bob")); + + store_->clearActiveBackup("alice"); + EXPECT_FALSE(store_->hasActiveBackup("alice")); + EXPECT_TRUE(store_->hasActiveBackup("bob")); +} + +// --- Backup JSON & listing --- + +TEST_F(BackupStoreStateTest, ReadBackupJson_MissingFile_ReturnsEmptyObject) { + auto json = store_->readBackupJson("alice"); + EXPECT_TRUE(json.empty()); +} + +TEST_F(BackupStoreStateTest, WriteAndReadBackupJson_RoundTrip) { + nlohmann::json data; + data["bk1"]["original_index"] = "my_idx"; + data["bk1"]["size_mb"] = 10; + + fs::create_directories(store_->getUserBackupDir("alice")); + store_->writeBackupJson("alice", data); + auto read = store_->readBackupJson("alice"); + + EXPECT_TRUE(read.contains("bk1")); + EXPECT_EQ(read["bk1"]["original_index"], "my_idx"); +} + +TEST_F(BackupStoreStateTest, ListBackups_EmptyWhenNoneExist) { + EXPECT_TRUE(store_->listBackups("alice").empty()); +} + +TEST_F(BackupStoreStateTest, ListBackups_ReturnsAllWrittenEntries) { + nlohmann::json data; + data["bk1"] = {{"original_index", "idx1"}}; + data["bk2"] = {{"original_index", "idx2"}}; + fs::create_directories(store_->getUserBackupDir("alice")); + store_->writeBackupJson("alice", data); + + auto list = store_->listBackups("alice"); + EXPECT_TRUE(list.contains("bk1")); + EXPECT_TRUE(list.contains("bk2")); +} + +TEST_F(BackupStoreStateTest, GetBackupInfo_ExistingEntry) { + nlohmann::json data; + data["bk1"] = {{"original_index", "idx1"}, {"size_mb", 5}}; + fs::create_directories(store_->getUserBackupDir("alice")); + store_->writeBackupJson("alice", data); + + auto info = store_->getBackupInfo("bk1", "alice"); + EXPECT_FALSE(info.is_null()); + EXPECT_EQ(info["original_index"], "idx1"); +} + +TEST_F(BackupStoreStateTest, GetBackupInfo_NonExistent_ReturnsNull) { + EXPECT_TRUE(store_->getBackupInfo("nonexistent", "alice").is_null()); +} + +// --- Backup deletion --- + +TEST_F(BackupStoreStateTest, DeleteBackup_NonExistent_ReturnsFalse) { + auto [ok, msg] = store_->deleteBackup("nonexistent", "alice"); + EXPECT_FALSE(ok); + EXPECT_EQ(msg, "Backup not found"); +} + +TEST_F(BackupStoreStateTest, DeleteBackup_InvalidName_ReturnsFalse) { + auto [ok, msg] = store_->deleteBackup("bad/name", "alice"); + EXPECT_FALSE(ok); +} + +TEST_F(BackupStoreStateTest, DeleteBackup_RemovesTarAndJsonEntry) { + std::string backup_dir = store_->getUserBackupDir("alice"); + fs::create_directories(backup_dir); + + std::string tar_path = backup_dir + "/bk1.tar"; + std::ofstream(tar_path) << "fake tar content"; + + nlohmann::json data; + data["bk1"] = {{"original_index", "idx1"}}; + store_->writeBackupJson("alice", data); + + auto [ok, msg] = store_->deleteBackup("bk1", "alice"); + EXPECT_TRUE(ok); + EXPECT_FALSE(fs::exists(tar_path)); + EXPECT_FALSE(store_->listBackups("alice").contains("bk1")); +} + +// ============================================================ +// Layer 2 — Archive (tar) operations +// ============================================================ + +class BackupArchiveTest : public ::testing::Test { +protected: + std::string dir_; + std::unique_ptr store_; + + void SetUp() override { + dir_ = "./test_backup_archive_" + std::to_string(rand()); + fs::create_directories(dir_); + store_ = std::make_unique(dir_); + } + + void TearDown() override { + store_.reset(); + if (fs::exists(dir_)) fs::remove_all(dir_); + } + + std::string makeSourceDir(const std::string& name) { + std::string src = dir_ + "/" + name; + fs::create_directories(src); + std::ofstream(src + "/file_a.bin") << "hello from file_a"; + std::ofstream(src + "/file_b.bin") << "hello from file_b"; + return src; + } +}; + +TEST_F(BackupArchiveTest, CreateBackupTar_ProducesNonEmptyFile) { + std::string src = makeSourceDir("myidx"); + std::string archive = dir_ + "/out.tar"; + std::string err; + bool ok = store_->createBackupTar(src, archive, err); + EXPECT_TRUE(ok) << "error: " << err; + EXPECT_TRUE(fs::exists(archive)); + EXPECT_GT(fs::file_size(archive), 0u); +} + +TEST_F(BackupArchiveTest, ExtractBackupTar_FilesRoundTrip) { + std::string src = makeSourceDir("myidx"); + std::string archive = dir_ + "/out.tar"; + std::string err; + ASSERT_TRUE(store_->createBackupTar(src, archive, err)) << err; + + std::string dest = dir_ + "/extracted"; + ASSERT_TRUE(store_->extractBackupTar(archive, dest, err)) << err; + + EXPECT_TRUE(fs::exists(dest + "/myidx/file_a.bin")); + EXPECT_TRUE(fs::exists(dest + "/myidx/file_b.bin")); +} + +TEST_F(BackupArchiveTest, ExtractBackupTar_ContentPreserved) { + std::string src = makeSourceDir("myidx"); + std::string archive = dir_ + "/out.tar"; + std::string err; + ASSERT_TRUE(store_->createBackupTar(src, archive, err)); + + std::string dest = dir_ + "/extracted"; + ASSERT_TRUE(store_->extractBackupTar(archive, dest, err)); + + std::ifstream f(dest + "/myidx/file_a.bin"); + std::string content((std::istreambuf_iterator(f)), {}); + EXPECT_EQ(content, "hello from file_a"); +} + +TEST_F(BackupArchiveTest, ExtractBackupTar_NonExistentArchive_Fails) { + std::string err; + bool ok = store_->extractBackupTar(dir_ + "/no.tar", dir_ + "/dest", err); + EXPECT_FALSE(ok); + EXPECT_FALSE(err.empty()); +} + +TEST_F(BackupArchiveTest, CreateBackupTar_PreCancelledStopToken_ReturnsFalse) { + std::string src = makeSourceDir("myidx"); + for (int i = 0; i < 10; ++i) + std::ofstream(src + "/extra_" + std::to_string(i) + ".bin") << std::string(512, 'x'); + + std::string archive = dir_ + "/out.tar"; + std::string err; + + std::stop_source ss; + ss.request_stop(); + + bool ok = store_->createBackupTar(src, archive, err, ss.get_token()); + EXPECT_FALSE(ok); + EXPECT_EQ(err, "Backup cancelled"); +} + +// ============================================================ +// Layer 3 — Integration tests via IndexManager +// ============================================================ + +class BackupIntegrationTest : public ::testing::Test { +protected: + static constexpr const char* USERNAME = "testuser"; + static constexpr const char* IDX_NAME = "testidx"; + static constexpr const char* INDEX_ID = "testuser/testidx"; + static constexpr const char* BACKUP_NAME = "mybk"; + static constexpr size_t DIM = 32; + static constexpr size_t N_VECTORS = 50; + + std::string data_dir_; + std::unique_ptr manager_; + + void SetUp() override { + data_dir_ = "./test_backup_integration_" + std::to_string(rand()); + fs::create_directories(data_dir_); + PersistenceConfig pcfg; + pcfg.save_on_shutdown = false; + manager_ = std::make_unique(data_dir_, pcfg); + } + + void TearDown() override { + manager_.reset(); + if (fs::exists(data_dir_)) fs::remove_all(data_dir_); + } + + void createTestIndex(const std::string& index_id = INDEX_ID) { + IndexConfig config{ + .dim = DIM, + .max_elements = 1000, + .space_type_str = "cosine", + .M = 8, + .ef_construction = 64, + .quant_level = ndd::quant::QuantizationLevel::FP32, + .checksum = 0 + }; + manager_->createIndex(index_id, config, UserType::Admin, 0); + } + + void insertVectors(size_t n = N_VECTORS, const std::string& index_id = INDEX_ID) { + std::vector vecs; + vecs.reserve(n); + for (size_t i = 0; i < n; ++i) { + ndd::HybridVectorObject v; + v.id = "vec_" + std::to_string(i); + v.vector.resize(DIM); + for (size_t d = 0; d < DIM; ++d) + v.vector[d] = static_cast(rand()) / RAND_MAX; + vecs.push_back(std::move(v)); + } + manager_->addVectors(index_id, vecs); + } + + // Waits until the named backup appears in listBackups (signals successful write). + // Used for create-backup completion because clearActiveBackup fires before the + // final rename + writeBackupJson, so polling getActiveBackup is not sufficient. + bool waitForBackupInList(const std::string& backup_name, int timeout_sec = 15) { + auto deadline = std::chrono::steady_clock::now() + + std::chrono::seconds(timeout_sec); + while (std::chrono::steady_clock::now() < deadline) { + if (manager_->listBackups(USERNAME).contains(backup_name)) + return true; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + return false; + } + + // Waits until no active backup remains for USERNAME. + // Reliable for restore because clearActiveBackup fires after loadIndex. + bool waitForNoActiveBackup(int timeout_sec = 15) { + auto deadline = std::chrono::steady_clock::now() + + std::chrono::seconds(timeout_sec); + while (std::chrono::steady_clock::now() < deadline) { + if (!manager_->getActiveBackup(USERNAME).has_value()) + return true; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + return false; + } +}; + +TEST_F(BackupIntegrationTest, CreateBackupAsync_ReturnsTrueAndBackupName) { + createTestIndex(); + insertVectors(); + auto [ok, name] = manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + EXPECT_TRUE(ok); + EXPECT_EQ(name, BACKUP_NAME); + waitForBackupInList(BACKUP_NAME); +} + +TEST_F(BackupIntegrationTest, CreateBackup_SetsActiveBackupDuringRun) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + // setActiveBackup is synchronous — active backup must be visible immediately + auto active = manager_->getActiveBackup(USERNAME); + EXPECT_TRUE(active.has_value()); + EXPECT_EQ(active->first, BACKUP_NAME); + EXPECT_EQ(active->second, "creation"); + waitForBackupInList(BACKUP_NAME); +} + +TEST_F(BackupIntegrationTest, CreateBackup_ProducesTarFile) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + std::string tar = data_dir_ + "/backups/" + USERNAME + "/" + BACKUP_NAME + ".tar"; + EXPECT_TRUE(fs::exists(tar)); + EXPECT_GT(fs::file_size(tar), 0u); +} + +TEST_F(BackupIntegrationTest, CreateBackup_AppearsInListBackups) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + EXPECT_TRUE(manager_->listBackups(USERNAME).contains(BACKUP_NAME)); +} + +TEST_F(BackupIntegrationTest, CreateBackup_MetadataHasExpectedFields) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + auto info = manager_->getBackupInfo(BACKUP_NAME, USERNAME); + EXPECT_FALSE(info.is_null()); + EXPECT_EQ(info["original_index"], IDX_NAME); + ASSERT_TRUE(info.contains("params")); + EXPECT_EQ(info["params"]["dim"].get(), DIM); + EXPECT_TRUE(info.contains("timestamp")); +} + +TEST_F(BackupIntegrationTest, CreateBackup_WhileInProgress_ReturnsFalse) { + createTestIndex(); + insertVectors(); + auto [ok1, _1] = manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(ok1); + + // setActiveBackup is synchronous; second call must be rejected + auto [ok2, msg] = manager_->createBackupAsync(INDEX_ID, "another_bk"); + EXPECT_FALSE(ok2); + EXPECT_NE(msg.find("in progress"), std::string::npos); + waitForBackupInList(BACKUP_NAME); +} + +TEST_F(BackupIntegrationTest, CreateBackup_DuplicateName_ReturnsFalse) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + auto [ok, msg] = manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + EXPECT_FALSE(ok); +} + +TEST_F(BackupIntegrationTest, CreateBackup_InvalidName_ReturnsFalse) { + createTestIndex(); + auto [ok, msg] = manager_->createBackupAsync(INDEX_ID, "bad/name"); + EXPECT_FALSE(ok); +} + +TEST_F(BackupIntegrationTest, DeleteBackup_RemovesTarAndJsonEntry) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + auto [ok, msg] = manager_->deleteBackup(BACKUP_NAME, USERNAME); + EXPECT_TRUE(ok); + + std::string tar = data_dir_ + "/backups/" + USERNAME + "/" + BACKUP_NAME + ".tar"; + EXPECT_FALSE(fs::exists(tar)); + EXPECT_FALSE(manager_->listBackups(USERNAME).contains(BACKUP_NAME)); +} + +TEST_F(BackupIntegrationTest, DeleteBackup_NonExistent_ReturnsFalse) { + auto [ok, msg] = manager_->deleteBackup("no_such_backup", USERNAME); + EXPECT_FALSE(ok); +} + +TEST_F(BackupIntegrationTest, RestoreBackupAsync_ReturnsTrueAndTargetName) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + auto [ok, name] = manager_->restoreBackupAsync(BACKUP_NAME, "restored_idx", USERNAME); + EXPECT_TRUE(ok); + EXPECT_EQ(name, "restored_idx"); + waitForNoActiveBackup(); +} + +TEST_F(BackupIntegrationTest, RestoreBackup_CreatesIndexWithCorrectMetadata) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + manager_->restoreBackupAsync(BACKUP_NAME, "restored_idx", USERNAME); + ASSERT_TRUE(waitForNoActiveBackup()); + + auto meta = manager_->getMetadata(USERNAME + std::string("/restored_idx")); + ASSERT_TRUE(meta.has_value()); + EXPECT_EQ(meta->name, "restored_idx"); + EXPECT_EQ(meta->dimension, DIM); + EXPECT_EQ(meta->M, 8u); +} + +TEST_F(BackupIntegrationTest, RestoreBackup_PreservesVectorCount) { + createTestIndex(); + insertVectors(N_VECTORS); + size_t original_count = manager_->getElementCount(INDEX_ID); + + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + manager_->restoreBackupAsync(BACKUP_NAME, "restored_idx", USERNAME); + ASSERT_TRUE(waitForNoActiveBackup()); + + std::string restored_id = USERNAME + std::string("/restored_idx"); + EXPECT_EQ(manager_->getElementCount(restored_id), original_count); +} + +TEST_F(BackupIntegrationTest, RestoreBackup_NonExistentBackup_ReturnsFalse) { + auto [ok, msg] = manager_->restoreBackupAsync("no_such_backup", "some_idx", USERNAME); + EXPECT_FALSE(ok); + EXPECT_NE(msg.find("not found"), std::string::npos); +} + +TEST_F(BackupIntegrationTest, RestoreBackup_TargetIndexAlreadyExists_ReturnsFalse) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + ASSERT_TRUE(waitForBackupInList(BACKUP_NAME)); + + // IDX_NAME index already exists + auto [ok, msg] = manager_->restoreBackupAsync(BACKUP_NAME, IDX_NAME, USERNAME); + EXPECT_FALSE(ok); + EXPECT_NE(msg.find("already exists"), std::string::npos); +} + +TEST_F(BackupIntegrationTest, RestoreBackup_WhileCreateInProgress_ReturnsFalse) { + createTestIndex(); + insertVectors(); + manager_->createBackupAsync(INDEX_ID, BACKUP_NAME); + + // setActiveBackup is synchronous — restore must be rejected immediately + auto [ok, msg] = manager_->restoreBackupAsync(BACKUP_NAME, "restored_idx", USERNAME); + EXPECT_FALSE(ok); + EXPECT_NE(msg.find("in progress"), std::string::npos); + waitForNoActiveBackup(); +} diff --git a/tests/filter_test.cpp b/tests/filter_test.cpp index 101be3403e..f75d51ed15 100644 --- a/tests/filter_test.cpp +++ b/tests/filter_test.cpp @@ -37,7 +37,7 @@ class FilterTest : public ::testing::Test { } // Initialize Filter - filter = std::make_unique(db_path); + filter = std::make_unique(db_path, "testuser/testidx"); } void TearDown() override { diff --git a/tests/rebuild_test.cpp b/tests/rebuild_test.cpp new file mode 100644 index 0000000000..e3a8a38f6a --- /dev/null +++ b/tests/rebuild_test.cpp @@ -0,0 +1,328 @@ +#include +#include +#include +#include +#include + +#include "rebuild.hpp" +#include "ndd.hpp" +#include "utils/msgpack_ndd.hpp" +#include "server/auth.hpp" + +namespace fs = std::filesystem; + +// ============================================================ +// Layer 1 — Rebuild state management (no IndexManager needed) +// ============================================================ + +class RebuildStateTest : public ::testing::Test { +protected: + Rebuild rebuild; +}; + +TEST_F(RebuildStateTest, NoRebuild_HasActiveIsFalse) { + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, NoRebuild_GetProgressIsIdle) { + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "idle"); +} + +TEST_F(RebuildStateTest, SetActive_HasActiveIsTrue) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + EXPECT_TRUE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, SetActive_GetProgressShowsInProgress) { + rebuild.setActiveRebuild("alice", "alice/idx", 200); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "in_progress"); + EXPECT_EQ(p["total_vectors"], 200); + EXPECT_EQ(p["vectors_processed"], 0); +} + +TEST_F(RebuildStateTest, UpdateProgress_ReflectedInGetProgress) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.updateProgress("alice", 50); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["vectors_processed"], 50); +} + +TEST_F(RebuildStateTest, PercentComplete_CalculatedCorrectly) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.updateProgress("alice", 50); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_DOUBLE_EQ(p["percent_complete"].get(), 50.0); +} + +TEST_F(RebuildStateTest, PercentComplete_ZeroTotal_IsZero) { + rebuild.setActiveRebuild("alice", "alice/idx", 0); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_DOUBLE_EQ(p["percent_complete"].get(), 0.0); +} + +TEST_F(RebuildStateTest, Complete_StatusIsCompleted) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "completed"); +} + +TEST_F(RebuildStateTest, Complete_HasActiveIsFalse) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, Complete_CompletedAtPresent) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_TRUE(p.contains("completed_at")); +} + +TEST_F(RebuildStateTest, Fail_StatusIsFailed) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "disk full"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "failed"); +} + +TEST_F(RebuildStateTest, Fail_HasActiveIsFalse) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "disk full"); + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, Fail_ErrorMessagePresent) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "disk full"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["error"], "disk full"); +} + +TEST_F(RebuildStateTest, Fail_CompletedAtPresent) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "oom"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_TRUE(p.contains("completed_at")); +} + +TEST_F(RebuildStateTest, TwoUsers_IndependentState) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + EXPECT_TRUE(rebuild.hasActiveRebuild("alice")); + EXPECT_FALSE(rebuild.hasActiveRebuild("bob")); + rebuild.setActiveRebuild("bob", "bob/idx", 50); + EXPECT_TRUE(rebuild.hasActiveRebuild("bob")); + rebuild.completeActiveRebuild("alice"); + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); + EXPECT_TRUE(rebuild.hasActiveRebuild("bob")); +} + +TEST_F(RebuildStateTest, GetProgress_WrongIndex_ReturnsIdle) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + auto p = rebuild.getProgress("alice", "alice/other"); + EXPECT_EQ(p["status"], "idle"); +} + +TEST_F(RebuildStateTest, SetActive_OverwritesPreviousCompleted) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + rebuild.setActiveRebuild("alice", "alice/idx", 200); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "in_progress"); + EXPECT_EQ(p["total_vectors"], 200); +} + +// ============================================================ +// Layer 2 — Temp file cleanup and path helpers +// ============================================================ + +class RebuildCleanupTest : public ::testing::Test { +protected: + std::string dir_; + Rebuild rebuild_; + + void SetUp() override { + dir_ = "./test_rebuild_cleanup_" + std::to_string(rand()); + fs::create_directories(dir_ + "/user/idx/vectors"); + } + + void TearDown() override { + if (fs::exists(dir_)) fs::remove_all(dir_); + } + + void touch(const std::string& rel_path) { + std::ofstream f(dir_ + "/" + rel_path); + f << "x"; + } + + bool exists(const std::string& rel_path) { + return fs::exists(dir_ + "/" + rel_path); + } +}; + +TEST_F(RebuildCleanupTest, CleanupTempFiles_NonExistentDir_NoOp) { + EXPECT_NO_THROW(rebuild_.cleanupTempFiles("/nonexistent/path/xyz")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_RemovesTempFile) { + touch("user/idx/vectors/default.idx.temp"); + rebuild_.cleanupTempFiles(dir_); + EXPECT_FALSE(exists("user/idx/vectors/default.idx.temp")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_RemovesTimestampedFile) { + touch("user/idx/vectors/default.idx.1714900000"); + rebuild_.cleanupTempFiles(dir_); + EXPECT_FALSE(exists("user/idx/vectors/default.idx.1714900000")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_LeavesCanonicalIndex) { + touch("user/idx/vectors/default.idx"); + rebuild_.cleanupTempFiles(dir_); + EXPECT_TRUE(exists("user/idx/vectors/default.idx")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_EmptyDir_NoOp) { + EXPECT_NO_THROW(rebuild_.cleanupTempFiles(dir_)); +} + +TEST(RebuildPathTest, GetTempPath_Format) { + auto path = Rebuild::getTempPath("/data/user/idx"); + EXPECT_EQ(path, "/data/user/idx/vectors/default.idx.temp"); +} + +TEST(RebuildPathTest, GetTimestampedPath_HasTimestamp) { + auto path = Rebuild::getTimestampedPath("/data/user/idx"); + // Should match /data/user/idx/vectors/default.idx. + std::string prefix = "/data/user/idx/vectors/default.idx."; + ASSERT_GT(path.size(), prefix.size()); + EXPECT_EQ(path.substr(0, prefix.size()), prefix); + std::string suffix = path.substr(prefix.size()); + EXPECT_FALSE(suffix.empty()); + EXPECT_TRUE(std::all_of(suffix.begin(), suffix.end(), ::isdigit)); +} + +// ============================================================ +// Layer 3 — Integration tests via IndexManager +// ============================================================ + +class RebuildIntegrationTest : public ::testing::Test { +protected: + static constexpr const char* USERNAME = "testuser"; + static constexpr const char* IDX_NAME = "testidx"; + static constexpr const char* INDEX_ID = "testuser/testidx"; + static constexpr size_t DIM = 32; + static constexpr size_t N_VECTORS = 100; + + std::string data_dir_; + std::unique_ptr manager_; + + void SetUp() override { + data_dir_ = "./test_rebuild_integration_" + std::to_string(rand()); + fs::create_directories(data_dir_); + PersistenceConfig pcfg; + pcfg.save_on_shutdown = false; + manager_ = std::make_unique(data_dir_, pcfg); + } + + void TearDown() override { + manager_.reset(); + if (fs::exists(data_dir_)) fs::remove_all(data_dir_); + } + + void createTestIndex(size_t M = 8, size_t ef_con = 64) { + IndexConfig config{ + .dim = DIM, + .max_elements = 1000, + .space_type_str = "cosine", + .M = M, + .ef_construction = ef_con, + .quant_level = ndd::quant::QuantizationLevel::FP32, + .checksum = 0 + }; + manager_->createIndex(INDEX_ID, config, UserType::Admin, 0); + } + + void insertVectors(size_t n = N_VECTORS) { + std::vector vecs; + vecs.reserve(n); + for (size_t i = 0; i < n; ++i) { + ndd::HybridVectorObject v; + v.id = "vec_" + std::to_string(i); + v.vector.resize(DIM); + for (size_t d = 0; d < DIM; ++d) + v.vector[d] = static_cast(rand()) / RAND_MAX; + vecs.push_back(std::move(v)); + } + manager_->addVectors(INDEX_ID, vecs); + } + + // Returns true if rebuild completed successfully within timeout_sec. + bool waitForRebuild(int timeout_sec = 10) { + auto deadline = std::chrono::steady_clock::now() + + std::chrono::seconds(timeout_sec); + while (std::chrono::steady_clock::now() < deadline) { + auto progress = manager_->getRebuildProgress(USERNAME, INDEX_ID); + std::string status = progress.value("status", ""); + if (status == "completed") return true; + if (status == "failed") return false; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + return false; + } +}; + +TEST_F(RebuildIntegrationTest, RebuildAsync_ReturnSuccessCode) { + createTestIndex(); + insertVectors(); + auto result = manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + EXPECT_EQ(result.code, 0); + waitForRebuild(); +} + +TEST_F(RebuildIntegrationTest, RebuildCompletes_ConfigUpdated) { + createTestIndex(8, 64); + insertVectors(); + manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + ASSERT_TRUE(waitForRebuild()); + auto meta = manager_->getMetadata(INDEX_ID); + ASSERT_TRUE(meta.has_value()); + EXPECT_EQ(meta->M, 16u); + EXPECT_EQ(meta->ef_con, 128u); +} + +TEST_F(RebuildIntegrationTest, RebuildCompletes_VectorCountPreserved) { + createTestIndex(); + insertVectors(N_VECTORS); + size_t before = manager_->getElementCount(INDEX_ID); + manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + ASSERT_TRUE(waitForRebuild()); + size_t after = manager_->getElementCount(INDEX_ID); + EXPECT_EQ(before, after); +} + +TEST_F(RebuildIntegrationTest, RebuildWhileInProgress_Returns409Code) { + createTestIndex(); + insertVectors(); + // setActiveRebuild is synchronous — second call sees IN_PROGRESS before thread starts + auto r1 = manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + ASSERT_EQ(r1.code, 0); + auto r2 = manager_->rebuildIndexAsync(INDEX_ID, 32, 256); + EXPECT_EQ(r2.code, 2); + waitForRebuild(); +} + +TEST_F(RebuildIntegrationTest, RebuildNonExistentIndex_Returns404Code) { + auto result = manager_->rebuildIndexAsync("testuser/doesnotexist", 16, 128); + EXPECT_EQ(result.code, 1); +} + +TEST_F(RebuildIntegrationTest, RebuildNoChange_Returns400Code) { + createTestIndex(8, 64); + insertVectors(); + auto result = manager_->rebuildIndexAsync(INDEX_ID, 8, 64); + EXPECT_EQ(result.code, 3); +}