Skip to content

Commit

Permalink
Merge pull request #42 from Maxxen/main
Browse files Browse the repository at this point in the history
update duckdb
  • Loading branch information
Maxxen authored Dec 11, 2024
2 parents 9637409 + d8b488c commit bae5b06
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 26 deletions.
1 change: 1 addition & 0 deletions .github/workflows/MainDistributionPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ jobs:
with:
duckdb_version: main
extension_name: vss
ci_tools_version: main
deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }}
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 2218 files
4 changes: 2 additions & 2 deletions src/hnsw/hnsw_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void HNSWIndex::VerifyAllocations(IndexLock &state) {
// Can rewrite index expression?
//------------------------------------------------------------------------------
static void TryBindIndexExpressionInternal(Expression &expr, idx_t table_idx, const vector<column_t> &index_columns,
const vector<column_t> &table_columns, bool &success, bool &found) {
const vector<ColumnIndex> &table_columns, bool &success, bool &found) {

if (expr.type == ExpressionType::BOUND_COLUMN_REF) {
found = true;
Expand All @@ -592,7 +592,7 @@ static void TryBindIndexExpressionInternal(Expression &expr, idx_t table_idx, co

const auto referenced_column = index_columns[ref.binding.column_index];
for (idx_t i = 0; i < table_columns.size(); i++) {
if (table_columns[i] == referenced_column) {
if (table_columns[i].GetPrimaryIndex() == referenced_column) {
ref.binding.column_index = i;
return;
}
Expand Down
18 changes: 11 additions & 7 deletions src/hnsw/hnsw_index_physical_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,21 @@ SinkFinalizeType PhysicalCreateHNSWIndex::Finalize(Pipeline &pipeline, Event &ev
return SinkFinalizeType::READY;
}

double PhysicalCreateHNSWIndex::GetSinkProgress(ClientContext &context, GlobalSinkState &gstate,
double source_progress) const {
ProgressData PhysicalCreateHNSWIndex::GetSinkProgress(ClientContext &context, GlobalSinkState &gstate,
ProgressData source_progress) const {
// The "source_progress" is not relevant for CREATE INDEX statements
ProgressData res;

const auto &state = gstate.Cast<CreateHNSWIndexGlobalState>();
// First half of the progress is appending to the collection
if (!state.is_building) {
return 50.0 *
MinValue(1.0, static_cast<double>(state.loaded_count) / static_cast<double>(estimated_cardinality));
res.done = state.loaded_count + 0.0;
res.total = estimated_cardinality + estimated_cardinality;
} else {
res.done = state.loaded_count + state.built_count;
res.total = state.loaded_count + state.loaded_count;
}
// Second half is actually building the index
return 50.0 + (50.0 * static_cast<double>(state.built_count) / static_cast<double>(state.loaded_count));
return res;
}

} // namespace duckdb
} // namespace duckdb
15 changes: 9 additions & 6 deletions src/hnsw/hnsw_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ BindInfo HNSWIndexScanBindInfo(const optional_ptr<FunctionData> bind_data_p) {
struct HNSWIndexScanGlobalState : public GlobalTableFunctionState {
ColumnFetchState fetch_state;
TableScanState local_storage_state;
vector<storage_t> column_ids;
vector<StorageIndex> column_ids;

// Index scan state
unique_ptr<IndexScanState> index_state;
Expand All @@ -52,7 +52,7 @@ static unique_ptr<GlobalTableFunctionState> HNSWIndexScanInitGlobal(ClientContex
if (id != DConstants::INVALID_INDEX) {
col_id = bind_data.table.GetColumn(LogicalIndex(id)).StorageOid();
}
result->column_ids.push_back(col_id);
result->column_ids.emplace_back(col_id);
}

// Initialize the storage scan state
Expand Down Expand Up @@ -123,9 +123,13 @@ unique_ptr<NodeStatistics> HNSWIndexScanCardinality(ClientContext &context, cons
//-------------------------------------------------------------------------
// ToString
//-------------------------------------------------------------------------
static string HNSWIndexScanToString(const FunctionData *bind_data_p) {
auto &bind_data = bind_data_p->Cast<HNSWIndexScanBindData>();
return bind_data.table.name + " (HNSW INDEX SCAN : " + bind_data.index.GetIndexName() + ")";
static InsertionOrderPreservingMap<string> HNSWIndexScanToString(TableFunctionToStringInput &input) {
D_ASSERT(input.bind_data);
InsertionOrderPreservingMap<string> result;
auto &bind_data = input.bind_data->Cast<HNSWIndexScanBindData>();
result["Table"] = bind_data.table.name;
result["HSNW Index"] = bind_data.index.GetIndexName();
return result;
}

//-------------------------------------------------------------------------
Expand All @@ -141,7 +145,6 @@ TableFunction HNSWIndexScanFunction::GetFunction() {
func.pushdown_complex_filter = nullptr;
func.to_string = HNSWIndexScanToString;
func.table_scan_progress = nullptr;
func.get_batch_index = nullptr;
func.projection_pushdown = true;
func.filter_pushdown = false;
func.get_bind_info = HNSWIndexScanBindInfo;
Expand Down
15 changes: 9 additions & 6 deletions src/hnsw/hnsw_optimize_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "duckdb/planner/expression_iterator.hpp"
#include "duckdb/storage/table/scan_state.hpp"
#include "duckdb/transaction/duck_transaction.hpp"
#include "duckdb/storage/storage_index.hpp"

#include "hnsw/hnsw.hpp"
#include "hnsw/hnsw_index.hpp"
Expand Down Expand Up @@ -74,7 +75,7 @@ class HNSWIndexJoinState final : public OperatorState {

ColumnFetchState fetch_state;
TableScanState local_storage_state;
vector<storage_t> phyiscal_column_ids;
vector<StorageIndex> physical_column_ids;

// Index scan state
unique_ptr<IndexScanState> index_state;
Expand All @@ -85,22 +86,22 @@ unique_ptr<OperatorState> PhysicalHNSWIndexJoin::GetOperatorState(ExecutionConte
auto result = make_uniq<HNSWIndexJoinState>();

auto &local_storage = LocalStorage::Get(context.client, table.catalog);
result->phyiscal_column_ids.reserve(inner_column_ids.size());
result->physical_column_ids.reserve(inner_column_ids.size());

// Figure out the storage column ids from the projection expression
for (auto &id : inner_column_ids) {
storage_t col_id = id;
if (id != DConstants::INVALID_INDEX) {
col_id = table.GetColumn(LogicalIndex(id)).StorageOid();
}
result->phyiscal_column_ids.push_back(col_id);
result->physical_column_ids.emplace_back(col_id);
}

// Initialize selection vector
result->match_sel.Initialize();

// Initialize the storage scan state
result->local_storage_state.Initialize(result->phyiscal_column_ids, nullptr);
result->local_storage_state.Initialize(result->physical_column_ids, nullptr);
local_storage.InitializeScan(table.GetStorage(), result->local_storage_state.local_state, nullptr);

// Initialize the index scan state
Expand Down Expand Up @@ -152,7 +153,7 @@ OperatorResultType PhysicalHNSWIndexJoin::Execute(ExecutionContext &context, Dat
const auto &row_ids = hnsw_index.GetMultiScanResult(*state.index_state);

// Execute one big fetch for the LHS
table.GetStorage().Fetch(transcation, chunk, state.phyiscal_column_ids, row_ids, output_idx, state.fetch_state);
table.GetStorage().Fetch(transcation, chunk, state.physical_column_ids, row_ids, output_idx, state.fetch_state);

// Now slice the chunk so that we include the rhs too
chunk.Slice(input, state.match_sel, output_idx, OUTER_COLUMN_OFFSET);
Expand Down Expand Up @@ -573,7 +574,9 @@ bool HNSWIndexJoinOptimizer::TryOptimize(Binder &binder, ClientContext &context,
//------------------------------------------------------------------------------

auto index_join = make_uniq<LogicalHNSWIndexJoin>(binder.GenerateTableIndex(), duck_table, *index_ptr, k_value);
index_join->inner_column_ids = inner_get.GetColumnIds();
for(auto &column_id : inner_get.GetColumnIds()) {
index_join->inner_column_ids.emplace_back(column_id.GetPrimaryIndex());
}
index_join->inner_projection_ids = inner_get.projection_ids;
index_join->inner_returned_types = inner_get.returned_types;

Expand Down
2 changes: 1 addition & 1 deletion src/hnsw/hnsw_optimize_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class HNSWIndexScanOptimizer : public OptimizerExtension {
auto &type = get.returned_types[column_id];
bool found = false;
for (idx_t i = 0; i < column_ids.size(); i++) {
if (column_ids[i] == column_id) {
if (column_ids[i].GetPrimaryIndex() == column_id) {
column_id = i;
found = true;
break;
Expand Down
2 changes: 1 addition & 1 deletion src/hnsw/hnsw_optimize_topk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class HNSWTopKOptimizer : public OptimizerExtension {
auto &type = get.returned_types[column_id];
bool found = false;
for (idx_t i = 0; i < column_ids.size(); i++) {
if (column_ids[i] == column_id) {
if (column_ids[i].GetPrimaryIndex() == column_id) {
column_id = i;
found = true;
break;
Expand Down
5 changes: 3 additions & 2 deletions src/include/hnsw/hnsw_index_physical_create.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include "duckdb/execution/physical_operator.hpp"
#include "duckdb/execution/progress_data.hpp"
#include "duckdb/storage/data_table.hpp"

namespace duckdb {
Expand Down Expand Up @@ -52,7 +53,7 @@ class PhysicalCreateHNSWIndex : public PhysicalOperator {
return true;
}

double GetSinkProgress(ClientContext &context, GlobalSinkState &gstate, double source_progress) const override;
ProgressData GetSinkProgress(ClientContext &context, GlobalSinkState &gstate, ProgressData source_progress) const override;
};

} // namespace duckdb
} // namespace duckdb

0 comments on commit bae5b06

Please sign in to comment.