Skip to content

Commit

Permalink
Update vendored DuckDB sources to 25c7873
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Dec 19, 2024
1 parent 25c7873 commit a9bf1a6
Show file tree
Hide file tree
Showing 46 changed files with 15,914 additions and 15,288 deletions.
15 changes: 13 additions & 2 deletions src/duckdb/extension/json/json_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,19 @@ void JSONScanLocalState::ParseJSON(char *const json_start, const idx_t json_size
doc = JSONCommon::ReadDocumentUnsafe(json_start, remaining, JSONCommon::READ_INSITU_FLAG, allocator.GetYYAlc(),
&err);
}
if (!bind_data.ignore_errors && err.code != YYJSON_READ_SUCCESS) {
current_reader->ThrowParseError(current_buffer_handle->buffer_index, lines_or_objects_in_buffer, err);
if (err.code != YYJSON_READ_SUCCESS) {
auto can_ignore_this_error = bind_data.ignore_errors;
string extra;
if (current_reader->GetFormat() != JSONFormat::NEWLINE_DELIMITED) {
can_ignore_this_error = false;
extra = bind_data.ignore_errors
? "Parse errors cannot be ignored for JSON formats other than 'newline_delimited'"
: "";
}
if (!can_ignore_this_error) {
current_reader->ThrowParseError(current_buffer_handle->buffer_index, lines_or_objects_in_buffer, err,
extra);
}
}

// We parse with YYJSON_STOP_WHEN_DONE, so we need to check this by hand
Expand Down
28 changes: 13 additions & 15 deletions src/duckdb/src/common/radix_partitioning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,25 @@ RETURN_TYPE RadixBitsSwitch(const idx_t radix_bits, ARGS &&... args) {
} // LCOV_EXCL_STOP
}

template <idx_t radix_bits>
struct RadixLessThan {
static inline bool Operation(hash_t hash, hash_t cutoff) {
using CONSTANTS = RadixPartitioningConstants<radix_bits>;
return CONSTANTS::ApplyMask(hash) < cutoff;
}
};

struct SelectFunctor {
template <idx_t radix_bits>
static idx_t Operation(Vector &hashes, const SelectionVector *sel, const idx_t count, const idx_t cutoff,
SelectionVector *true_sel, SelectionVector *false_sel) {
Vector cutoff_vector(Value::HASH(cutoff));
return BinaryExecutor::Select<hash_t, hash_t, RadixLessThan<radix_bits>>(hashes, cutoff_vector, sel, count,
true_sel, false_sel);
static idx_t Operation(Vector &hashes, const SelectionVector *sel, const idx_t count,
const ValidityMask &partition_mask, SelectionVector *true_sel, SelectionVector *false_sel) {
using CONSTANTS = RadixPartitioningConstants<radix_bits>;
return UnaryExecutor::Select<hash_t>(
hashes, sel, count,
[&](const hash_t hash) {
const auto partition_idx = CONSTANTS::ApplyMask(hash);
return partition_mask.RowIsValidUnsafe(partition_idx);
},
true_sel, false_sel);
}
};

idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, const idx_t count, const idx_t radix_bits,
const idx_t cutoff, SelectionVector *true_sel, SelectionVector *false_sel) {
return RadixBitsSwitch<SelectFunctor, idx_t>(radix_bits, hashes, sel, count, cutoff, true_sel, false_sel);
const ValidityMask &partition_mask, SelectionVector *true_sel,
SelectionVector *false_sel) {
return RadixBitsSwitch<SelectFunctor, idx_t>(radix_bits, hashes, sel, count, partition_mask, true_sel, false_sel);
}

