Skip to content

Commit

Permalink
Synchronize properly repair/insert test (#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
alonre24 authored Nov 12, 2024
1 parent 1381f64 commit 1b8a846
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions tests/unit/test_hnsw_parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class HNSWTestParallel : public ::testing::Test {

void insertVectorParallelSafe(VecSimIndex *parallel_index, size_t dim, labelType label,
data_t val, std::shared_mutex &indexGuard,
std::atomic<size_t> &counter, std::mutex &barrier) {
std::atomic<size_t> &counter, std::mutex &barrier,
size_t block_size = DEFAULT_BLOCK_SIZE) {
// The decision as to when to allocate a new block is made by the index internally in the
// "addVector" function, where there is an internal counter that is incremented for each
// vector. To ensure that the thread which is taking the write lock is the one that performs
Expand All @@ -108,7 +109,7 @@ class HNSWTestParallel : public ::testing::Test {
// global counter, so threads won't call "addVector" with the inappropriate lock.
bool exclusive = true;
barrier.lock();
if (counter++ % DEFAULT_BLOCK_SIZE != 0) {
if (counter++ % block_size != 0) {
indexGuard.lock_shared();
exclusive = false;
} else {
Expand Down Expand Up @@ -516,23 +517,17 @@ void HNSWTestParallel<index_type_t>::parallelInsertSearch(bool is_multi) {
size_t n_threads = std::min(10U, FLOOR_EVEN(std::thread::hardware_concurrency()));
// Save the number fo tasks done by thread i in the i-th entry.
std::vector<size_t> completed_tasks(n_threads, 0);
std::mutex barrier;

auto parallel_insert = [&](int myID) {
for (labelType label = myID; label < n; label += n_threads / 2) {
completed_tasks[myID]++;
if (label >= first_res_label && label <= last_res_label) {
continue; // Skip the vectors we already indexed.
}
// Lock exclusively unless we are not performing resizing due to a new block.
bool exclusive = true;
indexGuard.lock();
if (indexed_vectors++ % DEFAULT_BLOCK_SIZE != 0) {
indexGuard.unlock();
indexGuard.lock_shared();
exclusive = false;
}
GenerateAndAddVector<data_t>(parallel_index, dim, label, label);
exclusive ? indexGuard.unlock() : indexGuard.unlock_shared();
// Insert vector while acquire the guard lock exclusively if we are performing resizing.
this->insertVectorParallelSafe(parallel_index, dim, label, label, indexGuard,
indexed_vectors, barrier);
}
};
std::atomic_int successful_searches(0);
Expand Down Expand Up @@ -770,12 +765,14 @@ TYPED_TEST(HNSWTestParallel, parallelRepairInsert) {
size_t n = 10000;
size_t k = 11;
size_t dim = 4;
size_t block_size = 10;

// r/w lock to ensure that index is locked (stop the world) upon adding a new block to the
// global data structures, which is non read safe for parallel insertions.
std::shared_mutex indexGuard;

HNSWParams params = {.dim = dim, .metric = VecSimMetric_L2, .efRuntime = n};
HNSWParams params = {
.dim = dim, .metric = VecSimMetric_L2, .blockSize = block_size, .efRuntime = n};

auto *hnsw_index = this->CastToHNSW(this->CreateNewIndex(params));
size_t n_threads = std::min(8U, FLOOR_EVEN(std::thread::hardware_concurrency()));
Expand Down Expand Up @@ -805,7 +802,9 @@ TYPED_TEST(HNSWTestParallel, parallelRepairInsert) {
auto executeRepairJobs = [&](int myID) {
for (size_t i = myID - n_threads / 2; i < n_jobs; i += n_threads / 2) {
auto job = jobQ[i];
indexGuard.lock_shared();
hnsw_index->repairNodeConnections(job.first, job.second); // {element_id, level}
indexGuard.unlock_shared();
completed_tasks[myID]++;
}
};
Expand All @@ -819,7 +818,7 @@ TYPED_TEST(HNSWTestParallel, parallelRepairInsert) {
completed_tasks[myID]++;
// Insert vector while acquire the guard lock exclusively if we are performing resizing.
this->insertVectorParallelSafe(hnsw_index, dim, label, label, indexGuard, counter,
barrier);
barrier, block_size);
}
};

Expand Down

0 comments on commit 1b8a846

Please sign in to comment.