Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add Thread Pool for all indexes in Knowhere (#570)
Browse files Browse the repository at this point in the history
Signed-off-by: liliu-z <[email protected]>

Signed-off-by: liliu-z <[email protected]>
  • Loading branch information
liliu-z authored Nov 29, 2022
1 parent 3da4802 commit 2ac540e
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 65 deletions.
13 changes: 7 additions & 6 deletions knowhere/index/VecIndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "common/Exception.h"
#include "common/Log.h"
#include "index/VecIndexThreadPoolWrapper.h"
#include "index/vector_index/IndexAnnoy.h"
#include "index/vector_index/IndexBinaryIDMAP.h"
#include "index/vector_index/IndexBinaryIVF.h"
Expand Down Expand Up @@ -42,17 +43,17 @@ VecIndexFactory::CreateVecIndex(const IndexType& type, const IndexMode mode) {
switch (mode) {
case IndexMode::MODE_CPU: {
if (type == IndexEnum::INDEX_FAISS_BIN_IDMAP) {
return std::make_shared<knowhere::BinaryIDMAP>();
return std::make_shared<VecIndexThreadPoolWrapper>(std::make_unique<BinaryIDMAP>());
} else if (type == IndexEnum::INDEX_FAISS_BIN_IVFFLAT) {
return std::make_shared<knowhere::BinaryIVF>();
return std::make_shared<VecIndexThreadPoolWrapper>(std::make_unique<BinaryIVF>());
} else if (type == IndexEnum::INDEX_FAISS_IDMAP) {
return std::make_shared<knowhere::IDMAP>();
return std::make_shared<VecIndexThreadPoolWrapper>(std::make_unique<IDMAP>());
} else if (type == IndexEnum::INDEX_FAISS_IVFFLAT) {
return std::make_shared<knowhere::IVF_NM>();
return std::make_shared<VecIndexThreadPoolWrapper>(std::make_unique<IVF_NM>());
} else if (type == IndexEnum::INDEX_FAISS_IVFPQ) {
return std::make_shared<knowhere::IVFPQ>();
return std::make_shared<VecIndexThreadPoolWrapper>(std::make_unique<IVFPQ>());
} else if (type == IndexEnum::INDEX_FAISS_IVFSQ8) {
return std::make_shared<knowhere::IVFSQ>();
return std::make_shared<VecIndexThreadPoolWrapper>(std::make_unique<IVFSQ>());
} else if (type == IndexEnum::INDEX_ANNOY) {
return std::make_shared<knowhere::IndexAnnoy>();
} else if (type == IndexEnum::INDEX_HNSW) {
Expand Down
121 changes: 121 additions & 0 deletions knowhere/index/VecIndexThreadPoolWrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#pragma once

#include <memory>
#include <utility>

#include "knowhere/common/ThreadPool.h"
#include "knowhere/index/VecIndex.h"

namespace knowhere {

/**
* @brief This class is a Wrapper for VecIndex, it will use a global thread pool for all Query and RangeQuery API calls.
*
*/
class VecIndexThreadPoolWrapper : public VecIndex {
public:
explicit VecIndexThreadPoolWrapper(std::unique_ptr<VecIndex> index)
: VecIndexThreadPoolWrapper(std::move(index), ThreadPool::GetGlobalThreadPool()) {
}

explicit VecIndexThreadPoolWrapper(std::unique_ptr<VecIndex> index, std::shared_ptr<ThreadPool> thread_pool)
: index_(std::move(index)), thread_pool_(thread_pool) {
}

BinarySet
Serialize(const Config& config) override {
return index_->Serialize(config);
}

void
Load(const BinarySet& index_binary) override {
index_->Load(index_binary);
}

void
Train(const DatasetPtr& dataset, const Config& config) override {
index_->Train(dataset, config);
}

void
AddWithoutIds(const DatasetPtr& dataset, const Config& config) override {
index_->AddWithoutIds(dataset, config);
}

bool
Prepare(const Config& config) override {
return index_->Prepare(config);
}

DatasetPtr
GetVectorById(const DatasetPtr& dataset, const Config& config) override {
return index_->GetVectorById(dataset, config);
}

DatasetPtr
Query(const DatasetPtr& dataset, const Config& config, const faiss::BitsetView bitset) override {
return thread_pool_->push([&]() { return this->index_->Query(dataset, config, bitset); }).get();
}

DatasetPtr
QueryByRange(const DatasetPtr& dataset, const Config& config, const faiss::BitsetView bitset) override {
return thread_pool_->push([&]() { return this->index_->QueryByRange(dataset, config, bitset); }).get();
}

DatasetPtr
GetIndexMeta(const Config& config) override {
return index_->GetIndexMeta(config);
}

int64_t
Size() override {
return index_->Size();
}

int64_t
Dim() override {
return index_->Dim();
}

int64_t
Count() override {
return index_->Count();
}

StatisticsPtr
GetStatistics() override {
return index_->GetStatistics();
}

void
ClearStatistics() override {
index_->ClearStatistics();
}

IndexType
index_type() const override {
return index_->index_type();
}

IndexMode
index_mode() const override {
return index_->index_mode();
}

private:
std::unique_ptr<VecIndex> index_;
std::shared_ptr<ThreadPool> thread_pool_;
};

} // namespace knowhere
43 changes: 25 additions & 18 deletions knowhere/index/vector_index/IndexAnnoy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,32 @@ IndexAnnoy::Query(const DatasetPtr& dataset_ptr, const Config& config, const fai
auto p_id = new int64_t[k * rows];
auto p_dist = new float[k * rows];

#pragma omp parallel for
std::vector<std::future<void>> futures;
futures.reserve(rows);
for (unsigned int i = 0; i < rows; ++i) {
std::vector<int64_t> result;
result.reserve(k);
std::vector<float> distances;
distances.reserve(k);
index_->get_nns_by_vector(static_cast<const float*>(p_data) + i * dim, k, search_k, &result, &distances,
bitset);

size_t result_num = result.size();
auto local_p_id = p_id + k * i;
auto local_p_dist = p_dist + k * i;
memcpy(local_p_id, result.data(), result_num * sizeof(int64_t));
memcpy(local_p_dist, distances.data(), result_num * sizeof(float));

for (; result_num < k; result_num++) {
local_p_id[result_num] = -1;
local_p_dist[result_num] = 1.0 / 0.0;
}
futures.push_back(pool_->push([&, index = i]() {
std::vector<int64_t> result;
result.reserve(k);
std::vector<float> distances;
distances.reserve(k);
index_->get_nns_by_vector(static_cast<const float*>(p_data) + index * dim, k, search_k, &result, &distances,
bitset);

size_t result_num = result.size();
auto local_p_id = p_id + k * index;
auto local_p_dist = p_dist + k * index;
memcpy(local_p_id, result.data(), result_num * sizeof(int64_t));
memcpy(local_p_dist, distances.data(), result_num * sizeof(float));

for (; result_num < k; result_num++) {
local_p_id[result_num] = -1;
local_p_dist[result_num] = 1.0 / 0.0;
}
}));
}

for (auto& future : futures) {
future.get();
}

return GenResultDataset(p_id, p_dist);
Expand Down
4 changes: 3 additions & 1 deletion knowhere/index/vector_index/IndexAnnoy.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#include "annoy/src/annoylib.h"
#include "annoy/src/kissrandom.h"

#include "knowhere/common/Exception.h"
#include "knowhere/common/ThreadPool.h"
#include "knowhere/index/VecIndex.h"

namespace knowhere {
Expand All @@ -28,6 +28,7 @@ class IndexAnnoy : public VecIndex {
public:
IndexAnnoy() {
index_type_ = IndexEnum::INDEX_ANNOY;
pool_ = ThreadPool::GetGlobalThreadPool();
}

BinarySet
Expand Down Expand Up @@ -66,6 +67,7 @@ class IndexAnnoy : public VecIndex {

private:
std::string metric_type_;
std::shared_ptr<ThreadPool> pool_;
std::shared_ptr<AnnoyIndexInterface<int64_t, float>> index_ = nullptr;
};

Expand Down
10 changes: 0 additions & 10 deletions knowhere/index/vector_index/IndexBinaryIDMAP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,6 @@ BinaryIDMAP::Train(const DatasetPtr& dataset_ptr, const Config& config) {
index_ = index;
}

const uint8_t*
BinaryIDMAP::GetRawVectors() {
try {
auto flat_index = dynamic_cast<faiss::IndexBinaryFlat*>(index_.get());
return flat_index->xb.data();
} catch (std::exception& e) {
KNOWHERE_THROW_MSG(e.what());
}
}

void
BinaryIDMAP::QueryImpl(int64_t n,
const uint8_t* data,
Expand Down
3 changes: 0 additions & 3 deletions knowhere/index/vector_index/IndexBinaryIDMAP.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ class BinaryIDMAP : public VecIndex, public FaissBaseBinaryIndex {
return Count() * Dim() / 8;
}

virtual const uint8_t*
GetRawVectors();

protected:
virtual void
QueryImpl(int64_t n,
Expand Down
10 changes: 0 additions & 10 deletions knowhere/index/vector_index/IndexIDMAP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,6 @@ IDMAP::CopyCpuToGpu(const int64_t device_id, const Config& config) {
#endif
}

const float*
IDMAP::GetRawVectors() {
try {
auto flat_index = dynamic_cast<faiss::IndexFlat*>(index_.get());
return reinterpret_cast<const float*>(flat_index->codes.data());
} catch (std::exception& e) {
KNOWHERE_THROW_MSG(e.what());
}
}

void
IDMAP::QueryImpl(int64_t n,
const float* data,
Expand Down
3 changes: 0 additions & 3 deletions knowhere/index/vector_index/IndexIDMAP.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ class IDMAP : public VecIndex, public FaissBaseIndex {
VecIndexPtr
CopyCpuToGpu(const int64_t, const Config&);

virtual const float*
GetRawVectors();

protected:
virtual void
QueryImpl(int64_t, const float*, int64_t, float*, int64_t*, const Config&, const faiss::BitsetView);
Expand Down
5 changes: 0 additions & 5 deletions knowhere/index/vector_index/gpu/IndexGPUIDMAP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ GPUIDMAP::CopyGpuToGpu(const int64_t device_id, const Config& config) {
return std::static_pointer_cast<IDMAP>(cpu_index)->CopyCpuToGpu(device_id, config);
}

const float*
GPUIDMAP::GetRawVectors() {
KNOWHERE_THROW_MSG("Not support");
}

void
GPUIDMAP::QueryImpl(int64_t n,
const float* data,
Expand Down
3 changes: 0 additions & 3 deletions knowhere/index/vector_index/gpu/IndexGPUIDMAP.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ class GPUIDMAP : public IDMAP, public GPUIndex {
VecIndexPtr
CopyGpuToGpu(const int64_t, const Config&) override;

const float*
GetRawVectors() override;

void
GenGraph(const float*, const int64_t, GraphType&, const Config&);

Expand Down
1 change: 0 additions & 1 deletion unittest/test_binaryidmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ TEST_P(BinaryIDMAPTest, binaryidmap_basic) {
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dim(), dim);
ASSERT_GT(index_->Size(), 0);
ASSERT_TRUE(std::static_pointer_cast<knowhere::BinaryIDMAP>(index_)->GetRawVectors() != nullptr);

auto result = index_->GetVectorById(id_dataset, conf_);
AssertBinVec(result, base_dataset, id_dataset, nq, dim);
Expand Down
5 changes: 0 additions & 5 deletions unittest/test_idmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ TEST_P(IDMAPTest, idmap_basic) {
index_->BuildAll(base_dataset, conf_);
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dim(), dim);
ASSERT_TRUE(index_->GetRawVectors() != nullptr);
ASSERT_GT(index_->Size(), 0);

auto result = index_->GetVectorById(id_dataset, conf_);
Expand Down Expand Up @@ -279,7 +278,6 @@ TEST_P(IDMAPTest, idmap_copy) {
index_->BuildAll(base_dataset, conf_);
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dim(), dim);
ASSERT_TRUE(index_->GetRawVectors() != nullptr);
auto result = index_->Query(query_dataset, conf_, nullptr);
AssertAnns(result, nq, k);
// PrintResult(result, nq, k);
Expand All @@ -295,8 +293,6 @@ TEST_P(IDMAPTest, idmap_copy) {
auto clone_result = clone_index->Query(query_dataset, conf_, nullptr);

AssertAnns(clone_result, nq, k);
ASSERT_THROW({ std::static_pointer_cast<knowhere::GPUIDMAP>(clone_index)->GetRawVectors(); },
knowhere::KnowhereException);

auto binary = clone_index->Serialize(conf_);
clone_index->Load(binary);
Expand All @@ -311,7 +307,6 @@ TEST_P(IDMAPTest, idmap_copy) {
auto host_index = knowhere::cloner::CopyGpuToCpu(clone_index, conf_);
auto host_result = host_index->Query(query_dataset, conf_, nullptr);
AssertAnns(host_result, nq, k);
ASSERT_TRUE(std::static_pointer_cast<knowhere::IDMAP>(host_index)->GetRawVectors() != nullptr);

// gpu to gpu
auto device_index = knowhere::cloner::CopyCpuToGpu(index_, DEVICE_ID, conf_);
Expand Down

0 comments on commit 2ac540e

Please sign in to comment.