struct ComputePartitionIndicesFunctor {
Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/src/common/row_operations/row_radix_scatter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,13 @@ void RadixScatterStructVector(Vector &v, UnifiedVectorFormat &vdata, idx_t vcoun
for (idx_t i = 0; i < add_count; i++) {
auto idx = sel.get_index(i);
auto source_idx = vdata.sel->get_index(idx) + offset;

// write validity and according value
if (validity.RowIsValid(source_idx)) {
key_locations[i][0] = valid;
} else {
key_locations[i][0] = invalid;
memset(key_locations[i] + 1, '\0', width - 1);
}
key_locations[i]++;
}
Expand Down
103 changes: 76 additions & 27 deletions src/duckdb/src/execution/join_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ JoinHashTable::JoinHashTable(ClientContext &context, const vector<JoinCondition>
: buffer_manager(BufferManager::GetBufferManager(context)), conditions(conditions_p),
build_types(std::move(btypes)), output_columns(output_columns_p), entry_size(0), tuple_size(0),
vfound(Value::BOOLEAN(false)), join_type(type_p), finalized(false), has_null(false),
radix_bits(INITIAL_RADIX_BITS), partition_start(0), partition_end(0) {
radix_bits(INITIAL_RADIX_BITS) {
for (idx_t i = 0; i < conditions.size(); ++i) {
auto &condition = conditions[i];
D_ASSERT(condition.left->return_type == condition.right->return_type);
Expand Down Expand Up @@ -108,6 +108,8 @@ JoinHashTable::JoinHashTable(ClientContext &context, const vector<JoinCondition>
auto &config = ClientConfig::GetConfig(context);
single_join_error_on_multiple_rows = config.scalar_subquery_error_on_multiple_rows;
}

InitializePartitionMasks();
}

JoinHashTable::~JoinHashTable() {
Expand Down Expand Up @@ -1430,7 +1432,10 @@ idx_t JoinHashTable::GetRemainingSize() const {

idx_t count = 0;
idx_t data_size = 0;
for (idx_t partition_idx = partition_end; partition_idx < num_partitions; partition_idx++) {
for (idx_t partition_idx = 0; partition_idx < num_partitions; partition_idx++) {
if (completed_partitions.RowIsValidUnsafe(partition_idx)) {
continue;
}
count += partitions[partition_idx]->Count();
data_size += partitions[partition_idx]->SizeInBytes();
}
Expand Down Expand Up @@ -1464,6 +1469,32 @@ void JoinHashTable::SetRepartitionRadixBits(const idx_t max_ht_size, const idx_t
radix_bits += added_bits;
sink_collection =
make_uniq<RadixPartitionedTupleData>(buffer_manager, layout, radix_bits, layout.ColumnCount() - 1);

// Need to initialize again after changing the number of bits
InitializePartitionMasks();
}

void JoinHashTable::InitializePartitionMasks() {
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);

current_partitions.Initialize(num_partitions);
current_partitions.SetAllInvalid(num_partitions);

completed_partitions.Initialize(num_partitions);
completed_partitions.SetAllInvalid(num_partitions);
}

idx_t JoinHashTable::CurrentPartitionCount() const {
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
D_ASSERT(current_partitions.Capacity() == num_partitions);
return current_partitions.CountValid(num_partitions);
}

idx_t JoinHashTable::FinishedPartitionCount() const {
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
D_ASSERT(completed_partitions.Capacity() == num_partitions);
// We already marked the active partitions as done, so we have to subtract them here
return completed_partitions.CountValid(num_partitions) - CurrentPartitionCount();
}

void JoinHashTable::Repartition(JoinHashTable &global_ht) {
Expand All @@ -1477,6 +1508,7 @@ void JoinHashTable::Repartition(JoinHashTable &global_ht) {
void JoinHashTable::Reset() {
data_collection->Reset();
hash_map.Reset();
current_partitions.SetAllInvalid(RadixPartitioning::NumberOfPartitions(radix_bits));
finalized = false;
}

Expand All @@ -1486,33 +1518,46 @@ bool JoinHashTable::PrepareExternalFinalize(const idx_t max_ht_size) {
}

const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
if (partition_end == num_partitions) {
return false;
D_ASSERT(current_partitions.Capacity() == num_partitions);
D_ASSERT(completed_partitions.Capacity() == num_partitions);
D_ASSERT(current_partitions.CheckAllInvalid(num_partitions));

if (completed_partitions.CheckAllValid(num_partitions)) {
return false; // All partitions are done
}

// Start where we left off
// Create vector with unfinished partition indices
auto &partitions = sink_collection->GetPartitions();
partition_start = partition_end;
vector<idx_t> partition_indices;
partition_indices.reserve(num_partitions);
for (idx_t partition_idx = 0; partition_idx < num_partitions; partition_idx++) {
if (!completed_partitions.RowIsValidUnsafe(partition_idx)) {
partition_indices.push_back(partition_idx);
}
}
// Sort partitions by size, from small to large
std::sort(partition_indices.begin(), partition_indices.end(), [&](const idx_t &lhs, const idx_t &rhs) {
const auto lhs_size = partitions[lhs]->SizeInBytes() + PointerTableSize(partitions[lhs]->Count());
const auto rhs_size = partitions[rhs]->SizeInBytes() + PointerTableSize(partitions[rhs]->Count());
return lhs_size < rhs_size;
});

// Determine how many partitions we can do next (at least one)
// Determine which partitions should go next
idx_t count = 0;
idx_t data_size = 0;
idx_t partition_idx;
for (partition_idx = partition_start; partition_idx < num_partitions; partition_idx++) {
auto incl_count = count + partitions[partition_idx]->Count();
auto incl_data_size = data_size + partitions[partition_idx]->SizeInBytes();
auto incl_ht_size = incl_data_size + PointerTableSize(incl_count);
for (const auto &partition_idx : partition_indices) {
D_ASSERT(!completed_partitions.RowIsValidUnsafe(partition_idx));
const auto incl_count = count + partitions[partition_idx]->Count();
const auto incl_data_size = data_size + partitions[partition_idx]->SizeInBytes();
const auto incl_ht_size = incl_data_size + PointerTableSize(incl_count);
if (count > 0 && incl_ht_size > max_ht_size) {
break;
break; // Always add at least one partition
}
count = incl_count;
data_size = incl_data_size;
}
partition_end = partition_idx;

// Move the partitions to the main data collection
for (partition_idx = partition_start; partition_idx < partition_end; partition_idx++) {
data_collection->Combine(*partitions[partition_idx]);
current_partitions.SetValidUnsafe(partition_idx); // Mark as currently active
data_collection->Combine(*partitions[partition_idx]); // Move partition to the main data collection
completed_partitions.SetValidUnsafe(partition_idx); // Also already mark as done
}
D_ASSERT(Count() == count);

Expand All @@ -1531,7 +1576,7 @@ void JoinHashTable::ProbeAndSpill(ScanStructure &scan_structure, DataChunk &prob
SelectionVector false_sel(STANDARD_VECTOR_SIZE);
const auto true_count =
RadixPartitioning::Select(hashes, FlatVector::IncrementalSelectionVector(), probe_keys.size(), radix_bits,
partition_end, &true_sel, &false_sel);
current_partitions, &true_sel, &false_sel);
const auto false_count = probe_keys.size() - true_count;

// can't probe these values right now, append to spill
Expand Down Expand Up @@ -1596,21 +1641,25 @@ void ProbeSpill::Finalize() {
}

void ProbeSpill::PrepareNextProbe() {
global_spill_collection.reset();
auto &partitions = global_partitions->GetPartitions();
if (partitions.empty() || ht.partition_start == partitions.size()) {
if (partitions.empty() || ht.current_partitions.CheckAllInvalid(partitions.size())) {
// Can't probe, just make an empty one
global_spill_collection =
make_uniq<ColumnDataCollection>(BufferManager::GetBufferManager(context), probe_types);
} else {
// Move specific partitions to the global spill collection
global_spill_collection = std::move(partitions[ht.partition_start]);
for (idx_t i = ht.partition_start + 1; i < ht.partition_end; i++) {
auto &partition = partitions[i];
if (global_spill_collection->Count() == 0) {
// Move current partitions to the global spill collection
for (idx_t partition_idx = 0; partition_idx < partitions.size(); partition_idx++) {
if (!ht.current_partitions.RowIsValidUnsafe(partition_idx)) {
continue;
}
auto &partition = partitions[partition_idx];
if (!global_spill_collection) {
global_spill_collection = std::move(partition);
} else {
} else if (partition->Count() != 0) {
global_spill_collection->Combine(*partition);
}
partition.reset();
}
}
consumer = make_uniq<ColumnDataConsumer>(*global_spill_collection, column_ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,17 +1330,21 @@ void StringValueScanner::ProcessOverBufferValue() {
value = string_t(over_buffer_string.c_str() + result.quoted_position,
UnsafeNumericCast<uint32_t>(over_buffer_string.size() - 1 - result.quoted_position));
if (result.escaped) {
const auto str_ptr = over_buffer_string.c_str() + result.quoted_position;
value = RemoveEscape(str_ptr, over_buffer_string.size() - 2,
state_machine->dialect_options.state_machine_options.escape.GetValue(),
result.parse_chunk.data[result.chunk_col_id]);
if (!result.HandleTooManyColumnsError(over_buffer_string.c_str(), over_buffer_string.size())) {
const auto str_ptr = over_buffer_string.c_str() + result.quoted_position;
value = RemoveEscape(str_ptr, over_buffer_string.size() - 2,
state_machine->dialect_options.state_machine_options.escape.GetValue(),
result.parse_chunk.data[result.chunk_col_id]);
}
}
} else {
value = string_t(over_buffer_string.c_str(), UnsafeNumericCast<uint32_t>(over_buffer_string.size()));
if (result.escaped) {
value = RemoveEscape(over_buffer_string.c_str(), over_buffer_string.size(),
state_machine->dialect_options.state_machine_options.escape.GetValue(),
result.parse_chunk.data[result.chunk_col_id]);
if (!result.HandleTooManyColumnsError(over_buffer_string.c_str(), over_buffer_string.size())) {
value = RemoveEscape(over_buffer_string.c_str(), over_buffer_string.size(),
state_machine->dialect_options.state_machine_options.escape.GetValue(),
result.parse_chunk.data[result.chunk_col_id]);
}
}
}
if (states.EmptyLine() && state_machine->dialect_options.num_cols == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ CSVSniffer::DetectHeaderInternal(ClientContext &context, vector<HeaderValue> &be
CSVReaderOptions &options, CSVErrorHandler &error_handler) {
vector<string> detected_names;
auto &dialect_options = state_machine.dialect_options;
dialect_options.num_cols = best_sql_types_candidates_per_column_idx.size();
if (best_header_row.empty()) {
dialect_options.header = false;
for (idx_t col = 0; col < dialect_options.num_cols; col++) {
Expand All @@ -192,6 +193,19 @@ CSVSniffer::DetectHeaderInternal(ClientContext &context, vector<HeaderValue> &be
// If null-padding is not allowed and there is a mismatch between our header candidate and the number of columns
// We can't detect the dialect/type options properly
if (!options.null_padding && best_sql_types_candidates_per_column_idx.size() != best_header_row.size()) {
if (options.ignore_errors.GetValue()) {
dialect_options.header = false;
for (idx_t col = 0; col < dialect_options.num_cols; col++) {
detected_names.push_back(GenerateColumnName(dialect_options.num_cols, col));
}
dialect_options.rows_until_header += 1;
if (!options.columns_set) {
for (idx_t i = 0; i < MinValue<idx_t>(detected_names.size(), options.name_list.size()); i++) {
detected_names[i] = options.name_list[i];
}
}
return detected_names;
}
auto error =
CSVError::HeaderSniffingError(options, best_header_row, best_sql_types_candidates_per_column_idx.size(),
state_machine.dialect_options.state_machine_options.delimiter.GetValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ void CSVStateMachineCache::Insert(const CSVStateMachineOptions &state_machine_op
}
}
// 2) Field Separator State
transition_array[quote][static_cast<uint8_t>(CSVState::DELIMITER)] = CSVState::QUOTED;
if (quote != '\0') {
transition_array[quote][static_cast<uint8_t>(CSVState::DELIMITER)] = CSVState::QUOTED;
}
if (delimiter_first_byte != ' ') {
transition_array[' '][static_cast<uint8_t>(CSVState::DELIMITER)] = CSVState::EMPTY_SPACE;
}
Expand Down Expand Up @@ -164,7 +166,9 @@ void CSVStateMachineCache::Insert(const CSVStateMachineOptions &state_machine_op
transition_array[static_cast<uint8_t>('\r')][static_cast<uint8_t>(CSVState::RECORD_SEPARATOR)] =
CSVState::RECORD_SEPARATOR;
}
transition_array[quote][static_cast<uint8_t>(CSVState::RECORD_SEPARATOR)] = CSVState::QUOTED;
if (quote != '\0') {
transition_array[quote][static_cast<uint8_t>(CSVState::RECORD_SEPARATOR)] = CSVState::QUOTED;
}
if (delimiter_first_byte != ' ') {
transition_array[' '][static_cast<uint8_t>(CSVState::RECORD_SEPARATOR)] = CSVState::EMPTY_SPACE;
}
Expand All @@ -180,7 +184,9 @@ void CSVStateMachineCache::Insert(const CSVStateMachineOptions &state_machine_op
CSVState::RECORD_SEPARATOR;
transition_array[static_cast<uint8_t>('\r')][static_cast<uint8_t>(CSVState::CARRIAGE_RETURN)] =
CSVState::CARRIAGE_RETURN;
transition_array[quote][static_cast<uint8_t>(CSVState::CARRIAGE_RETURN)] = CSVState::QUOTED;
if (quote != '\0') {
transition_array[quote][static_cast<uint8_t>(CSVState::CARRIAGE_RETURN)] = CSVState::QUOTED;
}
if (delimiter_first_byte != ' ') {
transition_array[' '][static_cast<uint8_t>(CSVState::CARRIAGE_RETURN)] = CSVState::EMPTY_SPACE;
}
Expand Down Expand Up @@ -240,7 +246,9 @@ void CSVStateMachineCache::Insert(const CSVStateMachineOptions &state_machine_op
transition_array[static_cast<uint8_t>('\r')][static_cast<uint8_t>(CSVState::NOT_SET)] =
CSVState::RECORD_SEPARATOR;
}
transition_array[quote][static_cast<uint8_t>(CSVState::NOT_SET)] = CSVState::QUOTED;
if (quote != '\0') {
transition_array[quote][static_cast<uint8_t>(CSVState::NOT_SET)] = CSVState::QUOTED;
}
if (delimiter_first_byte != ' ') {
transition_array[' '][static_cast<uint8_t>(CSVState::NOT_SET)] = CSVState::EMPTY_SPACE;
}
Expand Down Expand Up @@ -274,7 +282,9 @@ void CSVStateMachineCache::Insert(const CSVStateMachineOptions &state_machine_op
transition_array[static_cast<uint8_t>('\r')][static_cast<uint8_t>(CSVState::EMPTY_SPACE)] =
CSVState::RECORD_SEPARATOR;
}
transition_array[quote][static_cast<uint8_t>(CSVState::EMPTY_SPACE)] = CSVState::QUOTED;
if (quote != '\0') {
transition_array[quote][static_cast<uint8_t>(CSVState::EMPTY_SPACE)] = CSVState::QUOTED;
}
if (comment != '\0') {
transition_array[comment][static_cast<uint8_t>(CSVState::EMPTY_SPACE)] = CSVState::COMMENT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,13 @@ CSVError CSVError::IncorrectColumnAmountError(const CSVReaderOptions &options, i
}
// How many columns were expected and how many were found
error << "Expected Number of Columns: " << options.dialect_options.num_cols << " Found: " << actual_columns + 1;
idx_t byte_pos = byte_position.GetIndex() == 0 ? 0 : byte_position.GetIndex() - 1;
if (actual_columns >= options.dialect_options.num_cols) {
return CSVError(error.str(), TOO_MANY_COLUMNS, actual_columns, csv_row, error_info, row_byte_position,
byte_position.GetIndex() - 1, options, how_to_fix_it.str(), current_path);
return CSVError(error.str(), TOO_MANY_COLUMNS, actual_columns, csv_row, error_info, row_byte_position, byte_pos,
options, how_to_fix_it.str(), current_path);
} else {
return CSVError(error.str(), TOO_FEW_COLUMNS, actual_columns, csv_row, error_info, row_byte_position,
byte_position.GetIndex() - 1, options, how_to_fix_it.str(), current_path);
return CSVError(error.str(), TOO_FEW_COLUMNS, actual_columns, csv_row, error_info, row_byte_position, byte_pos,
options, how_to_fix_it.str(), current_path);
}
}

Expand Down
Loading

0 comments on commit a9bf1a6

Please sign in to comment.