diff --git a/src/duckdb/extension/core_functions/aggregate/holistic/mode.cpp b/src/duckdb/extension/core_functions/aggregate/holistic/mode.cpp index bdfa0fc8..8c35fc8c 100644 --- a/src/duckdb/extension/core_functions/aggregate/holistic/mode.cpp +++ b/src/duckdb/extension/core_functions/aggregate/holistic/mode.cpp @@ -78,7 +78,7 @@ struct ModeState { //! The collection being read const ColumnDataCollection *inputs; //! The state used for reading the collection on this thread - ColumnDataScanState scan; + ColumnDataScanState *scan = nullptr; //! The data chunk paged into into DataChunk page; //! The data pointer @@ -93,31 +93,37 @@ struct ModeState { if (mode) { delete mode; } + if (scan) { + delete scan; + } } void InitializePage(const WindowPartitionInput &partition) { + if (!scan) { + scan = new ColumnDataScanState(); + } if (page.ColumnCount() == 0) { D_ASSERT(partition.inputs); inputs = partition.inputs; D_ASSERT(partition.column_ids.size() == 1); - inputs->InitializeScan(scan, partition.column_ids); - inputs->InitializeScanChunk(scan, page); + inputs->InitializeScan(*scan, partition.column_ids); + inputs->InitializeScanChunk(*scan, page); } } inline sel_t RowOffset(idx_t row_idx) const { D_ASSERT(RowIsVisible(row_idx)); - return UnsafeNumericCast(row_idx - scan.current_row_index); + return UnsafeNumericCast(row_idx - scan->current_row_index); } inline bool RowIsVisible(idx_t row_idx) const { - return (row_idx < scan.next_row_index && scan.current_row_index <= row_idx); + return (row_idx < scan->next_row_index && scan->current_row_index <= row_idx); } inline idx_t Seek(idx_t row_idx) { if (!RowIsVisible(row_idx)) { D_ASSERT(inputs); - inputs->Seek(row_idx, scan, page); + inputs->Seek(row_idx, *scan, page); data = FlatVector::GetData(page.data[0]); validity = &FlatVector::Validity(page.data[0]); } diff --git a/src/duckdb/extension/core_functions/aggregate/nested/binned_histogram.cpp b/src/duckdb/extension/core_functions/aggregate/nested/binned_histogram.cpp index eda2a653..fc184b8b 100644 --- a/src/duckdb/extension/core_functions/aggregate/nested/binned_histogram.cpp +++ b/src/duckdb/extension/core_functions/aggregate/nested/binned_histogram.cpp @@ -342,6 +342,9 @@ static AggregateFunction GetHistogramBinFunction(const LogicalType &type) { template AggregateFunction GetHistogramBinFunction(const LogicalType &type) { + if (type.id() == LogicalTypeId::DECIMAL) { + return GetHistogramBinFunction(LogicalType::DOUBLE); + } switch (type.InternalType()) { #ifndef DUCKDB_SMALLER_BINARY case PhysicalType::BOOL: diff --git a/src/duckdb/extension/parquet/column_writer.cpp b/src/duckdb/extension/parquet/column_writer.cpp index add370c1..ac854497 100644 --- a/src/duckdb/extension/parquet/column_writer.cpp +++ b/src/duckdb/extension/parquet/column_writer.cpp @@ -2119,8 +2119,9 @@ void ArrayColumnWriter::Write(ColumnWriterState &state_p, Vector &vector, idx_t struct double_na_equal { double_na_equal() : val(0) { } - double_na_equal(const double val_p) : val(val_p) { + explicit double_na_equal(const double val_p) : val(val_p) { } + // NOLINTNEXTLINE: allow implicit conversion to double operator double() const { return val; } @@ -2137,8 +2138,9 @@ struct double_na_equal { struct float_na_equal { float_na_equal() : val(0) { } - float_na_equal(const float val_p) : val(val_p) { + explicit float_na_equal(const float val_p) : val(val_p) { } + // NOLINTNEXTLINE: allow implicit conversion to float operator float() const { return val; } diff --git a/src/duckdb/extension/parquet/parquet_metadata.cpp b/src/duckdb/extension/parquet/parquet_metadata.cpp index 1c3b9b2f..456c9b3e 100644 --- a/src/duckdb/extension/parquet/parquet_metadata.cpp +++ b/src/duckdb/extension/parquet/parquet_metadata.cpp @@ -641,7 +641,7 @@ void ParquetMetaDataOperatorData::ExecuteBloomProbe(ClientContext &context, cons auto bloom_excludes = ParquetStatisticsUtils::BloomFilterExcludes(filter, column.meta_data, *protocol, allocator); current_chunk.SetValue(0, count, Value(file_path)); - current_chunk.SetValue(1, count, Value::BIGINT(row_group_idx)); + current_chunk.SetValue(1, count, Value::BIGINT(NumericCast(row_group_idx))); current_chunk.SetValue(2, count, Value::BOOLEAN(bloom_excludes)); count++; diff --git a/src/duckdb/extension/parquet/parquet_statistics.cpp b/src/duckdb/extension/parquet/parquet_statistics.cpp index b20510be..be5b28e6 100644 --- a/src/duckdb/extension/parquet/parquet_statistics.cpp +++ b/src/duckdb/extension/parquet/parquet_statistics.cpp @@ -553,7 +553,7 @@ ParquetBloomFilter::ParquetBloomFilter(idx_t num_entries, double bloom_filter_fa // see http://tfk.mit.edu/pdf/bloom.pdf double f = bloom_filter_false_positive_ratio; double k = 8.0; - double n = num_entries; + double n = LossyNumericCast(num_entries); double m = -k * n / std::log(1 - std::pow(f, 1 / k)); auto b = MaxValue(NextPowerOfTwo(m / k) / 32, 1); @@ -573,14 +573,14 @@ ParquetBloomFilter::ParquetBloomFilter(unique_ptr data_p) { } void ParquetBloomFilter::FilterInsert(uint64_t x) { - auto blocks = (ParquetBloomBlock *)(data->ptr); + auto blocks = reinterpret_cast(data->ptr); uint64_t i = ((x >> 32) * block_count) >> 32; auto &b = blocks[i]; ParquetBloomBlock::BlockInsert(b, x); } bool ParquetBloomFilter::FilterCheck(uint64_t x) { - auto blocks = (ParquetBloomBlock *)(data->ptr); + auto blocks = reinterpret_cast(data->ptr); auto i = ((x >> 32) * block_count) >> 32; return ParquetBloomBlock::BlockCheck(blocks[i], x); } @@ -595,12 +595,12 @@ static uint8_t PopCnt64(uint64_t n) { } double ParquetBloomFilter::OneRatio() { - auto bloom_ptr = (uint64_t *)data->ptr; + auto bloom_ptr = reinterpret_cast(data->ptr); idx_t one_count = 0; for (idx_t b_idx = 0; b_idx < data->len / sizeof(uint64_t); ++b_idx) { one_count += PopCnt64(bloom_ptr[b_idx]); } - return one_count / (data->len * 8.0); + return LossyNumericCast(one_count) / (data->len * 8.0); } ResizeableBuffer *ParquetBloomFilter::Get() { diff --git a/src/duckdb/extension/parquet/parquet_writer.cpp b/src/duckdb/extension/parquet/parquet_writer.cpp index 7a3ddb9a..883f0306 100644 --- a/src/duckdb/extension/parquet/parquet_writer.cpp +++ b/src/duckdb/extension/parquet/parquet_writer.cpp @@ -534,7 +534,7 @@ void ParquetWriter::Finalize() { // write nonsense bloom filter header duckdb_parquet::BloomFilterHeader filter_header; auto bloom_filter_bytes = bloom_filter_entry.bloom_filter->Get(); - filter_header.numBytes = bloom_filter_bytes->len; + filter_header.numBytes = NumericCast(bloom_filter_bytes->len); filter_header.algorithm.__set_BLOCK(duckdb_parquet::SplitBlockAlgorithm()); filter_header.compression.__set_UNCOMPRESSED(duckdb_parquet::Uncompressed()); filter_header.hash.__set_XXHASH(duckdb_parquet::XxHash()); @@ -544,14 +544,15 @@ void ParquetWriter::Finalize() { file_meta_data.row_groups[bloom_filter_entry.row_group_idx].columns[bloom_filter_entry.column_idx]; column_chunk.meta_data.__isset.bloom_filter_offset = true; - column_chunk.meta_data.bloom_filter_offset = writer->GetTotalWritten(); + column_chunk.meta_data.bloom_filter_offset = NumericCast(writer->GetTotalWritten()); auto bloom_filter_header_size = Write(filter_header); // write actual data WriteData(bloom_filter_bytes->ptr, bloom_filter_bytes->len); column_chunk.meta_data.__isset.bloom_filter_length = true; - column_chunk.meta_data.bloom_filter_length = bloom_filter_header_size + bloom_filter_bytes->len; + column_chunk.meta_data.bloom_filter_length = + NumericCast(bloom_filter_header_size + bloom_filter_bytes->len); } const auto metadata_start_offset = writer->GetTotalWritten(); diff --git a/src/duckdb/src/common/fsst.cpp b/src/duckdb/src/common/fsst.cpp index 74b747de..1e28ad5a 100644 --- a/src/duckdb/src/common/fsst.cpp +++ b/src/duckdb/src/common/fsst.cpp @@ -20,8 +20,8 @@ string_t FSSTPrimitives::DecompressValue(void *duckdb_fsst_decoder, Vector &resu decompressed_string_size); } -Value FSSTPrimitives::DecompressValue(void *duckdb_fsst_decoder, const char *compressed_string, - const idx_t compressed_string_len, vector &decompress_buffer) { +string FSSTPrimitives::DecompressValue(void *duckdb_fsst_decoder, const char *compressed_string, + const idx_t compressed_string_len, vector &decompress_buffer) { auto compressed_string_ptr = (unsigned char *)compressed_string; // NOLINT auto fsst_decoder = reinterpret_cast(duckdb_fsst_decoder); @@ -30,7 +30,7 @@ Value FSSTPrimitives::DecompressValue(void *duckdb_fsst_decoder, const char *com D_ASSERT(!decompress_buffer.empty()); D_ASSERT(decompressed_string_size <= decompress_buffer.size() - 1); - return Value(string(char_ptr_cast(decompress_buffer.data()), decompressed_string_size)); + return string(char_ptr_cast(decompress_buffer.data()), decompressed_string_size); } } // namespace duckdb diff --git a/src/duckdb/src/common/operator/cast_operators.cpp b/src/duckdb/src/common/operator/cast_operators.cpp index 5e1dc442..fe1ce26a 100644 --- a/src/duckdb/src/common/operator/cast_operators.cpp +++ b/src/duckdb/src/common/operator/cast_operators.cpp @@ -2298,7 +2298,7 @@ bool DoubleToDecimalCast(SRC input, DST &result, CastParameters ¶meters, uin double roundedValue = round(value); if (roundedValue <= -NumericHelper::DOUBLE_POWERS_OF_TEN[width] || roundedValue >= NumericHelper::DOUBLE_POWERS_OF_TEN[width]) { - string error = StringUtil::Format("Could not cast value %f to DECIMAL(%d,%d)", value, width, scale); + string error = StringUtil::Format("Could not cast value %f to DECIMAL(%d,%d)", input, width, scale); HandleCastError::AssignError(error, parameters); return false; } diff --git a/src/duckdb/src/common/types/row/tuple_data_collection.cpp b/src/duckdb/src/common/types/row/tuple_data_collection.cpp index 77b8befa..7db3ba37 100644 --- a/src/duckdb/src/common/types/row/tuple_data_collection.cpp +++ b/src/duckdb/src/common/types/row/tuple_data_collection.cpp @@ -169,6 +169,8 @@ void TupleDataCollection::InitializeChunkState(TupleDataChunkState &chunk_state, } InitializeVectorFormat(chunk_state.vector_data, types); + chunk_state.cached_cast_vectors.clear(); + chunk_state.cached_cast_vector_cache.clear(); for (auto &col : column_ids) { auto &type = types[col]; if (TypeVisitor::Contains(type, LogicalTypeId::ARRAY)) { diff --git a/src/duckdb/src/common/types/vector.cpp b/src/duckdb/src/common/types/vector.cpp index 48f9c504..325246ad 100644 --- a/src/duckdb/src/common/types/vector.cpp +++ b/src/duckdb/src/common/types/vector.cpp @@ -601,9 +601,16 @@ Value Vector::GetValueInternal(const Vector &v_p, idx_t index_p) { auto str_compressed = reinterpret_cast(data)[index]; auto decoder = FSSTVector::GetDecoder(*vector); auto &decompress_buffer = FSSTVector::GetDecompressBuffer(*vector); - Value result = FSSTPrimitives::DecompressValue(decoder, str_compressed.GetData(), str_compressed.GetSize(), - decompress_buffer); - return result; + auto string_val = FSSTPrimitives::DecompressValue(decoder, str_compressed.GetData(), str_compressed.GetSize(), + decompress_buffer); + switch (vector->GetType().id()) { + case LogicalTypeId::VARCHAR: + return Value(std::move(string_val)); + case LogicalTypeId::BLOB: + return Value::BLOB_RAW(string_val); + default: + throw InternalException("Unsupported vector type for FSST vector"); + } } switch (vector->GetType().id()) { diff --git a/src/duckdb/src/execution/aggregate_hashtable.cpp b/src/duckdb/src/execution/aggregate_hashtable.cpp index 542564bd..65619c3f 100644 --- a/src/duckdb/src/execution/aggregate_hashtable.cpp +++ b/src/duckdb/src/execution/aggregate_hashtable.cpp @@ -51,6 +51,9 @@ GroupedAggregateHashTable::GroupedAggregateHashTable(ClientContext &context, All // Partitioned data and pointer table InitializePartitionedData(); + if (radix_bits >= UNPARTITIONED_RADIX_BITS_THRESHOLD) { + InitializeUnpartitionedData(); + } Resize(initial_capacity); // Predicates @@ -72,11 +75,67 @@ void GroupedAggregateHashTable::InitializePartitionedData() { D_ASSERT(GetLayout().GetDataWidth() == layout.GetDataWidth()); D_ASSERT(GetLayout().GetRowWidth() == layout.GetRowWidth()); - partitioned_data->InitializeAppendState(state.append_state, TupleDataPinProperties::KEEP_EVERYTHING_PINNED); + partitioned_data->InitializeAppendState(state.partitioned_append_state, + TupleDataPinProperties::KEEP_EVERYTHING_PINNED); +} + +void GroupedAggregateHashTable::InitializeUnpartitionedData() { + D_ASSERT(radix_bits >= UNPARTITIONED_RADIX_BITS_THRESHOLD); + if (!unpartitioned_data) { + unpartitioned_data = + make_uniq(buffer_manager, layout, 0ULL, layout.ColumnCount() - 1); + } else { + unpartitioned_data->Reset(); + } + unpartitioned_data->InitializeAppendState(state.unpartitioned_append_state, + TupleDataPinProperties::KEEP_EVERYTHING_PINNED); +} + +const PartitionedTupleData &GroupedAggregateHashTable::GetPartitionedData() const { + return *partitioned_data; } -unique_ptr &GroupedAggregateHashTable::GetPartitionedData() { - return partitioned_data; +unique_ptr GroupedAggregateHashTable::AcquirePartitionedData() { + // Flush/unpin partitioned data + partitioned_data->FlushAppendState(state.partitioned_append_state); + partitioned_data->Unpin(); + + if (radix_bits >= UNPARTITIONED_RADIX_BITS_THRESHOLD) { + // Flush/unpin unpartitioned data and append to partitioned data + if (unpartitioned_data) { + unpartitioned_data->FlushAppendState(state.unpartitioned_append_state); + unpartitioned_data->Unpin(); + unpartitioned_data->Repartition(*partitioned_data); + } + InitializeUnpartitionedData(); + } + + // Return and re-initialize + auto result = std::move(partitioned_data); + InitializePartitionedData(); + return result; +} + +void GroupedAggregateHashTable::Abandon() { + if (radix_bits >= UNPARTITIONED_RADIX_BITS_THRESHOLD) { + // Flush/unpin unpartitioned data and append to partitioned data + if (unpartitioned_data) { + unpartitioned_data->FlushAppendState(state.unpartitioned_append_state); + unpartitioned_data->Unpin(); + unpartitioned_data->Repartition(*partitioned_data); + } + InitializeUnpartitionedData(); + } + + // Start over + ClearPointerTable(); + count = 0; +} + +void GroupedAggregateHashTable::Repartition() { + auto old = AcquirePartitionedData(); + D_ASSERT(old->GetPartitions().size() != partitioned_data->GetPartitions().size()); + old->Repartition(*partitioned_data); } shared_ptr GroupedAggregateHashTable::GetAggregateAllocator() { @@ -163,10 +222,6 @@ void GroupedAggregateHashTable::ClearPointerTable() { std::fill_n(entries, capacity, ht_entry_t()); } -void GroupedAggregateHashTable::ResetCount() { - count = 0; -} - void GroupedAggregateHashTable::SetRadixBits(idx_t radix_bits_p) { radix_bits = radix_bits_p; } @@ -189,39 +244,43 @@ void GroupedAggregateHashTable::Resize(idx_t size) { bitmask = capacity - 1; if (Count() != 0) { - for (auto &data_collection : partitioned_data->GetPartitions()) { - if (data_collection->Count() == 0) { - continue; - } - TupleDataChunkIterator iterator(*data_collection, TupleDataPinProperties::ALREADY_PINNED, false); - const auto row_locations = iterator.GetRowLocations(); - do { - for (idx_t i = 0; i < iterator.GetCurrentChunkCount(); i++) { - const auto &row_location = row_locations[i]; - const auto hash = Load(row_location + hash_offset); - - // Find an empty entry - auto entry_idx = ApplyBitMask(hash); - D_ASSERT(entry_idx == hash % capacity); - while (entries[entry_idx].IsOccupied()) { - entry_idx++; - if (entry_idx >= capacity) { - entry_idx = 0; - } - } - auto &entry = entries[entry_idx]; - D_ASSERT(!entry.IsOccupied()); - entry.SetSalt(ht_entry_t::ExtractSalt(hash)); - entry.SetPointer(row_location); - D_ASSERT(entry.IsOccupied()); - } - } while (iterator.Next()); + ReinsertTuples(*partitioned_data); + if (radix_bits >= UNPARTITIONED_RADIX_BITS_THRESHOLD) { + ReinsertTuples(*unpartitioned_data); } } Verify(); } +void GroupedAggregateHashTable::ReinsertTuples(PartitionedTupleData &data) { + for (auto &data_collection : data.GetPartitions()) { + if (data_collection->Count() == 0) { + continue; + } + TupleDataChunkIterator iterator(*data_collection, TupleDataPinProperties::ALREADY_PINNED, false); + const auto row_locations = iterator.GetRowLocations(); + do { + for (idx_t i = 0; i < iterator.GetCurrentChunkCount(); i++) { + const auto &row_location = row_locations[i]; + const auto hash = Load(row_location + hash_offset); + + // Find an empty entry + auto ht_offset = ApplyBitMask(hash); + D_ASSERT(ht_offset == hash % capacity); + while (entries[ht_offset].IsOccupied()) { + IncrementAndWrap(ht_offset, bitmask); + } + auto &entry = entries[ht_offset]; + D_ASSERT(!entry.IsOccupied()); + entry.SetSalt(ht_entry_t::ExtractSalt(hash)); + entry.SetPointer(row_location); + D_ASSERT(entry.IsOccupied()); + } + } while (iterator.Next()); + } +} + idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, DataChunk &payload, AggregateType filter) { unsafe_vector aggregate_filter; @@ -497,30 +556,31 @@ idx_t GroupedAggregateHashTable::FindOrCreateGroupsInternal(DataChunk &groups, V D_ASSERT(state.hash_salts.GetType() == LogicalType::HASH); // Need to fit the entire vector, and resize at threshold - if (Count() + groups.size() > capacity || Count() + groups.size() > ResizeThreshold()) { + const auto chunk_size = groups.size(); + if (Count() + chunk_size > capacity || Count() + chunk_size > ResizeThreshold()) { Verify(); Resize(capacity * 2); } - D_ASSERT(capacity - Count() >= groups.size()); // we need to be able to fit at least one vector of data + D_ASSERT(capacity - Count() >= chunk_size); // we need to be able to fit at least one vector of data - group_hashes_v.Flatten(groups.size()); - auto hashes = FlatVector::GetData(group_hashes_v); + group_hashes_v.Flatten(chunk_size); + const auto hashes = FlatVector::GetData(group_hashes_v); - addresses_v.Flatten(groups.size()); - auto addresses = FlatVector::GetData(addresses_v); + addresses_v.Flatten(chunk_size); + const auto addresses = FlatVector::GetData(addresses_v); // Compute the entry in the table based on the hash using a modulo, // and precompute the hash salts for faster comparison below - auto ht_offsets = FlatVector::GetData(state.ht_offsets); + const auto ht_offsets = FlatVector::GetData(state.ht_offsets); const auto hash_salts = FlatVector::GetData(state.hash_salts); - for (idx_t r = 0; r < groups.size(); r++) { + for (idx_t r = 0; r < chunk_size; r++) { const auto &hash = hashes[r]; ht_offsets[r] = ApplyBitMask(hash); D_ASSERT(ht_offsets[r] == hash % capacity); hash_salts[r] = ht_entry_t::ExtractSalt(hash); } - // we start out with all entries [0, 1, 2, ..., groups.size()] + // we start out with all entries [0, 1, 2, ..., chunk_size] const SelectionVector *sel_vector = FlatVector::IncrementalSelectionVector(); // Make a chunk that references the groups and the hashes and convert to unified format @@ -535,15 +595,14 @@ idx_t GroupedAggregateHashTable::FindOrCreateGroupsInternal(DataChunk &groups, V state.group_chunk.SetCardinality(groups); // convert all vectors to unified format - auto &chunk_state = state.append_state.chunk_state; - TupleDataCollection::ToUnifiedFormat(chunk_state, state.group_chunk); + TupleDataCollection::ToUnifiedFormat(state.partitioned_append_state.chunk_state, state.group_chunk); if (!state.group_data) { state.group_data = make_unsafe_uniq_array_uninitialized(state.group_chunk.ColumnCount()); } - TupleDataCollection::GetVectorData(chunk_state, state.group_data.get()); + TupleDataCollection::GetVectorData(state.partitioned_append_state.chunk_state, state.group_data.get()); idx_t new_group_count = 0; - idx_t remaining_entries = groups.size(); + idx_t remaining_entries = chunk_size; idx_t iteration_count; for (iteration_count = 0; remaining_entries > 0 && iteration_count < capacity; iteration_count++) { idx_t new_entry_count = 0; @@ -553,46 +612,55 @@ idx_t GroupedAggregateHashTable::FindOrCreateGroupsInternal(DataChunk &groups, V // For each remaining entry, figure out whether or not it belongs to a full or empty group for (idx_t i = 0; i < remaining_entries; i++) { const auto index = sel_vector->get_index(i); - const auto &salt = hash_salts[index]; + const auto salt = hash_salts[index]; auto &ht_offset = ht_offsets[index]; idx_t inner_iteration_count; for (inner_iteration_count = 0; inner_iteration_count < capacity; inner_iteration_count++) { auto &entry = entries[ht_offset]; - if (entry.IsOccupied()) { // Cell is occupied: Compare salts - if (entry.GetSalt() == salt) { - // Same salt, compare group keys - state.group_compare_vector.set_index(need_compare_count++, index); - break; - } - // Different salts, move to next entry (linear probing) - IncrementAndWrap(ht_offset, bitmask); - } else { // Cell is unoccupied, let's claim it - // Set salt (also marks as occupied) + if (!entry.IsOccupied()) { // Unoccupied: claim it entry.SetSalt(salt); - // Update selection lists for outer loops state.empty_vector.set_index(new_entry_count++, index); new_groups_out.set_index(new_group_count++, index); break; } + + if (DUCKDB_LIKELY(entry.GetSalt() == salt)) { // Matching salt: compare groups + state.group_compare_vector.set_index(need_compare_count++, index); + break; + } + + // Linear probing + IncrementAndWrap(ht_offset, bitmask); } - if (inner_iteration_count == capacity) { + if (DUCKDB_UNLIKELY(inner_iteration_count == capacity)) { throw InternalException("Maximum inner iteration count reached in GroupedAggregateHashTable"); } } if (new_entry_count != 0) { // Append everything that belongs to an empty group - partitioned_data->AppendUnified(state.append_state, state.group_chunk, state.empty_vector, new_entry_count); - RowOperations::InitializeStates(layout, chunk_state.row_locations, + optional_ptr data; + optional_ptr append_state; + if (radix_bits >= UNPARTITIONED_RADIX_BITS_THRESHOLD && + new_entry_count / RadixPartitioning::NumberOfPartitions(radix_bits) <= 4) { + TupleDataCollection::ToUnifiedFormat(state.unpartitioned_append_state.chunk_state, state.group_chunk); + data = unpartitioned_data.get(); + append_state = &state.unpartitioned_append_state; + } else { + data = partitioned_data.get(); + append_state = &state.partitioned_append_state; + } + data->AppendUnified(*append_state, state.group_chunk, state.empty_vector, new_entry_count); + RowOperations::InitializeStates(layout, append_state->chunk_state.row_locations, *FlatVector::IncrementalSelectionVector(), new_entry_count); // Set the entry pointers in the 1st part of the HT now that the data has been appended - const auto row_locations = FlatVector::GetData(chunk_state.row_locations); - const auto &row_sel = state.append_state.reverse_partition_sel; + const auto row_locations = FlatVector::GetData(append_state->chunk_state.row_locations); + const auto &row_sel = append_state->reverse_partition_sel; for (idx_t new_entry_idx = 0; new_entry_idx < new_entry_count; new_entry_idx++) { - const auto index = state.empty_vector.get_index(new_entry_idx); - const auto row_idx = row_sel.get_index(index); + const auto &index = state.empty_vector[new_entry_idx]; + const auto &row_idx = row_sel[index]; const auto &row_location = row_locations[row_idx]; auto &entry = entries[ht_offsets[index]]; @@ -605,19 +673,20 @@ idx_t GroupedAggregateHashTable::FindOrCreateGroupsInternal(DataChunk &groups, V if (need_compare_count != 0) { // Get the pointers to the rows that need to be compared for (idx_t need_compare_idx = 0; need_compare_idx < need_compare_count; need_compare_idx++) { - const auto index = state.group_compare_vector.get_index(need_compare_idx); + const auto &index = state.group_compare_vector[need_compare_idx]; const auto &entry = entries[ht_offsets[index]]; addresses[index] = entry.GetPointer(); } // Perform group comparisons - row_matcher.Match(state.group_chunk, chunk_state.vector_data, state.group_compare_vector, - need_compare_count, layout, addresses_v, &state.no_match_vector, no_match_count); + row_matcher.Match(state.group_chunk, state.partitioned_append_state.chunk_state.vector_data, + state.group_compare_vector, need_compare_count, layout, addresses_v, + &state.no_match_vector, no_match_count); } // Linear probing: each of the entries that do not match move to the next entry in the HT for (idx_t i = 0; i < no_match_count; i++) { - const auto index = state.no_match_vector.get_index(i); + const auto &index = state.no_match_vector[i]; auto &ht_offset = ht_offsets[index]; IncrementAndWrap(ht_offset, bitmask); } @@ -689,7 +758,8 @@ struct FlushMoveState { }; void GroupedAggregateHashTable::Combine(GroupedAggregateHashTable &other) { - auto other_data = other.partitioned_data->GetUnpartitioned(); + auto other_partitioned_data = other.AcquirePartitionedData(); + auto other_data = other_partitioned_data->GetUnpartitioned(); Combine(*other_data); // Inherit ownership to all stored aggregate allocators @@ -714,25 +784,21 @@ void GroupedAggregateHashTable::Combine(TupleDataCollection &other_data, optiona idx_t chunk_idx = 0; const auto chunk_count = other_data.ChunkCount(); while (fm_state.Scan()) { + const auto input_chunk_size = fm_state.groups.size(); FindOrCreateGroups(fm_state.groups, fm_state.hashes, fm_state.group_addresses, fm_state.new_groups_sel); RowOperations::CombineStates(row_state, layout, fm_state.scan_state.chunk_state.row_locations, - fm_state.group_addresses, fm_state.groups.size()); + fm_state.group_addresses, input_chunk_size); if (layout.HasDestructor()) { RowOperations::DestroyStates(row_state, layout, fm_state.scan_state.chunk_state.row_locations, - fm_state.groups.size()); + input_chunk_size); } if (progress) { - *progress = double(++chunk_idx) / double(chunk_count); + *progress = static_cast(++chunk_idx) / static_cast(chunk_count); } } Verify(); } -void GroupedAggregateHashTable::UnpinData() { - partitioned_data->FlushAppendState(state.append_state); - partitioned_data->Unpin(); -} - } // namespace duckdb diff --git a/src/duckdb/src/execution/operator/helper/physical_verify_vector.cpp b/src/duckdb/src/execution/operator/helper/physical_verify_vector.cpp index 5fc627c9..ea19d7ca 100644 --- a/src/duckdb/src/execution/operator/helper/physical_verify_vector.cpp +++ b/src/duckdb/src/execution/operator/helper/physical_verify_vector.cpp @@ -74,6 +74,14 @@ OperatorResultType VerifyEmitSequenceVector(const DataChunk &input, DataChunk &c break; } } + bool can_be_constant = true; + switch (chunk.data[c].GetType().id()) { + case LogicalTypeId::INTERVAL: + can_be_constant = false; + break; + default: + break; + } ConstantOrSequenceInfo info; info.is_constant = true; for (idx_t k = state.const_idx; k < input.size(); k++) { @@ -81,7 +89,7 @@ OperatorResultType VerifyEmitSequenceVector(const DataChunk &input, DataChunk &c if (info.values.empty()) { info.values.push_back(std::move(val)); } else if (info.is_constant) { - if (!ValueOperations::DistinctFrom(val, info.values[0])) { + if (!ValueOperations::DistinctFrom(val, info.values[0]) && can_be_constant) { // found the same value! continue info.values.push_back(std::move(val)); continue; diff --git a/src/duckdb/src/execution/operator/order/physical_top_n.cpp b/src/duckdb/src/execution/operator/order/physical_top_n.cpp index c55b5069..669c25a5 100644 --- a/src/duckdb/src/execution/operator/order/physical_top_n.cpp +++ b/src/duckdb/src/execution/operator/order/physical_top_n.cpp @@ -436,6 +436,7 @@ void TopNHeap::Reduce() { new_heap_data.Slice(heap_data, new_payload_sel, heap.size()); new_heap_data.Flatten(); + sort_key_heap.Destroy(); sort_key_heap.Move(new_sort_heap); heap_data.Reference(new_heap_data); } diff --git a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp index fb5543d3..a9df807d 100644 --- a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp +++ b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp @@ -92,7 +92,7 @@ class RadixHTGlobalSinkState; struct RadixHTConfig { public: - explicit RadixHTConfig(ClientContext &context, RadixHTGlobalSinkState &sink); + explicit RadixHTConfig(RadixHTGlobalSinkState &sink); void SetRadixBits(const idx_t &radix_bits_p); bool SetRadixBitsToExternal(); @@ -100,13 +100,19 @@ struct RadixHTConfig { private: void SetRadixBitsInternal(idx_t radix_bits_p, bool external); - static idx_t InitialSinkRadixBits(idx_t number_of_threads_p); - static idx_t MaximumSinkRadixBits(idx_t number_of_threads_p); - static idx_t SinkCapacity(idx_t number_of_threads_p); + idx_t InitialSinkRadixBits() const; + idx_t MaximumSinkRadixBits() const; + idx_t SinkCapacity() const; + +private: + //! The global sink state + RadixHTGlobalSinkState &sink; public: //! Number of threads (from TaskScheduler) const idx_t number_of_threads; + //! Width of tuples + const idx_t row_width; //! Capacity of HTs during the Sink const idx_t sink_capacity; @@ -119,17 +125,20 @@ struct RadixHTConfig { static constexpr idx_t L3_CACHE_SIZE = 1572864 / 2; //! Sink radix bits to initialize with - static constexpr idx_t MAXIMUM_INITIAL_SINK_RADIX_BITS = 3; + static constexpr idx_t MAXIMUM_INITIAL_SINK_RADIX_BITS = 4; //! Maximum Sink radix bits (independent of threads) - static constexpr idx_t MAXIMUM_FINAL_SINK_RADIX_BITS = 7; + static constexpr idx_t MAXIMUM_FINAL_SINK_RADIX_BITS = 8; - //! The global sink state - RadixHTGlobalSinkState &sink; //! Current thread-global sink radix bits atomic sink_radix_bits; //! Maximum Sink radix bits (set based on number of threads) const idx_t maximum_sink_radix_bits; + //! Thresholds at which we reduce the sink radix bits + //! This needed to reduce cache misses when we have very wide rows + static constexpr idx_t ROW_WIDTH_THRESHOLD_ONE = 32; + static constexpr idx_t ROW_WIDTH_THRESHOLD_TWO = 64; + public: //! If we have this many or less threads, we grow the HT, otherwise we abandon static constexpr idx_t GROW_STRATEGY_THREAD_THRESHOLD = 2; @@ -153,11 +162,6 @@ class RadixHTGlobalSinkState : public GlobalSinkState { unique_ptr temporary_memory_state; idx_t minimum_reservation; - //! The radix HT - const RadixPartitionedHashTable &radix_ht; - //! Config for partitioning - RadixHTConfig config; - //! Whether we've called Finalize bool finalized; //! Whether we are doing an external aggregation @@ -169,6 +173,11 @@ class RadixHTGlobalSinkState : public GlobalSinkState { //! If any thread has called combine atomic any_combined; + //! The radix HT + const RadixPartitionedHashTable &radix_ht; + //! Config for partitioning + RadixHTConfig config; + //! Uncombined partitioned data that will be put into the AggregatePartitions unique_ptr uncombined_data; //! Allocators used during the Sink/Finalize @@ -190,9 +199,9 @@ class RadixHTGlobalSinkState : public GlobalSinkState { RadixHTGlobalSinkState::RadixHTGlobalSinkState(ClientContext &context_p, const RadixPartitionedHashTable &radix_ht_p) : context(context_p), temporary_memory_state(TemporaryMemoryManager::Get(context).Register(context)), - radix_ht(radix_ht_p), config(context, *this), finalized(false), external(false), active_threads(0), + finalized(false), external(false), active_threads(0), number_of_threads(NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())), - any_combined(false), stored_allocators_size(0), finalize_done(0), + any_combined(false), radix_ht(radix_ht_p), config(*this), stored_allocators_size(0), finalize_done(0), scan_pin_properties(TupleDataPinProperties::DESTROY_AFTER_DONE), count_before_combining(0), max_partition_size(0) { @@ -252,11 +261,10 @@ void RadixHTGlobalSinkState::Destroy() { } // LCOV_EXCL_STOP -RadixHTConfig::RadixHTConfig(ClientContext &context, RadixHTGlobalSinkState &sink_p) - : number_of_threads(NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())), - sink_capacity(SinkCapacity(number_of_threads)), sink(sink_p), - sink_radix_bits(InitialSinkRadixBits(number_of_threads)), - maximum_sink_radix_bits(MaximumSinkRadixBits(number_of_threads)) { +RadixHTConfig::RadixHTConfig(RadixHTGlobalSinkState &sink_p) + : sink(sink_p), number_of_threads(sink.number_of_threads), row_width(sink.radix_ht.GetLayout().GetRowWidth()), + sink_capacity(SinkCapacity()), sink_radix_bits(InitialSinkRadixBits()), + maximum_sink_radix_bits(MaximumSinkRadixBits()) { } void RadixHTConfig::SetRadixBits(const idx_t &radix_bits_p) { @@ -288,27 +296,34 @@ void RadixHTConfig::SetRadixBitsInternal(const idx_t radix_bits_p, bool external sink_radix_bits = radix_bits_p; } -idx_t RadixHTConfig::InitialSinkRadixBits(const idx_t number_of_threads_p) { - return MinValue(RadixPartitioning::RadixBitsOfPowerOfTwo(NextPowerOfTwo(number_of_threads_p)), +idx_t RadixHTConfig::InitialSinkRadixBits() const { + return MinValue(RadixPartitioning::RadixBitsOfPowerOfTwo(NextPowerOfTwo(number_of_threads)), MAXIMUM_INITIAL_SINK_RADIX_BITS); } -idx_t RadixHTConfig::MaximumSinkRadixBits(const idx_t number_of_threads_p) { - if (number_of_threads_p <= GROW_STRATEGY_THREAD_THRESHOLD) { - return InitialSinkRadixBits(number_of_threads_p); // Don't repartition unless we go external +idx_t RadixHTConfig::MaximumSinkRadixBits() const { + if (number_of_threads <= GROW_STRATEGY_THREAD_THRESHOLD) { + return InitialSinkRadixBits(); // Don't repartition unless we go external + } + // If rows are very wide we have to reduce the number of partitions, otherwise cache misses get out of hand + if (row_width >= ROW_WIDTH_THRESHOLD_TWO) { + return MAXIMUM_FINAL_SINK_RADIX_BITS - 2; + } + if (row_width >= ROW_WIDTH_THRESHOLD_ONE) { + return MAXIMUM_FINAL_SINK_RADIX_BITS - 1; } return MAXIMUM_FINAL_SINK_RADIX_BITS; } -idx_t RadixHTConfig::SinkCapacity(const idx_t number_of_threads_p) { +idx_t RadixHTConfig::SinkCapacity() const { // Compute cache size per active thread (assuming cache is shared) - const auto total_shared_cache_size = number_of_threads_p * L3_CACHE_SIZE; - const auto cache_per_active_thread = L1_CACHE_SIZE + L2_CACHE_SIZE + total_shared_cache_size / number_of_threads_p; + const auto total_shared_cache_size = number_of_threads * L3_CACHE_SIZE; + const auto cache_per_active_thread = L1_CACHE_SIZE + L2_CACHE_SIZE + total_shared_cache_size / number_of_threads; // Divide cache per active thread by entry size, round up to next power of two, to get capacity - const auto size_per_entry = sizeof(ht_entry_t) * GroupedAggregateHashTable::LOAD_FACTOR; - const auto capacity = - NextPowerOfTwo(LossyNumericCast(static_cast(cache_per_active_thread) / size_per_entry)); + const auto size_per_entry = LossyNumericCast(sizeof(ht_entry_t) * GroupedAggregateHashTable::LOAD_FACTOR) + + MinValue(row_width, ROW_WIDTH_THRESHOLD_TWO); + const auto capacity = NextPowerOfTwo(cache_per_active_thread / size_per_entry); // Capacity must be at least the minimum capacity return MaxValue(capacity, GroupedAggregateHashTable::InitialCapacity()); @@ -362,13 +377,12 @@ void RadixPartitionedHashTable::PopulateGroupChunk(DataChunk &group_chunk, DataC void MaybeRepartition(ClientContext &context, RadixHTGlobalSinkState &gstate, RadixHTLocalSinkState &lstate) { auto &config = gstate.config; auto &ht = *lstate.ht; - auto &partitioned_data = ht.GetPartitionedData(); // Check if we're approaching the memory limit auto &temporary_memory_state = *gstate.temporary_memory_state; const auto aggregate_allocator_size = ht.GetAggregateAllocator()->AllocationSize(); const auto total_size = - aggregate_allocator_size + partitioned_data->SizeInBytes() + ht.Capacity() * sizeof(ht_entry_t); + aggregate_allocator_size + ht.GetPartitionedData().SizeInBytes() + ht.Capacity() * sizeof(ht_entry_t); idx_t thread_limit = temporary_memory_state.GetReservation() / gstate.number_of_threads; if (total_size > thread_limit) { // We're over the thread memory limit @@ -396,11 +410,8 @@ void MaybeRepartition(ClientContext &context, RadixHTGlobalSinkState &gstate, Ra BufferManager::GetBufferManager(context), gstate.radix_ht.GetLayout(), config.GetRadixBits(), gstate.radix_ht.GetLayout().ColumnCount() - 1); } - - ht.UnpinData(); - partitioned_data->Repartition(*lstate.abandoned_data); ht.SetRadixBits(gstate.config.GetRadixBits()); - ht.InitializePartitionedData(); + ht.AcquirePartitionedData()->Repartition(*lstate.abandoned_data); } } @@ -409,13 +420,13 @@ void MaybeRepartition(ClientContext &context, RadixHTGlobalSinkState &gstate, Ra return; } - const auto partition_count = partitioned_data->PartitionCount(); + const auto partition_count = ht.GetPartitionedData().PartitionCount(); const auto current_radix_bits = RadixPartitioning::RadixBitsOfPowerOfTwo(partition_count); D_ASSERT(current_radix_bits <= config.GetRadixBits()); const auto block_size = BufferManager::GetBufferManager(context).GetBlockSize(); const auto row_size_per_partition = - partitioned_data->Count() * partitioned_data->GetLayout().GetRowWidth() / partition_count; + ht.GetPartitionedData().Count() * ht.GetPartitionedData().GetLayout().GetRowWidth() / partition_count; if (row_size_per_partition > LossyNumericCast(config.BLOCK_FILL_FACTOR * static_cast(block_size))) { // We crossed our block filling threshold, try to increment radix bits config.SetRadixBits(current_radix_bits + config.REPARTITION_RADIX_BITS); @@ -427,11 +438,8 @@ void MaybeRepartition(ClientContext &context, RadixHTGlobalSinkState &gstate, Ra } // We're out-of-sync with the global radix bits, repartition - ht.UnpinData(); - auto old_partitioned_data = std::move(partitioned_data); ht.SetRadixBits(global_radix_bits); - ht.InitializePartitionedData(); - old_partitioned_data->Repartition(*ht.GetPartitionedData()); + ht.Repartition(); } void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input, @@ -439,10 +447,7 @@ void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk auto &gstate = input.global_state.Cast(); auto &lstate = input.local_state.Cast(); if (!lstate.ht) { - const auto capacity = gstate.number_of_threads <= RadixHTConfig::GROW_STRATEGY_THREAD_THRESHOLD - ? gstate.config.sink_capacity - : GroupedAggregateHashTable::InitialCapacity(); - lstate.ht = CreateHT(context.client, capacity, gstate.config.GetRadixBits()); + lstate.ht = CreateHT(context.client, gstate.config.sink_capacity, gstate.config.GetRadixBits()); gstate.active_threads++; } @@ -459,8 +464,7 @@ void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk if (gstate.number_of_threads > RadixHTConfig::GROW_STRATEGY_THREAD_THRESHOLD || gstate.external) { // 'Reset' the HT without taking its data, we can just keep appending to the same collection // This only works because we never resize the HT - ht.ClearPointerTable(); - ht.ResetCount(); + ht.Abandon(); // We don't do this when running with 1 or 2 threads, it only makes sense when there's many threads } @@ -471,8 +475,7 @@ void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk if (repartitioned && ht.Count() != 0) { // We repartitioned, but we didn't clear the pointer table / reset the count because we're on 1 or 2 threads - ht.ClearPointerTable(); - ht.ResetCount(); + ht.Abandon(); if (gstate.external) { ht.Resize(gstate.config.sink_capacity); } @@ -494,16 +497,15 @@ void RadixPartitionedHashTable::Combine(ExecutionContext &context, GlobalSinkSta MaybeRepartition(context.client, gstate, lstate); auto &ht = *lstate.ht; - ht.UnpinData(); - + auto lstate_data = ht.AcquirePartitionedData(); if (lstate.abandoned_data) { D_ASSERT(gstate.external); - D_ASSERT(lstate.abandoned_data->PartitionCount() == lstate.ht->GetPartitionedData()->PartitionCount()); + D_ASSERT(lstate.abandoned_data->PartitionCount() == lstate.ht->GetPartitionedData().PartitionCount()); D_ASSERT(lstate.abandoned_data->PartitionCount() == RadixPartitioning::NumberOfPartitions(gstate.config.GetRadixBits())); - lstate.abandoned_data->Combine(*lstate.ht->GetPartitionedData()); + lstate.abandoned_data->Combine(*lstate_data); } else { - lstate.abandoned_data = std::move(ht.GetPartitionedData()); + lstate.abandoned_data = std::move(lstate_data); } auto guard = gstate.Lock(); @@ -741,21 +743,17 @@ void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlob ht = sink.radix_ht.CreateHT(gstate.context, MinValue(capacity, capacity_limit), 0); } else { - // We may want to resize here to the size of this partition, but for now we just assume uniform partition sizes - ht->InitializePartitionedData(); - ht->ClearPointerTable(); - ht->ResetCount(); + ht->Abandon(); } // Now combine the uncombined data using this thread's HT ht->Combine(*partition.data, &partition.progress); - ht->UnpinData(); partition.progress = 1; // Move the combined data back to the partition partition.data = make_uniq(BufferManager::GetBufferManager(gstate.context), sink.radix_ht.GetLayout()); - partition.data->Combine(*ht->GetPartitionedData()->GetPartitions()[0]); + partition.data->Combine(*ht->AcquirePartitionedData()->GetPartitions()[0]); // Update thread-global state auto guard = sink.Lock(); diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 22fd4ac7..164f5d16 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "4-dev3162" +#define DUCKDB_PATCH_VERSION "4-dev3247" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 1 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.1.4-dev3162" +#define DUCKDB_VERSION "v1.1.4-dev3247" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "067ecb1c7f" +#define DUCKDB_SOURCE_ID "04d8f995da" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/function/window/window_executor.cpp b/src/duckdb/src/function/window/window_executor.cpp index bcf39018..d8f4286d 100644 --- a/src/duckdb/src/function/window/window_executor.cpp +++ b/src/duckdb/src/function/window/window_executor.cpp @@ -24,7 +24,6 @@ void WindowExecutorBoundsState::UpdateBounds(WindowExecutorGlobalState &gstate, WindowInputExpression boundary_end(eval_chunk, gstate.executor.boundary_end_idx); const auto count = eval_chunk.size(); - bounds.Reset(); state.Bounds(bounds, row_idx, range, count, boundary_start, boundary_end, partition_mask, order_mask); } diff --git a/src/duckdb/src/function/window/window_shared_expressions.cpp b/src/duckdb/src/function/window/window_shared_expressions.cpp index b3ecbbae..811d3ec5 100644 --- a/src/duckdb/src/function/window/window_shared_expressions.cpp +++ b/src/duckdb/src/function/window/window_shared_expressions.cpp @@ -23,8 +23,8 @@ column_t WindowSharedExpressions::RegisterExpr(const unique_ptr &exp return result; } -vector WindowSharedExpressions::GetSortedExpressions(Shared &shared) { - vector sorted(shared.size, nullptr); +vector> WindowSharedExpressions::GetSortedExpressions(Shared &shared) { + vector> sorted(shared.size); for (auto &col : shared.columns) { auto &expr = col.first.get(); for (auto col_idx : col.second) { diff --git a/src/duckdb/src/include/duckdb/common/fsst.hpp b/src/duckdb/src/include/duckdb/common/fsst.hpp index d92e8704..47398d27 100644 --- a/src/duckdb/src/include/duckdb/common/fsst.hpp +++ b/src/duckdb/src/include/duckdb/common/fsst.hpp @@ -20,7 +20,7 @@ class FSSTPrimitives { public: static string_t DecompressValue(void *duckdb_fsst_decoder, Vector &result, const char *compressed_string, const idx_t compressed_string_len, vector &decompress_buffer); - static Value DecompressValue(void *duckdb_fsst_decoder, const char *compressed_string, - const idx_t compressed_string_len, vector &decompress_buffer); + static string DecompressValue(void *duckdb_fsst_decoder, const char *compressed_string, + const idx_t compressed_string_len, vector &decompress_buffer); }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/common/numeric_utils.hpp b/src/duckdb/src/include/duckdb/common/numeric_utils.hpp index ef9a7bc8..813213dd 100644 --- a/src/duckdb/src/include/duckdb/common/numeric_utils.hpp +++ b/src/duckdb/src/include/duckdb/common/numeric_utils.hpp @@ -119,13 +119,8 @@ TO UnsafeNumericCast(FROM in) { // LossyNumericCast // When: between double/float to other convertible types // Checks: no checks performed (at the moment, to be improved adding range checks) -template -TO LossyNumericCast(double val) { - return static_cast(val); -} - -template -TO LossyNumericCast(float val) { +template +TO LossyNumericCast(FROM val) { return static_cast(val); } diff --git a/src/duckdb/src/include/duckdb/execution/aggregate_hashtable.hpp b/src/duckdb/src/include/duckdb/execution/aggregate_hashtable.hpp index 14a86d64..4ceb56a7 100644 --- a/src/duckdb/src/include/duckdb/execution/aggregate_hashtable.hpp +++ b/src/duckdb/src/include/duckdb/execution/aggregate_hashtable.hpp @@ -43,7 +43,7 @@ class GroupedAggregateHashTable : public BaseAggregateHashTable { public: //! The hash table load factor, when a resize is triggered - constexpr static double LOAD_FACTOR = 1.5; + constexpr static double LOAD_FACTOR = 1.25; //! Get the layout of this HT const TupleDataLayout &GetLayout() const; @@ -80,29 +80,25 @@ class GroupedAggregateHashTable : public BaseAggregateHashTable { idx_t FindOrCreateGroups(DataChunk &groups, Vector &addresses_out, SelectionVector &new_groups_out); void FindOrCreateGroups(DataChunk &groups, Vector &addresses_out); - unique_ptr &GetPartitionedData(); + const PartitionedTupleData &GetPartitionedData() const; + unique_ptr AcquirePartitionedData(); + void Abandon(); + void Repartition(); shared_ptr GetAggregateAllocator(); //! Resize the HT to the specified size. Must be larger than the current size. void Resize(idx_t size); //! Resets the pointer table of the HT to all 0's void ClearPointerTable(); - //! Resets the group count to 0 - void ResetCount(); //! Set the radix bits for this HT void SetRadixBits(idx_t radix_bits); //! Get the radix bits for this HT idx_t GetRadixBits() const; - //! Initializes the PartitionedTupleData - void InitializePartitionedData(); //! Executes the filter(if any) and update the aggregates void Combine(GroupedAggregateHashTable &other); void Combine(TupleDataCollection &other_data, optional_ptr> progress = nullptr); - //! Unpins the data blocks - void UnpinData(); - private: //! Efficiently matches groups RowMatcher row_matcher; @@ -125,7 +121,8 @@ class GroupedAggregateHashTable : public BaseAggregateHashTable { struct AggregateHTAppendState { AggregateHTAppendState(); - PartitionedTupleDataAppendState append_state; + PartitionedTupleDataAppendState partitioned_append_state; + PartitionedTupleDataAppendState unpartitioned_append_state; Vector ht_offsets; Vector hash_salts; @@ -139,10 +136,13 @@ class GroupedAggregateHashTable : public BaseAggregateHashTable { AggregateDictionaryState dict_state; } state; + //! If we have this many or more radix bits, we use the unpartitioned data collection too + static constexpr idx_t UNPARTITIONED_RADIX_BITS_THRESHOLD = 3; //! The number of radix bits to partition by idx_t radix_bits; //! The data of the HT unique_ptr partitioned_data; + unique_ptr unpartitioned_data; //! Predicates for matching groups (always ExpressionType::COMPARE_EQUAL) vector predicates; @@ -170,8 +170,14 @@ class GroupedAggregateHashTable : public BaseAggregateHashTable { //! Destroy the HT void Destroy(); + //! Initializes the PartitionedTupleData + void InitializePartitionedData(); + //! Initializes the PartitionedTupleData that only has 1 partition + void InitializeUnpartitionedData(); //! Apply bitmask to get the entry in the HT inline idx_t ApplyBitMask(hash_t hash) const; + //! Reinserts tuples (triggered by Resize) + void ReinsertTuples(PartitionedTupleData &data); void UpdateAggregates(DataChunk &payload, const unsafe_vector &filter); diff --git a/src/duckdb/src/include/duckdb/function/window/window_boundaries_state.hpp b/src/duckdb/src/include/duckdb/function/window/window_boundaries_state.hpp index 290353eb..35e0fdb6 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_boundaries_state.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_boundaries_state.hpp @@ -47,6 +47,9 @@ struct WindowInputExpression { auto &col = chunk.data[col_idx]; ptype = col.GetType().InternalType(); scalar = (col.GetVectorType() == VectorType::CONSTANT_VECTOR); + if (!scalar && col.GetVectorType() != VectorType::FLAT_VECTOR) { + col.Flatten(chunk.size()); + } } } diff --git a/src/duckdb/src/include/duckdb/function/window/window_shared_expressions.hpp b/src/duckdb/src/include/duckdb/function/window/window_shared_expressions.hpp index b6ebb6ff..fa4a40d1 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_shared_expressions.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_shared_expressions.hpp @@ -43,7 +43,7 @@ struct WindowSharedExpressions { } //! Expression layout - static vector GetSortedExpressions(Shared &shared); + static vector> GetSortedExpressions(Shared &shared); //! Expression execution utility static void PrepareExecutors(Shared &shared, ExpressionExecutor &exec, DataChunk &chunk); diff --git a/src/duckdb/src/include/duckdb/main/extension_entries.hpp b/src/duckdb/src/include/duckdb/main/extension_entries.hpp index e3980246..c1cb16d6 100644 --- a/src/duckdb/src/include/duckdb/main/extension_entries.hpp +++ b/src/duckdb/src/include/duckdb/main/extension_entries.hpp @@ -486,6 +486,7 @@ static constexpr ExtensionFunctionEntry EXTENSION_FUNCTIONS[] = { {"read_ndjson_auto", "json", CatalogType::TABLE_FUNCTION_ENTRY}, {"read_ndjson_objects", "json", CatalogType::TABLE_FUNCTION_ENTRY}, {"read_parquet", "parquet", CatalogType::TABLE_FUNCTION_ENTRY}, + {"read_xlsx", "excel", CatalogType::TABLE_FUNCTION_ENTRY}, {"reduce", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY}, {"reduce_sql_statement", "sqlsmith", CatalogType::TABLE_FUNCTION_ENTRY}, {"regr_avgx", "core_functions", CatalogType::AGGREGATE_FUNCTION_ENTRY}, @@ -1026,8 +1027,9 @@ static constexpr ExtensionEntry EXTENSION_FILE_PREFIXES[] = { // Note: these are currently hardcoded in scripts/generate_extensions_function.py // TODO: automate by passing though to script via duckdb static constexpr ExtensionEntry EXTENSION_FILE_POSTFIXES[] = { - {".parquet", "parquet"}, {".json", "json"}, {".jsonl", "json"}, {".ndjson", "json"}, - {".shp", "spatial"}, {".gpkg", "spatial"}, {".fgb", "spatial"}}; // END_OF_EXTENSION_FILE_POSTFIXES + {".parquet", "parquet"}, {".json", "json"}, {".jsonl", "json"}, {".ndjson", "json"}, + {".shp", "spatial"}, {".gpkg", "spatial"}, {".fgb", "spatial"}, {".xlsx", "excel"}, +}; // END_OF_EXTENSION_FILE_POSTFIXES // Note: these are currently hardcoded in scripts/generate_extensions_function.py // TODO: automate by passing though to script via duckdb diff --git a/src/duckdb/src/include/duckdb/planner/expression/bound_subquery_expression.hpp b/src/duckdb/src/include/duckdb/planner/expression/bound_subquery_expression.hpp index 9572036a..31501dd8 100644 --- a/src/duckdb/src/include/duckdb/planner/expression/bound_subquery_expression.hpp +++ b/src/duckdb/src/include/duckdb/planner/expression/bound_subquery_expression.hpp @@ -32,12 +32,12 @@ class BoundSubqueryExpression : public Expression { unique_ptr subquery; //! The subquery type SubqueryType subquery_type; - //! the child expression to compare with (in case of IN, ANY, ALL operators) - unique_ptr child; + //! the child expressions to compare with (in case of IN, ANY, ALL operators) + vector> children; //! The comparison type of the child expression with the subquery (in case of ANY, ALL operators) ExpressionType comparison_type; - //! The LogicalType of the subquery result. Only used for ANY expressions. - LogicalType child_type; + //! The LogicalTypes of the subquery result. Only used for ANY expressions. + vector child_types; //! The target LogicalType of the subquery result (i.e. to which type it should be casted, if child_type <> //! child_target). Only used for ANY expressions. LogicalType child_target; diff --git a/src/duckdb/src/optimizer/filter_combiner.cpp b/src/duckdb/src/optimizer/filter_combiner.cpp index e0a442dc..2a8be805 100644 --- a/src/duckdb/src/optimizer/filter_combiner.cpp +++ b/src/duckdb/src/optimizer/filter_combiner.cpp @@ -623,9 +623,6 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(const vector= <= (only for integers) // e.g. if we have x IN (1, 2, 3, 4, 5) we transform this into x >= 1 AND x <= 5 vector in_list; @@ -634,7 +631,7 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(const vector= min AND x <= max // IsDenseRange sorts in_list, so the front element is the min and the back element is the max auto lower_bound = @@ -698,8 +695,7 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(const vectorvalue.type().IsTemporal() || - const_val->value.type().id() == LogicalTypeId::VARCHAR) { + if (const_val->value.type().IsTemporal()) { column_id.SetInvalid(); break; } diff --git a/src/duckdb/src/optimizer/join_order/relation_statistics_helper.cpp b/src/duckdb/src/optimizer/join_order/relation_statistics_helper.cpp index aa20b033..22d498a6 100644 --- a/src/duckdb/src/optimizer/join_order/relation_statistics_helper.cpp +++ b/src/duckdb/src/optimizer/join_order/relation_statistics_helper.cpp @@ -107,6 +107,7 @@ RelationStats RelationStatisticsHelper::ExtractGetStats(LogicalGet &get, ClientC if (!get.table_filters.filters.empty()) { column_statistics = nullptr; + bool has_non_optional_filters = false; for (auto &it : get.table_filters.filters) { if (get.bind_data && get.function.statistics) { column_statistics = get.function.statistics(context, get.bind_data.get(), it.first); @@ -118,11 +119,15 @@ RelationStats RelationStatisticsHelper::ExtractGetStats(LogicalGet &get, ClientC base_table_cardinality, it.first, filter, *column_statistics); cardinality_after_filters = MinValue(cardinality_after_filters, cardinality_with_and_filter); } + + if (it.second->filter_type != TableFilterType::OPTIONAL_FILTER) { + has_non_optional_filters = true; + } } // if the above code didn't find an equality filter (i.e country_code = "[us]") // and there are other table filters (i.e cost > 50), use default selectivity. bool has_equality_filter = (cardinality_after_filters != base_table_cardinality); - if (!has_equality_filter && !get.table_filters.filters.empty()) { + if (!has_equality_filter && has_non_optional_filters) { cardinality_after_filters = MaxValue( LossyNumericCast(double(base_table_cardinality) * RelationStatisticsHelper::DEFAULT_SELECTIVITY), 1U); diff --git a/src/duckdb/src/parser/parser.cpp b/src/duckdb/src/parser/parser.cpp index ca7e1a71..77ad899f 100644 --- a/src/duckdb/src/parser/parser.cpp +++ b/src/duckdb/src/parser/parser.cpp @@ -369,25 +369,36 @@ vector Parser::TokenizeError(const string &error_msg) { // now iterate over the bool in_quotes = false; + char quote_char = '\0'; for (idx_t i = error_start; i < error_end; i++) { - if (error_msg[i] == '"' || error_msg[i] == '\'') { - SimplifiedToken token; - token.start = i; - if (!in_quotes) { - token.type = SimplifiedTokenType::SIMPLIFIED_TOKEN_STRING_CONSTANT; - token.start++; - } else { + if (in_quotes) { + // in a quote - look for the quote character + if (error_msg[i] == quote_char) { + SimplifiedToken token; + token.start = i; token.type = SimplifiedTokenType::SIMPLIFIED_TOKEN_IDENTIFIER; + tokens.push_back(token); + in_quotes = false; + } + if (StringUtil::CharacterIsNewline(error_msg[i])) { + // found a newline in a quote, abort the quoted state entirely + tokens.pop_back(); + in_quotes = false; } + } else if (error_msg[i] == '"' || error_msg[i] == '\'') { + // not quoted and found a quote - enter the quoted state + SimplifiedToken token; + token.start = i; + token.type = SimplifiedTokenType::SIMPLIFIED_TOKEN_STRING_CONSTANT; + token.start++; tokens.push_back(token); - in_quotes = !in_quotes; + quote_char = error_msg[i]; + in_quotes = true; } } - if (in_quotes && error_end < error_msg.size()) { - SimplifiedToken token; - token.start = error_end; - token.type = SimplifiedTokenType::SIMPLIFIED_TOKEN_IDENTIFIER; - tokens.push_back(token); + if (in_quotes) { + // unterminated quotes at the end of the error - pop back the quoted state + tokens.pop_back(); } if (line_pos.IsValid()) { SimplifiedToken token; diff --git a/src/duckdb/src/parser/transform/expression/transform_constant.cpp b/src/duckdb/src/parser/transform/expression/transform_constant.cpp index 35832560..c8c76c19 100644 --- a/src/duckdb/src/parser/transform/expression/transform_constant.cpp +++ b/src/duckdb/src/parser/transform/expression/transform_constant.cpp @@ -55,6 +55,12 @@ unique_ptr Transformer::TransformValue(duckdb_libpgquery::PG // successfully cast to bigint: bigint value return make_uniq(Value::HUGEINT(hugeint_value)); } + uhugeint_t uhugeint_value; + // if that is not successful; try to cast as uhugeint + if (TryCast::Operation(str_val, uhugeint_value)) { + // successfully cast to bigint: bigint value + return make_uniq(Value::UHUGEINT(uhugeint_value)); + } } idx_t decimal_offset = val.val.str[0] == '-' ? 3 : 2; if (try_cast_as_decimal && decimal_position.IsValid() && diff --git a/src/duckdb/src/parser/transform/expression/transform_subquery.cpp b/src/duckdb/src/parser/transform/expression/transform_subquery.cpp index 9d93b548..194ee5e1 100644 --- a/src/duckdb/src/parser/transform/expression/transform_subquery.cpp +++ b/src/duckdb/src/parser/transform/expression/transform_subquery.cpp @@ -74,16 +74,30 @@ unique_ptr Transformer::TransformSubquery(duckdb_libpgquery::P case duckdb_libpgquery::PG_ARRAY_SUBLINK: { // ARRAY expression // wrap subquery into - // "SELECT CASE WHEN ARRAY_AGG(COLUMNS(*)) IS NULL THEN [] ELSE ARRAY_AGG(COLUMNS(*)) END FROM (...) tbl" + // "SELECT CASE WHEN ARRAY_AGG(col) IS NULL THEN [] ELSE ARRAY_AGG(col) END FROM (...) tbl" auto select_node = make_uniq(); - // COLUMNS(*) - auto columns_star = make_uniq(); - columns_star->columns = true; + unique_ptr array_agg_child; + optional_ptr sub_select; + if (subquery_expr->subquery->node->type == QueryNodeType::SELECT_NODE) { + // easy case - subquery is a SELECT + sub_select = subquery_expr->subquery->node->Cast(); + if (sub_select->select_list.size() != 1) { + throw BinderException(*subquery_expr, "Subquery returns %zu columns - expected 1", + sub_select->select_list.size()); + } + array_agg_child = make_uniq(1ULL); + } else { + // subquery is not a SELECT but a UNION or CTE + // we can still support this but it is more challenging since we can't push columns for the ORDER BY + auto columns_star = make_uniq(); + columns_star->columns = true; + array_agg_child = std::move(columns_star); + } // ARRAY_AGG(COLUMNS(*)) vector> children; - children.push_back(std::move(columns_star)); + children.push_back(std::move(array_agg_child)); auto aggr = make_uniq("array_agg", std::move(children)); // push ORDER BY modifiers into the array_agg for (auto &modifier : subquery_expr->subquery->node->modifiers) { @@ -104,7 +118,12 @@ unique_ptr Transformer::TransformSubquery(duckdb_libpgquery::P idx_t positional_index = order_index < 0 ? NumericLimits::Maximum() : idx_t(order_index); order.expression = make_uniq(positional_index); } + } else if (sub_select) { + // if we have a SELECT we can push the ORDER BY clause into the SELECT list and reference it + sub_select->select_list.push_back(std::move(order.expression)); + order.expression = make_uniq(sub_select->select_list.size() - 1); } else { + // otherwise we remove order qualifications RemoveOrderQualificationRecursive(order.expression); } } diff --git a/src/duckdb/src/planner/binder/expression/bind_macro_expression.cpp b/src/duckdb/src/planner/binder/expression/bind_macro_expression.cpp index 151eadf9..3856ecbf 100644 --- a/src/duckdb/src/planner/binder/expression/bind_macro_expression.cpp +++ b/src/duckdb/src/planner/binder/expression/bind_macro_expression.cpp @@ -3,6 +3,7 @@ #include "duckdb/function/scalar_macro_function.hpp" #include "duckdb/parser/expression/function_expression.hpp" #include "duckdb/parser/expression/subquery_expression.hpp" +#include "duckdb/parser/expression/window_expression.hpp" #include "duckdb/parser/parsed_expression_iterator.hpp" #include "duckdb/planner/expression_binder.hpp" @@ -126,7 +127,26 @@ void ExpressionBinder::UnfoldMacroExpression(FunctionExpression &function, Scala macro_binding = new_macro_binding.get(); // replace current expression with stored macro expression - expr = macro_def.expression->Copy(); + // special case: If this is a window function, then we need to return a window expression + if (expr->GetExpressionClass() == ExpressionClass::WINDOW) { + // Only allowed if the expression is a function + if (macro_def.expression->GetExpressionType() != ExpressionType::FUNCTION) { + throw BinderException("Window function macros must be functions"); + } + auto macro_copy = macro_def.expression->Copy(); + auto ¯o_expr = macro_copy->Cast(); + // Transfer the macro function attributes + auto &window_expr = expr->Cast(); + window_expr.catalog = macro_expr.catalog; + window_expr.schema = macro_expr.schema; + window_expr.function_name = macro_expr.function_name; + window_expr.children = std::move(macro_expr.children); + window_expr.distinct = macro_expr.distinct; + window_expr.filter_expr = std::move(macro_expr.filter); + // TODO: transfer order_bys when window functions support them + } else { + expr = macro_def.expression->Copy(); + } // qualify only the macro parameters with a new empty binder that only knows the macro binding auto dummy_binder = Binder::CreateBinder(context); diff --git a/src/duckdb/src/planner/binder/expression/bind_star_expression.cpp b/src/duckdb/src/planner/binder/expression/bind_star_expression.cpp index 2a15d0c8..40704836 100644 --- a/src/duckdb/src/planner/binder/expression/bind_star_expression.cpp +++ b/src/duckdb/src/planner/binder/expression/bind_star_expression.cpp @@ -44,6 +44,11 @@ bool Binder::FindStarExpression(unique_ptr &expr, StarExpressi throw BinderException( "STAR expression with REPLACE list is only allowed as the root element of COLUMNS"); } + if (!current_star.rename_list.empty()) { + // '*' inside COLUMNS can not have a REPLACE list + throw BinderException( + "STAR expression with RENAME list is only allowed as the root element of COLUMNS"); + } // '*' expression inside a COLUMNS - convert to a constant list of strings (column names) vector> star_list; @@ -162,26 +167,37 @@ void TryTransformStarLike(unique_ptr &root) { if (right->expression_class != ExpressionClass::CONSTANT) { throw BinderException(*root, "Pattern applied to a star expression must be a constant"); } + if (!star.rename_list.empty()) { + throw BinderException(*root, "Rename list cannot be combined with a filtering operation"); + } if (!star.replace_list.empty()) { throw BinderException(*root, "Replace list cannot be combined with a filtering operation"); } - // generate a columns expression - // "* LIKE '%literal%' - // -> COLUMNS(list_filter(*, x -> x LIKE '%literal%')) + auto original_alias = root->alias; auto star_expr = std::move(left); + unique_ptr child_expr; + if (function.function_name == "regexp_full_match" && star.exclude_list.empty()) { + // * SIMILAR TO '[regex]' is equivalent to COLUMNS('[regex]') so we can just move the expression directly + child_expr = std::move(right); + } else { + // for other expressions -> generate a columns expression + // "* LIKE '%literal%' + // -> COLUMNS(list_filter(*, x -> x LIKE '%literal%')) + auto lhs = make_uniq("__lambda_col"); + function.children[0] = lhs->Copy(); - auto lhs = make_uniq("__lambda_col"); - function.children[0] = lhs->Copy(); - - auto lambda = make_uniq(std::move(lhs), std::move(root)); - vector> filter_children; - filter_children.push_back(std::move(star_expr)); - filter_children.push_back(std::move(lambda)); - auto list_filter = make_uniq("list_filter", std::move(filter_children)); + auto lambda = make_uniq(std::move(lhs), std::move(root)); + vector> filter_children; + filter_children.push_back(std::move(star_expr)); + filter_children.push_back(std::move(lambda)); + auto list_filter = make_uniq("list_filter", std::move(filter_children)); + child_expr = std::move(list_filter); + } auto columns_expr = make_uniq(); columns_expr->columns = true; - columns_expr->expr = std::move(list_filter); + columns_expr->expr = std::move(child_expr); + columns_expr->alias = std::move(original_alias); root = std::move(columns_expr); } diff --git a/src/duckdb/src/planner/binder/expression/bind_subquery_expression.cpp b/src/duckdb/src/planner/binder/expression/bind_subquery_expression.cpp index dc0f7624..0bf4dd5a 100644 --- a/src/duckdb/src/planner/binder/expression/bind_subquery_expression.cpp +++ b/src/duckdb/src/planner/binder/expression/bind_subquery_expression.cpp @@ -1,6 +1,7 @@ #include "duckdb/parser/expression/subquery_expression.hpp" #include "duckdb/planner/binder.hpp" #include "duckdb/planner/expression/bound_cast_expression.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" #include "duckdb/planner/expression/bound_subquery_expression.hpp" #include "duckdb/planner/expression_binder.hpp" #include "duckdb/common/string_util.hpp" @@ -38,6 +39,48 @@ class BoundSubqueryNode : public QueryNode { } }; +bool TypeIsUnnamedStruct(const LogicalType &type) { + if (type.id() != LogicalTypeId::STRUCT) { + return false; + } + return StructType::IsUnnamed(type); +} + +void ExtractSubqueryChildren(unique_ptr &child, vector> &result, + const vector &types) { + // two scenarios + // Single Expression (standard): + // x IN (...) + // Multi-Expression/Struct: + // (a, b) IN (SELECT ...) + // the latter has an unnamed struct on the LHS that is created by a "ROW" expression + auto &return_type = child->return_type; + if (!TypeIsUnnamedStruct(return_type)) { + // child is not an unnamed struct + return; + } + if (child->expression_class != ExpressionClass::BOUND_FUNCTION) { + // not a function + return; + } + auto &function = child->Cast(); + if (function.function.name != "row") { + // not "ROW" + return; + } + // we found (a, b, ...) - we can extract all children of this function + // note that we don't always want to do this + if (types.size() == 1 && TypeIsUnnamedStruct(types[0]) && function.children.size() != types.size()) { + // old case: we have an unnamed struct INSIDE the subquery as well + // i.e. (a, b) IN (SELECT (a, b) ...) + // unnesting the struct is guaranteed to throw an error - match the structs against each-other instead + return; + } + for (auto &row_child : function.children) { + result.push_back(std::move(row_child)); + } +} + BindResult ExpressionBinder::BindExpression(SubqueryExpression &expr, idx_t depth) { if (expr.subquery->node->type != QueryNodeType::BOUND_SUBQUERY_NODE) { // first bind the actual subquery in a new binder @@ -54,9 +97,6 @@ BindResult ExpressionBinder::BindExpression(SubqueryExpression &expr, idx_t dept binder.AddCorrelatedColumn(corr); } } - if (expr.subquery_type != SubqueryType::EXISTS && bound_node->types.size() > 1) { - throw BinderException(expr, "Subquery returns %zu columns - expected 1", bound_node->types.size()); - } auto prior_subquery = std::move(expr.subquery); expr.subquery = make_uniq(); expr.subquery->node = @@ -70,9 +110,25 @@ BindResult ExpressionBinder::BindExpression(SubqueryExpression &expr, idx_t dept return BindResult(std::move(error)); } } + auto &bound_subquery = expr.subquery->node->Cast(); + vector> child_expressions; + if (expr.subquery_type != SubqueryType::EXISTS) { + idx_t expected_columns = 1; + if (expr.child) { + auto &child = BoundExpression::GetExpression(*expr.child); + ExtractSubqueryChildren(child, child_expressions, bound_subquery.bound_node->types); + if (child_expressions.empty()) { + child_expressions.push_back(std::move(child)); + } + expected_columns = child_expressions.size(); + } + if (bound_subquery.bound_node->types.size() != expected_columns) { + throw BinderException(expr, "Subquery returns %zu columns - expected %d", + bound_subquery.bound_node->types.size(), expected_columns); + } + } // both binding the child and binding the subquery was successful D_ASSERT(expr.subquery->node->type == QueryNodeType::BOUND_SUBQUERY_NODE); - auto &bound_subquery = expr.subquery->node->Cast(); auto subquery_binder = std::move(bound_subquery.subquery_binder); auto bound_node = std::move(bound_subquery.bound_node); LogicalType return_type = @@ -85,19 +141,21 @@ BindResult ExpressionBinder::BindExpression(SubqueryExpression &expr, idx_t dept if (expr.subquery_type == SubqueryType::ANY) { // ANY comparison // cast child and subquery child to equivalent types - D_ASSERT(bound_node->types.size() == 1); - auto &child = BoundExpression::GetExpression(*expr.child); - auto child_type = ExpressionBinder::GetExpressionReturnType(*child); - LogicalType compare_type; - if (!LogicalType::TryGetMaxLogicalType(context, child_type, bound_node->types[0], compare_type)) { - throw BinderException( - expr, "Cannot compare values of type %s and %s in IN/ANY/ALL clause - an explicit cast is required", - child_type.ToString(), bound_node->types[0]); + for (idx_t child_idx = 0; child_idx < child_expressions.size(); child_idx++) { + auto &child = child_expressions[child_idx]; + auto child_type = ExpressionBinder::GetExpressionReturnType(*child); + auto &subquery_type = bound_node->types[child_idx]; + LogicalType compare_type; + if (!LogicalType::TryGetMaxLogicalType(context, child_type, subquery_type, compare_type)) { + throw BinderException( + expr, "Cannot compare values of type %s and %s in IN/ANY/ALL clause - an explicit cast is required", + child_type.ToString(), subquery_type); + } + child = BoundCastExpression::AddCastToType(context, std::move(child), compare_type); + result->child_types.push_back(subquery_type); + result->child_target = compare_type; + result->children.push_back(std::move(child)); } - child = BoundCastExpression::AddCastToType(context, std::move(child), compare_type); - result->child_type = bound_node->types[0]; - result->child_target = compare_type; - result->child = std::move(child); } result->binder = std::move(subquery_binder); result->subquery = std::move(bound_node); diff --git a/src/duckdb/src/planner/binder/expression/bind_window_expression.cpp b/src/duckdb/src/planner/binder/expression/bind_window_expression.cpp index 5909df50..482c4e17 100644 --- a/src/duckdb/src/planner/binder/expression/bind_window_expression.cpp +++ b/src/duckdb/src/planner/binder/expression/bind_window_expression.cpp @@ -142,9 +142,20 @@ static LogicalType BindRangeExpression(ClientContext &context, const string &nam } BindResult BaseSelectBinder::BindWindow(WindowExpression &window, idx_t depth) { + QueryErrorContext error_context(window.query_location); + // Check for macros pretending to be aggregates + auto entry = GetCatalogEntry(CatalogType::SCALAR_FUNCTION_ENTRY, window.catalog, window.schema, + window.function_name, OnEntryNotFound::RETURN_NULL, error_context); + if (window.type == ExpressionType::WINDOW_AGGREGATE && entry && entry->type == CatalogType::MACRO_ENTRY) { + auto macro = make_uniq(window.catalog, window.schema, window.function_name, + std::move(window.children), std::move(window.filter_expr), nullptr, + window.distinct); + auto macro_expr = window.Copy(); + return BindMacro(*macro, entry->Cast(), depth, macro_expr); + } + auto name = window.GetName(); - QueryErrorContext error_context(window.query_location); if (inside_window) { throw BinderException(error_context, "window function calls cannot be nested"); } @@ -243,8 +254,12 @@ BindResult BaseSelectBinder::BindWindow(WindowExpression &window, idx_t depth) { unique_ptr bind_info; if (window.type == ExpressionType::WINDOW_AGGREGATE) { // Look up the aggregate function in the catalog - auto &func = Catalog::GetEntry(context, window.catalog, window.schema, - window.function_name, error_context); + if (!entry || entry->type != CatalogType::AGGREGATE_FUNCTION_ENTRY) { + // Not an aggregate: Look it up to generate error + Catalog::GetEntry(context, window.catalog, window.schema, + window.function_name, error_context); + } + auto &func = entry->Cast(); D_ASSERT(func.type == CatalogType::AGGREGATE_FUNCTION_ENTRY); // bind the aggregate diff --git a/src/duckdb/src/planner/binder/query_node/plan_subquery.cpp b/src/duckdb/src/planner/binder/query_node/plan_subquery.cpp index 4b5857f5..647939c2 100644 --- a/src/duckdb/src/planner/binder/query_node/plan_subquery.cpp +++ b/src/duckdb/src/planner/binder/query_node/plan_subquery.cpp @@ -163,12 +163,15 @@ static unique_ptr PlanUncorrelatedSubquery(Binder &binder, BoundSubq join->AddChild(std::move(root)); join->AddChild(std::move(plan)); // create the JOIN condition - JoinCondition cond; - cond.left = std::move(expr.child); - cond.right = BoundCastExpression::AddDefaultCastToType( - make_uniq(expr.child_type, plan_columns[0]), expr.child_target); - cond.comparison = expr.comparison_type; - join->conditions.push_back(std::move(cond)); + for (idx_t child_idx = 0; child_idx < expr.children.size(); child_idx++) { + JoinCondition cond; + cond.left = std::move(expr.children[child_idx]); + auto &child_type = expr.child_types[child_idx]; + cond.right = BoundCastExpression::AddDefaultCastToType( + make_uniq(child_type, plan_columns[child_idx]), expr.child_target); + cond.comparison = expr.comparison_type; + join->conditions.push_back(std::move(cond)); + } root = std::move(join); // we replace the original subquery with a BoundColumnRefExpression referring to the mark column @@ -354,13 +357,24 @@ static unique_ptr PlanCorrelatedSubquery(Binder &binder, BoundSubque // now we create the join conditions between the dependent join and the original table CreateDelimJoinConditions(*delim_join, correlated_columns, plan_columns, flatten.delim_offset, perform_delim); + if (expr.children.size() > 1) { + // FIXME: the code to generate the plan here is actually correct + // the problem is in the hash join - specifically PhysicalHashJoin::InitializeHashTable + // this contains code that is hard-coded for a single comparison + // -> (delim_types.size() + 1 == conditions.size()) + // this needs to be generalized to get this to work + throw NotImplementedException("Correlated IN/ANY/ALL with multiple columns not yet supported"); + } // add the actual condition based on the ANY/ALL predicate - JoinCondition compare_cond; - compare_cond.left = std::move(expr.child); - compare_cond.right = BoundCastExpression::AddDefaultCastToType( - make_uniq(expr.child_type, plan_columns[0]), expr.child_target); - compare_cond.comparison = expr.comparison_type; - delim_join->conditions.push_back(std::move(compare_cond)); + for (idx_t child_idx = 0; child_idx < expr.children.size(); child_idx++) { + JoinCondition compare_cond; + compare_cond.left = std::move(expr.children[child_idx]); + auto &child_type = expr.child_types[child_idx]; + compare_cond.right = BoundCastExpression::AddDefaultCastToType( + make_uniq(child_type, plan_columns[child_idx]), expr.child_target); + compare_cond.comparison = expr.comparison_type; + delim_join->conditions.push_back(std::move(compare_cond)); + } delim_join->AddChild(std::move(dependent_join)); root = std::move(delim_join); @@ -457,6 +471,9 @@ unique_ptr Binder::PlanLateralJoin(unique_ptr vector conditions; vector> arbitrary_expressions; if (condition) { + if (condition->HasSubquery()) { + throw BinderException(*condition, "Subqueries are not supported in LATERAL join conditions"); + } // extract join conditions, if there are any LogicalComparisonJoin::ExtractJoinConditions(context, join_type, JoinRefType::REGULAR, left, right, std::move(condition), conditions, arbitrary_expressions); diff --git a/src/duckdb/src/planner/expression_iterator.cpp b/src/duckdb/src/planner/expression_iterator.cpp index 1b271112..1188958b 100644 --- a/src/duckdb/src/planner/expression_iterator.cpp +++ b/src/duckdb/src/planner/expression_iterator.cpp @@ -88,8 +88,8 @@ void ExpressionIterator::EnumerateChildren(Expression &expr, } case ExpressionClass::BOUND_SUBQUERY: { auto &subquery_expr = expr.Cast(); - if (subquery_expr.child) { - callback(subquery_expr.child); + for (auto &child : subquery_expr.children) { + callback(child); } break; } diff --git a/src/duckdb/src/planner/joinside.cpp b/src/duckdb/src/planner/joinside.cpp index ba5b49cc..38cf0f68 100644 --- a/src/duckdb/src/planner/joinside.cpp +++ b/src/duckdb/src/planner/joinside.cpp @@ -69,8 +69,9 @@ JoinSide JoinSide::GetJoinSide(Expression &expression, const unordered_set(); JoinSide side = JoinSide::NONE; - if (subquery.child) { - side = GetJoinSide(*subquery.child, left_bindings, right_bindings); + for (auto &child : subquery.children) { + auto child_side = GetJoinSide(*child, left_bindings, right_bindings); + side = CombineJoinSide(side, child_side); } // correlated subquery, check the side of each of correlated columns in the subquery for (auto &corr : subquery.binder->correlated_columns) { diff --git a/src/duckdb/src/storage/data_table.cpp b/src/duckdb/src/storage/data_table.cpp index 1fe64396..1d967779 100644 --- a/src/duckdb/src/storage/data_table.cpp +++ b/src/duckdb/src/storage/data_table.cpp @@ -415,18 +415,20 @@ static void VerifyGeneratedExpressionSuccess(ClientContext &context, TableCatalo } } -static void VerifyCheckConstraint(ClientContext &context, TableCatalogEntry &table, Expression &expr, - DataChunk &chunk) { +static void VerifyCheckConstraint(ClientContext &context, TableCatalogEntry &table, Expression &expr, DataChunk &chunk, + CheckConstraint &check) { ExpressionExecutor executor(context, expr); Vector result(LogicalType::INTEGER); try { executor.ExecuteExpression(chunk, result); } catch (std::exception &ex) { ErrorData error(ex); - throw ConstraintException("CHECK constraint failed: %s (Error: %s)", table.name, error.RawMessage()); + throw ConstraintException("CHECK constraint failed on table %s with expression %s (Error: %s)", table.name, + check.ToString(), error.RawMessage()); } catch (...) { // LCOV_EXCL_START - throw ConstraintException("CHECK constraint failed: %s (Unknown Error)", table.name); + throw ConstraintException("CHECK constraint failed on table %s with expression %s (Unknown Error)", table.name, + check.ToString()); } // LCOV_EXCL_STOP UnifiedVectorFormat vdata; result.ToUnifiedFormat(chunk.size(), vdata); @@ -435,7 +437,8 @@ static void VerifyCheckConstraint(ClientContext &context, TableCatalogEntry &tab for (idx_t i = 0; i < chunk.size(); i++) { auto idx = vdata.sel->get_index(i); if (vdata.validity.RowIsValid(idx) && dataptr[idx] == 0) { - throw ConstraintException("CHECK constraint failed: %s", table.name); + throw ConstraintException("CHECK constraint failed on table %s with expression %s", table.name, + check.ToString()); } } } @@ -763,8 +766,9 @@ void DataTable::VerifyAppendConstraints(ConstraintState &state, ClientContext &c break; } case ConstraintType::CHECK: { - auto &check = constraint->Cast(); - VerifyCheckConstraint(context, table, *check.expression, chunk); + auto &check = base_constraint->Cast(); + auto &bound_check = constraint->Cast(); + VerifyCheckConstraint(context, table, *bound_check.expression, chunk, check); break; } case ConstraintType::UNIQUE: { @@ -1330,11 +1334,12 @@ void DataTable::VerifyUpdateConstraints(ConstraintState &state, ClientContext &c break; } case ConstraintType::CHECK: { - auto &check = constraint->Cast(); + auto &check = base_constraint->Cast(); + auto &bound_check = constraint->Cast(); DataChunk mock_chunk; - if (CreateMockChunk(table, column_ids, check.bound_columns, chunk, mock_chunk)) { - VerifyCheckConstraint(context, table, *check.expression, mock_chunk); + if (CreateMockChunk(table, column_ids, bound_check.bound_columns, chunk, mock_chunk)) { + VerifyCheckConstraint(context, table, *bound_check.expression, mock_chunk, check); } break; } diff --git a/src/duckdb/src/storage/table/row_group.cpp b/src/duckdb/src/storage/table/row_group.cpp index ef5ac5f8..cb80bfc5 100644 --- a/src/duckdb/src/storage/table/row_group.cpp +++ b/src/duckdb/src/storage/table/row_group.cpp @@ -435,6 +435,10 @@ bool RowGroup::CheckZonemap(ScanFilterInfo &filters) { // label the filter as always true so we don't need to check it anymore filters.SetFilterAlwaysTrue(i); } + if (filter.filter_type == TableFilterType::OPTIONAL_FILTER) { + // these are only for row group checking, set as always true so we don't check it + filters.SetFilterAlwaysTrue(i); + } } return true; } diff --git a/src/duckdb/third_party/re2/re2/bitstate.cc b/src/duckdb/third_party/re2/re2/bitstate.cc index f87effd8..a7ef5d88 100644 --- a/src/duckdb/third_party/re2/re2/bitstate.cc +++ b/src/duckdb/third_party/re2/re2/bitstate.cc @@ -24,15 +24,63 @@ #include #include "util/logging.h" -#include "re2/bitstate.h" #include "re2/pod_array.h" +#include "re2/prog.h" #include "re2/regexp.h" namespace duckdb_re2 { +struct Job { + int id; + int rle; // run length encoding + const char* p; +}; + +class BitState { + public: + explicit BitState(Prog* prog); + + // The usual Search prototype. + // Can only call Search once per BitState. + bool Search(const StringPiece& text, const StringPiece& context, + bool anchored, bool longest, + StringPiece* submatch, int nsubmatch); + + private: + inline bool ShouldVisit(int id, const char* p); + void Push(int id, const char* p); + void GrowStack(); + bool TrySearch(int id, const char* p); + + // Search parameters + Prog* prog_; // program being run + StringPiece text_; // text being searched + StringPiece context_; // greater context of text being searched + bool anchored_; // whether search is anchored at text.begin() + bool longest_; // whether search wants leftmost-longest match + bool endmatch_; // whether match must end at text.end() + StringPiece* submatch_; // submatches to fill in + int nsubmatch_; // # of submatches to fill in + + // Search state + static constexpr int kVisitedBits = 64; + PODArray visited_; // bitmap: (list ID, char*) pairs visited + PODArray cap_; // capture registers + PODArray job_; // stack of text positions to explore + int njob_; // stack size + + BitState(const BitState&) = delete; + BitState& operator=(const BitState&) = delete; +}; + BitState::BitState(Prog* prog) - : prog_(prog){ - Reset(); + : prog_(prog), + anchored_(false), + longest_(false), + endmatch_(false), + submatch_(NULL), + nsubmatch_(0), + njob_(0) { } // Given id, which *must* be a list head, we can look up its list ID. @@ -260,23 +308,17 @@ bool BitState::Search(const StringPiece& text, const StringPiece& context, // Allocate scratch space. int nvisited = prog_->list_count() * static_cast(text.size()+1); nvisited = (nvisited + kVisitedBits-1) / kVisitedBits; - if (visited_.size() < nvisited) { - visited_ = PODArray(nvisited); - } - memset(visited_.data(), 0, nvisited*sizeof visited_[0]); + visited_ = PODArray(nvisited); + memset(visited_.data(), 0, nvisited*sizeof visited_[0]); int ncap = 2*nsubmatch; if (ncap < 2) ncap = 2; - if (cap_.size() < ncap) { - cap_ = PODArray(ncap); - } - memset(cap_.data(), 0, ncap*sizeof cap_[0]); + cap_ = PODArray(ncap); + memset(cap_.data(), 0, ncap*sizeof cap_[0]); // When sizeof(Job) == 16, we start with a nice round 1KiB. :) - if (job_.size() < 64) { - job_ = PODArray(64); - } + job_ = PODArray(64); // Anchored search must start at text.begin(). if (anchored_) { @@ -330,14 +372,10 @@ bool Prog::SearchBitState(const StringPiece& text, } // Run the search. - if (!bitstate) { - bitstate = std::unique_ptr(new BitState(this)); - } else { - bitstate->Reset(); - } + BitState b(this); bool anchored = anchor == kAnchored; bool longest = kind != kFirstMatch; - if (!bitstate->Search(text, context, anchored, longest, match, nmatch)) + if (!b.Search(text, context, anchored, longest, match, nmatch)) return false; if (kind == kFullMatch && EndPtr(match[0]) != EndPtr(text)) return false; diff --git a/src/duckdb/third_party/re2/re2/bitstate.h b/src/duckdb/third_party/re2/re2/bitstate.h deleted file mode 100644 index 899d57d9..00000000 --- a/src/duckdb/third_party/re2/re2/bitstate.h +++ /dev/null @@ -1,57 +0,0 @@ -#include "re2/prog.h" - -namespace duckdb_re2 { - -struct Job { - int id; - int rle; // run length encoding - const char* p; -}; - -class BitState { -public: - explicit BitState(Prog* prog); - - // The usual Search prototype. - // Can only call Search once per BitState. - bool Search(const StringPiece& text, const StringPiece& context, - bool anchored, bool longest, - StringPiece* submatch, int nsubmatch); - - void Reset() { - anchored_ = false; - longest_ = false; - endmatch_ =false; - submatch_ =NULL; - nsubmatch_ =0; - njob_= 0; - } - -private: - inline bool ShouldVisit(int id, const char* p); - void Push(int id, const char* p); - void GrowStack(); - bool TrySearch(int id, const char* p); - - // Search parameters - Prog* prog_; // program being run - StringPiece text_; // text being searched - StringPiece context_; // greater context of text being searched - bool anchored_; // whether search is anchored at text.begin() - bool longest_; // whether search wants leftmost-longest match - bool endmatch_; // whether match must end at text.end() - StringPiece* submatch_; // submatches to fill in - int nsubmatch_; // # of submatches to fill in - - // Search state - static constexpr int kVisitedBits = 64; - PODArray visited_; // bitmap: (list ID, char*) pairs visited - PODArray cap_; // capture registers - PODArray job_; // stack of text positions to explore - int njob_; // stack size - - BitState(const BitState&) = delete; - BitState& operator=(const BitState&) = delete; -}; - -} diff --git a/src/duckdb/third_party/re2/re2/prog.cc b/src/duckdb/third_party/re2/re2/prog.cc index f8b25883..716b6f3e 100644 --- a/src/duckdb/third_party/re2/re2/prog.cc +++ b/src/duckdb/third_party/re2/re2/prog.cc @@ -6,7 +6,6 @@ // Tested by compile_test.cc #include "re2/prog.h" -#include "re2/bitstate.h" #if defined(__AVX2__) #include diff --git a/src/duckdb/third_party/re2/re2/prog.h b/src/duckdb/third_party/re2/re2/prog.h index d90b9f71..e385b59e 100644 --- a/src/duckdb/third_party/re2/re2/prog.h +++ b/src/duckdb/third_party/re2/re2/prog.h @@ -24,7 +24,6 @@ #include "re2/sparse_set.h" namespace duckdb_re2 { -class BitState; // Opcodes for Inst enum InstOp { @@ -409,7 +408,6 @@ class Prog { DFA* GetDFA(MatchKind kind); void DeleteDFA(DFA* dfa); - std::unique_ptr bitstate; bool anchor_start_; // regexp has explicit start anchor bool anchor_end_; // regexp has explicit end anchor bool reversed_; // whether program runs backward over input diff --git a/src/duckdb/third_party/zstd/include/zstd/compress/zstd_cwksp.h b/src/duckdb/third_party/zstd/include/zstd/compress/zstd_cwksp.h index 70cfcfa5..bb1c62ce 100644 --- a/src/duckdb/third_party/zstd/include/zstd/compress/zstd_cwksp.h +++ b/src/duckdb/third_party/zstd/include/zstd/compress/zstd_cwksp.h @@ -186,19 +186,6 @@ MEM_STATIC void ZSTD_cwksp_assert_internal_consistency(ZSTD_cwksp* ws) { assert(ws->allocStart <= ws->workspaceEnd); assert(ws->initOnceStart <= ZSTD_cwksp_initialAllocStart(ws)); assert(ws->workspace <= ws->initOnceStart); -#if ZSTD_MEMORY_SANITIZER - { - intptr_t const offset = __msan_test_shadow(ws->initOnceStart, - (U8*)ZSTD_cwksp_initialAllocStart(ws) - (U8*)ws->initOnceStart); - (void)offset; -#if defined(ZSTD_MSAN_PRINT) - if(offset!=-1) { - __msan_print_shadow((U8*)ws->initOnceStart + offset - 8, 32); - } -#endif - assert(offset==-1); - }; -#endif } /**