Skip to content

Commit

Permalink
decouple to common package
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPresent-Han committed Nov 21, 2024
1 parent 931b097 commit c4a9e3e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 39 deletions.
75 changes: 43 additions & 32 deletions internal/agg/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil"
Expand Down Expand Up @@ -562,7 +563,22 @@ func NewGroupAggReducer(groupByFieldIds []int64, aggregates []*planpb.Aggregate,
}
}

func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*segcorepb.RetrieveResults) (*segcorepb.RetrieveResults, error) {
type AggregationResult struct {
fieldDatas []*schemapb.FieldData
}

func NewAggregationResult(fieldDatas []*schemapb.FieldData) *AggregationResult {
return &AggregationResult{
fieldDatas: fieldDatas,
}
}

// GetFieldDatas returns the fieldDatas slice
func (ar *AggregationResult) GetFieldDatas() []*schemapb.FieldData {
return ar.fieldDatas
}

func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*AggregationResult) (*AggregationResult, error) {
if results == nil || len(results) == 0 {
return nil, fmt.Errorf("no input segment's retrieved results can be reduced")
}
Expand All @@ -580,7 +596,7 @@ func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*segcorepb
numGroupingKeys := len(reducer.groupByFieldIds)
hashers := make([]FieldAccessor, 0, numGroupingKeys)
accumulators := make([]FieldAccessor, 0)
firstFieldData := results[0].GetFieldsData()
firstFieldData := results[0].GetFieldDatas()
for idx, fieldData := range firstFieldData {
if idx < numGroupingKeys {
hasher, err := NewFieldAccessor(fieldData.GetType())
Expand All @@ -603,7 +619,7 @@ func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*segcorepb
rowIdx := 0
totalRowCount := 0
for _, result := range results {
fieldDatas := result.GetFieldsData()
fieldDatas := result.GetFieldDatas()
if outputColumnCount == -1 {
outputColumnCount = len(fieldDatas)
} else if outputColumnCount != len(fieldDatas) {
Expand Down Expand Up @@ -661,11 +677,11 @@ func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*segcorepb
}

//3. assemble reduced buckets into retrievedResult
reducedResult := &segcorepb.RetrieveResults{}
reducedResult.FieldsData = typeutil.PrepareResultFieldData(firstFieldData, int64(totalRowCount))
reducedResult := NewAggregationResult(nil)
reducedResult.fieldDatas = typeutil.PrepareResultFieldData(firstFieldData, int64(totalRowCount))
rowIdx = 0
for _, bucket := range reducer.hashValsMap {
err := AssembleBucket(bucket, reducedResult.FieldsData, rowIdx)
err := AssembleBucket(bucket, reducedResult.GetFieldDatas(), rowIdx)
if err != nil {
return nil, err
}
Expand All @@ -674,31 +690,26 @@ func (reducer *GroupAggReducer) Reduce(ctx context.Context, results []*segcorepb
return reducedResult, nil
}

func CalculateRowNumber(results []*segcorepb.RetrieveResults) int64 {
rowNumber := 0
for _, result := range results {
fieldsData := result.GetFieldsData()
if len(fieldsData) < 1 {
return 0
}
firstFieldData := fieldsData[0]
switch firstFieldData.GetType() {
case schemapb.DataType_Bool:
rowNumber += len(firstFieldData.GetScalars().GetBoolData().Data)
case schemapb.DataType_Int8:
case schemapb.DataType_Int16:
case schemapb.DataType_Int32:
rowNumber += len(firstFieldData.GetScalars().GetIntData().Data)
case schemapb.DataType_Int64:
rowNumber += len(firstFieldData.GetScalars().GetLongData().Data)
case schemapb.DataType_VarChar:
case schemapb.DataType_String:
rowNumber += len(firstFieldData.GetScalars().GetStringData().Data)
case schemapb.DataType_Float:
rowNumber += len(firstFieldData.GetScalars().GetFloatData().Data)
case schemapb.DataType_Double:
rowNumber += len(firstFieldData.GetScalars().GetDoubleData().Data)
}
func InternalResult2AggResult(results []*internalpb.RetrieveResults) []*AggregationResult {
aggResults := make([]*AggregationResult, len(results))
for i := 0; i < len(results); i++ {
aggResults[i] = NewAggregationResult(results[i].GetFieldsData())
}
return aggResults
}

func AggResult2internalResult(aggRes *AggregationResult) *internalpb.RetrieveResults {
return &internalpb.RetrieveResults{FieldsData: aggRes.GetFieldDatas()}
}

func SegcoreResults2AggResult(results []*segcorepb.RetrieveResults) []*agg.AggregationResult {
aggResults := make([]*agg.AggregationResult, len(results))
for i := 0; i < len(results); i++ {
aggResults[i] = agg.NewAggregationResult(results[i].GetFieldsData())
}
return int64(rowNumber)
return aggResults
}

func AggResult2segcoreResult(aggRes *agg.AggregationResult) *segcorepb.RetrieveResults {
return &segcorepb.RetrieveResults{FieldsData: aggRes.GetFieldDatas()}
}
27 changes: 22 additions & 5 deletions internal/querynodev2/segments/AggReducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,39 @@ package segments
import (
"context"
"github.com/milvus-io/milvus/internal/agg"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/planpb"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
)

type AggReducer struct {
type InternalAggReducer struct {
groupAggReducer *agg.GroupAggReducer
}

func NewAggReducer(groupByFieldIds []int64, aggregates []*planpb.Aggregate, schema *schemapb.CollectionSchema) *AggReducer {
return &AggReducer{
func NewInternalAggReducer(groupByFieldIds []int64, aggregates []*planpb.Aggregate, schema *schemapb.CollectionSchema) *InternalAggReducer {
return &InternalAggReducer{
agg.NewGroupAggReducer(groupByFieldIds, aggregates, schema),
}
}

func (reducer *AggReducer) Reduce(ctx context.Context, results []*segcorepb.RetrieveResults, segments []Segment, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
return reducer.groupAggReducer.Reduce(ctx, results)
func (reducer *InternalAggReducer) Reduce(ctx context.Context, results []*internalpb.RetrieveResults) (*internalpb.RetrieveResults, error) {
reducedAggRes, err := reducer.groupAggReducer.Reduce(ctx, agg.InternalResult2AggResult(results))
return agg.AggResult2internalResult(reducedAggRes), err
}

type SegcoreAggReducer struct {
groupAggReducer *agg.GroupAggReducer
}

func NewSegcoreAggReducer(groupByFieldIds []int64, aggregates []*planpb.Aggregate, schema *schemapb.CollectionSchema) *SegcoreAggReducer {
return &SegcoreAggReducer{
agg.NewGroupAggReducer(groupByFieldIds, aggregates, schema),
}
}

func (reducer *SegcoreAggReducer) Reduce(ctx context.Context, results []*segcorepb.RetrieveResults, segments []Segment, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
reducedAggRes, err := reducer.groupAggReducer.Reduce(ctx, agg.SegcoreResults2AggResult(results))
return agg.AggResult2segcoreResult(reducedAggRes), err
}
7 changes: 5 additions & 2 deletions internal/querynodev2/segments/reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func CreateInternalReducer(req *querypb.QueryRequest, schema *schemapb.Collectio
if req.GetReq().GetIsCount() {
return &cntReducer{}
}
if len(req.GetReq().GetAggregates()) > 0 || len(req.GetReq().GetGroupByFieldIds()) > 0 {
return NewInternalAggReducer(req.GetReq().GetOutputFieldsId(), req.GetReq().GetAggregates(), schema)
}
return newDefaultLimitReducer(req, schema)
}

Expand All @@ -34,9 +37,9 @@ func CreateSegCoreReducer(req *querypb.QueryRequest, schema *schemapb.Collection
}

if len(req.GetReq().GetGroupByFieldIds()) > 0 || len(req.GetReq().GetAggregates()) > 0 {
return NewAggReducer(req.GetReq().GetGroupByFieldIds(), req.GetReq().GetAggregates(), schema)
return NewSegcoreAggReducer(req.GetReq().GetGroupByFieldIds(), req.GetReq().GetAggregates(), schema)
}

return newDefaultLimitReducerSegcore(req, schema, manager)
}

Expand Down

0 comments on commit c4a9e3e

Please sign in to comment.