Skip to content

Commit 90f4c7f

Browse files
committed
Implement 'threads reservation' in Redis for SVS threadpool
1 parent 9f0beab commit 90f4c7f

File tree

6 files changed

+363
-41
lines changed

6 files changed

+363
-41
lines changed

src/VecSim/algorithms/svs/svs.h

+13-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ struct SVSIndexBase {
2020
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
2121
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
2222
virtual bool isLabelExists(const labelType label) const = 0;
23+
virtual void setNumThreads(size_t numThreads) = 0;
24+
virtual size_t getThreadPoolCapacity() const = 0;
2325
};
2426

2527
template <typename MetricType, typename DataType, size_t QuantBits, size_t ResidualBits = 0>
@@ -53,6 +55,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
5355
size_t search_window_size;
5456
double epsilon;
5557

58+
// SVS thread pool
59+
VecSimSVSThreadPool threadpool_;
5660
// SVS Index implementation instance
5761
std::unique_ptr<impl_type> impl_;
5862

@@ -104,8 +108,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
104108
// Data should not be empty
105109
template <svs::data::ImmutableMemoryDataset Dataset>
106110
void initImpl(const Dataset &points, std::span<const labelType> ids) {
107-
VecSimSVSThreadPool threadpool;
108-
svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool}};
111+
svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}};
109112
// Construct SVS index initial storage with compression if needed
110113
auto data = storage_traits_t::create_storage(points, this->blockSize, threadpool_handle,
111114
this->getAllocator());
@@ -119,12 +122,12 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
119122

120123
// Construct initial Vamana Graph
121124
auto graph =
122-
graph_builder_t::build_graph(parameters, data, distance, threadpool, entry_point,
125+
graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point,
123126
this->blockSize, this->getAllocator());
124127

125128
// Create SVS MutableIndex instance
126129
impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
127-
std::move(distance), ids, std::move(threadpool));
130+
std::move(distance), ids, threadpool_);
128131

129132
// Set SVS MutableIndex build parameters to be used in future updates
130133
impl_->set_construction_window_size(parameters.window_size);
@@ -240,7 +243,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
240243
: Base{abstractInitParams, components}, forcePreprocessing{force_preprocessing},
241244
changes_num{0}, buildParams{makeVamanaBuildParameters(params)},
242245
search_window_size{getOrDefault(params.search_window_size, 10)},
243-
epsilon{getOrDefault(params.epsilon, 0.01)}, impl_{nullptr} {}
246+
epsilon{getOrDefault(params.epsilon, 0.01)},
247+
threadpool_{std::max(size_t{1}, params.num_threads)}, impl_{nullptr} {}
244248

245249
~SVSIndex() = default;
246250

@@ -305,6 +309,10 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
305309
return deleteVectorsImpl(labels, n);
306310
}
307311

