Skip to content

Commit

Permalink
Update vendored DuckDB sources to a9bf1a6
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Dec 20, 2024
1 parent a9bf1a6 commit 7319174
Show file tree
Hide file tree
Showing 19 changed files with 253 additions and 113 deletions.
5 changes: 4 additions & 1 deletion src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,15 +797,18 @@ class ParquetScanFunction {
auto &gstate = data_p.global_state->Cast<ParquetReadGlobalState>();
auto &bind_data = data_p.bind_data->CastNoConst<ParquetReadBindData>();

bool rowgroup_finished;
do {
if (gstate.CanRemoveColumns()) {
data.all_columns.Reset();
data.reader->Scan(data.scan_state, data.all_columns);
rowgroup_finished = data.all_columns.size() == 0;
bind_data.multi_file_reader->FinalizeChunk(context, bind_data.reader_bind, data.reader->reader_data,
data.all_columns, gstate.multi_file_reader_state);
output.ReferenceColumns(data.all_columns, gstate.projection_ids);
} else {
data.reader->Scan(data.scan_state, output);
rowgroup_finished = output.size() == 0;
bind_data.multi_file_reader->FinalizeChunk(context, bind_data.reader_bind, data.reader->reader_data,
output, gstate.multi_file_reader_state);
}
Expand All @@ -814,7 +817,7 @@ class ParquetScanFunction {
if (output.size() > 0) {
return;
}
if (!ParquetParallelStateNext(context, bind_data, data, gstate)) {
if (rowgroup_finished && !ParquetParallelStateNext(context, bind_data, data, gstate)) {
return;
}
} while (true);
Expand Down
4 changes: 3 additions & 1 deletion src/duckdb/src/common/vector_operations/vector_hash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ struct HashOp {
};

static inline hash_t CombineHashScalar(hash_t a, hash_t b) {
return (a * UINT64_C(0xbf58476d1ce4e5b9)) ^ b;
a ^= a >> 32;
a *= 0xd6e8feb86659fd93U;
return a ^ b;
}

template <bool HAS_RSEL, class T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace duckdb {
CSVSniffer::CSVSniffer(CSVReaderOptions &options_p, shared_ptr<CSVBufferManager> buffer_manager_p,
CSVStateMachineCache &state_machine_cache_p, bool default_null_to_varchar_p)
: state_machine_cache(state_machine_cache_p), options(options_p), buffer_manager(std::move(buffer_manager_p)),
default_null_to_varchar(default_null_to_varchar_p) {
lines_sniffed(0), default_null_to_varchar(default_null_to_varchar_p) {
// Initialize Format Candidates
for (const auto &format_template : format_template_candidates) {
auto &logical_type = format_template.first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ string DialectCandidates::Print() {

DialectCandidates::DialectCandidates(const CSVStateMachineOptions &options) {
// assert that quotes escapes and rules have equal size
auto default_quote = GetDefaultQuote();
auto default_escape = GetDefaultEscape();
auto default_quote_rule = GetDefaultQuoteRule();
auto default_delimiter = GetDefaultDelimiter();
auto default_comment = GetDefaultComment();
const auto default_quote = GetDefaultQuote();
const auto default_escape = GetDefaultEscape();
const auto default_quote_rule = GetDefaultQuoteRule();
const auto default_delimiter = GetDefaultDelimiter();
const auto default_comment = GetDefaultComment();

D_ASSERT(default_quote.size() == default_quote_rule.size() && default_quote_rule.size() == default_escape.size());
// fill the escapes
Expand Down Expand Up @@ -187,6 +187,9 @@ void CSVSniffer::GenerateStateMachineSearchSpace(vector<unique_ptr<ColumnCountSc

// Returns true if a comment is acceptable
bool AreCommentsAcceptable(const ColumnCountResult &result, idx_t num_cols, bool comment_set_by_user) {
if (comment_set_by_user) {
return true;
}
// For a comment to be acceptable, we want 3/5th's the majority of unmatched in the columns
constexpr double min_majority = 0.6;
// detected comments, are all lines that started with a comment character.
Expand All @@ -208,7 +211,7 @@ bool AreCommentsAcceptable(const ColumnCountResult &result, idx_t num_cols, bool
}
}
// If we do not encounter at least one full line comment, we do not consider this comment option.
if (valid_comments == 0 || (!has_full_line_comment && !comment_set_by_user)) {
if (valid_comments == 0 || !has_full_line_comment) {
// this is only valid if our comment character is \0
if (result.state_machine.state_machine_options.comment.GetValue() == '\0') {
return true;
Expand All @@ -234,7 +237,7 @@ void CSVSniffer::AnalyzeDialectCandidate(unique_ptr<ColumnCountScanner> scanner,
idx_t num_cols = sniffed_column_counts.result_position == 0 ? 1 : sniffed_column_counts[0].number_of_columns;
const bool ignore_errors = options.ignore_errors.GetValue();
// If we are ignoring errors and not null_padding , we pick the most frequent number of columns as the right one
bool use_most_frequent_columns = ignore_errors && !options.null_padding;
const bool use_most_frequent_columns = ignore_errors && !options.null_padding;
if (use_most_frequent_columns) {
num_cols = sniffed_column_counts.GetMostFrequentColumnCount();
}
Expand Down Expand Up @@ -355,7 +358,7 @@ void CSVSniffer::AnalyzeDialectCandidate(unique_ptr<ColumnCountScanner> scanner,
// - There's a single column before.
// - There are more values and no additional padding is required.
// - There's more than one column and less padding is required.
if (columns_match_set && rows_consistent &&
if (columns_match_set && (rows_consistent || (set_columns.IsSet() && ignore_errors)) &&
(single_column_before || ((more_values || more_columns) && !require_more_padding) ||
(more_than_one_column && require_less_padding) || quoted) &&
!invalid_padding && comments_are_acceptable) {
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "4-dev3722"
#define DUCKDB_PATCH_VERSION "4-dev3741"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 1
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.1.4-dev3722"
#define DUCKDB_VERSION "v1.1.4-dev3741"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "62582045a3"
#define DUCKDB_SOURCE_ID "ab8c909857"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
6 changes: 6 additions & 0 deletions src/duckdb/src/function/window/window_boundaries_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,20 +302,26 @@ WindowBoundsSet WindowBoundariesState::GetWindowBounds(const BoundWindowExpressi
switch (wexpr.GetExpressionType()) {
case ExpressionType::WINDOW_ROW_NUMBER:
result.insert(PARTITION_BEGIN);
if (!wexpr.arg_orders.empty()) {
// Secondary orders need to know how wide the partition is
result.insert(PARTITION_END);
}
break;
case ExpressionType::WINDOW_RANK_DENSE:
case ExpressionType::WINDOW_RANK:
result.insert(PARTITION_BEGIN);
if (wexpr.arg_orders.empty()) {
result.insert(PEER_BEGIN);
} else {
// Secondary orders need to know how wide the partition is
result.insert(PARTITION_END);
}
break;
case ExpressionType::WINDOW_PERCENT_RANK:
result.insert(PARTITION_BEGIN);
result.insert(PARTITION_END);
if (wexpr.arg_orders.empty()) {
// Secondary orders need to know where the first peer is
result.insert(PEER_BEGIN);
}
break;
Expand Down
4 changes: 4 additions & 0 deletions src/duckdb/src/function/window/window_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ WindowExecutor::WindowExecutor(BoundWindowExpression &wexpr, ClientContext &cont

boundary_start_idx = shared.RegisterEvaluate(wexpr.start_expr);
boundary_end_idx = shared.RegisterEvaluate(wexpr.end_expr);

for (const auto &order : wexpr.arg_orders) {
arg_order_idx.emplace_back(shared.RegisterSink(order.expression));
}
}

WindowExecutorGlobalState::WindowExecutorGlobalState(const WindowExecutor &executor, const idx_t payload_count,
Expand Down
31 changes: 24 additions & 7 deletions src/duckdb/src/function/window/window_merge_sort_tree.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#include "duckdb/function/window/window_merge_sort_tree.hpp"
#include "duckdb/planner/expression/bound_constant_expression.hpp"

#include <thread>
#include <utility>

namespace duckdb {

WindowMergeSortTree::WindowMergeSortTree(ClientContext &context, const vector<BoundOrderByNode> &orders,
const vector<column_t> &sort_idx, const idx_t count)
const vector<column_t> &sort_idx, const idx_t count, bool unique)
: context(context), memory_per_thread(PhysicalOperator::GetMaxThreadMemory(context)), sort_idx(sort_idx),
build_stage(PartitionSortStage::INIT), tasks_completed(0) {
// Sort the unfiltered indices by the orders
Expand All @@ -26,7 +27,19 @@ WindowMergeSortTree::WindowMergeSortTree(ClientContext &context, const vector<Bo
payload_layout.Initialize(payload_types);

auto &buffer_manager = BufferManager::GetBufferManager(context);
global_sort = make_uniq<GlobalSortState>(buffer_manager, orders, payload_layout);
if (unique) {
vector<BoundOrderByNode> unique_orders;
for (const auto &order : orders) {
unique_orders.emplace_back(order.Copy());
}
auto unique_expr = make_uniq<BoundConstantExpression>(Value(index_type));
const auto order_type = OrderType::ASCENDING;
const auto order_by_type = OrderByNullType::NULLS_LAST;
unique_orders.emplace_back(BoundOrderByNode(order_type, order_by_type, std::move(unique_expr)));
global_sort = make_uniq<GlobalSortState>(buffer_manager, unique_orders, payload_layout);
} else {
global_sort = make_uniq<GlobalSortState>(buffer_manager, orders, payload_layout);
}
global_sort->external = ClientConfig::GetConfig(context).force_external;
}

Expand All @@ -48,18 +61,22 @@ WindowMergeSortTreeLocalState::WindowMergeSortTreeLocalState(WindowMergeSortTree

void WindowMergeSortTreeLocalState::SinkChunk(DataChunk &chunk, const idx_t row_idx,
optional_ptr<SelectionVector> filter_sel, idx_t filtered) {
// Sequence the payload column
auto &indices = payload_chunk.data[0];
payload_chunk.SetCardinality(chunk);
indices.Sequence(int64_t(row_idx), 1, payload_chunk.size());

// Reference the sort columns
auto &sort_idx = window_tree.sort_idx;
for (column_t c = 0; c < sort_idx.size(); ++c) {
sort_chunk.data[c].Reference(chunk.data[sort_idx[c]]);
}
// Add the row numbers if we are uniquifying
if (sort_idx.size() < sort_chunk.ColumnCount()) {
sort_chunk.data[sort_idx.size()].Reference(indices);
}
sort_chunk.SetCardinality(chunk);

// Sequence the payload column
auto &indices = payload_chunk.data[0];
payload_chunk.SetCardinality(sort_chunk);
indices.Sequence(int64_t(row_idx), 1, payload_chunk.size());

// Apply FILTER clause, if any
if (filter_sel) {
sort_chunk.Slice(*filter_sel, filtered);
Expand Down
4 changes: 0 additions & 4 deletions src/duckdb/src/function/window/window_rank_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ void WindowPeerLocalState::NextRank(idx_t partition_begin, idx_t peer_begin, idx
WindowPeerExecutor::WindowPeerExecutor(BoundWindowExpression &wexpr, ClientContext &context,
WindowSharedExpressions &shared)
: WindowExecutor(wexpr, context, shared) {

for (const auto &order : wexpr.arg_orders) {
arg_order_idx.emplace_back(shared.RegisterSink(order.expression));
}
}

unique_ptr<WindowExecutorGlobalState> WindowPeerExecutor::GetGlobalState(const idx_t payload_count,
Expand Down
Loading

0 comments on commit 7319174

Please sign in to comment.