Skip to content

Commit

Permalink
support global aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPresent-Han committed Nov 21, 2024
1 parent 6da52f4 commit 19e4ab2
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions internal/agg/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,17 +514,22 @@ func AssembleBucket(bucket *Bucket, fieldDatas []*schemapb.FieldData, rowIdx int
colCount := len(fieldDatas)
for r := 0; r < bucket.RowCount(); r++ {
row := bucket.RowAt(r)
for c := 0; c < colCount; c++ {
err := AssembleSingleValue(row.ValAt(c), fieldDatas[c], rowIdx)
if err != nil {
return err
}
}
AssembleSingleRow(colCount, row, fieldDatas, rowIdx)
rowIdx++
}
return nil
}

func AssembleSingleRow(colCount int, row *Row, fieldDatas []*schemapb.FieldData, rowIdx int) error {
for c := 0; c < colCount; c++ {
err := AssembleSingleValue(row.ValAt(c), fieldDatas[c], rowIdx)
if err != nil {
return err
}
}
return nil
}

func AssembleSingleValue(val interface{}, fieldData *schemapb.FieldData, rowIdx int) error {
switch fieldData.GetType() {
case schemapb.DataType_Bool:
Expand Down Expand Up @@ -598,6 +603,7 @@ func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*Aggregati
hashers := make([]FieldAccessor, 0, numGroupingKeys)
accumulators := make([]FieldAccessor, 0)
firstFieldData := results[0].GetFieldDatas()
outputColumnCount := len(firstFieldData)
for idx, fieldData := range firstFieldData {
if idx < numGroupingKeys {
hasher, err := NewFieldAccessor(fieldData.GetType())
Expand All @@ -615,15 +621,35 @@ func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*Aggregati
}
}

isGlobal := numGroupingKeys == 0
if isGlobal {
reducedResult := NewAggregationResult(nil)
reducedResult.fieldDatas = typeutil.PrepareResultFieldData(firstFieldData, 1)
rows := make([]*Row, len(results))
for idx, result := range results {
entries := make([]*Entry, outputColumnCount)
for col := 0; col < outputColumnCount; col++ {
fieldData := result.GetFieldDatas()[col]
accumulators[col].SetVals(fieldData)
entries[col] = NewEntry(accumulators[col].ValAt(0))
}
rows[idx] = NewRow(entries)
}
for r := 1; r < len(rows); r++ {
for c := 0; c < outputColumnCount; c++ {
rows[0].UpdateEntry(rows[r], c, aggs[c])
}
}
AssembleSingleRow(outputColumnCount, rows[0], reducedResult.fieldDatas, 0)
return reducedResult, nil
}

// 2. compute hash values for all rows in the result retrieved
outputColumnCount := -1
rowIdx := 0
totalRowCount := 0
for _, result := range results {
fieldDatas := result.GetFieldDatas()
if outputColumnCount == -1 {
outputColumnCount = len(fieldDatas)
} else if outputColumnCount != len(fieldDatas) {
if outputColumnCount != len(fieldDatas) {
return nil, fmt.Errorf("retrieved results from different segments have different size of columns")
}
if outputColumnCount == 0 {
Expand Down Expand Up @@ -703,7 +729,7 @@ func AggResult2internalResult(aggRes *AggregationResult) *internalpb.RetrieveRes
return &internalpb.RetrieveResults{FieldsData: aggRes.GetFieldDatas()}
}

func SegcoreResults2AggResult(results []*segcorepb.RetrieveResults) []*agg.AggregationResult {
func SegcoreResults2AggResult(results []*segcorepb.RetrieveResults) []*AggregationResult {
aggResults := make([]*AggregationResult, len(results))
for i := 0; i < len(results); i++ {
aggResults[i] = NewAggregationResult(results[i].GetFieldsData())
Expand Down

0 comments on commit 19e4ab2

Please sign in to comment.