-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsession.go
80 lines (68 loc) · 1.63 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package topic
import (
"context"
"log"
"strconv"
"github.com/pkg/errors"
)
// sessionImpl implement Session interface.
type sessionImpl struct {
*storeImpl
expr Expr
}
// NewSession returns topic session.
func (s *storeImpl) NewSession(cond Expr) *sessionImpl {
return &sessionImpl{s, cond}
}
// Scan exec topic condition.
func (s *sessionImpl) Scan(ctx context.Context, maxScanCount int64, bulk Bulk) (totalSuccess int64, err error) {
tx := sharedClient.TxPipeline()
key := s.expr.Exec(&pipelineImpl{s.storeImpl, tx})
scanCmd := tx.SScan(key, 0, "", maxScanCount)
cmds, err := tx.Exec()
if err != nil {
err = errors.Wrap(txError(cmds), err.Error())
return totalSuccess, err
}
userIDs := make([]uint64, 0, maxScanCount)
var cursor uint64 = 1
for cursor > 0 {
select {
case <-ctx.Done(): // timeout or cancel
return
default:
var res []string
res, cursor, err = scanCmd.Result()
if err != nil {
return totalSuccess, errors.WithStack(err)
}
for _, uid := range res {
userID, err := strconv.ParseUint(uid, 10, 64)
if err != nil {
log.Printf("Failed to strconv.ParseUint: %+v", err)
continue
}
userIDs = append(userIDs, userID)
if len(userIDs) == int(maxScanCount) {
success, err := bulk(userIDs)
if err != nil {
return totalSuccess, err
}
totalSuccess += success
userIDs = userIDs[:0]
}
}
if cursor > 0 {
scanCmd = sharedClient.SScan(key, cursor, "", maxScanCount)
}
}
}
if len(userIDs) > 0 {
success, err := bulk(userIDs)
if err != nil {
return totalSuccess, err
}
totalSuccess += success
}
return totalSuccess, nil
}