diff --git a/src/VecSim/algorithms/hnsw/data_store/abs_data_store.h b/src/VecSim/algorithms/hnsw/data_store/abs_data_store.h new file mode 100644 index 000000000..a36f3e9c8 --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/abs_data_store.h @@ -0,0 +1,62 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "VecSim/memory/vecsim_malloc.h" +#include "VecSim/utils/vecsim_stl.h" +#include "VecSim/utils/vec_utils.h" +#include "VecSim/utils/data_block.h" +#include "VecSim/utils/vecsim_results_container.h" +#include "VecSim/query_result_definitions.h" +#include "VecSim/vec_sim_common.h" +#include "VecSim/vec_sim_index.h" +#include "VecSim/tombstone_interface.h" + + +using graphNodeType = std::pair; // represented as: (element_id, level) +enum CF_ID { + OUTGOING_CF = 1, + INCOMING_CF = 2, + LAST_CF = 3 // do not use (used only for indexing) +}; +struct StageParams +{ + enum Operation { + WRITE_DATA, + UPDATE_DATA, + DELETE_DATA + + }; + CF_ID cf_id; + Operation op; + graphNodeType original_key; + std::string key; + std::string data; +}; + +struct GetParams +{ + CF_ID cf_id; + graphNodeType original_key; + std::string key; + std::vector data; +}; + +class EdgesDataStore { +public: + virtual void Get(std::list &) = 0; + virtual void Put(const std::list &) = 0; + virtual void Flush() {}; +}; + diff --git a/src/VecSim/algorithms/hnsw/data_store/data_store_test.cpp b/src/VecSim/algorithms/hnsw/data_store/data_store_test.cpp new file mode 100644 index 000000000..0e372d4e7 --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/data_store_test.cpp @@ -0,0 +1,91 @@ +#include "edges_interface.h" + +const size_t max_outgoing_links = 32; +const size_t max_level = 5; +// id % vector_to_level == 0 -> vector exists in this level +const size_t vector_to_level[max_level] = { + max_outgoing_links, + max_outgoing_links << 5, + max_outgoing_links << 10, + max_outgoing_links << 15, + max_outgoing_links << 20 +}; +const size_t max_vectors = vector_to_level[max_level-1]; + +int idToLevel(size_t id) { + for (int level = 0; level < max_level; level++) { + if (id % vector_to_level[level] != 0) + return level; + } + return max_level; +} + +static EdgesInterface *edges_interface; +void generateLinksOnLevel(size_t id, int level) { + WriteBatch *wb = edges_interface->newWriteBatch(); + std::vector edges(max_outgoing_links); + for (size_t i = 0; i < max_outgoing_links; i++) { + while (1) { + int r = rand() % max_vectors; + if (level > 0) { + r = (r / vector_to_level[level -1]) * vector_to_level[level -1]; + if (r == 0) + continue; + + } + auto iter = std::find(edges.begin(), edges.end(), r); + if (iter != edges.end()) + continue; + edges[i] = r; + + edges_interface->AddIncomingTarget({r, level}, id, wb); + break; + } + } + edges_interface->SetOutgoingAllTargets({id, level}, edges, wb); + edges_interface->CommitWriteBatch(wb); +} + + +void verifyLinksOnLevel(size_t id, int level) { + std::vector og_edges = edges_interface->GetOutgoingEdges({id, level}, nullptr); + assert(og_edges.size() > 0); + + for (auto og : og_edges) { + auto inc_edges = edges_interface->GetIncomingEdges({og, level}, nullptr); + assert(std::find(inc_edges.begin(), inc_edges.end(), id) != inc_edges.end()); + } + +} + + + + +int main() { + std::shared_ptr allocator; + // auto ds = NewRamDataStore(allocator, 0x1000, max_outgoing_links, max_vectors); + auto ds = NewSpeedbDataStore(allocator, "/tmp/vectordb"); + edges_interface = new EdgesInterface(allocator, ds); + + // build + for (size_t id = 1; id < max_vectors; id++) { + int level = idToLevel(id); + for (int l = level; l >= 0; l--) { + generateLinksOnLevel(id, l); + } + } + edges_interface->Flush(); + for (int i = 0; i < 1000000; i++) { + size_t id = (rand() % (max_vectors -1)) + 1; // no 0 id; + int level = idToLevel(id); + for (int l = level; l >= 0; l--) { + verifyLinksOnLevel(id, l); + } + } +} + + + + + + diff --git a/src/VecSim/algorithms/hnsw/data_store/edges.h b/src/VecSim/algorithms/hnsw/data_store/edges.h new file mode 100644 index 000000000..a5fd5a538 --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/edges.h @@ -0,0 +1,149 @@ +#pragma once +#include "write_batch.h" + + +class AddTarget : public ObjectChange +{ +public: + AddTarget(idType target) : target_(target) {}; + void GetStageParams(StageParams &stage) override { + stage.op = StageParams::UPDATE_DATA; + stage.data = std::string("A") + std::string((char *)&target_, sizeof(idType)); + } + void Apply(std::vector &data) override { + data.push_back(target_); + } + void Apply(size_t &size, idType *cur_value) override { + cur_value[size] = target_; + size++; + } + +private: + idType target_; +}; + +class DeleteTarget : public ObjectChange +{ +public: + DeleteTarget(idType target) : target_(target) {}; + void GetStageParams(StageParams &stage) { + stage.op = StageParams::UPDATE_DATA; + stage.data = std::string("D") + std::string((char *)&target_, sizeof(idType)); + } + void Apply(std::vector &data) override { + auto iter = std::find(data.begin(), data.end(), target_); + if (iter != data.end()) { + *iter = data.back(); + data.resize(data.size() -1); + } + } + void Apply(size_t &size, idType *cur_value) override { + for (size_t i = 0; i < size; i++) { + if (cur_value[i] == target_) { + size--; + cur_value[i] = cur_value[size]; + return; + } + } + } +private: + idType target_; +}; + + + + +class Edges : public DirtyObject +{ +public: + virtual ~Edges() {} + + Edges(const std::vector &from) : DirtyObject(from) { + } + Edges() : DirtyObject() { + } + + void + AppendStagesOp(const graphNodeType &gn, + std::list &stageParmsList) override { + if (was_fetched_) { + ApplyChanges(); + } + if (is_dirty_) { + StageParams p; + p.cf_id = GetCfId(); + p.op = StageParams::WRITE_DATA; + p.original_key = gn; + buildKey(gn, p.key); + p.data = std::string((const char *) object_value_.data(), + object_value_.size() * sizeof(idType)); + stageParmsList.push_back(p); + + + } else { + for (auto ch: all_changes_) { + StageParams p; + p.cf_id = GetCfId(); + p.original_key = gn; + buildKey(gn, p.key); + ch->GetStageParams(p); + stageParmsList.push_back(p); + delete ch; + } + all_changes_.clear(); + } + } + + +}; + + + + +class OutgoingEdges : public Edges +{ +public: + ~OutgoingEdges() {}; + OutgoingEdges(const std::vector &from) : Edges(from) { + } + OutgoingEdges() { + } + virtual CF_ID GetCfId() override {return CfId();}; + static CF_ID CfId() {return OUTGOING_CF;} + + static void + PrepareGetsParams(const graphNodeType &gn, + std::list &getParmsList) { + + GetParams p; + p.cf_id = CfId(); + p.original_key = gn; + buildKey(gn, p.key); + getParmsList.push_back(p); + } +}; + +class IncomingEdges : public Edges +{ +public: + ~IncomingEdges(){}; + IncomingEdges(const std::vector &from) : Edges(from) { + } + IncomingEdges() : Edges() { + } + + virtual CF_ID GetCfId() override {return CfId();}; + static CF_ID CfId() {return INCOMING_CF;} + + static void + PrepareGetsParams(const graphNodeType &gn, + std::list &getParmsList) { + + GetParams p; + p.cf_id = CfId(); + p.original_key = gn; + buildKey(gn, p.key); + getParmsList.push_back(p); + } +}; + diff --git a/src/VecSim/algorithms/hnsw/data_store/edges_interface.cpp b/src/VecSim/algorithms/hnsw/data_store/edges_interface.cpp new file mode 100644 index 000000000..e251eba37 --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/edges_interface.cpp @@ -0,0 +1,264 @@ +#include "edges_interface.h" +#include "write_batch.h" +#include "edges.h" +#include "abs_data_store.h" + + +ObjectChange *ObjectChange::NewObjectChangeFromString(const char *data) { + idType target = *(idType *) (data + 1); + if (data[0] == 'A') { + return new AddTarget(target); + } else if (data[0] == 'D') { + return new DeleteTarget(target); + } else { + assert(0); + return nullptr; + } +} + + + +static std::vector executeGet(EdgesDataStore *ds, + const graphNodeType &gn, + bool outgoing_edges ) { + std::list getParamsList; + if (outgoing_edges) { + OutgoingEdges::PrepareGetsParams(gn, getParamsList); + } else { + IncomingEdges::PrepareGetsParams(gn, getParamsList); + } + + ds->Get(getParamsList); + auto &getP = getParamsList.front(); + assert(extractKey(getP.key.data()) == gn); + return getP.data; + +} + + + +// outgoing edges +const std::vector +EdgesInterface::GetOutgoingEdges(const graphNodeType &gn, + WriteBatch *wb) const { + if (wb) { + std::string key; + buildKey(gn, key); + auto ret = wb->find(key, OutgoingEdges::CfId() ); + if (!ret) { + ret = new OutgoingEdges(); + wb->Register(key, ret); + } + if (!ret->fetched()) { + ret->SetFetchedData(executeGet(ds_, gn, true)); + + } + ret->ApplyChanges(); + return ((OutgoingEdges *)ret)->Get(); + } else { + return executeGet(ds_, gn, true); + } +} + + +// add one target +void +EdgesInterface::AddOutgoingTarget(const graphNodeType &gn, + idType target, + WriteBatch *wb) { + std::string key; + buildKey(gn, key); + if (wb) { + auto outEdges = wb->find(key, OutgoingEdges::CfId()); + if (!outEdges) { + outEdges = new OutgoingEdges(); + wb->Register(key, outEdges); + } + outEdges->addChange(new AddTarget(target)); + } else { + std::list stage_params_list; + StageParams p; + p.cf_id = OutgoingEdges::CfId(); + p.key = key; + p.original_key = gn; + AddTarget(target).GetStageParams(p); + stage_params_list.push_back(p); + ds_->Put(stage_params_list); + + } + +} +// delete one target +void +EdgesInterface::DeleteOutgoingTarget(const graphNodeType &gn, + idType target, + WriteBatch *wb) { + std::string key; + buildKey(gn, key); + if (wb) { + auto outEdges = wb->find(key, OutgoingEdges::CfId()); + if (!outEdges) { + outEdges = new OutgoingEdges(); + wb->Register(key, outEdges); + } + outEdges->addChange(new DeleteTarget(target)); + } else { + std::list stage_params_list; + StageParams p; + p.cf_id = OutgoingEdges::CfId(); + p.key = key; + p.original_key = gn; + DeleteTarget(target).GetStageParams(p); + stage_params_list.push_back(p); + ds_->Put(stage_params_list); + } +} + + +// set outgoing edges +void +EdgesInterface::SetOutgoingAllTargets(const graphNodeType &gn, + const std::vector & ids, + WriteBatch *wb) { + if (wb) { + std::string key; + buildKey(gn, key); + auto outEdges = wb->find(key, OutgoingEdges::CfId()); + if (!outEdges) { + outEdges = new OutgoingEdges(); + wb->Register(key, outEdges); + } + outEdges->Set(ids); + } else { + OutgoingEdges outEdges(ids); + std::list stage_params_list; + outEdges.AppendStagesOp(gn, stage_params_list); + ds_->Put(stage_params_list); + } +} + + +// incoming edges +const std::vector +EdgesInterface::GetIncomingEdges(const graphNodeType &gn, + WriteBatch *wb) const { + if (wb) { + std::string key; + buildKey(gn, key); + auto ret = wb->find(key, IncomingEdges::CfId() ); + if (!ret) { + ret = new IncomingEdges(); + wb->Register(key, ret); + } + if (!ret->fetched()) { + ret->SetFetchedData(executeGet(ds_, gn, false)); + + } + ret->ApplyChanges(); + return ((IncomingEdges *)ret)->Get(); + } else { + return executeGet(ds_, gn, false); + } +} + + +// add one target +void +EdgesInterface::AddIncomingTarget(const graphNodeType &gn, + idType target, + WriteBatch *wb) { + std::string key; + buildKey(gn, key); + if (wb) { + auto outEdges = wb->find(key, IncomingEdges::CfId()); + if (!outEdges) { + outEdges = new IncomingEdges(); + wb->Register(key, outEdges); + } + outEdges->addChange(new AddTarget(target)); + } else { + std::list stage_params_list; + StageParams p; + p.cf_id = IncomingEdges::CfId(); + p.key = key; + p.original_key = gn; + AddTarget(target).GetStageParams(p); + stage_params_list.push_back(p); + ds_->Put(stage_params_list); + + } + +} +// delete one target +void +EdgesInterface::DeleteIncomingTarget(const graphNodeType &gn, + idType target, + WriteBatch *wb) { + std::string key; + buildKey(gn, key); + if (wb) { + auto outEdges = wb->find(key, IncomingEdges::CfId()); + if (!outEdges) { + outEdges = new IncomingEdges(); + wb->Register(key, outEdges); + } + outEdges->addChange(new DeleteTarget(target)); + } else { + std::list stage_params_list; + StageParams p; + p.cf_id = IncomingEdges::CfId(); + p.key = key; + p.original_key = gn; + DeleteTarget(target).GetStageParams(p); + stage_params_list.push_back(p); + ds_->Put(stage_params_list); + } +} + + +// set incoming edges +void +EdgesInterface::SetIncomingAllTargets(const graphNodeType &gn, + const std::vector & ids, + WriteBatch *wb) { + if (wb) { + std::string key; + buildKey(gn, key); + auto edges = wb->find(key, IncomingEdges::CfId()); + if (!edges) { + edges = new IncomingEdges(); + wb->Register(key, edges); + } + edges->Set(ids); + } else { + IncomingEdges edges(ids); + std::list stage_params_list; + edges.AppendStagesOp(gn, stage_params_list); + ds_->Put(stage_params_list); + } +} + + +WriteBatch * +EdgesInterface::newWriteBatch() { + return new WriteBatch(); +} + +void +EdgesInterface::CommitWriteBatch(WriteBatch *wb) { + + std::list stage_params_list; + for (auto obj : *wb) { + auto gn = extractKey(obj.first.data()); + obj.second->AppendStagesOp(gn , stage_params_list); + + delete obj.second; + } + ds_->Put(stage_params_list); +} + +void +EdgesInterface::Flush() { + ds_->Flush(); +} + diff --git a/src/VecSim/algorithms/hnsw/data_store/edges_interface.h b/src/VecSim/algorithms/hnsw/data_store/edges_interface.h new file mode 100644 index 000000000..f69f2eefd --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/edges_interface.h @@ -0,0 +1,119 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "VecSim/spaces/spaces.h" +#include "VecSim/memory/vecsim_malloc.h" +#include "VecSim/utils/vecsim_stl.h" +#include "VecSim/utils/vec_utils.h" +#include "VecSim/utils/data_block.h" +#include "VecSim/utils/vecsim_results_container.h" +#include "VecSim/query_result_definitions.h" +#include "VecSim/vec_sim_common.h" +#include "VecSim/vec_sim_index.h" +#include "VecSim/tombstone_interface.h" + +#ifdef BUILD_TESTS +#include "../hnsw_serialization_utils.h" +#include "VecSim/utils/serializer.h" +#endif + +using std::pair; +using graphNodeType = pair; // represented as: (element_id, level) + + +class WriteBatch; +class EdgesDataStore; +EdgesDataStore *NewRamDataStore(std::shared_ptr allocator, + size_t block_size, + size_t max_num_outgoing_links, + size_t initial_capacity); + +EdgesDataStore *NewSpeedbDataStore(std::shared_ptr allocator, + const char *dbPath); + +// edges interface provide encapsulation of accesses to the graph edges +// edges interface support also write batch +// if the user is using a wb all her changes will be aggregaded and apply at once on the datastor +// all the methods recive a graph node type to define the object + + +class EdgesInterface { +public: + EdgesInterface(std::shared_ptr allocator, + EdgesDataStore *ds) : + allocator_(allocator), + ds_(ds) { + } + ~EdgesInterface() {}; + // outgoing edges + const std::vector + GetOutgoingEdges(const graphNodeType &, + WriteBatch *) const; + + + // add one target + void AddOutgoingTarget(const graphNodeType &gn, + idType , + WriteBatch *); + + // delete one target + void DeleteOutgoingTarget(const graphNodeType &, + idType target, + WriteBatch *); + + // set all the targets + void SetOutgoingAllTargets(const graphNodeType &, + const std::vector &, + WriteBatch *); + + + // incoming edges + const std::vector + GetIncomingEdges(const graphNodeType &, + WriteBatch *) const ; + // add one target + void AddIncomingTarget(const graphNodeType &gn, + idType , + WriteBatch *); + + // delete one target + void DeleteIncomingTarget(const graphNodeType &, + idType target, + WriteBatch *); + + // set all the targets + // setting with an empty val equal to delete the edges on this level + void SetIncomingAllTargets(const graphNodeType &, + const std::vector &, + WriteBatch *); + + // delete all the edges (outgoing and incoming) at all the levels + + void DeleteNodeEdges(const idType id); + WriteBatch *newWriteBatch(); + void CommitWriteBatch(WriteBatch *); + + void Flush(); +private: + std::shared_ptr allocator_; + EdgesDataStore *ds_; + +}; + + + + + + diff --git a/src/VecSim/algorithms/hnsw/data_store/ram_data_store.cpp b/src/VecSim/algorithms/hnsw/data_store/ram_data_store.cpp new file mode 100644 index 000000000..c4afa4fa9 --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/ram_data_store.cpp @@ -0,0 +1,234 @@ +#include "abs_data_store.h" +#include "write_batch.h" +#include +class RamIncomingEdges : private std::vector { +public: + RamIncomingEdges(std::shared_ptr ) : + std::vector() {} + + void ApplyChange(ObjectChange &ch) { + ch.Apply(*this); + } + void Set(const std::string &ids) { + resize(ids.size() / sizeof(idType)); + std::memcpy(data(), ids.data(), ids.size() * sizeof(idType)); + } + const std::vector &Get() const {return *this;} + + void save(std::ofstream &output); + void restore(std::ifstream &input); +}; + + +class RamOutgoingEdges { +public: + static size_t max_num_outgoing_links; + static size_t GetAllocationSize() { + return sizeof(num_links_) + sizeof(idType) *max_num_outgoing_links; + } + + RamOutgoingEdges() :num_links_(0) {} + ~RamOutgoingEdges() = default; + + void + Set(const std::string &ids) { + num_links_ = ids.size() / sizeof(idType); + std::memcpy(links_, ids.data(), ids.size() * sizeof(idType)); + } + + const std::vector Get() const { + std::vector ret(num_links_); + std::memcpy(ret.data(), links_, num_links_ * sizeof(idType)); + return ret; + } + void ApplyChange(ObjectChange &ch) { + ch.Apply(num_links_, links_); + } + + + void save(std::ofstream &output); + void restore(std::ifstream &input); +private: + size_t num_links_=0; + idType links_[]; +}; + +size_t RamOutgoingEdges::max_num_outgoing_links; + + +class RamLevelData { +public: + RamLevelData(std::shared_ptr allocator) : incoming_edges(allocator) { + } + static size_t GetAllocationSize() { + return sizeof(RamLevelData) + RamOutgoingEdges::GetAllocationSize(); + } + + RamIncomingEdges incoming_edges; + RamOutgoingEdges outgoing_edges; +}; + + + +class RamDataStore : public EdgesDataStore { +public: + RamDataStore(std::shared_ptr allocator, + size_t block_size, + size_t max_num_outgoing_links, + size_t initial_capacity) : + graphDataBlocks_(allocator), + allocator_(allocator), + block_size_(block_size) { + + RamOutgoingEdges::max_num_outgoing_links = max_num_outgoing_links; + if (initial_capacity) { + auto initial_vector_size = initial_capacity / block_size_; + graphDataBlocks_.reserve(initial_vector_size); + } + } + void Get(std::list &get_params_list) override; + void Put(const std::list &stage_params_list) override; + + + +private: + RamLevelData *GetLevel0Data(const idType id) { + return (RamLevelData *)graphDataBlocks_[id / block_size_]. + getElement(id % block_size_); + } + size_t getCapacity() const { + return graphDataBlocks_.size() * block_size_; + } + void growByBlock() { + graphDataBlocks_.emplace_back(block_size_, + RamLevelData::GetAllocationSize(), + allocator_); + } + void HandleDelete(idType id) { + if (id == getCapacity() -1) { + // TBD shrink the graph data blocks? + } + for (size_t index = 0; index < other_levels_map.size(); index++) { + auto iter = other_levels_map[index]->find(id); + if (iter != other_levels_map[index]->end()) { + // TBD ALON ... allocator_.free(iter->second); + other_levels_map[index]->erase(iter); + } else { + // TBD + return; + } + } + + } + + + +private: + vecsim_stl::vector graphDataBlocks_; + std::shared_ptr allocator_; + const size_t block_size_; + using edges_map = std::unordered_map; + std::vector< edges_map *> other_levels_map; + std::shared_mutex mutex; +}; + + +void RamDataStore::Put(const std::list &stage_params_list) + { + std::unique_lock single_writer(mutex); + for (auto stage_param : stage_params_list) { + auto gn = stage_param.original_key; + auto level = gn.second; + auto id = gn.first; + if (stage_param.op == StageParams::DELETE_DATA) { + HandleDelete(id); + } else { + RamLevelData *lv = nullptr; + if (level == 0) { + while (id >= getCapacity()) + growByBlock(); + lv = GetLevel0Data(id); + } else { + auto index = level -1; + if (level > other_levels_map.size()) { + size_t cur_size = other_levels_map.size(); + other_levels_map.resize(level); + for (; cur_size < level; cur_size++) { + other_levels_map[index] = new std::unordered_map ; + } + } + auto iter = other_levels_map[index]->find(id); + if (iter == other_levels_map[index]->end()) { + //allocate + auto space = (char *)allocator_->callocate( + RamLevelData::GetAllocationSize()); + lv = new (space) RamLevelData(allocator_); + other_levels_map[level]->insert({id, lv}); + } else { + lv = iter->second; + } + } + + if (stage_param.op == StageParams::WRITE_DATA) { + if (stage_param.cf_id == OUTGOING_CF) { + lv->outgoing_edges.Set(stage_param.data); + } else { + lv->incoming_edges.Set(stage_param.data); + } + } else if (stage_param.op == StageParams::UPDATE_DATA) { + auto ch = ObjectChange::NewObjectChangeFromString(stage_param.data.data()); + if (stage_param.cf_id == OUTGOING_CF) { + lv->outgoing_edges.ApplyChange(*ch); + } else { + lv->incoming_edges.ApplyChange(*ch); + } + delete ch; + } + } + } + } + +void RamDataStore::Get(std::list &get_params_list) { + std::shared_lock multi_readers(mutex); + for (auto &get_param : get_params_list) { + auto gn = get_param.original_key; + auto level = gn.second; + auto id = gn.first; + RamLevelData *lv = nullptr; + if (level == 0) { + if (id < getCapacity() ) { + lv = GetLevel0Data(id); + } + } else { + size_t index = level -1; + if (index < other_levels_map.size()) { + auto iter = other_levels_map[index]->find(id); + if (iter != other_levels_map[index]->end()) { + lv = iter->second; + } + } + } + if (lv) { + if (get_param.cf_id == OUTGOING_CF) { + get_param.data = lv->outgoing_edges.Get(); + } else { + get_param.data = lv->incoming_edges.Get(); + } + } + } + +} + +EdgesDataStore *NewRamDataStore(std::shared_ptr allocator, + size_t block_size, + size_t max_num_outgoing_links, + size_t initial_capacity) { + return new RamDataStore(allocator, block_size, max_num_outgoing_links, initial_capacity); +} + + + + + + + diff --git a/src/VecSim/algorithms/hnsw/data_store/spdb_data_store.cpp b/src/VecSim/algorithms/hnsw/data_store/spdb_data_store.cpp new file mode 100644 index 000000000..e01bb9998 --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/spdb_data_store.cpp @@ -0,0 +1,195 @@ +#include "abs_data_store.h" +#include "write_batch.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/comparator.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/table.h" + +class EdgesComparator : public ROCKSDB_NAMESPACE::Comparator { +public: + ~EdgesComparator() {}; + int + Compare(const ROCKSDB_NAMESPACE::Slice& a, + const ROCKSDB_NAMESPACE::Slice& b) const override{ + auto gn1 = extractKey(a.data()); + auto gn2 = extractKey(b.data()); + return (gn1.second > gn2.second) ? 1 : + (gn1.second < gn2.second) ? -1 : + (int64_t) (gn1.first) - (int64_t) (gn2.first); + } + const char* Name() const override {return "EDGES_COMAPRE";}; + + void FindShortestSeparator(std::string* , + const ROCKSDB_NAMESPACE::Slice& ) const override {} + + void FindShortSuccessor(std::string* ) const override {}; + + +}; +class EdgesMergeOp : public ROCKSDB_NAMESPACE::MergeOperator { +public: + ~EdgesMergeOp() {}; + + const char* Name() const override {return "EDGES_MERGE";}; + +#if 0 + bool Merge( + const ROCKSDB_NAMESPACE::Slice&, + const ROCKSDB_NAMESPACE::Slice* existing_value, + const ROCKSDB_NAMESPACE::Slice& value, + std::string* new_value, + ROCKSDB_NAMESPACE::Logger* ) const override { + ROCKSDB_NAMESPACE::Slice existing_slice; + + if (existing_value) { + existing_slice = *existing_value; + } + std::vector cur_value(existing_slice.size()/sizeof(idType)); + auto ch = ObjectChange::NewObjectChangeFromString(value.data()); + ch->Apply(cur_value); + *new_value = std::string( + (char *)cur_value.data(), cur_value.size() * sizeof(idType)); + return true; + } +#endif + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + ROCKSDB_NAMESPACE::Slice existing_slice; + + if (merge_in.existing_value) { + existing_slice = *merge_in.existing_value; + } + std::vector cur_value(existing_slice.size()/sizeof(idType)); + std::memcpy(cur_value.data(), existing_slice.data(), existing_slice.size()); + + for (auto op : merge_in.operand_list) { + auto ch = ObjectChange::NewObjectChangeFromString(op.data()); + ch->Apply(cur_value); + delete ch; + } + merge_out->new_value = std::string( + (char *)cur_value.data(), cur_value.size() * sizeof(idType)); + return true; + } +}; + + +class SpdbDataStore : public EdgesDataStore { +public: + SpdbDataStore(std::shared_ptr allocator, + const char *dbPath); + void Get(std::list &get_params_list) override; + void Put(const std::list &stage_params_list) override; + void Flush() override { + db_->Flush(ROCKSDB_NAMESPACE::FlushOptions(), cfs_); + } + + + +private: + void OpenDb(const char *dbPath); + + +private: + ROCKSDB_NAMESPACE::DB *db_; + std::vectorcfs_; + std::shared_ptr allocator_; +}; + +SpdbDataStore::SpdbDataStore(std::shared_ptr , + const char *dbPath) +{ + cfs_.resize(LAST_CF); + OpenDb(dbPath); +} + + +void SpdbDataStore::OpenDb(const char *dbPath) { + + ROCKSDB_NAMESPACE::Options options; + options.create_if_missing = true; + options.use_direct_reads = true; + // make sure we are working on a new database + DestroyDB(dbPath, options); + + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::DB::Open(options, dbPath, &db_); + assert(s.ok()); + cfs_[0] = db_->DefaultColumnFamily(); + // create column families + ROCKSDB_NAMESPACE::ColumnFamilyOptions cf_options; + //cf_options.comparator = new EdgesComparator; + cf_options.merge_operator.reset(new EdgesMergeOp); + cf_options.compression = ROCKSDB_NAMESPACE::kNoCompression; + + ROCKSDB_NAMESPACE::BlockBasedTableOptions bs_options; + bs_options.block_align = true; + bs_options.no_block_cache = true; + bs_options.data_block_index_type = ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinaryAndHash; + + cf_options.table_factory.reset( + NewBlockBasedTableFactory(bs_options)); + + s = db_->CreateColumnFamily(cf_options, "outgoing_edges", &cfs_[OUTGOING_CF]); + assert(s.ok()); + s = db_->CreateColumnFamily(cf_options, "incoming_edges", &cfs_[INCOMING_CF]); + assert(s.ok()); +} + + + +void SpdbDataStore::Put(const std::list &stage_params_list) +{ + ROCKSDB_NAMESPACE::WriteBatch wb; + for (auto &stage_param : stage_params_list) { + switch (stage_param.op) { + case StageParams::WRITE_DATA: + wb.Put(cfs_[stage_param.cf_id], stage_param.key, stage_param.data); + break; + case StageParams::UPDATE_DATA: + wb.Merge(cfs_[stage_param.cf_id], stage_param.key, stage_param.data); + break; + case StageParams::DELETE_DATA: + wb.Delete(cfs_[stage_param.cf_id], stage_param.key); + break; + default: + assert(0); + } + } + ROCKSDB_NAMESPACE::WriteOptions options; + options.disableWAL = true; + auto s = db_->Write(options, &wb); + assert(s.ok()); + } + +void SpdbDataStore::Get(std::list &get_params_list) { + std::vector cfs(get_params_list.size()); + std::vector keys(get_params_list.size()); + std::vector values; + int index = 0; + for (auto &get_param : get_params_list) { + cfs[index] = cfs_[get_param.cf_id]; + keys[index] = get_param.key; + index++; + } + db_->MultiGet(ROCKSDB_NAMESPACE::ReadOptions(), + cfs, + keys, + &values); + index = 0; + for (auto &get_param : get_params_list) { + get_param.data = std::vector(values[index].size() / sizeof(idType)); + std::memcpy(get_param.data.data(), values[index].data(), values[index].size()); + } + +} + + + + + + +EdgesDataStore *NewSpeedbDataStore(std::shared_ptr allocator, + const char *dbPath) { + return new SpdbDataStore(allocator, dbPath); +} diff --git a/src/VecSim/algorithms/hnsw/data_store/write_batch.h b/src/VecSim/algorithms/hnsw/data_store/write_batch.h new file mode 100644 index 000000000..5d80ea5cc --- /dev/null +++ b/src/VecSim/algorithms/hnsw/data_store/write_batch.h @@ -0,0 +1,159 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "VecSim/spaces/spaces.h" +#include "VecSim/memory/vecsim_malloc.h" +#include "VecSim/utils/vecsim_stl.h" +#include "VecSim/utils/vec_utils.h" +#include "VecSim/utils/data_block.h" +#include "VecSim/utils/vecsim_results_container.h" +#include "VecSim/query_result_definitions.h" +#include "VecSim/vec_sim_common.h" +#include "VecSim/vec_sim_index.h" +#include "VecSim/tombstone_interface.h" + +using graphNodeType = std::pair; // represented as: (element_id, level) +#include "abs_data_store.h" + + +static inline void +buildKey(const graphNodeType gn, std::string &ret) { + ret.resize(16);; + sprintf(ret.data(),"%8.8u%8.8u",gn.second,gn.first); +} + +static inline graphNodeType +extractKey(const char *key) { + unsigned int level, id; + sscanf(key,"%8u%8u",&level,&id); + return graphNodeType({id, level}); +} + +class ObjectChange +{ +public: + virtual ~ObjectChange() {} ; + virtual void GetStageParams(StageParams &) = 0; + virtual void Apply(std::vector &cur_value) = 0; + virtual void Apply(size_t &size, idType *cur_value) = 0; + static ObjectChange *NewObjectChangeFromString(const char *data); +}; + + + + +class DirtyObject { +public: + + DirtyObject() = default; + DirtyObject(const std::vector &from) : + was_fetched_(true), object_value_(from) { + } + virtual ~DirtyObject() {}; + + // append the operation that are pending for stages + virtual void + AppendStagesOp(const graphNodeType &gn, + std::list &stageParmsList) = 0; + + void ApplyChanges() { + assert(was_fetched_); + for (auto ch : all_changes_) { + ch->Apply(object_value_); + is_dirty_ = true; + delete ch; + } + all_changes_.clear(); + } + + const std::vector &Get() { + assert(was_fetched_); + ApplyChanges(); + return object_value_; + } + + void SetFetchedData(const std::vector &data) { + object_value_ = data; + was_fetched_ = true; + is_dirty_ = false; + } + + void Set(const std::vector &data) { + object_value_ = data; + was_fetched_ = true; + is_dirty_ = true; + for (auto ch : all_changes_) { + delete ch; + } + all_changes_.clear(); + } + + void addChange(ObjectChange *ch) { + all_changes_.push_back(ch); + } + bool fetched() const {return was_fetched_;} + bool &fetched() {return was_fetched_;} + virtual CF_ID GetCfId() = 0; + + + +protected: + std::list all_changes_; + // data = null is not enough as it may be a new object + bool was_fetched_ = false; + bool is_dirty_ = false; + std::vector object_value_; +}; + +// a container of dirty objects +class WriteBatch { +public: + void Register(std::string &key, DirtyObject *obj) { + auto cf_id = obj->GetCfId(); + auto true_key = key + std::string(1, (char)cf_id); + auto iter = dirty_objects.find(true_key); + if (iter == dirty_objects.end()) { + dirty_objects.insert({true_key, obj}); + } + } + + + DirtyObject *find(std::string &key, int cf_id) const { + auto true_key = key + std::string(1, (char)cf_id); + auto const &iter = dirty_objects.find(true_key); + if (iter == dirty_objects.end()) { + return nullptr; + } else { + return iter->second; + } + } + + const std::unordered_map::const_iterator begin() const + { return dirty_objects.begin(); } + const std::unordered_map::const_iterator end() const + { return dirty_objects.end(); } + + + +private: + std::unordered_map dirty_objects; +}; + + + + + + + diff --git a/src/VecSim/algorithms/hnsw/graph_data_ram.cpp b/src/VecSim/algorithms/hnsw/graph_data_ram.cpp new file mode 100644 index 000000000..8a0bf85bf --- /dev/null +++ b/src/VecSim/algorithms/hnsw/graph_data_ram.cpp @@ -0,0 +1,216 @@ +#include "graph_data_ram.h" +struct LevelDataOnRam { + static size_t max_num_outgoing_links; + LevelDataOnRam(std::shared_ptr allocator) : + incomingEdges(allocator), outgoingEdges() { + } + static size_t + GetAllocationSizeBytes() { + return sizeof(incomingEdges) + sizeof(outgoingEdges) + + max_num_outgoing_links * sizeof(idType); + } + // currently only one size of level data + IncomingEdges incomingEdges; + OutgoingEdges outgoingEdges; // must be last + + +}; +size_t LevelDataOnRam::max_num_outgoing_links; + +struct VectorGraphData { + VectorGraphData(std::shared_ptr allocator, + size_t num_levels) : + level0_data(allocator) { + if (num_levels == 0) { + others = nullptr; + } else { + others = (char *)allocator->callocate( + LevelDataOnRam::GetAllocationSizeBytes() * num_levels); + } + } + + + LevelDataOnRam &getLevelData(size_t level_num) { + if (level_num == 0) return level0_data; + // else + return *(LevelDataOnRam *) + (others + (level_num-1) * LevelDataOnRam::GetAllocationSizeBytes()); + } + static size_t + GetAllocationSizeBytes() { + return sizeof(char *) + LevelDataOnRam::GetAllocationSizeBytes(); + }; + + char *others; + // since level0_data has a variable size it must be last + LevelDataOnRam level0_data; +}; + + + RamGraphData::RamGraphData(std::shared_ptr allocator, + size_t block_size, + size_t max_num_outgoing_links, + size_t vector_size_bytes, + size_t initial_capacity, + size_t vector_alignment) : + vectorBlocks_(allocator), + graphDataBlocks_(allocator), + idToMetaData_(allocator), + allocator_(allocator), + block_size_(block_size), + vector_size_bytes_(vector_size_bytes), + vector_alignment_(vector_alignment) + { + LevelDataOnRam::max_num_outgoing_links = max_num_outgoing_links; + if (initial_capacity) { + idToMetaData_.reserve(initial_capacity); + auto initial_vector_size = initial_capacity / block_size_; + vectorBlocks_.reserve(initial_vector_size); + graphDataBlocks_.reserve(initial_vector_size); + } + } + + +const char * +RamGraphData::getVectorByInternalId(idType internal_id) const { + return vectorBlocks_[internal_id / block_size_].getElement(internal_id % block_size_); +} + +void +RamGraphData::multiGetVectors(const std::vector &ids, + std::vector &results) const { + results.reserve(ids.size()); + for (auto id:ids) { + results.push_back(getVectorByInternalId(id)); + } +} + + + +idType +RamGraphData::pushVector(const void *vector_data, + int max_level, + const labelType &label, + WriteBatch *wb) { + idToMetaData_.push_back(VectorMetaData(label,max_level)); + + if (vectorBlocks_.size() == 0 || + vectorBlocks_.back().getLength() == block_size_) { + growByBlock(); + + } + idType ret = vectorBlocks_.size() * block_size_ + + vectorBlocks_.back().getLength(); + assert(idToMetaData_.size() == ret); + + vectorBlocks_.back().addElement(vector_data); + + VectorGraphData tmp(allocator_, max_level); + graphDataBlocks_.back().addElement(&tmp); + + return ret; +} + +// outgoing edges +const absEdges & +RamGraphData::GetLevelOutgoingEdges(const graphNodeType &gn) const { + return getGraphDataByInternalId(gn.first)-> + getLevelData(gn.second).outgoingEdges; +} + +absEdges & +RamGraphData::GetLevelOutgoingEdges(const graphNodeType &gn, + WriteBatch *) { + return getGraphDataByInternalId(gn.first)-> + getLevelData(gn.second).outgoingEdges; +} +// incoming edges +const absEdges & +RamGraphData::GetLevelIncomingEdges(const graphNodeType &gn) const { + return getGraphDataByInternalId(gn.first)-> + getLevelData(gn.second).incomingEdges; +} + +absEdges & +RamGraphData::GetLevelIncomingEdges(const graphNodeType &gn, + WriteBatch *) { + return getGraphDataByInternalId(gn.first)-> + getLevelData(gn.second).incomingEdges; +} + +idType +RamGraphData::getVectorIdByLevel(short level, + idType startingId) const { + for (idType i = startingId; i < idToMetaData_.size(); i++) { + auto const &v = vectorMetaDataById(i); + if (v.max_level_ == level) { + return i; + } + } + for (idType i = 0; i < startingId; i++) { + auto const &v = vectorMetaDataById(i); + if (v.max_level_ == level) { + return i; + } + } + return idType(-1); +} + +idType +RamGraphData::getGarbadgeCollectionTarget(idType startingId) const { + for (idType i = startingId; i < idToMetaData_.size(); i++) { + auto const &v = vectorMetaDataById(i); + if (v.ismarked(VectorMetaData::PERMANENT_DELETED)) { + return i; + } + } + return idType(-1); +} + + +VectorGraphData * +RamGraphData::getGraphDataByInternalId(idType internal_id) const { + return (VectorGraphData *) + graphDataBlocks_[internal_id / block_size_]. + getElement(internal_id % block_size_); +} + +void RamGraphData::growByBlock() { + // Validations + assert(vectorBlocks_.size() == graphDataBlocks_.size()); + assert(vectorBlocks_.size() == 0 || + vectorBlocks_.back().getLength() == block_size_); + + vectorBlocks_.emplace_back(block_size_, vector_size_bytes_, + allocator_, vector_alignment_); + graphDataBlocks_.emplace_back(block_size_, + VectorGraphData::GetAllocationSizeBytes(), + allocator_); +} + +void RamGraphData::shrinkByBlock() { + assert(vectorBlocks_.size() == graphDataBlocks_.size()); + assert(vectorBlocks_.size() > 0); + assert(vectorBlocks_.back().getLength() == 0); + + vectorBlocks_.pop_back(); + graphDataBlocks_.pop_back(); +} + +void RamGraphData::shrinkToFit() { + while (vectorBlocks_.size() && vectorBlocks_.back().getLength() == 0) { + shrinkByBlock(); + } + vectorBlocks_.shrink_to_fit(); + graphDataBlocks_.shrink_to_fit(); + idToMetaData_.shrink_to_fit(); +} + + +void RamGraphData::save(std::ofstream &) const { +} + +void RamGraphGrestore(std::ifstream &) { + // TBD +} + diff --git a/src/VecSim/algorithms/hnsw/graph_data_ram.h b/src/VecSim/algorithms/hnsw/graph_data_ram.h new file mode 100644 index 000000000..64fe3baaf --- /dev/null +++ b/src/VecSim/algorithms/hnsw/graph_data_ram.h @@ -0,0 +1,232 @@ +#pragma once + +#include "abs_graph_data.h" + +class IncomingEdges : private vecsim_stl::vector , + public absEdges { +public: + IncomingEdges(std::shared_ptr allocator) : + vecsim_stl::vector(allocator) {} + + void + push(idType id) override { + vecsim_stl::vector::push_back(id); + } + + bool + removeIdIfExists(idType element_id) override { + auto it = std::find(begin(), end(), element_id); + if (it != end()) { + // Swap the last element with the current one (equivalent to removing the element id from + // the list). + *it = back(); + pop_back(); + return true; + } + return false; + } + + virtual void + removeId(idType element_id) override { + auto exists = removeIdIfExists(element_id); + if (!exists) + assert(0); + } + + std::pair + Get() override { + return {size(), data()}; + } + virtual void + Set(std::pair inp) override { + resize(inp.first); + memcpy(data(), inp.second, inp.first * sizeof(idType)); + } + + void save(std::ofstream &output); + void restore(std::ifstream &input); +}; + + +class OutgoingEdges : public absEdges { +public: + OutgoingEdges() :num_links_(0) {} + ~OutgoingEdges() = default; + + void + push(idType id) override { + links_[num_links_++] = id; + } + + bool + removeIdIfExists(idType element_id) override { + for (size_t i = 0; i < num_links_; i++) { + if (links_[i] == element_id) { + // Swap the last element with the current one (equivalent to removing the element id from + // the list). + links_[i] = links_[num_links_-1]; + num_links_--; + return true; + } + } + return false; + } + + virtual void + removeId(idType element_id) override { + auto exists = removeIdIfExists(element_id); + if (!exists) + assert(0); + } + + std::pair + Get() { + return {num_links_, links_}; + } + virtual void + Set(std::pair inp) override { + num_links_ = inp.first; + memcpy(links_, inp.second, inp.first * sizeof(idType)); + } + + void save(std::ofstream &output); + void restore(std::ifstream &input); +private: + size_t num_links_=0; + idType links_[]; +}; + + +class WriteBatch; + +class VectorGraphData; +class LevelData; +class RamGraphData : public absGraphData { +private: + friend class absGraphData; + RamGraphData(std::shared_ptr allocator, + size_t block_size, + size_t max_num_outgoing_links, + size_t vector_size_bytes, + size_t initial_capacity, + size_t vector_alignment); + + virtual ~RamGraphData() override {}; + + + // vector methods + + virtual const char * + getVectorByInternalId(idType internal_id) const override; + + virtual void + multiGetVectors(const std::vector &, + std::vector &results) const override; + + virtual idType + pushVector(const void *vector_data, + int max_level, + const labelType &label, + WriteBatch *wb) override; + + // vectorMetaData methods + + const VectorMetaData & + vectorMetaDataById(idType internal_id) const override { + return idToMetaData_[internal_id]; + } + + VectorMetaData & + vectorMetaDataById(idType internal_id, + WriteBatch *) override { + return idToMetaData_[internal_id]; + } + + // premanently delete the vector and the edges "free" the id + void + deleteVectorAndEdges(idType internalId, + WriteBatch *wb) override { + vectorMetaDataById(internalId, wb).mark( + VectorMetaData::PERMANENT_DELETED); + } + + + // outgoing edges + virtual const absEdges & + GetLevelOutgoingEdges(const graphNodeType &) const override; + + virtual absEdges & + GetLevelOutgoingEdges(const graphNodeType &, + WriteBatch *) override; + + + // inomming edges + // fetch incoming from the database + virtual const absEdges & + GetLevelIncomingEdges(const graphNodeType &) const override; + virtual absEdges & + GetLevelIncomingEdges(const graphNodeType &, + WriteBatch *) override; + + // May not fetch from the database support only + // simple updates (add / delete target) operations + virtual absEdges & + GetLevelVirtualIncomingEdges(const graphNodeType &id, + WriteBatch *wb) override { + // in mem implementation + return GetLevelIncomingEdges(id, wb); + } + +public: + // helper methods needed by hnsw + + // get the first id that exists at level "level" + // at or after the statingId + virtual idType + getVectorIdByLevel(short level, + idType startingId = 0) const = 0; + + // get a permanent deleted entry (at or after start point) + // to be used by the GC + virtual idType + getGarbadgeCollectionTarget(idType startingId = 0) const = 0; + + virtual void + shrinkToFit() override; + +public: + // new and commit wrire batch are not supported (all writes are inplace) + WriteBatch * + newWriteBatch() override { + return nullptr; + } + + void + CommitWriteBatch(WriteBatch *) override { + return; + } + +public: + virtual void + save(std::ofstream &output) const override; + virtual void + restore(std::ifstream &input) override; + +private: + VectorGraphData * + getGraphDataByInternalId(idType internal_id) const; + + void growByBlock(); + void shrinkByBlock(); + +private: + vecsim_stl::vector vectorBlocks_; + vecsim_stl::vector graphDataBlocks_; + vecsim_stl::vector idToMetaData_; + std::shared_ptr allocator_; + const size_t block_size_; + const size_t vector_size_bytes_; + const size_t vector_alignment_; + + +};