Skip to content

Commit

Permalink
Update vendored DuckDB sources to fe39c1a
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Dec 12, 2024
1 parent fe39c1a commit 2108b31
Show file tree
Hide file tree
Showing 46 changed files with 807 additions and 265 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ struct MedianAbsoluteDeviationOperation : QuantileOperation {
const auto &quantile = bind_data.quantiles[0];
auto &window_state = state.GetOrCreateWindowState();
MEDIAN_TYPE med;
if (gstate && gstate->HasTrees()) {
if (gstate && gstate->HasTree()) {
med = gstate->GetWindowState().template WindowScalar<MEDIAN_TYPE, false>(data, frames, n, result, quantile);
} else {
window_state.UpdateSkip(data, frames, included);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ struct QuantileScalarOperation : public QuantileOperation {
}

const auto &quantile = bind_data.quantiles[0];
if (gstate && gstate->HasTrees()) {
if (gstate && gstate->HasTree()) {
rdata[ridx] = gstate->GetWindowState().template WindowScalar<RESULT_TYPE, DISCRETE>(data, frames, n, result,
quantile);
} else {
Expand Down Expand Up @@ -333,7 +333,7 @@ struct QuantileListOperation : QuantileOperation {
return;
}

if (gstate && gstate->HasTrees()) {
if (gstate && gstate->HasTree()) {
gstate->GetWindowState().template WindowList<CHILD_TYPE, DISCRETE>(data, frames, n, list, lidx, bind_data);
} else {
auto &window_state = state.GetOrCreateWindowState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "duckdb/common/operator/cast_operators.hpp"
#include "duckdb/common/operator/multiply.hpp"
#include "duckdb/planner/expression/bound_constant_expression.hpp"
#include "duckdb/function/window/window_index_tree.hpp"
#include <algorithm>
#include <numeric>
#include <stdlib.h>
Expand Down Expand Up @@ -316,148 +317,55 @@ struct QuantileIncluded {
CURSOR_TYPE &dmask;
};

// Shared untemplated sort logic
static unique_ptr<GlobalSortState> SortQuantileIndices(const WindowPartitionInput &partition, // NOLINT
const LogicalType &index_type, OrderType order_type) {
auto &inputs = *partition.inputs;
const auto &filter_mask = partition.filter_mask;

// Sort the unfiltered indices by the argument values
vector<LogicalType> payload_types;
payload_types.emplace_back(index_type);

idx_t capacity = STANDARD_VECTOR_SIZE;
DataChunk payload;
payload.Initialize(inputs.GetAllocator(), payload_types, capacity);
RowLayout payload_layout;
payload_layout.Initialize(payload.GetTypes());
SelectionVector filtered(capacity);

// TODO: Two pass parallel sorting using Build
ColumnDataScanState state;
DataChunk sort;
inputs.InitializeScan(state, partition.column_ids);
inputs.InitializeScanChunk(state, sort);
auto order_expr = make_uniq<BoundConstantExpression>(Value(sort.GetTypes()[0]));
vector<BoundOrderByNode> orders;
orders.emplace_back(BoundOrderByNode(order_type, OrderByNullType::NULLS_LAST, std::move(order_expr)));

auto &buffer_manager = BufferManager::GetBufferManager(partition.context);
auto global_sort = make_uniq<GlobalSortState>(buffer_manager, orders, payload_layout);
global_sort->external = ClientConfig::GetConfig(partition.context).force_external;
const auto memory_per_thread = PhysicalOperator::GetMaxThreadMemory(partition.context);

LocalSortState local_sort;
local_sort.Initialize(*global_sort, global_sort->buffer_manager);

// Build the indirection array by scanning the valid indices
while (inputs.Scan(state, sort)) {
// Match the payload to the scanned data
if (sort.size() > capacity) {
payload.Destroy();
capacity = sort.size();
payload.Initialize(inputs.GetAllocator(), payload_types, capacity);
filtered.Initialize(capacity);
} else {
payload.Reset();
}
auto &indices = payload.data[0];
payload.SetCardinality(sort);
indices.Sequence(int64_t(state.current_row_index), 1, payload.size());

if (!filter_mask.AllValid() || !partition.all_valid[0]) {
auto &key = sort.data[0];
auto &validity = FlatVector::Validity(key);
idx_t valid = 0;
for (sel_t i = 0; i < sort.size(); ++i) {
if (filter_mask.RowIsValid(i + state.current_row_index) && validity.RowIsValid(i)) {
filtered[valid++] = i;
}
}
if (valid < sort.size()) {
payload.Slice(filtered, valid);
sort.Slice(filtered, valid);
}
}
local_sort.SinkChunk(sort, payload);
if (local_sort.SizeInBytes() > memory_per_thread) {
local_sort.Sort(*global_sort, true);
}
}
global_sort->AddLocalState(local_sort);

// Sort it
global_sort->PrepareMergePhase();
while (global_sort->sorted_blocks.size() > 1) {
global_sort->InitializeMergeRound();
MergeSorter merge_sorter(*global_sort, global_sort->buffer_manager);
merge_sorter.PerformInMergeRound();
global_sort->CompleteMergeRound(false);
}

return global_sort;
}

template <typename IDX>
struct QuantileSortTree : public MergeSortTree<IDX, IDX> {
struct QuantileSortTree {

using BaseTree = MergeSortTree<IDX, IDX>;
using Elements = typename BaseTree::Elements;
unique_ptr<WindowIndexTree> index_tree;

explicit QuantileSortTree(Elements &&lowest_level) {
BaseTree::Allocate(lowest_level.size());
BaseTree::LowestLevel() = std::move(lowest_level);
}

template <class INPUT_TYPE>
static unique_ptr<QuantileSortTree> WindowInit(AggregateInputData &aggr_input_data,
const WindowPartitionInput &partition) {
QuantileSortTree(AggregateInputData &aggr_input_data, const WindowPartitionInput &partition) {
// TODO: Two pass parallel sorting using Build
auto &inputs = *partition.inputs;
ColumnDataScanState scan;
DataChunk sort;
inputs.InitializeScan(scan, partition.column_ids);
inputs.InitializeScanChunk(scan, sort);

// Sort the unfiltered indices by the argument values
using ElementType = typename QuantileSortTree::ElementType;
vector<LogicalType> payload_types;
switch (sizeof(ElementType)) {
case sizeof(int64_t):
payload_types.emplace_back(LogicalType::BIGINT);
break;
case sizeof(int32_t):
payload_types.emplace_back(LogicalType::INTEGER);
break;
default:
throw InternalException("Unsupported Quantile Sort Tree index size");
}

// TODO: Two pass parallel sorting using Build
// Sort on the single argument
auto &bind_data = aggr_input_data.bind_data->Cast<QuantileBindData>();
auto order_expr = make_uniq<BoundConstantExpression>(Value(sort.GetTypes()[0]));
auto order_type = bind_data.desc ? OrderType::DESCENDING : OrderType::ASCENDING;
auto global_sort = SortQuantileIndices(partition, payload_types[0], order_type);

// Now scan the sorted indices into an array we can use as the leaves
vector<ElementType> sorted;
if (!global_sort->sorted_blocks.empty()) {
PayloadScanner scanner(*global_sort);
DataChunk payload;
payload.Initialize(inputs.GetAllocator(), payload_types);
sorted.resize(scanner.Remaining());
for (;;) {
idx_t row_idx = scanner.Scanned();
scanner.Scan(payload);
if (payload.size() == 0) {
break;
BoundOrderModifier order_bys;
order_bys.orders.emplace_back(BoundOrderByNode(order_type, OrderByNullType::NULLS_LAST, std::move(order_expr)));
vector<column_t> sort_idx(1, 0);
const auto count = partition.count;

index_tree = make_uniq<WindowIndexTree>(partition.context, order_bys, sort_idx, count);
auto index_state = index_tree->GetLocalState();
auto &local_state = index_state->Cast<WindowIndexTreeLocalState>();

// Build the indirection array by scanning the valid indices
const auto &filter_mask = partition.filter_mask;
SelectionVector filter_sel(STANDARD_VECTOR_SIZE);
while (inputs.Scan(scan, sort)) {
const auto row_idx = scan.current_row_index;
if (!filter_mask.AllValid() || !partition.all_valid[0]) {
auto &key = sort.data[0];
auto &validity = FlatVector::Validity(key);
idx_t filtered = 0;
for (sel_t i = 0; i < sort.size(); ++i) {
if (filter_mask.RowIsValid(i + row_idx) && validity.RowIsValid(i)) {
filter_sel[filtered++] = i;
}
}
auto &indices = payload.data[0];
auto data = FlatVector::GetData<ElementType>(indices);

std::copy(data, data + payload.size(), sorted.data() + row_idx);
local_state.SinkChunk(sort, row_idx, filter_sel, filtered);
} else {
local_state.SinkChunk(sort, row_idx, nullptr, 0);
}
}

return make_uniq<QuantileSortTree>(std::move(sorted));
local_state.Sort();
}

inline IDX SelectNth(const SubFrames &frames, size_t n) const {
return BaseTree::NthElement(BaseTree::SelectNth(frames, n));
inline idx_t SelectNth(const SubFrames &frames, size_t n) const {
return index_tree->SelectNth(frames, n);
}

template <typename INPUT_TYPE, typename RESULT_TYPE, bool DISCRETE>
Expand All @@ -466,7 +374,7 @@ struct QuantileSortTree : public MergeSortTree<IDX, IDX> {
D_ASSERT(n > 0);

// Thread safe and idempotent.
BaseTree::Build();
index_tree->Build();

// Find the interpolated indicies within the frame
Interpolator<DISCRETE> interp(q, n, false);
Expand All @@ -488,7 +396,7 @@ struct QuantileSortTree : public MergeSortTree<IDX, IDX> {
D_ASSERT(n > 0);

// Thread safe and idempotent.
BaseTree::Build();
index_tree->Build();

// Result is a constant LIST<CHILD_TYPE> with a fixed length
auto ldata = FlatVector::GetData<list_entry_t>(list);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ struct QuantileOperation {
data_ptr_t g_state) {
D_ASSERT(partition.inputs);

const auto count = partition.count;
const auto &stats = partition.stats;

// If frames overlap significantly, then use local skip lists.
Expand All @@ -71,11 +70,7 @@ struct QuantileOperation {
// Build the tree
auto &state = *reinterpret_cast<STATE *>(g_state);
auto &window_state = state.GetOrCreateWindowState();
if (count < std::numeric_limits<uint32_t>::max()) {
window_state.qst32 = QuantileSortTree<uint32_t>::WindowInit<INPUT_TYPE>(aggr_input_data, partition);
} else {
window_state.qst64 = QuantileSortTree<uint64_t>::WindowInit<INPUT_TYPE>(aggr_input_data, partition);
}
window_state.qst = make_uniq<QuantileSortTree>(aggr_input_data, partition);
}

template <class INPUT_TYPE>
Expand Down Expand Up @@ -109,10 +104,7 @@ struct SkipLess {
template <typename INPUT_TYPE>
struct WindowQuantileState {
// Windowed Quantile merge sort trees
using QuantileSortTree32 = QuantileSortTree<uint32_t>;
using QuantileSortTree64 = QuantileSortTree<uint64_t>;
unique_ptr<QuantileSortTree32> qst32;
unique_ptr<QuantileSortTree64> qst64;
unique_ptr<QuantileSortTree> qst;

// Windowed Quantile skip lists
using SkipType = pair<idx_t, INPUT_TYPE>;
Expand Down Expand Up @@ -196,18 +188,16 @@ struct WindowQuantileState {
}
}

bool HasTrees() const {
return qst32 || qst64;
bool HasTree() const {
return qst.get();
}

template <typename RESULT_TYPE, bool DISCRETE>
RESULT_TYPE WindowScalar(CursorType &data, const SubFrames &frames, const idx_t n, Vector &result,
const QuantileValue &q) const {
D_ASSERT(n > 0);
if (qst32) {
return qst32->WindowScalar<INPUT_TYPE, RESULT_TYPE, DISCRETE>(data, frames, n, result, q);
} else if (qst64) {
return qst64->WindowScalar<INPUT_TYPE, RESULT_TYPE, DISCRETE>(data, frames, n, result, q);
if (qst) {
return qst->WindowScalar<INPUT_TYPE, RESULT_TYPE, DISCRETE>(data, frames, n, result, q);
} else if (s) {
// Find the position(s) needed
try {
Expand Down Expand Up @@ -284,8 +274,8 @@ struct QuantileState {
v.emplace_back(TYPE_OP::Operation(element, aggr_input));
}

bool HasTrees() const {
return window_state && window_state->HasTrees();
bool HasTree() const {
return window_state && window_state->HasTree();
}
WindowQuantileState<INPUT_TYPE> &GetOrCreateWindowState() {
if (!window_state) {
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/extension/json/json_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void JsonSerializer::WriteValue(bool value) {
}

void JsonSerializer::WriteDataPtr(const_data_ptr_t ptr, idx_t count) {
auto blob = Blob::ToBlob(string_t(const_char_ptr_cast(ptr), count));
auto blob = Blob::ToString(string_t(const_char_ptr_cast(ptr), count));
auto val = yyjson_mut_strcpy(doc, blob.c_str());
PushValue(val);
}
Expand Down
7 changes: 7 additions & 0 deletions src/duckdb/src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,13 @@ optional_ptr<CatalogEntry> Catalog::CreateIndex(ClientContext &context, CreateIn
return CreateIndex(GetCatalogTransaction(context), info);
}

unique_ptr<LogicalOperator> Catalog::BindAlterAddIndex(Binder &binder, TableCatalogEntry &table_entry,
unique_ptr<LogicalOperator> plan,
unique_ptr<CreateIndexInfo> create_info,
unique_ptr<AlterTableInfo> alter_info) {
throw NotImplementedException("BindAlterAddIndex not supported by this catalog");
}

//===--------------------------------------------------------------------===//
// Lookup Structures
//===--------------------------------------------------------------------===//
Expand Down
Loading

0 comments on commit 2108b31

Please sign in to comment.