Skip to content

Commit

Permalink
before diving into aggreation
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPresent-Han committed Nov 10, 2024
1 parent 93e34b3 commit 64e961b
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 19 deletions.
23 changes: 23 additions & 0 deletions internal/core/src/exec/operator/query-agg/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,29 @@ class Aggregate {

virtual void extractValues(char** groups, int32_t numGroups, VectorPtr* result) {};

// Returns true if the accumulator never takes more than
// accumulatorFixedWidthSize() bytes. If this is false, the
// accumulator needs to track its changing variable length footprint
// using RowSizeTracker (Aggregate::trackRowSize), see ArrayAggAggregate for
// sample usage. A group row with at least one variable length key or
// aggregate will have a 32-bit slot at offset RowContainer::rowSize_ for
// keeping track of per-row size. The size is relevant for keeping caps on
// result set and spilling batch sizes with skewed data.
virtual bool isFixedSize() const {
return true;
}

// Returns the fixed number of bytes the accumulator takes on a group
// row. Variable width accumulators will reference the variable
// width part of the state from the fixed part.
virtual int32_t accumulatorFixedWidthSize() const = 0;

/// Returns the alignment size of the accumulator. Some types such as
/// int128_t require aligned access. This value must be a power of 2.
virtual int32_t accumulatorAlignmentSize() const {
return 1;
}

protected:
virtual void setOffsetsInternal(
int32_t offset,
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/exec/operator/query-agg/AggregateInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ struct AggregateInfo{
/// Indices of the input columns in the input RowVector.
std::vector<column_index_t> input_column_idxes_;

/// Boolean indicating whether inputs must be de-duplicated before aggregating
bool distinct{false};

/// Index of the result column in the output RowVector.
column_index_t output_;

/// Type of intermediate results. Used for spilling.
DataType intermediateType_;
};

std::vector<AggregateInfo> toAggregateInfo(
Expand Down
3 changes: 1 addition & 2 deletions internal/core/src/exec/operator/query-agg/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ std::vector<Accumulator> GroupingSet::accumulators(bool /*excludeToIntermediate*
std::vector<Accumulator> accumulators;
accumulators.reserve(aggregates_.size());
for(auto& aggregate: aggregates_) {
// add accumalator for each aggregate
// accumulators.emplace_back(Accumulator{aggregate-});
accumulators.emplace_back(Accumulator{aggregate.function_.get(), aggregate.intermediateType_});
}
return accumulators;
}
Expand Down
15 changes: 7 additions & 8 deletions internal/core/src/exec/operator/query-agg/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ RowContainer::RowContainer(const std::vector<DataType> &keyTypes,
nullOffsets_.push_back(nullOffset);
++nullOffset;
isVariableWidth |= !accumulator.isFixedSize();
usesExternalMemory_ |= accumulator.usesExternalMemory();
alignment_ = combineAlignments(accumulator.alignment(), alignment_);
}

Expand Down Expand Up @@ -145,27 +144,27 @@ void RowContainer::store(const milvus::ColumnVectorPtr &column_data, milvus::vec
Accumulator::Accumulator(
bool isFixedSize,
int32_t fixedSize,
bool useExternalMemory,
int32_t alignment,
DataType spillType,
std::function<void(folly::Range<char**> groups, milvus::VectorPtr& result)>
spillExtractFunction,
std::function<void(folly::Range<char**> groups)> destroyFunction):
isFixedSize_{isFixedSize},
fixedSize_{fixedSize},
usesExternalMemory_{useExternalMemory},
alignment_{alignment},
spillType_{spillType},
spillExtractFunction_{std::move(spillExtractFunction)},
destroyFunction_{std::move(destroyFunction)}{

}

Accumulator::Accumulator(milvus::exec::Aggregate *aggregate): alignment_(1),
isFixedSize_(true),
spillType_(DataType::NONE),
usesExternalMemory_(false),
fixedSize_(1){
Accumulator::Accumulator(milvus::exec::Aggregate *aggregate, DataType spillType):
isFixedSize_(aggregate->isFixedSize()),
fixedSize_{aggregate->accumulatorFixedWidthSize()},
alignment_(aggregate->accumulatorAlignmentSize()),
spillType_(spillType){
AssertInfo(aggregate!=nullptr, "Input aggregate for accumulator cannot be nullptr!");
}

}
}
10 changes: 1 addition & 9 deletions internal/core/src/exec/operator/query-agg/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,18 @@ class Accumulator {
Accumulator(
bool isFixedSize,
int32_t fixedSize,
bool useExternalMemory,
int32_t alignment,
DataType spillType,
std::function<void(folly::Range<char**> groups, VectorPtr& result)>
spillExtractFunction,
std::function<void(folly::Range<char**> groups)> destroyFunction);

explicit Accumulator(Aggregate* aggregate);
explicit Accumulator(Aggregate* aggregate, DataType spillType);

bool isFixedSize() const {
return isFixedSize_;
}

bool usesExternalMemory() const {
return usesExternalMemory_;
}

int32_t alignment() const {
return alignment_;
}
Expand All @@ -59,7 +54,6 @@ class Accumulator {
private:
const bool isFixedSize_;
const int32_t fixedSize_;
const bool usesExternalMemory_;
const int32_t alignment_;
const DataType spillType_;
std::function<void(folly::Range<char**> groups, VectorPtr& result)> spillExtractFunction_;
Expand Down Expand Up @@ -446,8 +440,6 @@ class RowContainer {

std::vector<Accumulator> accumulators_;

bool usesExternalMemory_{false};

// Head of linked list of free rows.
char* firstFreeRow_ = nullptr;
uint64_t numRows_ = 0;
Expand Down

0 comments on commit 64e961b

Please sign in to comment.