diff --git a/internal/core/src/exec/operator/query-agg/Aggregate.h b/internal/core/src/exec/operator/query-agg/Aggregate.h index 683891995b9e4..418f579134202 100644 --- a/internal/core/src/exec/operator/query-agg/Aggregate.h +++ b/internal/core/src/exec/operator/query-agg/Aggregate.h @@ -72,6 +72,29 @@ class Aggregate { virtual void extractValues(char** groups, int32_t numGroups, VectorPtr* result) {}; + // Returns true if the accumulator never takes more than + // accumulatorFixedWidthSize() bytes. If this is false, the + // accumulator needs to track its changing variable length footprint + // using RowSizeTracker (Aggregate::trackRowSize), see ArrayAggAggregate for + // sample usage. A group row with at least one variable length key or + // aggregate will have a 32-bit slot at offset RowContainer::rowSize_ for + // keeping track of per-row size. The size is relevant for keeping caps on + // result set and spilling batch sizes with skewed data. + virtual bool isFixedSize() const { + return true; + } + + // Returns the fixed number of bytes the accumulator takes on a group + // row. Variable width accumulators will reference the variable + // width part of the state from the fixed part. + virtual int32_t accumulatorFixedWidthSize() const = 0; + + /// Returns the alignment size of the accumulator. Some types such as + /// int128_t require aligned access. This value must be a power of 2. + virtual int32_t accumulatorAlignmentSize() const { + return 1; + } + protected: virtual void setOffsetsInternal( int32_t offset, diff --git a/internal/core/src/exec/operator/query-agg/AggregateInfo.h b/internal/core/src/exec/operator/query-agg/AggregateInfo.h index 2738181649433..b4dda8894dca2 100644 --- a/internal/core/src/exec/operator/query-agg/AggregateInfo.h +++ b/internal/core/src/exec/operator/query-agg/AggregateInfo.h @@ -29,8 +29,14 @@ struct AggregateInfo{ /// Indices of the input columns in the input RowVector. std::vector input_column_idxes_; + /// Boolean indicating whether inputs must be de-duplicated before aggregating + bool distinct{false}; + /// Index of the result column in the output RowVector. column_index_t output_; + + /// Type of intermediate results. Used for spilling. + DataType intermediateType_; }; std::vector toAggregateInfo( diff --git a/internal/core/src/exec/operator/query-agg/GroupingSet.cpp b/internal/core/src/exec/operator/query-agg/GroupingSet.cpp index 8464fb637f6fb..453ddec8a3afa 100644 --- a/internal/core/src/exec/operator/query-agg/GroupingSet.cpp +++ b/internal/core/src/exec/operator/query-agg/GroupingSet.cpp @@ -133,8 +133,7 @@ std::vector GroupingSet::accumulators(bool /*excludeToIntermediate* std::vector accumulators; accumulators.reserve(aggregates_.size()); for(auto& aggregate: aggregates_) { - // add accumalator for each aggregate - // accumulators.emplace_back(Accumulator{aggregate-}); + accumulators.emplace_back(Accumulator{aggregate.function_.get(), aggregate.intermediateType_}); } return accumulators; } diff --git a/internal/core/src/exec/operator/query-agg/RowContainer.cpp b/internal/core/src/exec/operator/query-agg/RowContainer.cpp index 1d628588a049a..b6b2ac15768bc 100644 --- a/internal/core/src/exec/operator/query-agg/RowContainer.cpp +++ b/internal/core/src/exec/operator/query-agg/RowContainer.cpp @@ -59,7 +59,6 @@ RowContainer::RowContainer(const std::vector &keyTypes, nullOffsets_.push_back(nullOffset); ++nullOffset; isVariableWidth |= !accumulator.isFixedSize(); - usesExternalMemory_ |= accumulator.usesExternalMemory(); alignment_ = combineAlignments(accumulator.alignment(), alignment_); } @@ -145,7 +144,6 @@ void RowContainer::store(const milvus::ColumnVectorPtr &column_data, milvus::vec Accumulator::Accumulator( bool isFixedSize, int32_t fixedSize, - bool useExternalMemory, int32_t alignment, DataType spillType, std::function groups, milvus::VectorPtr& result)> @@ -153,7 +151,6 @@ Accumulator::Accumulator( std::function groups)> destroyFunction): isFixedSize_{isFixedSize}, fixedSize_{fixedSize}, - usesExternalMemory_{useExternalMemory}, alignment_{alignment}, spillType_{spillType}, spillExtractFunction_{std::move(spillExtractFunction)}, @@ -161,11 +158,13 @@ Accumulator::Accumulator( } -Accumulator::Accumulator(milvus::exec::Aggregate *aggregate): alignment_(1), - isFixedSize_(true), - spillType_(DataType::NONE), - usesExternalMemory_(false), - fixedSize_(1){ +Accumulator::Accumulator(milvus::exec::Aggregate *aggregate, DataType spillType): + isFixedSize_(aggregate->isFixedSize()), + fixedSize_{aggregate->accumulatorFixedWidthSize()}, + alignment_(aggregate->accumulatorAlignmentSize()), + spillType_(spillType){ + AssertInfo(aggregate!=nullptr, "Input aggregate for accumulator cannot be nullptr!"); } + } } diff --git a/internal/core/src/exec/operator/query-agg/RowContainer.h b/internal/core/src/exec/operator/query-agg/RowContainer.h index 40123309700aa..65bda817a9b08 100644 --- a/internal/core/src/exec/operator/query-agg/RowContainer.h +++ b/internal/core/src/exec/operator/query-agg/RowContainer.h @@ -31,23 +31,18 @@ class Accumulator { Accumulator( bool isFixedSize, int32_t fixedSize, - bool useExternalMemory, int32_t alignment, DataType spillType, std::function groups, VectorPtr& result)> spillExtractFunction, std::function groups)> destroyFunction); - explicit Accumulator(Aggregate* aggregate); + explicit Accumulator(Aggregate* aggregate, DataType spillType); bool isFixedSize() const { return isFixedSize_; } - bool usesExternalMemory() const { - return usesExternalMemory_; - } - int32_t alignment() const { return alignment_; } @@ -59,7 +54,6 @@ class Accumulator { private: const bool isFixedSize_; const int32_t fixedSize_; - const bool usesExternalMemory_; const int32_t alignment_; const DataType spillType_; std::function groups, VectorPtr& result)> spillExtractFunction_; @@ -446,8 +440,6 @@ class RowContainer { std::vector accumulators_; - bool usesExternalMemory_{false}; - // Head of linked list of free rows. char* firstFreeRow_ = nullptr; uint64_t numRows_ = 0;