diff --git a/dao/feature_view_tablestore_dao.go b/dao/feature_view_tablestore_dao.go index 1f73c4e..e1f5fba 100644 --- a/dao/feature_view_tablestore_dao.go +++ b/dao/feature_view_tablestore_dao.go @@ -136,7 +136,7 @@ func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, us skField = fmt.Sprintf("%s_%s", sequenceConfig.ItemIdField, sequenceConfig.TimestampField) } - fetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo { + onlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo { sequences := []*sequenceInfo{} var ots_mu sync.Mutex @@ -237,6 +237,104 @@ func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, us return resultSequences } + offlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo { + sequences := []*sequenceInfo{} + + var ots_mu sync.Mutex + var ots_wg sync.WaitGroup + events := strings.Split(seqEvent, "|") + + for _, event := range events { + ots_wg.Add(1) + go func(event string) { + defer ots_wg.Done() + getRangeRequest := &tablestore.GetRangeRequest{} + rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{} + rangeRowQueryCriteria.TableName = tableName + + startPK := new(tablestore.PrimaryKey) + startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event)) + startPK.AddPrimaryKeyColumnWithMinValue(skField) + endPK := new(tablestore.PrimaryKey) + endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event)) + endPK.AddPrimaryKeyColumnWithMaxValue(skField) + + rangeRowQueryCriteria.StartPrimaryKey = startPK + rangeRowQueryCriteria.EndPrimaryKey = endPK + rangeRowQueryCriteria.Direction = tablestore.FORWARD + if sequenceConfig.PlayTimeField == "" { + rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField} + } else { + rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField} + } + rangeRowQueryCriteria.MaxVersion = 1 + + getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria + getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest) + + for { + if err != nil { + fmt.Println("get range failed with error:", err) + } + for _, row := range getRangeResp.Rows { + if row.PrimaryKey.PrimaryKeys == nil { + continue + } + seq := new(sequenceInfo) + if sequenceConfig.DeduplicationMethodNum == 1 { + seq.itemId = utils.ToString(row.PrimaryKey.PrimaryKeys[1].Value, "") + } + for _, column := range row.Columns { + switch column.ColumnName { + case sequenceConfig.EventField: + seq.event = utils.ToString(column.Value, "") + case sequenceConfig.ItemIdField: + seq.itemId = utils.ToString(column.Value, "") + case sequenceConfig.PlayTimeField: + seq.playTime = utils.ToFloat(column.Value, 0) + case sequenceConfig.TimestampField: + seq.timestamp = utils.ToInt64(column.Value, 0) + } + } + + if seq.event == "" || seq.itemId == "" { + continue + } + if t, exist := sequencePlayTimeMap[seq.event]; exist { + if seq.playTime <= t { + continue + } + } + + ots_mu.Lock() + sequences = append(sequences, seq) + ots_mu.Unlock() + } + if getRangeResp.NextStartPrimaryKey == nil { + break + } else { + getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey + getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest) + } + } + }(event) + } + ots_wg.Wait() + + // add seqLen limit + sort.Slice(sequences, func(i, j int) bool { + return sequences[i].timestamp > sequences[j].timestamp + }) + limit := seqLen + if seqLen > len(sequences) { + limit = len(sequences) + } + + resultSequences := sequences[:limit] + + return resultSequences + } + results := make([]map[string]interface{}, 0, len(keys)) var outmu sync.Mutex @@ -261,7 +359,7 @@ func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, us innerWg.Add(1) go func(seqEvent string, seqLen int, key interface{}) { defer innerWg.Done() - if onlineresult := fetchDataFunc(seqEvent, seqLen, key, d.onlineTable); onlineresult != nil { + if onlineresult := onlineFetchDataFunc(seqEvent, seqLen, key, d.onlineTable); onlineresult != nil { onlineSequences = onlineresult } }(seqConfig.SeqEvent, seqConfig.SeqLen, key) @@ -269,7 +367,7 @@ func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, us innerWg.Add(1) go func(seqEvent string, seqLen int, key interface{}) { defer innerWg.Done() - if offlineresult := fetchDataFunc(seqEvent, seqLen, key, d.offlineTable); offlineresult != nil { + if offlineresult := offlineFetchDataFunc(seqEvent, seqLen, key, d.offlineTable); offlineresult != nil { offlineSequences = offlineresult } }(seqConfig.SeqEvent, seqConfig.SeqLen, key)