Skip to content

Commit

Permalink
Merge pull request #25 from aliyun/feature/seq_ots_offline
Browse files Browse the repository at this point in the history
[perf]: sequence feature not set timeRange for tablestore offline seq…
  • Loading branch information
bruceding authored Aug 9, 2024
2 parents 60a437e + c734aca commit 8f636be
Showing 1 changed file with 101 additions and 3 deletions.
104 changes: 101 additions & 3 deletions dao/feature_view_tablestore_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, us
skField = fmt.Sprintf("%s_%s", sequenceConfig.ItemIdField, sequenceConfig.TimestampField)
}

fetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo {
onlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo {
sequences := []*sequenceInfo{}

var ots_mu sync.Mutex
Expand Down Expand Up @@ -237,6 +237,104 @@ func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, us
return resultSequences
}

offlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo {
sequences := []*sequenceInfo{}

var ots_mu sync.Mutex
var ots_wg sync.WaitGroup
events := strings.Split(seqEvent, "|")

for _, event := range events {
ots_wg.Add(1)
go func(event string) {
defer ots_wg.Done()
getRangeRequest := &tablestore.GetRangeRequest{}
rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
rangeRowQueryCriteria.TableName = tableName

startPK := new(tablestore.PrimaryKey)
startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event))
startPK.AddPrimaryKeyColumnWithMinValue(skField)
endPK := new(tablestore.PrimaryKey)
endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event))
endPK.AddPrimaryKeyColumnWithMaxValue(skField)

rangeRowQueryCriteria.StartPrimaryKey = startPK
rangeRowQueryCriteria.EndPrimaryKey = endPK
rangeRowQueryCriteria.Direction = tablestore.FORWARD
if sequenceConfig.PlayTimeField == "" {
rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField}
} else {
rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField}
}
rangeRowQueryCriteria.MaxVersion = 1

getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest)

for {
if err != nil {
fmt.Println("get range failed with error:", err)
}
for _, row := range getRangeResp.Rows {
if row.PrimaryKey.PrimaryKeys == nil {
continue
}
seq := new(sequenceInfo)
if sequenceConfig.DeduplicationMethodNum == 1 {
seq.itemId = utils.ToString(row.PrimaryKey.PrimaryKeys[1].Value, "")
}
for _, column := range row.Columns {
switch column.ColumnName {
case sequenceConfig.EventField:
seq.event = utils.ToString(column.Value, "")
case sequenceConfig.ItemIdField:
seq.itemId = utils.ToString(column.Value, "")
case sequenceConfig.PlayTimeField:
seq.playTime = utils.ToFloat(column.Value, 0)
case sequenceConfig.TimestampField:
seq.timestamp = utils.ToInt64(column.Value, 0)
}
}

if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}

ots_mu.Lock()
sequences = append(sequences, seq)
ots_mu.Unlock()
}
if getRangeResp.NextStartPrimaryKey == nil {
break
} else {
getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey
getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest)
}
}
}(event)
}
ots_wg.Wait()

// add seqLen limit
sort.Slice(sequences, func(i, j int) bool {
return sequences[i].timestamp > sequences[j].timestamp
})
limit := seqLen
if seqLen > len(sequences) {
limit = len(sequences)
}

resultSequences := sequences[:limit]

return resultSequences
}

results := make([]map[string]interface{}, 0, len(keys))
var outmu sync.Mutex

Expand All @@ -261,15 +359,15 @@ func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, us
innerWg.Add(1)
go func(seqEvent string, seqLen int, key interface{}) {
defer innerWg.Done()
if onlineresult := fetchDataFunc(seqEvent, seqLen, key, d.onlineTable); onlineresult != nil {
if onlineresult := onlineFetchDataFunc(seqEvent, seqLen, key, d.onlineTable); onlineresult != nil {
onlineSequences = onlineresult
}
}(seqConfig.SeqEvent, seqConfig.SeqLen, key)
//get data from offline table
innerWg.Add(1)
go func(seqEvent string, seqLen int, key interface{}) {
defer innerWg.Done()
if offlineresult := fetchDataFunc(seqEvent, seqLen, key, d.offlineTable); offlineresult != nil {
if offlineresult := offlineFetchDataFunc(seqEvent, seqLen, key, d.offlineTable); offlineresult != nil {
offlineSequences = offlineresult
}
}(seqConfig.SeqEvent, seqConfig.SeqLen, key)
Expand Down

0 comments on commit 8f636be

Please sign in to comment.