312+
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }
313+
314+
size_t getThreadPoolCapacity() const override { return threadpool_.capacity(); }
315+
308316
double getDistanceFrom_Unsafe(labelType label, const void *vector_data) const override {
309317
if (!impl_ || !impl_->has_id(label)) {
310318
return std::numeric_limits<double>::quiet_NaN();

src/VecSim/algorithms/svs/svs_tiered.h

+165-22
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,138 @@
1313
#include "VecSim/algorithms/svs/svs.h"
1414
#include "VecSim/index_factories/svs_factory.h"
1515

16-
struct SVSIndexUpdateJob : public AsyncJob {
17-
SVSIndexUpdateJob(std::shared_ptr<VecSimAllocator> allocator, JobCallback insertCb,
18-
VecSimIndex *index)
19-
: AsyncJob(std::move(allocator), HNSW_INSERT_VECTOR_JOB, insertCb, index) {}
16+
#include <chrono>
17+
#include <condition_variable>
18+
#include <memory>
19+
#include <mutex>
20+
21+
class SVSMultiThreadJob : public AsyncJob {
22+
23+
// Thread reservation control block shared between all threads
24+
// to reserve threads and wait for the job to be done
25+
// actual reserved threads can be less than requested if timeout is reached
26+
class ControlBlock {
27+
const size_t requestedThreads; // number of threads requested to reserve
28+
const std::chrono::microseconds timeout; // timeout for threads reservation
29+
size_t reservedThreads; // number of threads reserved
30+
bool jobDone;
31+
std::mutex mutex;
32+
std::condition_variable cv;
33+
34+
public:
35+
template <typename Rep, typename Period>
36+
ControlBlock(size_t requested_threads,
37+
std::chrono::duration<Rep, Period> threads_wait_timeout)
38+
: requestedThreads{requested_threads}, timeout{threads_wait_timeout},
39+
reservedThreads{0}, jobDone{false} {}
40+
41+
// reserve a thread and wait for the job to be done
42+
void reserveThreadAndWait() {
43+
// count current thread
44+
{
45+
std::lock_guard lock{mutex};
46+
++reservedThreads;
47+
}
48+
cv.notify_one();
49+
// wait for the job to be done
50+
{
51+
std::unique_lock lock{mutex};
52+
cv.wait(lock, [&] { return jobDone; });
53+
}
54+
}
55+
56+
// wait for threads to be reserved
57+
// return actual number of reserved threads
58+
size_t waitForThreads() {
59+
std::unique_lock lock{mutex};
60+
++reservedThreads; // count current thread
61+
cv.wait_for(lock, timeout, [&] { return reservedThreads >= requestedThreads; });
62+
return reservedThreads;
63+
}
64+
65+
// mark the whole job as done
66+
void markJobDone() {
67+
{
68+
std::lock_guard lock{mutex};
69+
jobDone = true;
70+
}
71+
cv.notify_all();
72+
}
73+
};
74+
75+
// Job to reserve a thread and wait for the job to be done
76+
class ReserveThreadJob : public AsyncJob {
77+
std::weak_ptr<ControlBlock> controlBlock; // control block is owned by the main job and can
78+
// be destroyed before this job is started
79+
80+
static void Execute_impl(AsyncJob *job) {
81+
auto *jobPtr = static_cast<ReserveThreadJob *>(job);
82+
// if control block is already destroyed by the update job, just delete the job
83+
auto controlBlock = jobPtr->controlBlock.lock();
84+
if (controlBlock) {
85+
controlBlock->reserveThreadAndWait();
86+
}
87+
delete job;
88+
}
89+
90+
public:
91+
ReserveThreadJob(std::shared_ptr<VecSimAllocator> allocator, JobType jobType,
92+
VecSimIndex *index, std::weak_ptr<ControlBlock> controlBlock)
93+
: AsyncJob(std::move(allocator), jobType, Execute_impl, index),
94+
controlBlock(std::move(controlBlock)) {}
95+
};
96+
97+
using task_type = std::function<void(VecSimIndex *, size_t)>;
98+
task_type task;
99+
std::shared_ptr<ControlBlock> controlBlock;
100+
101+
static void Execute_impl(AsyncJob *job) {
102+
auto *jobPtr = static_cast<SVSMultiThreadJob *>(job);
103+
auto controlBlock = jobPtr->controlBlock;
104+
size_t num_threads = 1;
105+
if (controlBlock) {
106+
num_threads = controlBlock->waitForThreads();
107+
}
108+
assert(num_threads > 0);
109+
jobPtr->task(jobPtr->index, num_threads);
110+
if (controlBlock) {
111+
jobPtr->controlBlock->markJobDone();
112+
}
113+
delete job;
114+
}
115+
116+
SVSMultiThreadJob(std::shared_ptr<VecSimAllocator> allocator, JobType jobType,
117+
task_type callback, VecSimIndex *index,
118+
std::shared_ptr<ControlBlock> controlBlock)
119+
: AsyncJob(std::move(allocator), jobType, Execute_impl, index), task(std::move(callback)),
120+
controlBlock(std::move(controlBlock)) {}
121+
122+
public:
123+
template <typename Rep, typename Period>
124+
static vecsim_stl::vector<AsyncJob *>
125+
createJobs(const std::shared_ptr<VecSimAllocator> &allocator, JobType jobType,
126+
std::function<void(VecSimIndex *, size_t)> callback, VecSimIndex *index,
127+
size_t num_threads, std::chrono::duration<Rep, Period> threads_wait_timeout) {
128+
assert(num_threads > 0);
129+
std::shared_ptr<ControlBlock> controlBlock =
130+
num_threads == 1 ? nullptr
131+
: std::make_shared<ControlBlock>(num_threads, threads_wait_timeout);
132+
133+
vecsim_stl::vector<AsyncJob *> jobs(num_threads, allocator);
134+
jobs[0] =
135+
new (allocator) SVSMultiThreadJob(allocator, jobType, callback, index, controlBlock);
136+
for (size_t i = 1; i < num_threads; ++i) {
137+
jobs[i] = new (allocator) ReserveThreadJob(allocator, jobType, index, controlBlock);
138+
}
139+
return jobs;
140+
}
141+
142+
#ifdef BUILD_TESTS
143+
public:
144+
static constexpr size_t estimateSize(size_t num_threads) {
145+
return sizeof(SVSMultiThreadJob) + (num_threads - 1) * sizeof(ReserveThreadJob);
146+
}
147+
#endif
20148
};
21149

22150
template <typename DataType>
@@ -30,6 +158,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
30158
// Add: true, Delete: false
31159
using journal_record = std::pair<labelType, bool>;
32160
size_t updateJobThreshold;
161+
size_t updateJobWaitTime;
33162
std::vector<journal_record> journal;
34163
std::shared_mutex journal_mutex;
35164
std::atomic_flag indexUpdateScheduled = ATOMIC_FLAG_INIT;
@@ -130,22 +259,30 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
130259
}
131260

132261
public:
133-
TieredSVS_BatchIterator(void *query_vector, const Index *index, VecSimQueryParams *params,
262+
TieredSVS_BatchIterator(void *query_vector, const Index *index,
263+
VecSimQueryParams *queryParams,
134264
std::shared_ptr<VecSimAllocator> allocator)
135265
: VecSimBatchIterator(query_vector, queryParams ? queryParams->timeoutCtx : nullptr,
136266
std::move(allocator)),
137-
index(index), queryParams(params ? new VecSimQueryParams{*params} : nullptr),
138-
flat_results(this->allocator), svs_results(this->allocator),
267+
index(index), flat_results(this->allocator), svs_results(this->allocator),
139268
flat_iterator(index->frontendIndex->newBatchIterator(query_vector, queryParams)),
140269
svs_iterator(nullptr), svs_lock(index->mainIndexGuard, std::defer_lock),
141-
returned_results_set(this->allocator) {}
270+
returned_results_set(this->allocator) {
271+
if (queryParams) {
272+
this->queryParams =
273+
(VecSimQueryParams *)this->allocator->allocate(sizeof(VecSimQueryParams));
274+
*this->queryParams = *queryParams;
275+
} else {
276+
this->queryParams = nullptr;
277+
}
278+
}
142279

143280
~TieredSVS_BatchIterator() {
144281
release_svs_iterator();
145-
delete flat_iterator;
146282
if (queryParams) {
147283
this->allocator->free_allocation(queryParams);
148284
}
285+
delete flat_iterator;
149286
}
150287

151288
VecSimQueryReply *getNextResults(size_t n_res, VecSimQueryReply_Order order) override {
@@ -260,6 +397,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
260397
public:
261398
backend_index_t *GetBackendIndex() { return this->backendIndex; }
262399
void submitSingleJob(AsyncJob *job) { Base::submitSingleJob(job); }
400+
void submitJobs(vecsim_stl::vector<AsyncJob *> &jobs) { Base::submitJobs(jobs); }
263401
#endif
264402

265403
private:
@@ -285,14 +423,14 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
285423
}
286424
}
287425

288-
static void updateSVSIndexWrapper(AsyncJob *job) {
289-
auto index = dynamic_cast<TieredSVSIndex<DataType> *>(job->index);
426+
static void updateSVSIndexWrapper(VecSimIndex *idx, size_t availableThreads) {
427+
auto index = static_cast<TieredSVSIndex<DataType> *>(idx);
290428
assert(index);
291429
// prevent parallel updates
292430
std::lock_guard<std::mutex> lock(index->updateJobMutex);
293-
index->indexUpdateScheduled.clear();
431+
index->GetSVSIndex()->setNumThreads(availableThreads);
294432
index->updateSVSIndex();
295-
delete job;
433+
index->indexUpdateScheduled.clear();
296434
}
297435

298436
#ifdef BUILD_TESTS
@@ -304,9 +442,11 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
304442
return;
305443
}
306444

307-
auto job =
308-
new (this->allocator) SVSIndexUpdateJob{this->allocator, updateSVSIndexWrapper, this};
309-
this->submitSingleJob(job);
445+
auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity();
446+
auto jobs = SVSMultiThreadJob::createJobs(this->allocator, HNSW_INSERT_VECTOR_JOB,
447+
updateSVSIndexWrapper, this, total_threads,
448+
std::chrono::microseconds(updateJobWaitTime));
449+
this->submitJobs(jobs);
310450
}
311451

312452
private:
@@ -382,9 +522,12 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
382522
tiered_index_params.specificParams.tieredSVSParams.updateJobThreshold == 0
383523
? DEFAULT_PENDING_SWAP_JOBS_THRESHOLD
384524
: std::min(tiered_index_params.specificParams.tieredSVSParams.updateJobThreshold,
385-
MAX_PENDING_SWAP_JOBS_THRESHOLD)) {
525+
MAX_PENDING_SWAP_JOBS_THRESHOLD)),
526+
updateJobWaitTime(
527+
tiered_index_params.specificParams.tieredSVSParams.updateJobWaitTime == 0
528+
? 1000 // default wait time: 1ms
529+
: tiered_index_params.specificParams.tieredSVSParams.updateJobWaitTime) {
386530
this->journal.reserve(this->updateJobThreshold * 2);
387-
TIERED_LOG(VecSimCommonStrings::LOG_NOTICE_STRING, "TieredSVSIndex created");
388531
}
389532

390533
int addVector(const void *blob, labelType label) override {
@@ -406,7 +549,8 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
406549
// elsewhere search queries may return wrong result.
407550
assert(ret >= 0 && "addVector: vector duplication in both indices");
408551
journal.emplace_back(label, true);
409-
index_update_needed = this->journal.size() >= this->updateJobThreshold;
552+
index_update_needed = this->backendIndex->indexSize() > 0 ||
553+
this->journal.size() >= this->updateJobThreshold;
410554
}
411555

412556
if (index_update_needed) {
@@ -524,11 +668,10 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
524668
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
525669
"running asynchronous GC for tiered SVS index");
526670
if (!indexUpdateScheduled.test_and_set()) {
527-
auto job = new (this->allocator)
528-
SVSIndexUpdateJob{this->allocator, updateSVSIndexWrapper, this};
529-
updateSVSIndexWrapper(job);
671+
updateSVSIndexWrapper(this, 1);
530672
}
531673
std::unique_lock<std::shared_mutex> backend_lock{this->mainIndexGuard};
674+
// VecSimIndexAbstract::runGC() is protected
532675
static_cast<VecSimIndexInterface *>(this->backendIndex)->runGC();
533676
}
534677

0 commit comments

Comments
 (0)