diff --git a/src/duckdb/extension/json/include/json_scan.hpp b/src/duckdb/extension/json/include/json_scan.hpp index 9c5be623..61439b61 100644 --- a/src/duckdb/extension/json/include/json_scan.hpp +++ b/src/duckdb/extension/json/include/json_scan.hpp @@ -179,7 +179,8 @@ struct JSONScanGlobalState { //! Column names that we're actually reading (after projection pushdown) vector names; - vector column_indices; + vector column_ids; + vector column_indices; //! Buffer manager allocator Allocator &allocator; diff --git a/src/duckdb/extension/json/include/json_transform.hpp b/src/duckdb/extension/json/include/json_transform.hpp index 222f9003..0ae6b57b 100644 --- a/src/duckdb/extension/json/include/json_transform.hpp +++ b/src/duckdb/extension/json/include/json_transform.hpp @@ -8,6 +8,8 @@ #pragma once +#include "duckdb/common/column_index.hpp" +#include "duckdb/common/optional_ptr.hpp" #include "duckdb/function/scalar/strftime_format.hpp" #include "json_common.hpp" @@ -64,9 +66,10 @@ struct TryParseTimeStamp { struct JSONTransform { static bool Transform(yyjson_val *vals[], yyjson_alc *alc, Vector &result, const idx_t count, - JSONTransformOptions &options); + JSONTransformOptions &options, optional_ptr column_index); static bool TransformObject(yyjson_val *objects[], yyjson_alc *alc, const idx_t count, const vector &names, - const vector &result_vectors, JSONTransformOptions &options); + const vector &result_vectors, JSONTransformOptions &options, + optional_ptr> column_indices, bool error_unknown_key); static bool GetStringVector(yyjson_val *vals[], const idx_t count, const LogicalType &target, Vector &string_vector, JSONTransformOptions &options); }; diff --git a/src/duckdb/extension/json/json_functions/json_create.cpp b/src/duckdb/extension/json/json_functions/json_create.cpp index 3927daa1..8c587fd0 100644 --- a/src/duckdb/extension/json/json_functions/json_create.cpp +++ b/src/duckdb/extension/json/json_functions/json_create.cpp @@ -40,25 +40,36 @@ static LogicalType GetJSONType(StructNames &const_struct_names, const LogicalTyp // These types can go directly into JSON case LogicalTypeId::SQLNULL: case LogicalTypeId::BOOLEAN: - case LogicalTypeId::BIGINT: - case LogicalTypeId::UBIGINT: - case LogicalTypeId::DOUBLE: - return type; - // We cast these types to a type that can go into JSON case LogicalTypeId::TINYINT: case LogicalTypeId::SMALLINT: case LogicalTypeId::INTEGER: - return LogicalType::BIGINT; + case LogicalTypeId::BIGINT: + case LogicalTypeId::HUGEINT: + case LogicalTypeId::UHUGEINT: case LogicalTypeId::UTINYINT: case LogicalTypeId::USMALLINT: case LogicalTypeId::UINTEGER: - return LogicalType::UBIGINT; + case LogicalTypeId::UBIGINT: case LogicalTypeId::FLOAT: + case LogicalTypeId::DOUBLE: + case LogicalTypeId::BIT: + case LogicalTypeId::BLOB: + case LogicalTypeId::VARCHAR: + case LogicalTypeId::AGGREGATE_STATE: + case LogicalTypeId::ENUM: + case LogicalTypeId::DATE: + case LogicalTypeId::INTERVAL: + case LogicalTypeId::TIME: + case LogicalTypeId::TIME_TZ: + case LogicalTypeId::TIMESTAMP: + case LogicalTypeId::TIMESTAMP_TZ: + case LogicalTypeId::TIMESTAMP_NS: + case LogicalTypeId::TIMESTAMP_MS: + case LogicalTypeId::TIMESTAMP_SEC: + case LogicalTypeId::UUID: + case LogicalTypeId::VARINT: case LogicalTypeId::DECIMAL: - case LogicalTypeId::UHUGEINT: - case LogicalTypeId::HUGEINT: - return LogicalType::DOUBLE; - // The nested types need to conform as well + return type; case LogicalTypeId::LIST: return LogicalType::LIST(GetJSONType(const_struct_names, ListType::GetChildType(type))); case LogicalTypeId::ARRAY: @@ -211,7 +222,7 @@ template <> struct CreateJSONValue { static inline yyjson_mut_val *Operation(yyjson_mut_doc *doc, const hugeint_t &input) { const auto input_string = input.ToString(); - return yyjson_mut_strncpy(doc, input_string.c_str(), input_string.length()); + return yyjson_mut_rawncpy(doc, input_string.c_str(), input_string.length()); } }; @@ -219,7 +230,7 @@ template <> struct CreateJSONValue { static inline yyjson_mut_val *Operation(yyjson_mut_doc *doc, const uhugeint_t &input) { const auto input_string = input.ToString(); - return yyjson_mut_strncpy(doc, input_string.c_str(), input_string.length()); + return yyjson_mut_rawncpy(doc, input_string.c_str(), input_string.length()); } }; @@ -287,6 +298,22 @@ static void TemplatedCreateValues(yyjson_mut_doc *doc, yyjson_mut_val *vals[], V } } +static void CreateRawValues(yyjson_mut_doc *doc, yyjson_mut_val *vals[], Vector &value_v, idx_t count) { + UnifiedVectorFormat value_data; + value_v.ToUnifiedFormat(count, value_data); + auto values = UnifiedVectorFormat::GetData(value_data); + for (idx_t i = 0; i < count; i++) { + idx_t val_idx = value_data.sel->get_index(i); + if (!value_data.validity.RowIsValid(val_idx)) { + vals[i] = yyjson_mut_null(doc); + } else { + const auto &str = values[val_idx]; + vals[i] = yyjson_mut_rawncpy(doc, str.GetData(), str.GetSize()); + } + D_ASSERT(vals[i] != nullptr); + } +} + static void CreateValuesStruct(const StructNames &names, yyjson_mut_doc *doc, yyjson_mut_val *vals[], Vector &value_v, idx_t count) { // Structs become values, therefore we initialize vals to JSON values @@ -476,7 +503,8 @@ static void CreateValuesArray(const StructNames &names, yyjson_mut_doc *doc, yyj static void CreateValues(const StructNames &names, yyjson_mut_doc *doc, yyjson_mut_val *vals[], Vector &value_v, idx_t count) { - switch (value_v.GetType().id()) { + const auto &type = value_v.GetType(); + switch (type.id()) { case LogicalTypeId::SQLNULL: CreateValuesNull(doc, vals, count); break; @@ -550,17 +578,28 @@ static void CreateValues(const StructNames &names, yyjson_mut_doc *doc, yyjson_m case LogicalTypeId::TIMESTAMP_NS: case LogicalTypeId::TIMESTAMP_MS: case LogicalTypeId::TIMESTAMP_SEC: - case LogicalTypeId::VARINT: case LogicalTypeId::UUID: { Vector string_vector(LogicalTypeId::VARCHAR, count); VectorOperations::DefaultCast(value_v, string_vector, count); TemplatedCreateValues(doc, vals, string_vector, count); break; } + case LogicalTypeId::VARINT: { + Vector string_vector(LogicalTypeId::VARCHAR, count); + VectorOperations::DefaultCast(value_v, string_vector, count); + CreateRawValues(doc, vals, string_vector, count); + break; + } case LogicalTypeId::DECIMAL: { - Vector double_vector(LogicalType::DOUBLE, count); - VectorOperations::DefaultCast(value_v, double_vector, count); - TemplatedCreateValues(doc, vals, double_vector, count); + if (DecimalType::GetWidth(type) > 15) { + Vector string_vector(LogicalTypeId::VARCHAR, count); + VectorOperations::DefaultCast(value_v, string_vector, count); + CreateRawValues(doc, vals, string_vector, count); + } else { + Vector double_vector(LogicalType::DOUBLE, count); + VectorOperations::DefaultCast(value_v, double_vector, count); + TemplatedCreateValues(doc, vals, double_vector, count); + } break; } case LogicalTypeId::INVALID: @@ -604,7 +643,6 @@ static void ObjectFunction(DataChunk &args, ExpressionState &state, Vector &resu for (idx_t i = 0; i < count; i++) { objects[i] = JSONCommon::WriteVal(objs[i], alc); } - if (args.AllConstant()) { result.SetVectorType(VectorType::CONSTANT_VECTOR); } @@ -637,7 +675,6 @@ static void ArrayFunction(DataChunk &args, ExpressionState &state, Vector &resul for (idx_t i = 0; i < count; i++) { objects[i] = JSONCommon::WriteVal(arrs[i], alc); } - if (args.AllConstant()) { result.SetVectorType(VectorType::CONSTANT_VECTOR); } @@ -651,22 +688,9 @@ static void ToJSONFunctionInternal(const StructNames &names, Vector &input, cons CreateValues(names, doc, vals, input, count); // Write JSON values to string - auto objects = FlatVector::GetData(result); - auto &result_validity = FlatVector::Validity(result); - UnifiedVectorFormat input_data; - input.ToUnifiedFormat(count, input_data); - for (idx_t i = 0; i < count; i++) { - idx_t idx = input_data.sel->get_index(i); - if (input_data.validity.RowIsValid(idx)) { - objects[i] = JSONCommon::WriteVal(vals[i], alc); - } else { - result_validity.SetInvalid(i); - } - } - - if (input.GetVectorType() == VectorType::CONSTANT_VECTOR || count == 1) { - result.SetVectorType(VectorType::CONSTANT_VECTOR); - } + UnaryExecutor::ExecuteWithNulls(input, result, count, [&](data_t, ValidityMask &, idx_t index) { + return JSONCommon::WriteVal(vals[index], alc); + }); } static void ToJSONFunction(DataChunk &args, ExpressionState &state, Vector &result) { diff --git a/src/duckdb/extension/json/json_functions/json_structure.cpp b/src/duckdb/extension/json/json_functions/json_structure.cpp index 7982003f..51652bae 100644 --- a/src/duckdb/extension/json/json_functions/json_structure.cpp +++ b/src/duckdb/extension/json/json_functions/json_structure.cpp @@ -1,12 +1,11 @@ #include "json_structure.hpp" #include "duckdb/common/enum_util.hpp" +#include "duckdb/common/extra_type_info.hpp" #include "json_executors.hpp" #include "json_scan.hpp" #include "json_transform.hpp" -#include - namespace duckdb { static bool IsNumeric(LogicalTypeId type) { diff --git a/src/duckdb/extension/json/json_functions/json_transform.cpp b/src/duckdb/extension/json/json_functions/json_transform.cpp index c37fff36..5b4b64e0 100644 --- a/src/duckdb/extension/json/json_functions/json_transform.cpp +++ b/src/duckdb/extension/json/json_functions/json_transform.cpp @@ -362,9 +362,14 @@ static bool TransformToString(yyjson_val *vals[], yyjson_alc *alc, Vector &resul bool JSONTransform::TransformObject(yyjson_val *objects[], yyjson_alc *alc, const idx_t count, const vector &names, const vector &result_vectors, - JSONTransformOptions &options) { + JSONTransformOptions &options, + optional_ptr> column_indices, bool error_unknown_key) { + if (column_indices && column_indices->empty()) { + column_indices = nullptr; + } D_ASSERT(alc); D_ASSERT(names.size() == result_vectors.size()); + D_ASSERT(!column_indices || column_indices->size() == names.size()); const idx_t column_count = names.size(); // Build hash map from key to column index so we don't have to linearly search using the key @@ -429,7 +434,7 @@ bool JSONTransform::TransformObject(yyjson_val *objects[], yyjson_alc *alc, cons found_keys[col_idx] = true; found_key_count++; } - } else if (success && options.error_unknown_key) { + } else if (success && error_unknown_key && options.error_unknown_key) { options.error_message = StringUtil::Format("Object %s has unknown key \"" + string(key_ptr, key_len) + "\"", JSONCommon::ValToString(objects[i], 50)); @@ -458,7 +463,9 @@ bool JSONTransform::TransformObject(yyjson_val *objects[], yyjson_alc *alc, cons } for (idx_t col_idx = 0; col_idx < column_count; col_idx++) { - if (!JSONTransform::Transform(nested_vals[col_idx], alc, *result_vectors[col_idx], count, options)) { + auto child_column_index = column_indices ? &(*column_indices)[col_idx] : nullptr; + if (!JSONTransform::Transform(nested_vals[col_idx], alc, *result_vectors[col_idx], count, options, + child_column_index)) { success = false; } } @@ -471,7 +478,11 @@ bool JSONTransform::TransformObject(yyjson_val *objects[], yyjson_alc *alc, cons } static bool TransformObjectInternal(yyjson_val *objects[], yyjson_alc *alc, Vector &result, const idx_t count, - JSONTransformOptions &options) { + JSONTransformOptions &options, optional_ptr column_index) { + if (column_index && column_index->ChildIndexCount() == 0) { + column_index = nullptr; + } + // Set validity first auto &result_validity = FlatVector::Validity(result); for (idx_t i = 0; i < count; i++) { @@ -485,14 +496,31 @@ static bool TransformObjectInternal(yyjson_val *objects[], yyjson_alc *alc, Vect auto &child_vs = StructVector::GetEntries(result); vector child_names; vector child_vectors; - child_names.reserve(child_vs.size()); - child_vectors.reserve(child_vs.size()); + + const auto child_count = column_index ? column_index->ChildIndexCount() : child_vs.size(); + child_names.reserve(child_count); + child_vectors.reserve(child_count); + + unordered_set projected_indices; + for (idx_t child_i = 0; child_i < child_count; child_i++) { + const auto actual_i = column_index ? column_index->GetChildIndex(child_i).GetPrimaryIndex() : child_i; + projected_indices.insert(actual_i); + + child_names.push_back(StructType::GetChildName(result.GetType(), actual_i)); + child_vectors.push_back(child_vs[actual_i].get()); + } + for (idx_t child_i = 0; child_i < child_vs.size(); child_i++) { - child_names.push_back(StructType::GetChildName(result.GetType(), child_i)); - child_vectors.push_back(child_vs[child_i].get()); + if (projected_indices.find(child_i) == projected_indices.end()) { + child_vs[child_i]->SetVectorType(VectorType::CONSTANT_VECTOR); + ConstantVector::SetNull(*child_vs[child_i], true); + } } - return JSONTransform::TransformObject(objects, alc, count, child_names, child_vectors, options); + auto child_indices = column_index ? &column_index->GetChildIndexes() : nullptr; + const auto error_unknown_key = child_count == child_vs.size(); // Nothing projected out, error if unknown + return JSONTransform::TransformObject(objects, alc, count, child_names, child_vectors, options, child_indices, + error_unknown_key); } static bool TransformArrayToList(yyjson_val *arrays[], yyjson_alc *alc, Vector &result, const idx_t count, @@ -562,7 +590,7 @@ static bool TransformArrayToList(yyjson_val *arrays[], yyjson_alc *alc, Vector & } // Transform array values - if (!JSONTransform::Transform(nested_vals, alc, ListVector::GetEntry(result), offset, options)) { + if (!JSONTransform::Transform(nested_vals, alc, ListVector::GetEntry(result), offset, options, nullptr)) { success = false; } @@ -652,7 +680,7 @@ static bool TransformArrayToArray(yyjson_val *arrays[], yyjson_alc *alc, Vector } // Transform array values - if (!JSONTransform::Transform(nested_vals, alc, ArrayVector::GetEntry(result), child_count, options)) { + if (!JSONTransform::Transform(nested_vals, alc, ArrayVector::GetEntry(result), child_count, options, nullptr)) { success = false; } @@ -720,13 +748,13 @@ static bool TransformObjectToMap(yyjson_val *objects[], yyjson_alc *alc, Vector D_ASSERT(list_offset == list_size); // Transform keys - if (!JSONTransform::Transform(keys, alc, MapVector::GetKeys(result), list_size, options)) { + if (!JSONTransform::Transform(keys, alc, MapVector::GetKeys(result), list_size, options, nullptr)) { throw ConversionException( StringUtil::Format(options.error_message + ". Cannot default to NULL, because map keys cannot be NULL")); } // Transform values - if (!JSONTransform::Transform(vals, alc, MapVector::GetValues(result), list_size, options)) { + if (!JSONTransform::Transform(vals, alc, MapVector::GetValues(result), list_size, options, nullptr)) { success = false; } @@ -813,7 +841,7 @@ bool TransformValueIntoUnion(yyjson_val **vals, yyjson_alc *alc, Vector &result, idx_t actual_tag = tag - names.begin(); Vector single(UnionType::GetMemberType(type, actual_tag), 1); - if (!JSONTransform::Transform(&val, alc, single, 1, options)) { + if (!JSONTransform::Transform(&val, alc, single, 1, options, nullptr)) { success = false; } @@ -824,7 +852,7 @@ bool TransformValueIntoUnion(yyjson_val **vals, yyjson_alc *alc, Vector &result, } bool JSONTransform::Transform(yyjson_val *vals[], yyjson_alc *alc, Vector &result, const idx_t count, - JSONTransformOptions &options) { + JSONTransformOptions &options, optional_ptr column_index) { auto result_type = result.GetType(); if ((result_type == LogicalTypeId::TIMESTAMP || result_type == LogicalTypeId::DATE) && options.date_format_map && options.date_format_map->HasFormats(result_type.id())) { @@ -899,7 +927,7 @@ bool JSONTransform::Transform(yyjson_val *vals[], yyjson_alc *alc, Vector &resul case LogicalTypeId::BLOB: return TransformToString(vals, alc, result, count); case LogicalTypeId::STRUCT: - return TransformObjectInternal(vals, alc, result, count, options); + return TransformObjectInternal(vals, alc, result, count, options, column_index); case LogicalTypeId::LIST: return TransformArrayToList(vals, alc, result, count, options); case LogicalTypeId::MAP: @@ -935,7 +963,7 @@ static bool TransformFunctionInternal(Vector &input, const idx_t count, Vector & } } - auto success = JSONTransform::Transform(vals, alc, result, count, options); + auto success = JSONTransform::Transform(vals, alc, result, count, options, nullptr); if (input.GetVectorType() == VectorType::CONSTANT_VECTOR) { result.SetVectorType(VectorType::CONSTANT_VECTOR); } diff --git a/src/duckdb/extension/json/json_functions/read_json.cpp b/src/duckdb/extension/json/json_functions/read_json.cpp index d56b3e98..78c1b437 100644 --- a/src/duckdb/extension/json/json_functions/read_json.cpp +++ b/src/duckdb/extension/json/json_functions/read_json.cpp @@ -1,9 +1,9 @@ +#include "duckdb/common/helper.hpp" #include "duckdb/common/multi_file_reader.hpp" #include "json_functions.hpp" #include "json_scan.hpp" #include "json_structure.hpp" #include "json_transform.hpp" -#include "duckdb/common/helper.hpp" namespace duckdb { @@ -330,8 +330,8 @@ static void ReadJSONFunction(ClientContext &context, TableFunctionInput &data_p, if (!gstate.names.empty()) { vector result_vectors; - result_vectors.reserve(gstate.column_indices.size()); - for (const auto &col_idx : gstate.column_indices) { + result_vectors.reserve(gstate.column_ids.size()); + for (const auto &col_idx : gstate.column_ids) { result_vectors.emplace_back(&output.data[col_idx]); } @@ -339,11 +339,12 @@ static void ReadJSONFunction(ClientContext &context, TableFunctionInput &data_p, bool success; if (gstate.bind_data.options.record_type == JSONRecordType::RECORDS) { success = JSONTransform::TransformObject(values, lstate.GetAllocator(), count, gstate.names, result_vectors, - lstate.transform_options); + lstate.transform_options, gstate.column_indices, + lstate.transform_options.error_unknown_key); } else { D_ASSERT(gstate.bind_data.options.record_type == JSONRecordType::VALUES); success = JSONTransform::Transform(values, lstate.GetAllocator(), *result_vectors[0], count, - lstate.transform_options); + lstate.transform_options, gstate.column_indices[0]); } if (!success) { diff --git a/src/duckdb/extension/json/json_functions/read_json_objects.cpp b/src/duckdb/extension/json/json_functions/read_json_objects.cpp index 7e97b647..a0e6e6b8 100644 --- a/src/duckdb/extension/json/json_functions/read_json_objects.cpp +++ b/src/duckdb/extension/json/json_functions/read_json_objects.cpp @@ -33,7 +33,7 @@ static void ReadJSONObjectsFunction(ClientContext &context, TableFunctionInput & if (!gstate.names.empty()) { // Create the strings without copying them - const auto col_idx = gstate.column_indices[0]; + const auto col_idx = gstate.column_ids[0]; auto strings = FlatVector::GetData(output.data[col_idx]); auto &validity = FlatVector::Validity(output.data[col_idx]); for (idx_t i = 0; i < count; i++) { diff --git a/src/duckdb/extension/json/json_scan.cpp b/src/duckdb/extension/json/json_scan.cpp index ceabcd82..a1dd4e0d 100644 --- a/src/duckdb/extension/json/json_scan.cpp +++ b/src/duckdb/extension/json/json_scan.cpp @@ -185,8 +185,9 @@ unique_ptr JSONGlobalTableFunctionState::Init(ClientCo continue; } - gstate.column_indices.push_back(col_idx); gstate.names.push_back(bind_data.names[col_id]); + gstate.column_ids.push_back(col_idx); + gstate.column_indices.push_back(input.column_indexes[col_idx]); } if (gstate.names.size() < bind_data.names.size() || bind_data.options.file_options.union_by_name) { diff --git a/src/duckdb/extension/parquet/include/decode_utils.hpp b/src/duckdb/extension/parquet/include/decode_utils.hpp index 43182094..d6c4a854 100644 --- a/src/duckdb/extension/parquet/include/decode_utils.hpp +++ b/src/duckdb/extension/parquet/include/decode_utils.hpp @@ -32,11 +32,18 @@ class ParquetDecodeUtils { public: template - static uint32_t BitUnpack(ByteBuffer &src, bitpacking_width_t &bitpack_pos, T *dst, const idx_t count, - const bitpacking_width_t width) { + static void BitUnpack(ByteBuffer &src, bitpacking_width_t &bitpack_pos, T *dst, idx_t count, + const bitpacking_width_t width) { CheckWidth(width); const auto mask = BITPACK_MASKS[width]; src.available(count * width / BITPACK_DLEN); // check if buffer has enough space available once + if (bitpack_pos == 0 && count >= BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) { + idx_t remainder = count % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; + idx_t aligned_count = count - remainder; + BitUnpackAlignedInternal(src, dst, aligned_count, width); + dst += aligned_count; + count = remainder; + } for (idx_t i = 0; i < count; i++) { auto val = (src.unsafe_get() >> bitpack_pos) & mask; bitpack_pos += width; @@ -49,7 +56,6 @@ class ParquetDecodeUtils { } dst[i] = val; } - return count; } template @@ -60,28 +66,36 @@ class ParquetDecodeUtils { } template - static void BitUnpackAligned(ByteBuffer &src, T *dst, const idx_t count, const bitpacking_width_t width) { - CheckWidth(width); - if (count % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE != 0) { - throw InvalidInputException("Aligned bitpacking count must be a multiple of %llu", - BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE); - } - const auto read_size = count * width / BITPACK_DLEN; - src.available(read_size); // check if buffer has enough space available once + static void BitUnpackAlignedInternal(ByteBuffer &src, T *dst, const idx_t count, const bitpacking_width_t width) { for (idx_t i = 0; i < count; i += BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) { + const auto next_read = BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE * width / 8; + // Buffer for alignment T aligned_data[BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE]; // Copy over to aligned buffer - const auto next_read = BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE * width / 8; memcpy(aligned_data, src.ptr, next_read); - src.unsafe_inc(next_read); // Unpack - BitpackingPrimitives::UnPackBlock(data_ptr_cast(dst), data_ptr_cast(aligned_data), width); + BitpackingPrimitives::UnPackBlock(data_ptr_cast(dst), data_ptr_cast(aligned_data), width, true); + + src.unsafe_inc(next_read); + dst += BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; } } + template + static void BitUnpackAligned(ByteBuffer &src, T *dst, const idx_t count, const bitpacking_width_t width) { + CheckWidth(width); + if (count % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE != 0) { + throw InvalidInputException("Aligned bitpacking count must be a multiple of %llu", + BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE); + } + const auto read_size = count * width / BITPACK_DLEN; + src.available(read_size); // check if buffer has enough space available once + BitUnpackAlignedInternal(src, dst, count, width); + } + //===--------------------------------------------------------------------===// // Zigzag //===--------------------------------------------------------------------===// diff --git a/src/duckdb/extension/parquet/include/parquet_rle_bp_decoder.hpp b/src/duckdb/extension/parquet/include/parquet_rle_bp_decoder.hpp index 49583f71..b8dc35b3 100644 --- a/src/duckdb/extension/parquet/include/parquet_rle_bp_decoder.hpp +++ b/src/duckdb/extension/parquet/include/parquet_rle_bp_decoder.hpp @@ -40,11 +40,7 @@ class RleBpDecoder { values_read += repeat_batch; } else if (literal_count_ > 0) { uint32_t literal_batch = MinValue(batch_size - values_read, static_cast(literal_count_)); - uint32_t actual_read = ParquetDecodeUtils::BitUnpack(buffer_, bitpack_pos, values + values_read, - literal_batch, bit_width_); - if (literal_batch != actual_read) { - throw std::runtime_error("Did not find enough values"); - } + ParquetDecodeUtils::BitUnpack(buffer_, bitpack_pos, values + values_read, literal_batch, bit_width_); literal_count_ -= literal_batch; values_read += literal_batch; } else { diff --git a/src/duckdb/extension/parquet/parquet_extension.cpp b/src/duckdb/extension/parquet/parquet_extension.cpp index a10326b9..a5e8ee52 100644 --- a/src/duckdb/extension/parquet/parquet_extension.cpp +++ b/src/duckdb/extension/parquet/parquet_extension.cpp @@ -209,7 +209,7 @@ struct ParquetWriteGlobalState : public GlobalFunctionData { struct ParquetWriteLocalState : public LocalFunctionData { explicit ParquetWriteLocalState(ClientContext &context, const vector &types) - : buffer(context, types, ColumnDataAllocatorType::HYBRID) { + : buffer(BufferAllocator::Get(context), types) { buffer.InitializeAppend(append_state); } diff --git a/src/duckdb/src/common/progress_bar/progress_bar.cpp b/src/duckdb/src/common/progress_bar/progress_bar.cpp index 13d1d509..2fa8c0ad 100644 --- a/src/duckdb/src/common/progress_bar/progress_bar.cpp +++ b/src/duckdb/src/common/progress_bar/progress_bar.cpp @@ -83,8 +83,16 @@ bool ProgressBar::ShouldPrint(bool final) const { // Don't print progress at all return false; } - // FIXME - do we need to check supported before running `profiler.Elapsed()` ? - auto sufficient_time_elapsed = profiler.Elapsed() > static_cast(show_progress_after) / 1000.0; + if (!supported) { + return false; + } + + double elapsed_time = -1.0; + if (elapsed_time < 0.0) { + elapsed_time = profiler.Elapsed(); + } + + auto sufficient_time_elapsed = elapsed_time > static_cast(show_progress_after) / 1000.0; if (!sufficient_time_elapsed) { // Don't print yet return false; @@ -93,9 +101,6 @@ bool ProgressBar::ShouldPrint(bool final) const { // Print the last completed bar return true; } - if (!supported) { - return false; - } return query_progress.percentage > -1; } @@ -103,16 +108,24 @@ void ProgressBar::Update(bool final) { if (!final && !supported) { return; } - double new_percentage = -1; - auto rows_processed = query_progress.rows_processed.load(); - auto total_rows_to_process = query_progress.total_rows_to_process.load(); - supported = executor.GetPipelinesProgress(new_percentage, rows_processed, total_rows_to_process); - query_progress.rows_processed = rows_processed; - query_progress.total_rows_to_process = total_rows_to_process; - if (!final && !supported) { + ProgressData progress; + idx_t invalid_pipelines = executor.GetPipelinesProgress(progress); + + double new_percentage = 0.0; + if (invalid_pipelines == 0 && progress.IsValid()) { + if (progress.total > 1e15) { + progress.Normalize(1e15); + } + query_progress.rows_processed = idx_t(progress.done); + query_progress.total_rows_to_process = idx_t(progress.total); + new_percentage = progress.ProgressDone() * 100; + } + + if (!final && invalid_pipelines > 0) { return; } + if (new_percentage > query_progress.percentage) { query_progress.percentage = new_percentage; } diff --git a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp index e7d0c756..5459674b 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp @@ -885,15 +885,15 @@ SourceResultType PhysicalHashAggregate::GetData(ExecutionContext &context, DataC return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; } -double PhysicalHashAggregate::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { +ProgressData PhysicalHashAggregate::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { auto &sink_gstate = sink_state->Cast(); auto &gstate = gstate_p.Cast(); - double total_progress = 0; + ProgressData progress; for (idx_t radix_idx = 0; radix_idx < groupings.size(); radix_idx++) { - total_progress += groupings[radix_idx].table_data.GetProgress( - context, *sink_gstate.grouping_states[radix_idx].table_state, *gstate.radix_states[radix_idx]); + progress.Add(groupings[radix_idx].table_data.GetProgress( + context, *sink_gstate.grouping_states[radix_idx].table_state, *gstate.radix_states[radix_idx])); } - return total_progress / double(groupings.size()); + return progress; } InsertionOrderPreservingMap PhysicalHashAggregate::ParamsToString() const { diff --git a/src/duckdb/src/execution/operator/aggregate/physical_window.cpp b/src/duckdb/src/execution/operator/aggregate/physical_window.cpp index 87426272..4beef863 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_window.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_window.cpp @@ -932,13 +932,20 @@ OrderPreservationType PhysicalWindow::SourceOrder() const { return OrderPreservationType::FIXED_ORDER; } -double PhysicalWindow::GetProgress(ClientContext &context, GlobalSourceState &gsource_p) const { +ProgressData PhysicalWindow::GetProgress(ClientContext &context, GlobalSourceState &gsource_p) const { auto &gsource = gsource_p.Cast(); const auto returned = gsource.returned.load(); auto &gsink = gsource.gsink; const auto count = gsink.global_partition->count.load(); - return count ? (double(returned) / double(count)) : -1; + ProgressData res; + if (count) { + res.done = double(returned); + res.total = double(count); + } else { + res.SetInvalid(); + } + return res; } OperatorPartitionData PhysicalWindow::GetPartitionData(ExecutionContext &context, DataChunk &chunk, diff --git a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp index b85879bb..3098acc0 100644 --- a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp @@ -695,7 +695,9 @@ SinkFinalizeType PhysicalHashJoin::Finalize(Pipeline &pipeline, Event &event, Cl const auto max_partition_ht_size = sink.max_partition_size + JoinHashTable::PointerTableSize(sink.max_partition_count); - if (max_partition_ht_size > sink.temporary_memory_state->GetReservation()) { + const auto very_very_skewed = // No point in repartitioning if it's this skewed + static_cast(max_partition_ht_size) >= 0.8 * static_cast(sink.total_size); + if (!very_very_skewed && max_partition_ht_size > sink.temporary_memory_state->GetReservation()) { // We have to repartition ht.SetRepartitionRadixBits(sink.temporary_memory_state->GetReservation(), sink.max_partition_size, sink.max_partition_count); @@ -1291,24 +1293,30 @@ SourceResultType PhysicalHashJoin::GetData(ExecutionContext &context, DataChunk return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; } -double PhysicalHashJoin::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { +ProgressData PhysicalHashJoin::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { auto &sink = sink_state->Cast(); auto &gstate = gstate_p.Cast(); + ProgressData res; + if (!sink.external) { if (PropagatesBuildSide(join_type)) { - return static_cast(gstate.full_outer_chunk_done) / - static_cast(gstate.full_outer_chunk_count) * 100.0; + res.done = static_cast(gstate.full_outer_chunk_done); + res.total = static_cast(gstate.full_outer_chunk_count); + return res; } - return 100.0; + res.done = 0.0; + res.total = 1.0; + return res; } auto num_partitions = static_cast(RadixPartitioning::NumberOfPartitions(sink.hash_table->GetRadixBits())); auto partition_start = static_cast(sink.hash_table->GetPartitionStart()); - auto partition_end = static_cast(sink.hash_table->GetPartitionEnd()); + // auto partition_end = static_cast(sink.hash_table->GetPartitionEnd()); // This many partitions are fully done - auto progress = partition_start / num_partitions; + res.done = partition_start; + res.total = num_partitions; auto probe_chunk_done = static_cast(gstate.probe_chunk_done); auto probe_chunk_count = static_cast(gstate.probe_chunk_count); @@ -1316,10 +1324,12 @@ double PhysicalHashJoin::GetProgress(ClientContext &context, GlobalSourceState & // Progress of the current round of probing, weighed by the number of partitions auto probe_progress = probe_chunk_done / probe_chunk_count; // Add it to the progress, weighed by the number of partitions in the current round - progress += (partition_end - partition_start) / num_partitions * probe_progress; + // progress += (partition_end - partition_start) / num_partitions * probe_progress; + // TODO also also me, fixup using somehow `partition_end - partition_start` + res.done += probe_progress; } - return progress * 100.0; + return res; } InsertionOrderPreservingMap PhysicalHashJoin::ParamsToString() const { diff --git a/src/duckdb/src/execution/operator/join/physical_iejoin.cpp b/src/duckdb/src/execution/operator/join/physical_iejoin.cpp index 5f5b62e7..c5d5810f 100644 --- a/src/duckdb/src/execution/operator/join/physical_iejoin.cpp +++ b/src/duckdb/src/execution/operator/join/physical_iejoin.cpp @@ -889,7 +889,7 @@ class IEJoinGlobalSourceState : public GlobalSourceState { GetNextPair(client, lstate); } - double GetProgress() const { + ProgressData GetProgress() const { auto &left_table = *gsink.tables[0]; auto &right_table = *gsink.tables[1]; @@ -903,7 +903,14 @@ class IEJoinGlobalSourceState : public GlobalSourceState { const auto r = MinValue(next_right.load(), right_outers.load()); const auto returned = completed.load() + l + r; - return count ? (100.0 * double(returned) / double(count)) : -1; + ProgressData res; + if (count) { + res.done = double(returned); + res.total = double(count); + } else { + res.SetInvalid(); + } + return res; } const PhysicalIEJoin &op; @@ -937,7 +944,7 @@ unique_ptr PhysicalIEJoin::GetLocalSourceState(ExecutionContex return make_uniq(context.client, *this); } -double PhysicalIEJoin::GetProgress(ClientContext &context, GlobalSourceState &gsource_p) const { +ProgressData PhysicalIEJoin::GetProgress(ClientContext &context, GlobalSourceState &gsource_p) const { auto &gsource = gsource_p.Cast(); return gsource.GetProgress(); } diff --git a/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp b/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp index d1fcd7b7..4effccaf 100644 --- a/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp +++ b/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp @@ -143,7 +143,7 @@ class FixedBatchCopyLocalState : public LocalSinkState { FixedBatchCopyState current_task = FixedBatchCopyState::SINKING_DATA; void InitializeCollection(ClientContext &context, const PhysicalOperator &op) { - collection = make_uniq(context, op.children[0]->types, ColumnDataAllocatorType::HYBRID); + collection = make_uniq(BufferAllocator::Get(context), op.children[0]->types); collection->InitializeAppend(append_state); local_memory_usage = 0; } @@ -434,7 +434,7 @@ void PhysicalBatchCopyToFile::RepartitionBatches(ClientContext &context, GlobalS // the collection is too large for a batch - we need to repartition // create an empty collection auto new_collection = - make_uniq(context, children[0]->types, ColumnDataAllocatorType::HYBRID); + make_uniq(BufferAllocator::Get(context), children[0]->types); append_batch = make_uniq(0U, std::move(new_collection)); } if (append_batch) { @@ -458,8 +458,7 @@ void PhysicalBatchCopyToFile::RepartitionBatches(ClientContext &context, GlobalS // the collection is full - move it to the result and create a new one task_manager.AddTask(make_uniq(gstate.scheduled_batch_index++, std::move(append_batch))); - auto new_collection = - make_uniq(context, children[0]->types, ColumnDataAllocatorType::HYBRID); + auto new_collection = make_uniq(BufferAllocator::Get(context), children[0]->types); append_batch = make_uniq(0U, std::move(new_collection)); append_batch->collection->InitializeAppend(append_state); } diff --git a/src/duckdb/src/execution/operator/scan/physical_positional_scan.cpp b/src/duckdb/src/execution/operator/scan/physical_positional_scan.cpp index 2fad0493..c1e2707b 100644 --- a/src/duckdb/src/execution/operator/scan/physical_positional_scan.cpp +++ b/src/duckdb/src/execution/operator/scan/physical_positional_scan.cpp @@ -119,7 +119,7 @@ class PositionalTableScanner { return source.ColumnCount(); } - double GetProgress(ClientContext &context) { + ProgressData GetProgress(ClientContext &context) { return table.GetProgress(context, global_state); } @@ -179,15 +179,16 @@ SourceResultType PhysicalPositionalScan::GetData(ExecutionContext &context, Data return SourceResultType::HAVE_MORE_OUTPUT; } -double PhysicalPositionalScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { +ProgressData PhysicalPositionalScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { auto &gstate = gstate_p.Cast(); - double result = child_tables[0]->GetProgress(context, *gstate.global_states[0]); - for (size_t t = 1; t < child_tables.size(); ++t) { - result = MinValue(result, child_tables[t]->GetProgress(context, *gstate.global_states[t])); + ProgressData res; + + for (size_t t = 0; t < child_tables.size(); ++t) { + res.Add(child_tables[t]->GetProgress(context, *gstate.global_states[t])); } - return result; + return res; } bool PhysicalPositionalScan::Equals(const PhysicalOperator &other_p) const { diff --git a/src/duckdb/src/execution/operator/scan/physical_table_scan.cpp b/src/duckdb/src/execution/operator/scan/physical_table_scan.cpp index 00d2814e..91f509a3 100644 --- a/src/duckdb/src/execution/operator/scan/physical_table_scan.cpp +++ b/src/duckdb/src/execution/operator/scan/physical_table_scan.cpp @@ -112,13 +112,24 @@ SourceResultType PhysicalTableScan::GetData(ExecutionContext &context, DataChunk return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; } -double PhysicalTableScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { +ProgressData PhysicalTableScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { auto &gstate = gstate_p.Cast(); + ProgressData res; if (function.table_scan_progress) { - return function.table_scan_progress(context, bind_data.get(), gstate.global_state.get()); + double table_progress = function.table_scan_progress(context, bind_data.get(), gstate.global_state.get()); + if (table_progress < 0.0) { + res.SetInvalid(); + } else { + res.done = table_progress; + res.total = 100.0; + // Assume cardinality is always 1e3 + res.Normalize(1e3); + } + } else { + // if table_scan_progress is not implemented we don't support this function yet in the progress bar + res.SetInvalid(); } - // if table_scan_progress is not implemented we don't support this function yet in the progress bar - return -1; + return res; } bool PhysicalTableScan::SupportsPartitioning(const OperatorPartitionInfo &partition_info) const { @@ -197,8 +208,13 @@ InsertionOrderPreservingMap PhysicalTableScan::ParamsToString() const { filters_info += "\n"; } first_item = false; - auto &col_name = names[column_ids[column_index].GetPrimaryIndex()]; - filters_info += filter->ToString(col_name); + + const auto col_id = column_ids[column_index].GetPrimaryIndex(); + if (col_id == COLUMN_IDENTIFIER_ROW_ID) { + filters_info += filter->ToString("rowid"); + } else { + filters_info += filter->ToString(names[col_id]); + } } } result["Filters"] = filters_info; diff --git a/src/duckdb/src/execution/physical_operator.cpp b/src/duckdb/src/execution/physical_operator.cpp index 91588dab..c5119620 100644 --- a/src/duckdb/src/execution/physical_operator.cpp +++ b/src/duckdb/src/execution/physical_operator.cpp @@ -122,8 +122,10 @@ OperatorPartitionData PhysicalOperator::GetPartitionData(ExecutionContext &conte throw InternalException("Calling GetPartitionData on a node that does not support it"); } -double PhysicalOperator::GetProgress(ClientContext &context, GlobalSourceState &gstate) const { - return -1; +ProgressData PhysicalOperator::GetProgress(ClientContext &context, GlobalSourceState &gstate) const { + ProgressData res; + res.SetInvalid(); + return res; } // LCOV_EXCL_STOP diff --git a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp index 54f1de55..fb5543d3 100644 --- a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp +++ b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp @@ -913,25 +913,24 @@ SourceResultType RadixPartitionedHashTable::GetData(ExecutionContext &context, D } } -double RadixPartitionedHashTable::GetProgress(ClientContext &, GlobalSinkState &sink_p, - GlobalSourceState &gstate_p) const { +ProgressData RadixPartitionedHashTable::GetProgress(ClientContext &, GlobalSinkState &sink_p, + GlobalSourceState &gstate_p) const { auto &sink = sink_p.Cast(); auto &gstate = gstate_p.Cast(); // Get partition combine progress, weigh it 2x - double total_progress = 0; + ProgressData progress; for (auto &partition : sink.partitions) { - total_progress += 2.0 * partition->progress; + progress.done += 2.0 * partition->progress; } // Get scan progress, weigh it 1x - total_progress += 1.0 * double(gstate.task_done); + progress.done += 1.0 * double(gstate.task_done); // Divide by 3x for the weights, and the number of partitions to get a value between 0 and 1 again - total_progress /= 3.0 * double(sink.partitions.size()); + progress.total += 3.0 * double(sink.partitions.size()); - // Multiply by 100 to get a percentage - return 100.0 * total_progress; + return progress; } } // namespace duckdb diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index c7a6a44a..02dfb743 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "4-dev2779" +#define DUCKDB_PATCH_VERSION "4-dev2836" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 1 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.1.4-dev2779" +#define DUCKDB_VERSION "v1.1.4-dev2836" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "fc4b8d4794" +#define DUCKDB_SOURCE_ID "002702cf46" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/execution/executor.hpp b/src/duckdb/src/include/duckdb/execution/executor.hpp index 66f66343..46315a98 100644 --- a/src/duckdb/src/include/duckdb/execution/executor.hpp +++ b/src/duckdb/src/include/duckdb/execution/executor.hpp @@ -15,6 +15,7 @@ #include "duckdb/common/reference_map.hpp" #include "duckdb/main/query_result.hpp" #include "duckdb/execution/task_error_manager.hpp" +#include "duckdb/execution/progress_data.hpp" #include "duckdb/parallel/pipeline.hpp" #include @@ -86,7 +87,7 @@ class Executor { void AddToBeRescheduled(shared_ptr &task); //! Returns the progress of the pipelines - bool GetPipelinesProgress(double ¤t_progress, uint64_t ¤t_cardinality, uint64_t &total_cardinality); + idx_t GetPipelinesProgress(ProgressData &progress); void CompletePipeline() { completed_pipelines++; diff --git a/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp b/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp index 9f04e1b9..195d1fb9 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp @@ -93,7 +93,7 @@ class PhysicalHashAggregate : public PhysicalOperator { GlobalSourceState &gstate) const override; SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; - double GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; + ProgressData GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; bool IsSource() const override { return true; diff --git a/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_window.hpp b/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_window.hpp index 5b648f2f..5a485bca 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_window.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_window.hpp @@ -51,7 +51,7 @@ class PhysicalWindow : public PhysicalOperator { bool SupportsPartitioning(const OperatorPartitionInfo &partition_info) const override; OrderPreservationType SourceOrder() const override; - double GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const override; + ProgressData GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const override; public: // Sink interface diff --git a/src/duckdb/src/include/duckdb/execution/operator/join/physical_hash_join.hpp b/src/duckdb/src/include/duckdb/execution/operator/join/physical_hash_join.hpp index 50a57380..849423a2 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/join/physical_hash_join.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/join/physical_hash_join.hpp @@ -76,7 +76,7 @@ class PhysicalHashJoin : public PhysicalComparisonJoin { GlobalSourceState &gstate) const override; SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; - double GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; + ProgressData GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; //! Becomes a source when it is an external join bool IsSource() const override { diff --git a/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp b/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp index 0fe4b962..5c018372 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp @@ -46,7 +46,7 @@ class PhysicalIEJoin : public PhysicalRangeJoin { return true; } - double GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const override; + ProgressData GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const override; public: // Sink Interface diff --git a/src/duckdb/src/include/duckdb/execution/operator/scan/physical_positional_scan.hpp b/src/duckdb/src/include/duckdb/execution/operator/scan/physical_positional_scan.hpp index 00a45023..2a39fb23 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/scan/physical_positional_scan.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/scan/physical_positional_scan.hpp @@ -38,7 +38,7 @@ class PhysicalPositionalScan : public PhysicalOperator { unique_ptr GetGlobalSourceState(ClientContext &context) const override; SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; - double GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; + ProgressData GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; bool IsSource() const override { return true; diff --git a/src/duckdb/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp b/src/duckdb/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp index 55b86195..45ac1e34 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp @@ -73,7 +73,7 @@ class PhysicalTableScan : public PhysicalOperator { bool SupportsPartitioning(const OperatorPartitionInfo &partition_info) const override; - double GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; + ProgressData GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/execution/physical_operator.hpp b/src/duckdb/src/include/duckdb/execution/physical_operator.hpp index 91f81a6d..822b5537 100644 --- a/src/duckdb/src/include/duckdb/execution/physical_operator.hpp +++ b/src/duckdb/src/include/duckdb/execution/physical_operator.hpp @@ -15,6 +15,7 @@ #include "duckdb/common/enums/explain_format.hpp" #include "duckdb/common/types/data_chunk.hpp" #include "duckdb/execution/execution_context.hpp" +#include "duckdb/execution/progress_data.hpp" #include "duckdb/optimizer/join_order/join_node.hpp" #include "duckdb/common/optional_idx.hpp" #include "duckdb/execution/physical_operator_states.hpp" @@ -139,10 +140,11 @@ class PhysicalOperator { } //! Returns the current progress percentage, or a negative value if progress bars are not supported - virtual double GetProgress(ClientContext &context, GlobalSourceState &gstate) const; + virtual ProgressData GetProgress(ClientContext &context, GlobalSourceState &gstate) const; //! Returns the current progress percentage, or a negative value if progress bars are not supported - virtual double GetSinkProgress(ClientContext &context, GlobalSinkState &gstate, double source_progress) const { + virtual ProgressData GetSinkProgress(ClientContext &context, GlobalSinkState &gstate, + const ProgressData source_progress) const { return source_progress; } diff --git a/src/duckdb/src/include/duckdb/execution/progress_data.hpp b/src/duckdb/src/include/duckdb/execution/progress_data.hpp new file mode 100644 index 00000000..01d5b875 --- /dev/null +++ b/src/duckdb/src/include/duckdb/execution/progress_data.hpp @@ -0,0 +1,55 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/execution/progress_data.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/common/assert.hpp" + +namespace duckdb { + +struct ProgressData { + double done = 0.0; + double total = 0.0; + bool invalid = false; + double ProgressDone() const { + // ProgressDone requires a valid state + D_ASSERT(IsValid()); + + return done / total; + } + void Add(const ProgressData &other) { + // Add is unchecked, propagating invalid + done += other.done; + total += other.total; + invalid |= other.invalid; + } + void Normalize(const double target = 1.0) { + // Normalize checks only `target`, propagating invalid + D_ASSERT(target > 0.0); + if (IsValid()) { + if (total > 0.0) { + done /= total; + } + total = 1.0; + done *= target; + total *= target; + } else { + SetInvalid(); + } + } + void SetInvalid() { + invalid = true; + done = 0.0; + total = 1.0; + } + bool IsValid() const { + return (!invalid) && (done >= 0.0) && (done <= total) && (total >= 0.0); + } +}; + +} // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/execution/radix_partitioned_hashtable.hpp b/src/duckdb/src/include/duckdb/execution/radix_partitioned_hashtable.hpp index 2134de07..b14ebb17 100644 --- a/src/duckdb/src/include/duckdb/execution/radix_partitioned_hashtable.hpp +++ b/src/duckdb/src/include/duckdb/execution/radix_partitioned_hashtable.hpp @@ -10,6 +10,7 @@ #include "duckdb/common/types/row/tuple_data_layout.hpp" #include "duckdb/execution/operator/aggregate/grouped_aggregate_data.hpp" +#include "duckdb/execution/progress_data.hpp" #include "duckdb/parser/group_by_node.hpp" namespace duckdb { @@ -50,7 +51,7 @@ class RadixPartitionedHashTable { SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, GlobalSinkState &sink, OperatorSourceInput &input) const; - double GetProgress(ClientContext &context, GlobalSinkState &sink_p, GlobalSourceState &gstate) const; + ProgressData GetProgress(ClientContext &context, GlobalSinkState &sink_p, GlobalSourceState &gstate) const; const TupleDataLayout &GetLayout() const; idx_t MaxThreads(GlobalSinkState &sink) const; diff --git a/src/duckdb/src/include/duckdb/parallel/pipeline.hpp b/src/duckdb/src/include/duckdb/parallel/pipeline.hpp index a074551c..34dcdd44 100644 --- a/src/duckdb/src/include/duckdb/parallel/pipeline.hpp +++ b/src/duckdb/src/include/duckdb/parallel/pipeline.hpp @@ -97,7 +97,7 @@ class Pipeline : public enable_shared_from_this { void PrintDependencies() const; //! Returns query progress - bool GetProgress(double ¤t_percentage, idx_t &estimated_cardinality); + bool GetProgress(ProgressData &progress_data); //! Returns a list of all operators (including source and sink) involved in this pipeline vector> GetOperators(); diff --git a/src/duckdb/src/include/duckdb/storage/statistics/numeric_stats.hpp b/src/duckdb/src/include/duckdb/storage/statistics/numeric_stats.hpp index c5490958..342a6f30 100644 --- a/src/duckdb/src/include/duckdb/storage/statistics/numeric_stats.hpp +++ b/src/duckdb/src/include/duckdb/storage/statistics/numeric_stats.hpp @@ -54,6 +54,20 @@ struct NumericStats { //! Sets the max value of the statistics DUCKDB_API static void SetMax(BaseStatistics &stats, const Value &val); + template + static void SetMax(BaseStatistics &stats, T val) { + auto &nstats = GetDataUnsafe(stats); + nstats.has_max = true; + nstats.max.GetReferenceUnsafe() = val; + } + + template + static void SetMin(BaseStatistics &stats, T val) { + auto &nstats = GetDataUnsafe(stats); + nstats.has_min = true; + nstats.min.GetReferenceUnsafe() = val; + } + //! Check whether or not a given comparison with a constant could possibly be satisfied by rows given the statistics DUCKDB_API static FilterPropagateResult CheckZonemap(const BaseStatistics &stats, ExpressionType comparison_type, array_ptr constants); diff --git a/src/duckdb/src/optimizer/filter_combiner.cpp b/src/duckdb/src/optimizer/filter_combiner.cpp index dd9bffe7..c5481c28 100644 --- a/src/duckdb/src/optimizer/filter_combiner.cpp +++ b/src/duckdb/src/optimizer/filter_combiner.cpp @@ -507,9 +507,6 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(const vectorsecond; for (auto &constant_cmp : constant_list) { @@ -519,7 +516,10 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(const vector())); + // However, the rowid pseudocolumn can never be NULL. + if (!column_index.IsRowIdColumn()) { + table_filters.PushFilter(column_index, PushDownFilterIntoExpr(expr, make_uniq())); + } equivalence_map.erase(filter_exp); } diff --git a/src/duckdb/src/optimizer/remove_unused_columns.cpp b/src/duckdb/src/optimizer/remove_unused_columns.cpp index b56fd9f1..12ea9230 100644 --- a/src/duckdb/src/optimizer/remove_unused_columns.cpp +++ b/src/duckdb/src/optimizer/remove_unused_columns.cpp @@ -239,9 +239,12 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { if (!index.IsValid()) { throw InternalException("Could not find column index for table filter"); } - auto &column_type = get.returned_types[filter.first]; + + auto column_type = + filter.first == COLUMN_IDENTIFIER_ROW_ID ? LogicalType::ROW_TYPE : get.returned_types[filter.first]; + ColumnBinding filter_binding(get.table_index, index.GetIndex()); - auto column_ref = make_uniq(column_type, filter_binding); + auto column_ref = make_uniq(std::move(column_type), filter_binding); auto filter_expr = filter.second->ToExpression(*column_ref); VisitExpression(&filter_expr); filter_expressions.push_back(std::move(filter_expr)); diff --git a/src/duckdb/src/parallel/executor.cpp b/src/duckdb/src/parallel/executor.cpp index 9655c33a..44db75f1 100644 --- a/src/duckdb/src/parallel/executor.cpp +++ b/src/duckdb/src/parallel/executor.cpp @@ -684,39 +684,21 @@ void Executor::Flush(ThreadContext &thread_context) { } } -bool Executor::GetPipelinesProgress(double ¤t_progress, uint64_t ¤t_cardinality, - uint64_t &total_cardinality) { // LCOV_EXCL_START +idx_t Executor::GetPipelinesProgress(ProgressData &progress) { // LCOV_EXCL_START lock_guard elock(executor_lock); - vector progress; - vector cardinality; - total_cardinality = 0; - current_cardinality = 0; + progress.done = 0; + progress.total = 0; + idx_t count_invalid = 0; for (auto &pipeline : pipelines) { - double child_percentage; - idx_t child_cardinality; - - if (!pipeline->GetProgress(child_percentage, child_cardinality)) { - return false; + ProgressData p; + if (!pipeline->GetProgress(p)) { + count_invalid++; + } else { + progress.Add(p); } - progress.push_back(child_percentage); - cardinality.push_back(child_cardinality); - total_cardinality += child_cardinality; - } - if (total_cardinality == 0) { - return true; } - current_progress = 0; - - for (size_t i = 0; i < progress.size(); i++) { - progress[i] = MaxValue(0.0, MinValue(100.0, progress[i])); - current_cardinality = LossyNumericCast(static_cast( - static_cast(current_cardinality) + - static_cast(progress[i]) * static_cast(cardinality[i]) / static_cast(100))); - current_progress += progress[i] * double(cardinality[i]) / double(total_cardinality); - D_ASSERT(current_cardinality <= total_cardinality); - } - return true; + return count_invalid; } // LCOV_EXCL_STOP bool Executor::HasResultCollector() { diff --git a/src/duckdb/src/parallel/pipeline.cpp b/src/duckdb/src/parallel/pipeline.cpp index 04a9f8bb..4e15068f 100644 --- a/src/duckdb/src/parallel/pipeline.cpp +++ b/src/duckdb/src/parallel/pipeline.cpp @@ -72,17 +72,23 @@ ClientContext &Pipeline::GetClientContext() { return executor.context; } -bool Pipeline::GetProgress(double ¤t_percentage, idx_t &source_cardinality) { +bool Pipeline::GetProgress(ProgressData &progress) { D_ASSERT(source); - source_cardinality = MinValue(source->estimated_cardinality, 1ULL << 48ULL); + idx_t source_cardinality = MinValue(source->estimated_cardinality, 1ULL << 48ULL); + if (source_cardinality < 1) { + source_cardinality = 1; + } if (!initialized) { - current_percentage = 0; + progress.done = 0; + progress.total = double(source_cardinality); return true; } auto &client = executor.context; - current_percentage = source->GetProgress(client, *source_state); - current_percentage = sink->GetSinkProgress(client, *sink->sink_state, current_percentage); - return current_percentage >= 0; + + progress = source->GetProgress(client, *source_state); + progress.Normalize(double(source_cardinality)); + progress = sink->GetSinkProgress(client, *sink->sink_state, progress); + return progress.IsValid(); } void Pipeline::ScheduleSequentialTask(shared_ptr &event) { diff --git a/src/duckdb/src/planner/table_filter.cpp b/src/duckdb/src/planner/table_filter.cpp index b9db37d8..f8544102 100644 --- a/src/duckdb/src/planner/table_filter.cpp +++ b/src/duckdb/src/planner/table_filter.cpp @@ -67,10 +67,6 @@ DynamicTableFilterSet::GetFinalTableFilters(const PhysicalTableScan &scan, } for (auto &entry : filters) { for (auto &filter : entry.second->filters) { - if (scan.column_ids[filter.first].IsRowIdColumn()) { - // skip row id filters - continue; - } result->PushFilter(ColumnIndex(filter.first), filter.second->Copy()); } } diff --git a/src/duckdb/src/storage/table/row_group.cpp b/src/duckdb/src/storage/table/row_group.cpp index eaeb5e88..0d682a20 100644 --- a/src/duckdb/src/storage/table/row_group.cpp +++ b/src/duckdb/src/storage/table/row_group.cpp @@ -401,6 +401,15 @@ void RowGroup::NextVector(CollectionScanState &state) { } } +static FilterPropagateResult CheckRowIdFilter(TableFilter &filter, idx_t beg_row, idx_t end_row) { + // RowId columns dont have a zonemap, but we can trivially create stats to check the filter against. + BaseStatistics dummy_stats = NumericStats::CreateEmpty(LogicalType::ROW_TYPE); + NumericStats::SetMin(dummy_stats, UnsafeNumericCast(beg_row)); + NumericStats::SetMax(dummy_stats, UnsafeNumericCast(end_row)); + + return filter.CheckStatistics(dummy_stats); +} + bool RowGroup::CheckZonemap(ScanFilterInfo &filters) { auto &filter_list = filters.GetFilterList(); // new row group - label all filters as up for grabs again @@ -409,7 +418,15 @@ bool RowGroup::CheckZonemap(ScanFilterInfo &filters) { auto &entry = filter_list[i]; auto &filter = entry.filter; auto base_column_index = entry.table_column_index; - auto prune_result = GetColumn(base_column_index).CheckZonemap(filter); + + FilterPropagateResult prune_result; + + if (base_column_index == COLUMN_IDENTIFIER_ROW_ID) { + prune_result = CheckRowIdFilter(filter, this->start, this->start + this->count); + } else { + prune_result = GetColumn(base_column_index).CheckZonemap(filter); + } + if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) { return false; } @@ -473,7 +490,13 @@ bool RowGroup::CheckZonemapSegments(CollectionScanState &state) { auto base_column_idx = entry.table_column_index; auto &filter = entry.filter; - auto prune_result = GetColumn(base_column_idx).CheckZonemap(state.column_scans[column_idx], filter); + FilterPropagateResult prune_result; + if (base_column_idx == COLUMN_IDENTIFIER_ROW_ID) { + prune_result = CheckRowIdFilter(filter, this->start, this->start + this->count); + } else { + prune_result = GetColumn(base_column_idx).CheckZonemap(state.column_scans[column_idx], filter); + } + if (prune_result != FilterPropagateResult::FILTER_ALWAYS_FALSE) { continue; } @@ -613,10 +636,48 @@ void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &s // this filter is always true - skip it continue; } - auto scan_idx = filter.scan_column_index; - auto &col_data = GetColumn(filter.table_column_index); - col_data.Select(transaction, state.vector_index, state.column_scans[scan_idx], - result.data[scan_idx], sel, approved_tuple_count, filter.filter); + + const auto scan_idx = filter.scan_column_index; + const auto column_idx = filter.table_column_index; + + if (column_idx == COLUMN_IDENTIFIER_ROW_ID) { + + // We do another quick statistics scan for row ids here + const auto rowid_start = this->start + current_row; + const auto rowid_end = this->start + current_row + max_count; + const auto prune_result = CheckRowIdFilter(filter.filter, rowid_start, rowid_end); + if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) { + // We can just break out of the loop here. + approved_tuple_count = 0; + break; + } + + // Generate row ids + // Create sequence for row ids + D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE); + result.data[i].SetVectorType(VectorType::FLAT_VECTOR); + auto result_data = FlatVector::GetData(result.data[i]); + for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) { + result_data[sel.get_index(sel_idx)] = + UnsafeNumericCast(this->start + current_row + sel.get_index(sel_idx)); + } + + // Was this filter always true? If so, we dont need to apply it + if (prune_result == FilterPropagateResult::FILTER_ALWAYS_TRUE) { + continue; + } + + // Now apply the filter + UnifiedVectorFormat vdata; + result.data[i].ToUnifiedFormat(approved_tuple_count, vdata); + ColumnSegment::FilterSelection(sel, result.data[i], vdata, filter.filter, approved_tuple_count, + approved_tuple_count); + + } else { + auto &col_data = GetColumn(filter.table_column_index); + col_data.Select(transaction, state.vector_index, state.column_scans[scan_idx], + result.data[scan_idx], sel, approved_tuple_count, filter.filter); + } } for (auto &table_filter : filter_list) { if (table_filter.IsAlwaysTrue()) { @@ -652,7 +713,7 @@ void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &s } auto &column = column_ids[i]; if (column.IsRowIdColumn()) { - D_ASSERT(result.data[i].GetType().InternalType() == PhysicalType::INT64); + D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE); result.data[i].SetVectorType(VectorType::FLAT_VECTOR); auto result_data = FlatVector::GetData(result.data[i]); for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) {