diff --git a/src/core/ndd.hpp b/src/core/ndd.hpp index 55f6e5bc57..746509efaf 100644 --- a/src/core/ndd.hpp +++ b/src/core/ndd.hpp @@ -727,28 +727,38 @@ class IndexManager { } hnswlib::SpaceType space_type = hnswlib::getSpaceType(config.space_type_str); - std::string lmdb_dir = index_dir + "/ids"; //create the directory and initialize sequence for IDMapper LOG_INFO(2021, index_id, "Creating ID mapper with user type " << userTypeToString(user_type)); - // IDMapper now uses tier-based fixed bloom filter sizing based on user_type - auto id_mapper = std::make_shared(lmdb_dir, true, user_type); - - // Create HNSW directly with all necessary parameters ndd::quant::QuantizationLevel quant_level = config.quant_level; auto vector_storage = std::make_shared(index_dir, index_id, config.dim, config.quant_level); + std::shared_ptr id_mapper; + if(vector_storage->uses_shared_env()) { + id_mapper = std::make_shared( + vector_storage->shared_env(), "id_map", true, user_type); + } else { + std::string lmdb_dir = index_dir + "/ids"; + // IDMapper now uses tier-based fixed bloom filter sizing based on user_type + id_mapper = std::make_shared(lmdb_dir, true, user_type); + } + // Initialize Sparse Storage if needed std::unique_ptr sparse_storage = nullptr; if(ndd::sparseModelEnabled(config.sparse_model)) { - std::string sparse_storage_dir = index_dir + "/sparse"; - sparse_storage = std::make_unique( - sparse_storage_dir, index_id, config.sparse_model); + if(vector_storage->uses_shared_env()) { + sparse_storage = std::make_unique( + vector_storage->shared_env(), index_id, config.sparse_model); + } else { + std::string sparse_storage_dir = index_dir + "/sparse"; + sparse_storage = std::make_unique( + sparse_storage_dir, index_id, config.sparse_model); + } if(!sparse_storage->initialize()) { throw std::runtime_error("Failed to initialize sparse storage"); } @@ -824,12 +834,10 @@ class IndexManager { void loadIndex(const std::string& index_id) { std::string index_dir = data_dir_ + "/" + index_id; - std::string lmdb_dir = index_dir + "/ids"; std::string vector_storage_dir = index_dir + "/vectors"; std::string index_path = vector_storage_dir + "/" + settings::DEFAULT_SUBINDEX + ".idx"; - if(!std::filesystem::exists(index_path) || !std::filesystem::exists(lmdb_dir) - || !std::filesystem::exists(vector_storage_dir)) { + if(!std::filesystem::exists(index_path) || !std::filesystem::exists(vector_storage_dir)) { throw std::runtime_error("Required files missing for index: " + index_id); } @@ -852,16 +860,32 @@ class IndexManager { } // Step 2: Create IDMapper and VectorStorage - IDMapper handles bloom filter initialization - auto id_mapper = std::make_shared(lmdb_dir, false); auto vector_storage = std::make_shared( index_dir, index_id, alg->getDimension(), alg->getQuantLevel()); + std::shared_ptr id_mapper; + if(vector_storage->uses_shared_env()) { + id_mapper = std::make_shared( + vector_storage->shared_env(), "id_map", false); + } else { + std::string lmdb_dir = index_dir + "/ids"; + if(!std::filesystem::exists(lmdb_dir)) { + throw std::runtime_error("Required files missing for index: " + index_id); + } + id_mapper = std::make_shared(lmdb_dir, false); + } + // Initialize Sparse Storage if sparse_model is enabled std::unique_ptr sparse_storage; if(ndd::sparseModelEnabled(sparse_model)) { - std::string sparse_storage_dir = index_dir + "/sparse"; - sparse_storage = std::make_unique( - sparse_storage_dir, index_id, sparse_model); + if(vector_storage->uses_shared_env()) { + sparse_storage = std::make_unique( + vector_storage->shared_env(), index_id, sparse_model); + } else { + std::string sparse_storage_dir = index_dir + "/sparse"; + sparse_storage = std::make_unique( + sparse_storage_dir, index_id, sparse_model); + } if(!sparse_storage->initialize()) { throw std::runtime_error("Failed to initialize sparse storage for index: " + index_id); @@ -1014,15 +1038,44 @@ class IndexManager { str_ids.push_back(vec.id); } LOG_DEBUG("Extracted " << str_ids.size() << " string IDs from vectors"); + const bool shared_atomic_write = entry.vector_storage->uses_shared_env() + && entry.id_mapper->get_env() + == entry.vector_storage->shared_env(); + std::vector> numeric_ids; - // Get or create numeric IDs in batch - this returns ids. - // If str_id already exists, it will return the old numeric ID - if(entry.alg->getDeletedCount() > 0) { - // There are deleted IDs, we need to reuse them - numeric_ids = entry.id_mapper->create_ids_batch(str_ids, wal); + MDBX_txn* shared_txn = nullptr; + int64_t sparse_count_delta = 0; + + if(shared_atomic_write) { + int rc = mdbx_txn_begin( + entry.vector_storage->shared_env(), nullptr, MDBX_TXN_READWRITE, &shared_txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin shared add transaction: " + + std::string(mdbx_strerror(rc))); + } + + try { + if(entry.alg->getDeletedCount() > 0) { + numeric_ids = + entry.id_mapper->create_ids_batch_txn(shared_txn, str_ids); + } else { + numeric_ids = + entry.id_mapper->create_ids_batch_txn(shared_txn, str_ids); + } + } catch(...) { + mdbx_txn_abort(shared_txn); + throw; + } } else { - // No deleted IDs, just create new ones - numeric_ids = entry.id_mapper->create_ids_batch(str_ids, wal); + // Get or create numeric IDs in batch - this returns ids. + // If str_id already exists, it will return the old numeric ID + if(entry.alg->getDeletedCount() > 0) { + // There are deleted IDs, we need to reuse them + numeric_ids = entry.id_mapper->create_ids_batch(str_ids, wal); + } else { + // No deleted IDs, just create new ones + numeric_ids = entry.id_mapper->create_ids_batch(str_ids, wal); + } } LOG_DEBUG("Created " << numeric_ids.size() << " numeric IDs for string IDs"); @@ -1063,12 +1116,25 @@ class IndexManager { } if(!sparse_batch.empty()) { - if(!entry.sparse_storage->store_vectors_batch(sparse_batch)) { - LOG_ERROR(2053, - index_id, - "Failed to update sparse storage for batch size " - << sparse_batch.size()); - return false; + if(shared_atomic_write && shared_txn + && entry.sparse_storage->uses_shared_env()) { + if(!entry.sparse_storage->store_vectors_batch_txn( + shared_txn, sparse_batch, &sparse_count_delta)) { + mdbx_txn_abort(shared_txn); + LOG_ERROR(2053, + index_id, + "Failed to update sparse storage for batch size " + << sparse_batch.size()); + return false; + } + } else { + if(!entry.sparse_storage->store_vectors_batch(sparse_batch)) { + LOG_ERROR(2053, + index_id, + "Failed to update sparse storage for batch size " + << sparse_batch.size()); + return false; + } } } } @@ -1100,7 +1166,24 @@ class IndexManager { // Copy QuantVectorObject for storage (we need to keep original for HNSW) storage_vectors.emplace_back(numeric_ids[i].first, quantized_vectors[i]); } - entry.vector_storage->store_vectors_batch(storage_vectors); + if(shared_atomic_write && shared_txn) { + try { + entry.vector_storage->store_vectors_batch_txn(shared_txn, storage_vectors); + int rc = mdbx_txn_commit(shared_txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit shared add transaction: " + + std::string(mdbx_strerror(rc))); + } + if(entry.sparse_storage && entry.sparse_storage->uses_shared_env()) { + entry.sparse_storage->apply_vector_count_delta(sparse_count_delta); + } + } catch(...) { + mdbx_txn_abort(shared_txn); + throw; + } + } else { + entry.vector_storage->store_vectors_batch(storage_vectors); + } LOG_DEBUG("Stored " << storage_vectors.size() << " pre-quantized vectors in vector storage"); @@ -1334,24 +1417,81 @@ class IndexManager { // meta, vector data Meta and vector data will be overwritten when the id is reused bool deleteVectorsByIds(CacheEntry& entry, const std::vector& numeric_ids) { try { - for(ndd::idInt numeric_id : numeric_ids) { - auto meta = entry.vector_storage->get_meta(numeric_id); - // Remove ID mapping by getting the string id from metadata - auto stored_ids = entry.id_mapper->deletePoints({meta.id}); - if(stored_ids[0] != numeric_id) { - LOG_DEBUG("Error: Mismatch in stored ID and numeric ID " - << stored_ids[0] << " != " << numeric_id); - continue; + const bool can_use_shared_txn = entry.vector_storage->uses_shared_env() + && entry.id_mapper->get_env() + == entry.vector_storage->shared_env(); + + if(can_use_shared_txn) { + MDBX_txn* txn; + int64_t sparse_count_delta = 0; + int rc = mdbx_txn_begin( + entry.vector_storage->shared_env(), nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin shared delete transaction: " + + std::string(mdbx_strerror(rc))); } - // Remove the filter - entry.vector_storage->deleteFilter(numeric_id, meta.filter); - // Mark as deleted in HNSW index - entry.alg->markDelete(numeric_id); + try { + for(ndd::idInt numeric_id : numeric_ids) { + auto meta = entry.vector_storage->get_meta_txn(txn, numeric_id); + auto stored_ids = entry.id_mapper->deletePoints_txn(txn, {meta.id}); + if(stored_ids.empty() || stored_ids[0] != numeric_id) { + LOG_DEBUG("Error: Mismatch in stored ID and numeric ID " + << (stored_ids.empty() ? 0 : stored_ids[0]) << " != " + << numeric_id); + continue; + } + + entry.vector_storage->deletePoint_txn(txn, numeric_id); + if(entry.sparse_storage && entry.sparse_storage->uses_shared_env()) { + int64_t one_delta = 0; + if(entry.sparse_storage->delete_vector_txn(txn, numeric_id, &one_delta)) { + sparse_count_delta += one_delta; + } + } + } + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error( + "Failed to commit shared delete transaction: " + + std::string(mdbx_strerror(rc))); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + + if(entry.sparse_storage && entry.sparse_storage->uses_shared_env()) { + entry.sparse_storage->apply_vector_count_delta(sparse_count_delta); + } - // Delete from sparse storage if hybrid index - if(entry.sparse_storage) { - entry.sparse_storage->delete_vector(numeric_id); + for(ndd::idInt numeric_id : numeric_ids) { + entry.alg->markDelete(numeric_id); + if(entry.sparse_storage && !entry.sparse_storage->uses_shared_env()) { + entry.sparse_storage->delete_vector(numeric_id); + } + } + } else { + for(ndd::idInt numeric_id : numeric_ids) { + auto meta = entry.vector_storage->get_meta(numeric_id); + // Remove ID mapping by getting the string id from metadata + auto stored_ids = entry.id_mapper->deletePoints({meta.id}); + if(stored_ids[0] != numeric_id) { + LOG_DEBUG("Error: Mismatch in stored ID and numeric ID " + << stored_ids[0] << " != " << numeric_id); + continue; + } + // Remove the filter + entry.vector_storage->deleteFilter(numeric_id, meta.filter); + // Mark as deleted in HNSW index + + entry.alg->markDelete(numeric_id); + + // Delete from sparse storage if hybrid index + if(entry.sparse_storage) { + entry.sparse_storage->delete_vector(numeric_id); + } } } // Add the list to write ahead log using IndexManager's method @@ -1409,16 +1549,54 @@ class IndexManager { std::unique_lock operation_lock(entry.operation_mutex); + const bool can_use_shared_txn = entry.vector_storage->uses_shared_env() + && entry.id_mapper->get_env() + == entry.vector_storage->shared_env(); + size_t updated_count = 0; - for(const auto& [str_id, new_filter] : updates) { - ndd::idInt numeric_id = entry.id_mapper->get_id(str_id); - if(numeric_id == 0) { - LOG_DEBUG("updateFilters: ID not found: " << str_id); - continue; + if(can_use_shared_txn) { + MDBX_txn* txn; + int rc = mdbx_txn_begin( + entry.vector_storage->shared_env(), nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error( + "Failed to begin shared updateFilters transaction: " + + std::string(mdbx_strerror(rc))); } - entry.vector_storage->updateFilter(numeric_id, new_filter); - updated_count++; + try { + for(const auto& [str_id, new_filter] : updates) { + ndd::idInt numeric_id = entry.id_mapper->get_id_txn(txn, str_id); + if(numeric_id == 0) { + LOG_DEBUG("updateFilters: ID not found: " << str_id); + continue; + } + + entry.vector_storage->updateFilter_txn(txn, numeric_id, new_filter); + updated_count++; + } + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error( + "Failed to commit shared updateFilters transaction: " + + std::string(mdbx_strerror(rc))); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + } else { + for(const auto& [str_id, new_filter] : updates) { + ndd::idInt numeric_id = entry.id_mapper->get_id(str_id); + if(numeric_id == 0) { + LOG_DEBUG("updateFilters: ID not found: " << str_id); + continue; + } + + entry.vector_storage->updateFilter(numeric_id, new_filter); + updated_count++; + } } if(updated_count > 0) { diff --git a/src/filter/category_index.hpp b/src/filter/category_index.hpp index 58ffa62c69..a7c02e61b8 100644 --- a/src/filter/category_index.hpp +++ b/src/filter/category_index.hpp @@ -21,7 +21,27 @@ namespace ndd { return field + ":" + value; } - // Load bitmap from LMDB + // Load bitmap using an existing transaction (avoids opening a nested txn) + ndd::RoaringBitmap get_bitmap_with_txn(MDBX_txn* txn, + const std::string& filter_key) const { + MDBX_val key{const_cast(filter_key.c_str()), filter_key.size()}; + MDBX_val data; + + int rc = mdbx_get(txn, dbi_, &key, &data); + if(rc == MDBX_NOTFOUND) { + return ndd::RoaringBitmap(); + } + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to read filter key '" + filter_key + + "': " + std::string(mdbx_strerror(rc))); + } + if(data.iov_len == 0) { + return ndd::RoaringBitmap(); + } + return ndd::RoaringBitmap::read(static_cast(data.iov_base)); + } + + // Load bitmap from LMDB (opens its own read transaction) ndd::RoaringBitmap get_bitmap_internal(const std::string& filter_key) const { MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_RDONLY, &txn); @@ -31,29 +51,7 @@ namespace ndd { } try { - MDBX_val key{const_cast(filter_key.c_str()), filter_key.size()}; - MDBX_val data; - - rc = mdbx_get(txn, dbi_, &key, &data); - if(rc == MDBX_NOTFOUND) { - mdbx_txn_abort(txn); - // LOG_DEBUG("Filter key not found: " << filter_key); - return ndd::RoaringBitmap(); // Return empty bitmap - } - if(rc != MDBX_SUCCESS) { - mdbx_txn_abort(txn); - throw std::runtime_error("Failed to read filter key '" + filter_key - + "': " + std::string(mdbx_strerror(rc))); - } - - if(data.iov_len == 0) { - mdbx_txn_abort(txn); - // LOG_DEBUG("Empty data for filter key: " << filter_key); - return ndd::RoaringBitmap(); - } - - ndd::RoaringBitmap bitmap = - ndd::RoaringBitmap::read(static_cast(data.iov_base)); + ndd::RoaringBitmap bitmap = get_bitmap_with_txn(txn, filter_key); mdbx_txn_abort(txn); return bitmap; } catch(...) { @@ -62,7 +60,8 @@ namespace ndd { } } - void store_bitmap_internal(const std::string& filter_key, + void store_bitmap_internal(MDBX_txn* txn, + const std::string& filter_key, const ndd::RoaringBitmap& bitmap) { if(bitmap.cardinality() == 0) { // LOG_DEBUG("Storing empty bitmap for key: " << filter_key); @@ -79,25 +78,11 @@ namespace ndd { MDBX_val key{const_cast(filter_key.c_str()), filter_key.size()}; MDBX_val data{const_cast(buffer.data()), buffer.size()}; - MDBX_txn* txn; - int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); if(rc != MDBX_SUCCESS) { - throw std::runtime_error("Failed to begin write transaction: " - + std::string(mdbx_strerror(rc))); - } - - rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); - if(rc != MDBX_SUCCESS) { - mdbx_txn_abort(txn); throw std::runtime_error("Failed to store bitmap: " + std::string(mdbx_strerror(rc))); } - - rc = mdbx_txn_commit(txn); - if(rc != MDBX_SUCCESS) { - throw std::runtime_error("Failed to commit transaction: " - + std::string(mdbx_strerror(rc))); - } } public: @@ -155,18 +140,66 @@ namespace ndd { return get_bitmap_internal(key); } - void add(const std::string& field, const std::string& value, ndd::idInt id) { + void add(MDBX_txn* txn, + const std::string& field, + const std::string& value, + ndd::idInt id) { std::string filter_key = format_filter_key(field, value); - ndd::RoaringBitmap bitmap = get_bitmap_internal(filter_key); + ndd::RoaringBitmap bitmap = get_bitmap_with_txn(txn, filter_key); bitmap.add(id); - store_bitmap_internal(filter_key, bitmap); + store_bitmap_internal(txn, filter_key, bitmap); } - void remove(const std::string& field, const std::string& value, ndd::idInt id) { + void add(const std::string& field, const std::string& value, ndd::idInt id) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin write transaction: " + + std::string(mdbx_strerror(rc))); + } + + try { + add(txn, field, value, id); + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit transaction: " + + std::string(mdbx_strerror(rc))); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + } + + void remove(MDBX_txn* txn, + const std::string& field, + const std::string& value, + ndd::idInt id) { std::string filter_key = format_filter_key(field, value); - ndd::RoaringBitmap bitmap = get_bitmap_internal(filter_key); + ndd::RoaringBitmap bitmap = get_bitmap_with_txn(txn, filter_key); bitmap.remove(id); - store_bitmap_internal(filter_key, bitmap); + store_bitmap_internal(txn, filter_key, bitmap); + } + + void remove(const std::string& field, const std::string& value, ndd::idInt id) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin write transaction: " + + std::string(mdbx_strerror(rc))); + } + + try { + remove(txn, field, value, id); + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit transaction: " + + std::string(mdbx_strerror(rc))); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } } bool contains(const std::string& field, const std::string& value, ndd::idInt id) const { @@ -181,24 +214,65 @@ namespace ndd { if(ids.empty()) { return; } - std::string filter_key = format_filter_key(field, value); - ndd::RoaringBitmap bitmap = get_bitmap_internal(filter_key); - for(const auto& id : ids) { - bitmap.add(id); + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin write transaction: " + + std::string(mdbx_strerror(rc))); + } + + try { + std::string filter_key = format_filter_key(field, value); + ndd::RoaringBitmap bitmap = get_bitmap_with_txn(txn, filter_key); + for(const auto& id : ids) { + bitmap.add(id); + } + store_bitmap_internal(txn, filter_key, bitmap); + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit transaction: " + + std::string(mdbx_strerror(rc))); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; } - store_bitmap_internal(filter_key, bitmap); } - // Helper for batch operations where key is already formatted - void add_batch_by_key(const std::string& key, const std::vector& ids) { + void add_batch_by_key(MDBX_txn* txn, + const std::string& key, + const std::vector& ids) { if(ids.empty()) { return; } - ndd::RoaringBitmap bitmap = get_bitmap_internal(key); + ndd::RoaringBitmap bitmap = get_bitmap_with_txn(txn, key); for(const auto& id : ids) { bitmap.add(id); } - store_bitmap_internal(key, bitmap); + store_bitmap_internal(txn, key, bitmap); + } + + // Helper for batch operations where key is already formatted + void add_batch_by_key(const std::string& key, const std::vector& ids) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin write transaction: " + + std::string(mdbx_strerror(rc))); + } + + try { + add_batch_by_key(txn, key, ids); + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit transaction: " + + std::string(mdbx_strerror(rc))); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } } // Expose key formatting for external batching logic diff --git a/src/filter/filter.hpp b/src/filter/filter.hpp index 392fe2b404..0de89ef965 100644 --- a/src/filter/filter.hpp +++ b/src/filter/filter.hpp @@ -4,12 +4,14 @@ #include #include #include -#include #include #include #include #include #include +#include +#include +#include #include "json/nlohmann_json.hpp" #include "../utils/settings.hpp" @@ -44,6 +46,8 @@ class Filter { MDBX_dbi dbi_; // Used for schema storage std::string index_id_; std::string path_; + std::string schema_dbi_name_; + bool owns_env_; std::unique_ptr numeric_index_; std::unique_ptr category_index_; @@ -79,13 +83,28 @@ class Filter { mdbx_txn_abort(txn); } - void save_schema_internal() { + bool save_schema_internal(MDBX_txn* txn) { nlohmann::json j; - for(const auto& [k, v] : schema_cache_) { - j[k] = static_cast(v); + { + std::lock_guard lock(schema_mutex_); + for(const auto& [k, v] : schema_cache_) { + j[k] = static_cast(v); + } } std::string json_str = j.dump(); + MDBX_val key{const_cast(SCHEMA_KEY), strlen(SCHEMA_KEY)}; + MDBX_val data{const_cast(json_str.c_str()), json_str.size()}; + + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc != MDBX_SUCCESS) { + LOG_ERROR(1211, index_id_, "Failed to persist filter schema: " << mdbx_strerror(rc)); + return false; + } + return true; + } + + void save_schema_internal() { MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); if(rc != MDBX_SUCCESS) { @@ -94,34 +113,60 @@ class Filter { return; } - MDBX_val key{const_cast(SCHEMA_KEY), strlen(SCHEMA_KEY)}; - MDBX_val data{const_cast(json_str.c_str()), json_str.size()}; - - rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); - if(rc == MDBX_SUCCESS) { - rc = mdbx_txn_commit(txn); - if(rc != MDBX_SUCCESS) { - LOG_ERROR( - 1209, index_id_, "Failed to commit filter schema update: " << mdbx_strerror(rc)); - } - } else { + if(!save_schema_internal(txn)) { mdbx_txn_abort(txn); - LOG_ERROR(1211, index_id_, "Failed to persist filter schema: " << mdbx_strerror(rc)); + return; + } + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + LOG_ERROR( + 1209, index_id_, "Failed to commit filter schema update: " << mdbx_strerror(rc)); } } - bool register_field_type(const std::string& field, FieldType type) { + bool register_field_type(const std::string& field, FieldType type, bool* changed = nullptr) { std::lock_guard lock(schema_mutex_); auto it = schema_cache_.find(field); if(it != schema_cache_.end()) { + if(changed) { + *changed = false; + } return it->second == type; } schema_cache_[field] = type; - save_schema_internal(); + if(changed) { + *changed = true; + } return true; } + void init_dbis() { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != 0) { + throw std::runtime_error(std::string("Failed to begin filter transaction: ") + mdbx_strerror(rc)); + } + + const char* schema_dbi_name = schema_dbi_name_.empty() ? nullptr : schema_dbi_name_.c_str(); + rc = mdbx_dbi_open(txn, schema_dbi_name, MDBX_CREATE, &dbi_); + if(rc != 0) { + mdbx_txn_abort(txn); + throw std::runtime_error(std::string("Failed to open filter database: ") + mdbx_strerror(rc)); + } + rc = mdbx_txn_commit(txn); + if(rc != 0) { + throw std::runtime_error(std::string("Failed to commit filter transaction: ") + mdbx_strerror(rc)); + } + + // Initialize Indices + numeric_index_ = std::make_unique(env_); + category_index_ = std::make_unique(env_); + + load_schema(); + } + void init_environment() { int rc = mdbx_env_create(&env_); if(rc != 0) { @@ -148,28 +193,7 @@ class Filter { if(rc != 0) { throw std::runtime_error(std::string("Failed to open filter environment: ") + mdbx_strerror(rc)); } - - MDBX_txn* txn; - rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); - if(rc != 0) { - throw std::runtime_error(std::string("Failed to begin filter transaction: ") + mdbx_strerror(rc)); - } - - rc = mdbx_dbi_open(txn, nullptr, MDBX_CREATE, &dbi_); - if(rc != 0) { - mdbx_txn_abort(txn); - throw std::runtime_error(std::string("Failed to open filter database: ") + mdbx_strerror(rc)); - } - rc = mdbx_txn_commit(txn); - if(rc != 0) { - throw std::runtime_error(std::string("Failed to commit filter transaction: ") + mdbx_strerror(rc)); - } - - // Initialize Indices - numeric_index_ = std::make_unique(env_); - category_index_ = std::make_unique(env_); - - load_schema(); + init_dbis(); } static std::string format_filter_key(const std::string& field, const std::string& value) { @@ -179,14 +203,31 @@ class Filter { public: Filter(const std::string& path, const std::string& index_id) : index_id_(index_id), - path_(path) { - std::filesystem::create_directories(path); + path_(path), + schema_dbi_name_(), + owns_env_(true) { + if(::mkdir(path.c_str(), 0775) != 0 && errno != EEXIST) { + throw std::runtime_error("Failed to create filter directory: " + path); + } init_environment(); } + Filter(MDBX_env* env, + const std::string& index_id, + const std::string& schema_dbi_name = "filter_schema") : + env_(env), + index_id_(index_id), + path_(), + schema_dbi_name_(schema_dbi_name), + owns_env_(false) { + init_dbis(); + } + ~Filter() { mdbx_dbi_close(env_, dbi_); - mdbx_env_close(env_); + if(owns_env_) { + mdbx_env_close(env_); + } } // Compute the filter bitmap based on the provided JSON filter array @@ -393,12 +434,15 @@ class Filter { } // Optimized version to process filter JSON in batch - void add_filters_from_json_batch( + void add_filters_from_json_batch_txn( + MDBX_txn* txn, const std::vector>& id_filter_pairs) { if(id_filter_pairs.empty()) { return; } + bool schema_changed = false; + // Create a map to collect IDs for each filter std::unordered_map> filter_to_ids; @@ -421,13 +465,16 @@ class Filter { continue; } - if(!register_field_type(field, type)) { + bool field_added = false; + if(!register_field_type(field, type, &field_added)) { LOG_ERROR(1202, index_id_, "Type mismatch for field '" << field << "'"); continue; } + schema_changed = schema_changed || field_added; if(value.is_string()) { - std::string filter_key = format_filter_key(field, value.get()); + std::string filter_key = + format_filter_key(field, value.get()); filter_to_ids[filter_key].push_back(numeric_id); } else if(value.is_number()) { // Use Numeric Index for numbers @@ -437,17 +484,17 @@ class Filter { } else { sortable_val = ndd::filter::float_to_sortable(value.get()); } - numeric_index_->put(field, numeric_id, sortable_val); + numeric_index_->put(txn, field, numeric_id, sortable_val); } else if(value.is_boolean()) { std::string filter_key = format_filter_key(field, value.get() ? "1" : "0"); filter_to_ids[filter_key].push_back(numeric_id); } else { LOG_WARN(1203, - index_id_, - "Unsupported filter type for field '" << field - << "' in filter: " - << value.dump()); + index_id_, + "Unsupported filter type for field '" << field + << "' in filter: " + << value.dump()); } } } catch(const std::exception& e) { @@ -457,7 +504,43 @@ class Filter { // Process each filter with its batch of IDs for(const auto& [filter_key, ids] : filter_to_ids) { - add_to_filter_batch(filter_key, ids); + category_index_->add_batch_by_key(txn, filter_key, ids); + } + + if(schema_changed && !save_schema_internal(txn)) { + throw std::runtime_error("Failed to persist filter schema in batch mutation"); + } + } + + void add_filters_from_json_batch( + const std::vector>& id_filter_pairs) { + if(id_filter_pairs.empty()) { + return; + } + + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + LOG_ERROR(1212, + index_id_, + "Failed to begin batch filter write transaction: " << mdbx_strerror(rc)); + throw std::runtime_error("Failed to begin filter transaction: " + + std::string(mdbx_strerror(rc))); + } + + bool schema_changed = false; + + try { + add_filters_from_json_batch_txn(txn, id_filter_pairs); + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit filter batch transaction: " + + std::string(mdbx_strerror(rc))); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; } } @@ -470,63 +553,119 @@ class Filter { return category_index_->contains(field, value, numeric_id); } - void add_filters_from_json(ndd::idInt numeric_id, const std::string& filter_json) { - try { - auto j = nlohmann::json::parse(filter_json); - for(const auto& [field, value] : j.items()) { - FieldType type = FieldType::Unknown; - if(value.is_boolean()) { - type = FieldType::Bool; - } else if(value.is_number()) { - type = FieldType::Number; - } else if(value.is_string()) { - type = FieldType::String; - } + void add_filters_from_json_txn(MDBX_txn* txn, + ndd::idInt numeric_id, + const std::string& filter_json) { + bool schema_changed = false; - if(type == FieldType::Unknown) { - LOG_DEBUG("Unsupported filter type for field '" << field << "'"); - continue; - } + auto j = nlohmann::json::parse(filter_json); + for(const auto& [field, value] : j.items()) { + FieldType type = FieldType::Unknown; + if(value.is_boolean()) { + type = FieldType::Bool; + } else if(value.is_number()) { + type = FieldType::Number; + } else if(value.is_string()) { + type = FieldType::String; + } - if(!register_field_type(field, type)) { - LOG_ERROR(1205, index_id_, "Type mismatch for field '" << field << "'"); - continue; - } + if(type == FieldType::Unknown) { + LOG_DEBUG("Unsupported filter type for field '" << field << "'"); + continue; + } - if(value.is_string()) { - add_to_filter(field, value.get(), numeric_id); - } else if(value.is_number()) { - uint32_t sortable_val; - if(value.is_number_integer()) { - sortable_val = ndd::filter::int_to_sortable(value.get()); - } else { - sortable_val = ndd::filter::float_to_sortable(value.get()); - } - numeric_index_->put(field, numeric_id, sortable_val); - } else if(value.is_boolean()) { - add_to_filter(field, value.get() ? "1" : "0", numeric_id); + bool field_added = false; + if(!register_field_type(field, type, &field_added)) { + LOG_ERROR(1205, index_id_, "Type mismatch for field '" << field << "'"); + continue; + } + schema_changed = schema_changed || field_added; + + if(value.is_string()) { + category_index_->add(txn, field, value.get(), numeric_id); + } else if(value.is_number()) { + uint32_t sortable_val; + if(value.is_number_integer()) { + sortable_val = ndd::filter::int_to_sortable(value.get()); + } else { + sortable_val = ndd::filter::float_to_sortable(value.get()); } + numeric_index_->put(txn, field, numeric_id, sortable_val); + } else if(value.is_boolean()) { + category_index_->add(txn, field, value.get() ? "1" : "0", numeric_id); + } + } + + if(schema_changed && !save_schema_internal(txn)) { + throw std::runtime_error("Failed to persist filter schema"); + } + } + + void add_filters_from_json(ndd::idInt numeric_id, const std::string& filter_json) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + LOG_ERROR(1213, + index_id_, + "Failed to begin filter insert transaction: " << mdbx_strerror(rc)); + throw std::runtime_error("Failed to begin filter transaction: " + + std::string(mdbx_strerror(rc))); + } + + try { + add_filters_from_json_txn(txn, numeric_id, filter_json); + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit filter insert transaction: " + + std::string(mdbx_strerror(rc))); } } catch(const std::exception& e) { + mdbx_txn_abort(txn); LOG_ERROR(1206, index_id_, "Error adding filters: " << e.what()); + throw; + } + } + + void remove_filters_from_json_txn(MDBX_txn* txn, + ndd::idInt numeric_id, + const std::string& filter_json) { + auto j = nlohmann::json::parse(filter_json); + for(const auto& [field, value] : j.items()) { + if(value.is_string()) { + category_index_->remove(txn, field, value.get(), numeric_id); + } else if(value.is_number()) { + // Remove from Numeric Index + numeric_index_->remove(txn, field, numeric_id); + } else if(value.is_boolean()) { + category_index_->remove(txn, field, value.get() ? "1" : "0", numeric_id); + } } } void remove_filters_from_json(ndd::idInt numeric_id, const std::string& filter_json) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + LOG_ERROR(1214, + index_id_, + "Failed to begin filter delete transaction: " << mdbx_strerror(rc)); + throw std::runtime_error("Failed to begin filter transaction: " + + std::string(mdbx_strerror(rc))); + } + try { - auto j = nlohmann::json::parse(filter_json); - for(const auto& [field, value] : j.items()) { - if(value.is_string()) { - remove_from_filter(field, value.get(), numeric_id); - } else if(value.is_number()) { - // Remove from Numeric Index - numeric_index_->remove(field, numeric_id); - } else if(value.is_boolean()) { - remove_from_filter(field, value.get() ? "1" : "0", numeric_id); - } + remove_filters_from_json_txn(txn, numeric_id, filter_json); + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit filter delete transaction: " + + std::string(mdbx_strerror(rc))); } } catch(const std::exception& e) { + mdbx_txn_abort(txn); LOG_ERROR(1207, index_id_, "Error removing filters: " << e.what()); + throw; } } diff --git a/src/filter/numeric_index.hpp b/src/filter/numeric_index.hpp index c002652137..f900779907 100644 --- a/src/filter/numeric_index.hpp +++ b/src/filter/numeric_index.hpp @@ -250,34 +250,57 @@ namespace ndd { } } + void put(MDBX_txn* txn, const std::string& field, ndd::idInt id, uint32_t value) { + put_internal(txn, field, id, value); + } + void put(const std::string& field, ndd::idInt id, uint32_t value) { MDBX_txn* txn; - mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin numeric put transaction: " + + std::string(mdbx_strerror(rc))); + } try { put_internal(txn, field, id, value); - mdbx_txn_commit(txn); + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit numeric put transaction: " + + std::string(mdbx_strerror(rc))); + } } catch(...) { mdbx_txn_abort(txn); throw; } } + void remove(MDBX_txn* txn, const std::string& field, ndd::idInt id) { + std::string fwd_key_str = make_forward_key(field, id); + MDBX_val fwd_key{const_cast(fwd_key_str.data()), fwd_key_str.size()}; + MDBX_val fwd_val; + + if(mdbx_get(txn, forward_dbi_, &fwd_key, &fwd_val) == MDBX_SUCCESS) { + uint32_t old_val; + std::memcpy(&old_val, fwd_val.iov_base, sizeof(uint32_t)); + remove_from_buckets(txn, field, old_val, id); + mdbx_del(txn, forward_dbi_, &fwd_key, nullptr); + } + } + void remove(const std::string& field, ndd::idInt id) { MDBX_txn* txn; - mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin numeric remove transaction: " + + std::string(mdbx_strerror(rc))); + } try { - std::string fwd_key_str = make_forward_key(field, id); - MDBX_val fwd_key{const_cast(fwd_key_str.data()), fwd_key_str.size()}; - MDBX_val fwd_val; - - if(mdbx_get(txn, forward_dbi_, &fwd_key, &fwd_val) == MDBX_SUCCESS) { - uint32_t old_val; - std::memcpy(&old_val, fwd_val.iov_base, sizeof(uint32_t)); - remove_from_buckets(txn, field, old_val, id); - mdbx_del(txn, forward_dbi_, &fwd_key, nullptr); + remove(txn, field, id); + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit numeric remove transaction: " + + std::string(mdbx_strerror(rc))); } - - mdbx_txn_commit(txn); } catch(...) { mdbx_txn_abort(txn); throw; diff --git a/src/sparse/sparse_storage.hpp b/src/sparse/sparse_storage.hpp index d4bea62e5d..e1f0c47806 100644 --- a/src/sparse/sparse_storage.hpp +++ b/src/sparse/sparse_storage.hpp @@ -27,7 +27,20 @@ namespace ndd { db_path_(db_path), index_id_(index_id), sparse_model_(sparse_model), - env_(nullptr) { + env_(nullptr), + owns_env_(true) { + sparse_index_ = nullptr; + } + + SparseVectorStorage(MDBX_env* env, + const std::string& index_id, + ndd::SparseScoringModel sparse_model = + ndd::SparseScoringModel::DEFAULT) : + db_path_(), + index_id_(index_id), + sparse_model_(sparse_model), + env_(env), + owns_env_(false) { sparse_index_ = nullptr; } @@ -35,8 +48,14 @@ namespace ndd { // Initialize storage bool initialize() { - if(!initializeMDBX()) { - return false; + if(owns_env_) { + if(!initializeMDBX()) { + return false; + } + } else { + if(!initializeDBIs()) { + return false; + } } sparse_index_ = @@ -222,6 +241,76 @@ namespace ndd { return txn->commit(); } + bool store_vectors_batch_txn( + MDBX_txn* txn, + const std::vector>& batch, + int64_t* vector_count_delta_out = nullptr) { + std::unique_lock lock(mutex_); + int64_t delta = 0; + + for(const auto& [doc_id, sparse_vec] : batch) { + const auto existing_vec = getVectorInternal(txn, doc_id); + const bool had_sparse_terms = existing_vec.has_value() && !existing_vec->empty(); + const bool has_sparse_terms = !sparse_vec.empty(); + + if(had_sparse_terms + && !sparse_index_->removeDocument(txn, doc_id, *existing_vec)) { + return false; + } + + if(existing_vec.has_value() && !deleteVectorInternal(txn, doc_id)) { + return false; + } + + if(has_sparse_terms) { + if(!storeVectorInternal(txn, doc_id, sparse_vec)) { + return false; + } + + if(!sparse_index_->addDocumentsBatch(txn, {{doc_id, sparse_vec}})) { + return false; + } + } + + delta += static_cast(has_sparse_terms) + - static_cast(had_sparse_terms); + } + + if(vector_count_delta_out) { + *vector_count_delta_out = delta; + } + return true; + } + + bool delete_vector_txn(MDBX_txn* txn, + ndd::idInt doc_id, + int64_t* vector_count_delta_out = nullptr) { + std::unique_lock lock(mutex_); + + auto vec = getVectorInternal(txn, doc_id); + if(!vec) { + LOG_WARN(2242, index_id_, "delete_vector_txn could not find doc_id=" << doc_id); + return false; + } + + const bool had_sparse_terms = !vec->empty(); + if(had_sparse_terms && !sparse_index_->removeDocument(txn, doc_id, *vec)) { + return false; + } + + if(!deleteVectorInternal(txn, doc_id)) { + return false; + } + + if(vector_count_delta_out) { + *vector_count_delta_out = had_sparse_terms ? -1 : 0; + } + return true; + } + + void apply_vector_count_delta(int64_t delta) { applyVectorCountDelta(delta); } + bool uses_shared_env() const { return !owns_env_; } + /*NOT BEING USED FOR NOW*/ #if 0 bool delete_vectors_batch(const std::vector& doc_ids) { @@ -254,6 +343,7 @@ namespace ndd { std::string index_id_; ndd::SparseScoringModel sparse_model_; MDBX_env* env_; + bool owns_env_; MDBX_dbi docs_dbi_; std::unique_ptr sparse_index_; @@ -263,6 +353,31 @@ namespace ndd { std::unordered_set deleted_docs_; // Helper methods + bool initializeDBIs() { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != 0) { + LOG_ERROR(2250, index_id_, "mdbx_txn_begin failed: " << mdbx_strerror(rc)); + return false; + } + + rc = mdbx_dbi_open(txn, "sparse_docs", MDBX_CREATE | MDBX_INTEGERKEY, &docs_dbi_); + if(rc != 0) { + LOG_ERROR(2251, + index_id_, + "mdbx_dbi_open failed for sparse_docs: " << mdbx_strerror(rc)); + mdbx_txn_abort(txn); + return false; + } + + rc = mdbx_txn_commit(txn); + if(rc != 0) { + LOG_ERROR(2252, index_id_, "mdbx_txn_commit failed: " << mdbx_strerror(rc)); + return false; + } + return true; + } + bool initializeMDBX() { int rc = mdbx_env_create(&env_); if(rc != 0) { @@ -302,30 +417,11 @@ namespace ndd { return false; } - MDBX_txn* txn; - rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); - if(rc != 0) { - LOG_ERROR(2250, index_id_, "mdbx_txn_begin failed: " << mdbx_strerror(rc)); - return false; - } - - rc = mdbx_dbi_open(txn, "sparse_docs", MDBX_CREATE | MDBX_INTEGERKEY, &docs_dbi_); - if(rc != 0) { - LOG_ERROR(2251, index_id_, "mdbx_dbi_open failed for sparse_docs: " << mdbx_strerror(rc)); - mdbx_txn_abort(txn); - return false; - } - - rc = mdbx_txn_commit(txn); - if(rc != 0) { - LOG_ERROR(2252, index_id_, "mdbx_txn_commit failed: " << mdbx_strerror(rc)); - return false; - } - return true; + return initializeDBIs(); } void closeMDBX() { - if(env_) { + if(owns_env_ && env_) { mdbx_env_close(env_); env_ = nullptr; } diff --git a/src/storage/id_mapper.hpp b/src/storage/id_mapper.hpp index cda3109702..05cd138fe4 100644 --- a/src/storage/id_mapper.hpp +++ b/src/storage/id_mapper.hpp @@ -20,7 +20,9 @@ class IDMapper { public: IDMapper(const std::string& path, bool is_new = false, UserType user_type = UserType::Admin) : path_(path), - user_type_(user_type) { + user_type_(user_type), + owns_env_(true), + dbi_name_() { if(is_new) { std::filesystem::create_directories(path); } @@ -74,16 +76,157 @@ class IDMapper { } } + IDMapper(MDBX_env* env, + const std::string& dbi_name, + bool is_new = false, + UserType user_type = UserType::Admin) : + env_(env), + dbi_(0), + path_(), + user_type_(user_type), + owns_env_(false), + dbi_name_(dbi_name) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to begin transaction: ") + + mdbx_strerror(rc)); + } + + rc = mdbx_dbi_open(txn, dbi_name_.c_str(), MDBX_CREATE, &dbi_); + if(rc != MDBX_SUCCESS) { + mdbx_txn_abort(txn); + throw std::runtime_error(std::string("Failed to open database: ") + mdbx_strerror(rc)); + } + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to commit transaction: ") + + mdbx_strerror(rc)); + } + + if(is_new) { + init_next_id(); + } + } + ~IDMapper() { mdbx_dbi_close(env_, dbi_); - mdbx_env_close(env_); + if(owns_env_) { + mdbx_env_close(env_); + } } // Create string ID to numeric ID mapping. If string ids exists in the database, it will return // the existing numeric ID along with flag It will also use old numeric IDs of deleted points + template + std::vector> + create_ids_batch_txn(MDBX_txn* txn, const std::vector& str_ids) { + if(str_ids.empty()) { + return {}; + } + + constexpr idInt INVALID_LABEL = static_cast(-1); + std::vector> id_tuples; + id_tuples.reserve(str_ids.size()); + for(const auto& str_id : str_ids) { + id_tuples.emplace_back(str_id, INVALID_LABEL, true, false); + } + + for(auto& tup : id_tuples) { + const std::string& str_id = std::get<0>(tup); + MDBX_val key{(void*)str_id.c_str(), str_id.size()}; + MDBX_val data; + + int rc = mdbx_get(txn, dbi_, &key, &data); + if(rc == MDBX_SUCCESS) { + idInt existing_id = *(idInt*)data.iov_base; + std::get<1>(tup) = existing_id; + std::get<2>(tup) = false; + } else if(rc == MDBX_NOTFOUND) { + std::get<1>(tup) = 0; + } else { + throw std::runtime_error("Database error checking ID: " + + std::string(mdbx_strerror(rc))); + } + } + + size_t total_new_ids_needed = + std::count_if(id_tuples.begin(), id_tuples.end(), [](const auto& t) { + return std::get<1>(t) == 0; + }); + + size_t fresh_ids_count = total_new_ids_needed; + size_t deleted_index = 0; + + if(use_deleted_ids) { + std::vector deletedIds = getDeletedIds_txn(txn, fresh_ids_count); + + for(auto& tup : id_tuples) { + if(std::get<1>(tup) == 0 && std::get<2>(tup) == true + && deleted_index < deletedIds.size()) { + std::get<1>(tup) = deletedIds[deleted_index++]; + std::get<3>(tup) = true; + } + } + fresh_ids_count -= deleted_index; + } + + std::vector new_ids; + if(fresh_ids_count > 0) { + new_ids = get_next_ids_txn(txn, fresh_ids_count); + } + + size_t new_id_index = 0; + for(auto& tup : id_tuples) { + if(std::get<2>(tup) == true && std::get<1>(tup) != 0) { + const std::string& str_id = std::get<0>(tup); + idInt id = std::get<1>(tup); + + MDBX_val key{(void*)str_id.c_str(), str_id.size()}; + MDBX_val data{&id, sizeof(idInt)}; + + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to insert IDs: " + + std::string(mdbx_strerror(rc))); + } + } else if(std::get<1>(tup) == 0) { + if(new_id_index >= new_ids.size()) { + throw std::runtime_error("Mismatch in generated ID count"); + } + idInt new_id = new_ids[new_id_index++]; + const std::string& str_id = std::get<0>(tup); + + MDBX_val key{(void*)str_id.c_str(), str_id.size()}; + MDBX_val data{&new_id, sizeof(idInt)}; + + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to insert IDs: " + + std::string(mdbx_strerror(rc))); + } + + std::get<1>(tup) = new_id; + } + } + + std::vector> result; + result.reserve(id_tuples.size()); + for(const auto& tup : id_tuples) { + bool is_new_to_hnsw = std::get<2>(tup); + if(std::get<3>(tup)) { + is_new_to_hnsw = false; + } + result.emplace_back(std::get<1>(tup), is_new_to_hnsw); + } + return result; + } + template std::vector> create_ids_batch(const std::vector& str_ids, void* wal_ptr = nullptr) { + (void)wal_ptr; if(str_ids.empty()) { return {}; } @@ -194,28 +337,6 @@ class IDMapper { new_ids = get_next_ids(fresh_ids_count); } - // CRITICAL FIX: Log to WAL AFTER generating IDs (minimal risk window) - if(wal_ptr) { - WriteAheadLog* wal = static_cast(wal_ptr); - std::vector wal_entries; - - // Log reused IDs - for(const auto& tup : id_tuples) { - if(std::get<2>(tup) && std::get<1>(tup) != 0) { - wal_entries.push_back({WALOperationType::VECTOR_ADD, std::get<1>(tup)}); - } - } - - // Log fresh IDs - for(idInt id : new_ids) { - wal_entries.push_back({WALOperationType::VECTOR_ADD, id}); - } - - if(!wal_entries.empty()) { - wal->log(wal_entries); - } - } - if(fresh_ids_count > 0 && new_ids.size() != fresh_ids_count) { throw std::runtime_error("Mismatch: get_next_ids returned " + std::to_string(new_ids.size()) + " but expected " @@ -352,6 +473,18 @@ class IDMapper { } // Get ID for a string (returns 0 if not found) + idInt get_id_txn(MDBX_txn* txn, const std::string& str_id) const { + MDBX_val key, data; + key.iov_len = str_id.size(); + key.iov_base = (void*)str_id.c_str(); + + int rc = mdbx_get(txn, dbi_, &key, &data); + if(rc == MDBX_SUCCESS) { + return *(idInt*)data.iov_base; + } + return 0; + } + idInt get_id(const std::string& str_id) const { LOG_DEBUG("=== get_id START for: [" << str_id << "] size: " << str_id.size() << " ==="); @@ -388,6 +521,52 @@ class IDMapper { return 0; // Not found } + std::vector deletePoints_txn(MDBX_txn* txn, + const std::vector& external_ids) { + std::vector deleted_ids; + + MDBX_val key, data; + for(const auto& ext_id : external_ids) { + key.iov_len = ext_id.size(); + key.iov_base = const_cast(ext_id.data()); + + if(mdbx_get(txn, dbi_, &key, &data) == MDBX_SUCCESS) { + idInt label = *reinterpret_cast(data.iov_base); + deleted_ids.push_back(label); + mdbx_del(txn, dbi_, &key, nullptr); + } else { + deleted_ids.push_back(0); + } + } + + if(!deleted_ids.empty()) { + std::string del_key = DELETED_IDS_KEY; + MDBX_val del_mdb_key, del_mdb_val; + + del_mdb_key.iov_len = del_key.size(); + del_mdb_key.iov_base = const_cast(del_key.data()); + + std::vector existing; + if(mdbx_get(txn, dbi_, &del_mdb_key, &del_mdb_val) == MDBX_SUCCESS) { + size_t count = del_mdb_val.iov_len / sizeof(idInt); + idInt* raw = reinterpret_cast(del_mdb_val.iov_base); + existing.insert(existing.end(), raw, raw + count); + } + + for(idInt l : deleted_ids) { + if(l != 0) { + existing.push_back(l); + } + } + + del_mdb_val.iov_len = existing.size() * sizeof(idInt); + del_mdb_val.iov_base = existing.data(); + mdbx_put(txn, dbi_, &del_mdb_key, &del_mdb_val, MDBX_UPSERT); + } + + return deleted_ids; + } + // Deletes mapping from string_id to numeric_id, append to DELETED_IDS_KEY // Returns the deleted numeric_ids, if strings is not found, returns 0 std::vector deletePoints(const std::vector& external_ids) { @@ -489,11 +668,15 @@ class IDMapper { // It will grow automatically as needed via compact operations } + MDBX_env* get_env() const { return env_; } + private: MDBX_env* env_; MDBX_dbi dbi_; std::string path_; UserType user_type_; + bool owns_env_; + std::string dbi_name_; mutable std::mutex mutex_; // Only used for next_id management // Along with string:number pairs, the database also stores a key for next_id. They key for next // id also has random alphanumeric characters to avoid collision with other keys. The key is @@ -502,6 +685,65 @@ class IDMapper { static const std::string DELETED_IDS_KEY; // Atomic operation to get and increment next_ids + std::vector get_next_ids_txn(MDBX_txn* txn, size_t size = 1) { + std::lock_guard lock(mutex_); + + MDBX_val key{(void*)NEXT_ID_KEY.c_str(), NEXT_ID_KEY.size()}; + MDBX_val data; + idInt current_id = 0; + + int rc = mdbx_get(txn, dbi_, &key, &data); + if(rc == MDBX_SUCCESS) { + current_id = *(idInt*)data.iov_base; + } else if(rc != MDBX_NOTFOUND) { + throw std::runtime_error(std::string("Failed to get next_id: ") + mdbx_strerror(rc)); + } + + idInt next_id = current_id + size; + data.iov_len = sizeof(idInt); + data.iov_base = &next_id; + + rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to store next_id: ") + + mdbx_strerror(rc)); + } + + std::vector ids(size); + std::iota(ids.begin(), ids.end(), current_id); + return ids; + } + + std::vector getDeletedIds_txn(MDBX_txn* txn, size_t max_count) { + std::vector result; + + std::string del_key = DELETED_IDS_KEY; + MDBX_val key, val; + key.iov_len = del_key.size(); + key.iov_base = const_cast(del_key.data()); + + if(mdbx_get(txn, dbi_, &key, &val) != MDBX_SUCCESS) { + return result; + } + + size_t total = val.iov_len / sizeof(idInt); + idInt* raw = reinterpret_cast(val.iov_base); + + size_t count = std::min(max_count, total); + result.insert(result.end(), raw, raw + count); + + if(count < total) { + MDBX_val new_val; + new_val.iov_len = (total - count) * sizeof(idInt); + new_val.iov_base = raw + count; + mdbx_put(txn, dbi_, &key, &new_val, MDBX_UPSERT); + } else { + mdbx_del(txn, dbi_, &key, nullptr); + } + + return result; + } + std::vector get_next_ids(size_t size = 1) { std::lock_guard lock(mutex_); MDBX_txn* txn; diff --git a/src/storage/vector_storage.hpp b/src/storage/vector_storage.hpp index 8ca7f56ab9..4e3f5ed122 100644 --- a/src/storage/vector_storage.hpp +++ b/src/storage/vector_storage.hpp @@ -14,6 +14,59 @@ #include #include +class SharedIndexEnv { +private: + MDBX_env* env_; + +public: + explicit SharedIndexEnv(const std::string& env_path) : + env_(nullptr) { + std::filesystem::create_directories(env_path); + + int rc = mdbx_env_create(&env_); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to create shared MDBX env: ") + + mdbx_strerror(rc)); + } + + rc = mdbx_env_set_geometry(env_, + -1, + 1ULL << settings::VECTOR_MAP_SIZE_BITS, + 1ULL << settings::VECTOR_MAP_SIZE_MAX_BITS, + 1ULL << settings::VECTOR_MAP_SIZE_BITS, + -1, + -1); + if(rc != MDBX_SUCCESS) { + mdbx_env_close(env_); + throw std::runtime_error(std::string("Failed to set shared env geometry: ") + + mdbx_strerror(rc)); + } + + rc = mdbx_env_set_maxdbs(env_, 32); + if(rc != MDBX_SUCCESS) { + mdbx_env_close(env_); + throw std::runtime_error(std::string("Failed to set shared env maxdbs: ") + + mdbx_strerror(rc)); + } + + rc = mdbx_env_open( + env_, env_path.c_str(), MDBX_WRITEMAP | MDBX_MAPASYNC | MDBX_NORDAHEAD, 0664); + if(rc != MDBX_SUCCESS) { + mdbx_env_close(env_); + throw std::runtime_error(std::string("Failed to open shared MDBX env: ") + + mdbx_strerror(rc)); + } + } + + ~SharedIndexEnv() { + if(env_) { + mdbx_env_close(env_); + } + } + + MDBX_env* get() const { return env_; } +}; + // Handles vector storage class VectorStore { private: @@ -21,10 +74,33 @@ class VectorStore { MDBX_dbi dbi_; std::string index_id_; std::string path_; + std::string dbi_name_; + bool owns_env_; size_t vector_dim_; ndd::quant::QuantizationLevel quant_level_; size_t bytes_per_vector_; + void init_dbi() { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to begin transaction: ") + + mdbx_strerror(rc)); + } + + const char* db_name = dbi_name_.empty() ? settings::DEFAULT_SUBINDEX.c_str() : dbi_name_.c_str(); + rc = mdbx_dbi_open(txn, db_name, MDBX_CREATE | MDBX_INTEGERKEY, &dbi_); + if(rc != MDBX_SUCCESS) { + mdbx_txn_abort(txn); + throw std::runtime_error(std::string("Failed to open database: ") + mdbx_strerror(rc)); + } + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit transaction: " + std::string(mdbx_strerror(rc))); + } + } + void init_environment() { int rc = mdbx_env_create(&env_); if(rc != MDBX_SUCCESS) { @@ -52,24 +128,7 @@ class VectorStore { throw std::runtime_error(std::string("Failed to open environment: ") + mdbx_strerror(rc)); } - - MDBX_txn* txn; - rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); - if(rc != MDBX_SUCCESS) { - throw std::runtime_error(std::string("Failed to begin transaction: ") + mdbx_strerror(rc)); - } - - rc = mdbx_dbi_open(txn, settings::DEFAULT_SUBINDEX.c_str(), MDBX_CREATE | MDBX_INTEGERKEY, &dbi_); - if(rc != MDBX_SUCCESS) { - mdbx_txn_abort(txn); - throw std::runtime_error(std::string("Failed to open database: ") + mdbx_strerror(rc)); - } - - rc = mdbx_txn_commit(txn); - if(rc != MDBX_SUCCESS) { - throw std::runtime_error("Failed to commit transaction: " - + std::string(mdbx_strerror(rc))); - } + init_dbi(); } public: @@ -79,6 +138,8 @@ class VectorStore { const std::string& index_id) : index_id_(index_id), path_(path), + dbi_name_(), + owns_env_(true), vector_dim_(vector_dim), quant_level_(quant_level) { bytes_per_vector_ = @@ -87,9 +148,28 @@ class VectorStore { init_environment(); } + VectorStore(MDBX_env* env, + size_t vector_dim, + ndd::quant::QuantizationLevel quant_level, + const std::string& index_id, + const std::string& dbi_name) : + env_(env), + index_id_(index_id), + path_(), + dbi_name_(dbi_name), + owns_env_(false), + vector_dim_(vector_dim), + quant_level_(quant_level) { + bytes_per_vector_ = + ndd::quant::get_quantizer_dispatch(quant_level_).get_storage_size(vector_dim); + init_dbi(); + } + ~VectorStore() { mdbx_dbi_close(env_, dbi_); - mdbx_env_close(env_); + if(owns_env_) { + mdbx_env_close(env_); + } } // Nested Cursor struct @@ -256,6 +336,28 @@ class VectorStore { } // Batch operations with raw bytes + void store_vectors_batch_txn( + MDBX_txn* txn, + const std::vector>>& batch) { + if(batch.empty()) { + return; + } + + for(const auto& [numeric_id, vector_bytes] : batch) { + if(vector_bytes.size() != bytes_per_vector_) { + throw std::runtime_error("Vector byte size mismatch"); + } + + MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; + MDBX_val data{const_cast(vector_bytes.data()), vector_bytes.size()}; + + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to store vector: ") + mdbx_strerror(rc)); + } + } + } + void store_vectors_batch(const std::vector>>& batch) { if(batch.empty()) { @@ -270,34 +372,17 @@ class VectorStore { } }; - auto write_batch = [&](MDBX_txn* txn) -> int { - for(const auto& [numeric_id, vector_bytes] : batch) { - if(vector_bytes.size() != bytes_per_vector_) { - throw std::runtime_error("Vector byte size mismatch"); - } - - MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; - MDBX_val data{const_cast(vector_bytes.data()), vector_bytes.size()}; - - int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); - if(rc != MDBX_SUCCESS) { - return rc; - } - } - return MDBX_SUCCESS; - }; - MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); if(rc != MDBX_SUCCESS) { throw std::runtime_error(std::string("Failed to begin transaction: ") + mdbx_strerror(rc)); } - rc = write_batch(txn); - // MDBX auto-grows, no manual resize needed - if(rc != MDBX_SUCCESS) { + try { + store_vectors_batch_txn(txn, batch); + } catch(...) { mdbx_txn_abort(txn); - throw std::runtime_error(std::string("Failed to store vector: ") + mdbx_strerror(rc)); + throw; } try_commit(txn); @@ -339,6 +424,15 @@ class VectorStore { } } + void remove_txn(MDBX_txn* txn, ndd::idInt numeric_id) { + MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; + + int rc = mdbx_del(txn, dbi_, &key, nullptr); + if(rc != MDBX_SUCCESS && rc != MDBX_NOTFOUND) { + throw std::runtime_error(std::string("Failed to delete vector data: ") + mdbx_strerror(rc)); + } + } + void remove(ndd::idInt numeric_id) { MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); @@ -347,12 +441,7 @@ class VectorStore { } try { - MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; - - rc = mdbx_del(txn, dbi_, &key, nullptr); - if(rc != MDBX_SUCCESS && rc != MDBX_NOTFOUND) { - throw std::runtime_error(std::string("Failed to delete vector data: ") + mdbx_strerror(rc)); - } + remove_txn(txn, numeric_id); rc = mdbx_txn_commit(txn); if(rc != MDBX_SUCCESS) { @@ -379,6 +468,29 @@ class MetaStore { MDBX_env* env_; MDBX_dbi dbi_; std::string path_; + std::string dbi_name_; + bool owns_env_; + + void init_dbi() { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to begin transaction: ") + mdbx_strerror(rc)); + } + + const char* db_name = dbi_name_.empty() ? nullptr : dbi_name_.c_str(); + rc = mdbx_dbi_open(txn, db_name, MDBX_CREATE | MDBX_INTEGERKEY, &dbi_); + if(rc != MDBX_SUCCESS) { + mdbx_txn_abort(txn); + throw std::runtime_error(std::string("Failed to open database: ") + mdbx_strerror(rc)); + } + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit transaction: " + + std::string(mdbx_strerror(rc))); + } + } void init_environment() { int rc = mdbx_env_create(&env_); @@ -407,36 +519,51 @@ class MetaStore { // throw std::runtime_error("Failed to open environment"); throw std::runtime_error(std::string("Failed to open environment: ") + mdbx_strerror(rc)); } - - MDBX_txn* txn; - rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); - if(rc != MDBX_SUCCESS) { - throw std::runtime_error(std::string("Failed to begin transaction: ") + mdbx_strerror(rc)); - } - - rc = mdbx_dbi_open(txn, nullptr, MDBX_CREATE | MDBX_INTEGERKEY, &dbi_); - if(rc != MDBX_SUCCESS) { - mdbx_txn_abort(txn); - throw std::runtime_error(std::string("Failed to open database: ") + mdbx_strerror(rc)); - } - - rc = mdbx_txn_commit(txn); - if(rc != MDBX_SUCCESS) { - throw std::runtime_error("Failed to commit transaction: " - + std::string(mdbx_strerror(rc))); - } + init_dbi(); } public: MetaStore(const std::string& path) : - path_(path) { + path_(path), + dbi_name_(), + owns_env_(true) { std::filesystem::create_directories(path); init_environment(); } + MetaStore(MDBX_env* env, const std::string& dbi_name) : + env_(env), + path_(), + dbi_name_(dbi_name), + owns_env_(false) { + init_dbi(); + } + ~MetaStore() { mdbx_dbi_close(env_, dbi_); - mdbx_env_close(env_); + if(owns_env_) { + mdbx_env_close(env_); + } + } + + void store_meta_batch_txn(MDBX_txn* txn, + const std::vector>& batch) { + if(batch.empty()) { + return; + } + + for(const auto& [numeric_id, meta] : batch) { + msgpack::sbuffer sbuf; + msgpack::pack(sbuf, meta); + + MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; + MDBX_val data{const_cast(sbuf.data()), sbuf.size()}; + + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to store meta: ") + mdbx_strerror(rc)); + } + } } void store_meta_batch(const std::vector>& batch) { @@ -452,39 +579,36 @@ class MetaStore { } }; - auto write_batch = [&](MDBX_txn* txn) -> int { - for(const auto& [numeric_id, meta] : batch) { - msgpack::sbuffer sbuf; - msgpack::pack(sbuf, meta); - - MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; - MDBX_val data{const_cast(sbuf.data()), sbuf.size()}; - - int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); - if(rc != MDBX_SUCCESS) { - return rc; - } - } - return MDBX_SUCCESS; - }; - MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); if(rc != MDBX_SUCCESS) { throw std::runtime_error(std::string("Failed to begin transaction: ") + mdbx_strerror(rc)); } - rc = write_batch(txn); - // MDBX auto-grows, no manual resize needed - if(rc != MDBX_SUCCESS) { + try { + store_meta_batch_txn(txn, batch); + } catch(...) { mdbx_txn_abort(txn); - throw std::runtime_error(std::string("Failed to store meta: ") + mdbx_strerror(rc)); + throw; } try_commit(txn); } void store_meta(ndd::idInt id, const ndd::VectorMeta& meta) { store_meta_batch({{id, meta}}); } + ndd::VectorMeta get_meta_txn(MDBX_txn* txn, ndd::idInt numeric_id) const { + MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; + MDBX_val data; + + int rc = mdbx_get(txn, dbi_, &key, &data); + if(rc == MDBX_NOTFOUND) { + throw std::runtime_error("Meta not found"); + } + + auto oh = msgpack::unpack(reinterpret_cast(data.iov_base), data.iov_len); + return oh.get().as(); + } + ndd::VectorMeta get_meta(ndd::idInt numeric_id) const { MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_RDONLY, &txn); @@ -493,16 +617,7 @@ class MetaStore { } try { - MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; - MDBX_val data; - - rc = mdbx_get(txn, dbi_, &key, &data); - if(rc == MDBX_NOTFOUND) { - mdbx_txn_abort(txn); - throw std::runtime_error("Meta not found"); - } - auto oh = msgpack::unpack(reinterpret_cast(data.iov_base), data.iov_len); - auto meta = oh.get().as(); + auto meta = get_meta_txn(txn, numeric_id); mdbx_txn_abort(txn); return meta; } catch(...) { @@ -511,6 +626,15 @@ class MetaStore { } } + void remove_txn(MDBX_txn* txn, ndd::idInt numeric_id) { + MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; + + int rc = mdbx_del(txn, dbi_, &key, nullptr); + if(rc != MDBX_SUCCESS && rc != MDBX_NOTFOUND) { + throw std::runtime_error(std::string("Failed to delete metadata: ") + mdbx_strerror(rc)); + } + } + void remove(ndd::idInt numeric_id) { MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); @@ -519,12 +643,7 @@ class MetaStore { } try { - MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; - - rc = mdbx_del(txn, dbi_, &key, nullptr); - if(rc != MDBX_SUCCESS && rc != MDBX_NOTFOUND) { - throw std::runtime_error(std::string("Failed to delete metadata: ") + mdbx_strerror(rc)); - } + remove_txn(txn, numeric_id); rc = mdbx_txn_commit(txn); if(rc != MDBX_SUCCESS) { @@ -541,6 +660,8 @@ class MetaStore { class VectorStorage { private: std::string index_id_; + bool use_shared_env_; + std::unique_ptr shared_env_; std::unique_ptr vector_store_; std::unique_ptr meta_store_; @@ -551,11 +672,26 @@ class VectorStorage { const std::string& index_id, size_t vector_dim, ndd::quant::QuantizationLevel quant_level) : - index_id_(index_id) { - vector_store_ = std::make_unique( + index_id_(index_id), + use_shared_env_(false) { + const bool legacy_split_layout = std::filesystem::exists(base_path + "/meta") + || std::filesystem::exists(base_path + "/filters"); + + if(legacy_split_layout) { + vector_store_ = std::make_unique( base_path + "/vectors", vector_dim, quant_level, index_id_); - meta_store_ = std::make_unique(base_path + "/meta"); - filter_store_ = std::make_unique(base_path + "/filters", index_id_); + meta_store_ = std::make_unique(base_path + "/meta"); + filter_store_ = std::make_unique(base_path + "/filters", index_id_); + } else { + use_shared_env_ = true; + shared_env_ = std::make_unique(base_path + "/vectors"); + MDBX_env* env = shared_env_->get(); + + vector_store_ = std::make_unique( + env, vector_dim, quant_level, index_id_, settings::DEFAULT_SUBINDEX); + meta_store_ = std::make_unique(env, "vector_meta"); + filter_store_ = std::make_unique(env, index_id_, "filter_schema"); + } } VectorStore::Cursor getCursor() { return vector_store_->getCursor(); } // Get numeric ids of matching filters @@ -683,6 +819,45 @@ class VectorStorage { // Optimized batch operation using pre-quantized QuantVectorObject // This avoids double quantization by using already quantized data + void store_vectors_batch_txn( + MDBX_txn* txn, + const std::vector>& vectors) { + if(vectors.empty()) { + return; + } + + std::vector>> vector_batch; + std::vector> meta_batch; + std::vector> filter_batch; + + vector_batch.reserve(vectors.size()); + meta_batch.reserve(vectors.size()); + filter_batch.reserve(vectors.size()); + + for(const auto& [numeric_id, quant_obj] : vectors) { + std::vector vector_bytes = quant_obj.quant_vector; + + ndd::VectorMeta meta; + meta.id = quant_obj.id; + meta.filter = quant_obj.filter; + meta.meta = quant_obj.meta; + meta.norm = quant_obj.norm; + + vector_batch.emplace_back(numeric_id, std::move(vector_bytes)); + meta_batch.emplace_back(numeric_id, std::move(meta)); + + if(!quant_obj.filter.empty()) { + filter_batch.emplace_back(numeric_id, quant_obj.filter); + } + } + + vector_store_->store_vectors_batch_txn(txn, vector_batch); + meta_store_->store_meta_batch_txn(txn, meta_batch); + if(!filter_batch.empty()) { + filter_store_->add_filters_from_json_batch_txn(txn, filter_batch); + } + } + void store_vectors_batch(const std::vector>& vectors) { if(vectors.empty()) { return; @@ -717,13 +892,39 @@ class VectorStorage { } } - // Store vectors and metadata in single transactions - vector_store_->store_vectors_batch(vector_batch); - meta_store_->store_meta_batch(meta_batch); + if(use_shared_env_ && shared_env_) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(shared_env_->get(), nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to begin shared write transaction: ") + + mdbx_strerror(rc)); + } + + try { + vector_store_->store_vectors_batch_txn(txn, vector_batch); + meta_store_->store_meta_batch_txn(txn, meta_batch); + if(!filter_batch.empty()) { + filter_store_->add_filters_from_json_batch_txn(txn, filter_batch); + } - // Process filter data in batch if any - if(!filter_batch.empty()) { - filter_store_->add_filters_from_json_batch(filter_batch); + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to commit shared write transaction: ") + + mdbx_strerror(rc)); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + } else { + // Legacy split-env mode + vector_store_->store_vectors_batch(vector_batch); + meta_store_->store_meta_batch(meta_batch); + + // Process filter data in batch if any + if(!filter_batch.empty()) { + filter_store_->add_filters_from_json_batch(filter_batch); + } } } @@ -749,19 +950,74 @@ class VectorStorage { return meta_store_->get_meta(numeric_id); } + ndd::VectorMeta get_meta_txn(MDBX_txn* txn, ndd::idInt numeric_id) const { + return meta_store_->get_meta_txn(txn, numeric_id); + } + + bool uses_shared_env() const { return use_shared_env_ && shared_env_ != nullptr; } + MDBX_env* shared_env() const { return shared_env_ ? shared_env_->get() : nullptr; } + + void deletePoint_txn(MDBX_txn* txn, ndd::idInt numeric_id) { + auto meta = meta_store_->get_meta_txn(txn, numeric_id); + if(!meta.filter.empty()) { + filter_store_->remove_filters_from_json_txn(txn, numeric_id, meta.filter); + } + vector_store_->remove_txn(txn, numeric_id); + meta_store_->remove_txn(txn, numeric_id); + } + + void updateFilter_txn(MDBX_txn* txn, + ndd::idInt numeric_id, + const std::string& new_filter_json) { + auto meta = meta_store_->get_meta_txn(txn, numeric_id); + + if(!meta.filter.empty()) { + filter_store_->remove_filters_from_json_txn(txn, numeric_id, meta.filter); + } + + meta.filter = new_filter_json; + meta_store_->store_meta_batch_txn(txn, {{numeric_id, meta}}); + + if(!new_filter_json.empty()) { + filter_store_->add_filters_from_json_txn(txn, numeric_id, new_filter_json); + } + } + // NOT used anymore. Deletes filter, meta and vector data. void deletePoint(ndd::idInt numeric_id) { try { - // Get metadata first to get filter info - auto meta = meta_store_->get_meta(numeric_id); + if(use_shared_env_ && shared_env_) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(shared_env_->get(), nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to begin shared delete transaction: ") + + mdbx_strerror(rc)); + } - // Remove filter entries if they exist - if(!meta.filter.empty()) { - filter_store_->remove_filters_from_json(numeric_id, meta.filter); + try { + deletePoint_txn(txn, numeric_id); + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to commit shared delete transaction: ") + + mdbx_strerror(rc)); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + } else { + // Get metadata first to get filter info + auto meta = meta_store_->get_meta(numeric_id); + + // Remove filter entries if they exist + if(!meta.filter.empty()) { + filter_store_->remove_filters_from_json(numeric_id, meta.filter); + } + // Try to remove both vector and meta data + vector_store_->remove(numeric_id); + meta_store_->remove(numeric_id); } - // Try to remove both vector and meta data - vector_store_->remove(numeric_id); - meta_store_->remove(numeric_id); } catch(const std::exception& e) { throw std::runtime_error(std::string("Failed to remove vector and metadata: ") + e.what()); @@ -774,21 +1030,43 @@ class VectorStorage { // Update filter for a vector void updateFilter(ndd::idInt numeric_id, const std::string& new_filter_json) { - // Get existing meta - auto meta = meta_store_->get_meta(numeric_id); + if(use_shared_env_ && shared_env_) { + MDBX_txn* txn; + int rc = mdbx_txn_begin(shared_env_->get(), nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to begin shared filter update transaction: ") + + mdbx_strerror(rc)); + } - // Remove old filters - if(!meta.filter.empty()) { - filter_store_->remove_filters_from_json(numeric_id, meta.filter); - } + try { + updateFilter_txn(txn, numeric_id, new_filter_json); - // Update meta - meta.filter = new_filter_json; - meta_store_->store_meta(numeric_id, meta); + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to commit shared filter update transaction: ") + + mdbx_strerror(rc)); + } + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + } else { + // Get existing meta + auto meta = meta_store_->get_meta(numeric_id); - // Add new filters - if(!new_filter_json.empty()) { - filter_store_->add_filters_from_json(numeric_id, new_filter_json); + // Remove old filters + if(!meta.filter.empty()) { + filter_store_->remove_filters_from_json(numeric_id, meta.filter); + } + + // Update meta + meta.filter = new_filter_json; + meta_store_->store_meta(numeric_id, meta); + + // Add new filters + if(!new_filter_json.empty()) { + filter_store_->add_filters_from_json(numeric_id, new_filter_json); + } } } diff --git a/src/storage/wal.hpp b/src/storage/wal.hpp index c4d94fcef0..3ce8f3a34c 100644 --- a/src/storage/wal.hpp +++ b/src/storage/wal.hpp @@ -86,6 +86,7 @@ class WriteAheadLog { // Read all entries from the WAL file std::vector readEntries() { + std::lock_guard lock(file_mutex_); std::vector entries; std::ifstream infile(log_path_, std::ios::binary); @@ -99,34 +100,17 @@ class WriteAheadLog { // Read operation type infile.read(reinterpret_cast(&op), sizeof(op)); - if(infile.eof()) { + if(!infile) { break; } // Read numeric ID infile.read(reinterpret_cast(&numeric_id), sizeof(numeric_id)); - if(infile.eof()) { + if(!infile) { + // Ignore a trailing partial record (e.g. crash while writing). break; } - std::string string_id; - // Only VECTOR_ADD has string_id - if(static_cast(op) == WALOperationType::VECTOR_ADD) { - // Read string length - size_t str_len; - infile.read(reinterpret_cast(&str_len), sizeof(str_len)); - if(infile.eof()) { - break; - } - - // Read string content - string_id.resize(str_len); - infile.read(&string_id[0], str_len); - if(infile.eof()) { - break; - } - } - entries.push_back({static_cast(op), numeric_id}); } @@ -134,9 +118,18 @@ class WriteAheadLog { } // Clear the WAL file void clear() { + std::lock_guard lock(file_mutex_); log_file_.close(); - std::filesystem::remove(log_path_); + std::error_code ec; + std::filesystem::remove(log_path_, ec); log_file_.open(log_path_, std::ios::binary | std::ios::app); + if(!log_file_) { + std::string err_string = "Failed to reopen WAL file after clear: " + log_path_ + + " errno: " + std::to_string(errno) + + " errcode: " + std::strerror(errno); + LOG_ERROR(1402, index_id_, err_string); + throw std::runtime_error(err_string); + } entry_count_ = 0; }