-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask.go
153 lines (120 loc) · 3.42 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package scheduler
import (
"errors"
"fmt"
"net/http"
"path"
"reflect"
"strconv"
"strings"
"time"
)
const (
taskTimestampSeparator = "_"
taskVersionSeparator = "v"
)
var ErrInvalidTaskName = errors.New("invalid task name")
type Task struct {
QueuePath string
Prefix string
ID string
ScheduledAt time.Time
Request *http.Request
Authorization isAuthorizationToken
Version int
}
func (t *Task) comparisonID() string {
return t.Prefix + t.ID + taskTimestampSeparator + strconv.FormatInt(t.ScheduledAt.UnixNano(), 16)
}
func (t *Task) TaskID() string {
return t.comparisonID() + taskVersionSeparator + strconv.Itoa(t.Version)
}
func (t *Task) TaskName() string {
return taskName(t.QueuePath, t.TaskID())
}
func taskName(queuePath, taskID string) string {
return queuePath + "/tasks/" + taskID
}
func (t *Task) Validate() error {
if t.ID == "" {
return fmt.Errorf("ID is empty: %w", ErrTaskValidation)
}
if err := t.validateTaskID(); err != nil {
return err
}
return nil
}
// validateTaskID validates task id.
// TASK_ID can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_). The maximum length is 500 characters.
// see: https://cloud.google.com/tasks/docs/reference/rest/v2/projects.locations.queues.tasks#resource:-task
func (t *Task) validateTaskID() error {
taskID := t.TaskID()
if len(taskID) > 500 {
return fmt.Errorf("task id maximum length is 500: %w", ErrTaskValidation)
}
for _, char := range taskID {
if ('A' <= char && char <= 'Z') || ('a' <= char && char <= 'z') ||
('0' <= char && char <= '9') ||
char == '-' || char == '_' {
continue
}
return fmt.Errorf("task id contains invalid character %c: %w", char, ErrTaskValidation)
}
return nil
}
func (t *Task) Compare(target *Task) bool {
if t.comparisonID() != target.comparisonID() {
return false
}
// t.Request.Body and t.Request.Header are not checked but use Version field to update tasks.
if t.Version > target.Version {
return false
}
if !reflect.DeepEqual(t.Authorization, target.Authorization) {
return false
}
if t.Request.Method != target.Request.Method ||
removeTrailingSlash(t.Request.URL.String()) != removeTrailingSlash(target.Request.URL.String()) {
return false
}
return true
}
type isAuthorizationToken interface {
isAuthorizationToken()
}
type OAuthToken struct {
ServiceAccountEmail string
Scope string
}
func (*OAuthToken) isAuthorizationToken() {}
type OIDCToken struct {
ServiceAccountEmail string
Audience string
}
func (*OIDCToken) isAuthorizationToken() {}
func removeTrailingSlash(u string) string {
return strings.TrimSuffix(u, "/")
}
func ParseTaskName(prefix, taskName string) (string, int, error) {
v := path.Base(taskName)
if !strings.HasPrefix(v, prefix) {
return "", 0, fmt.Errorf("task name has no valid prefix: %w", ErrInvalidTaskName)
}
v = strings.TrimPrefix(v, prefix)
tsIdx := strings.LastIndex(v, taskTimestampSeparator)
if tsIdx < 0 {
return "", 0, fmt.Errorf("invalid task name format: %w", ErrInvalidTaskName)
}
id := v[:tsIdx]
v = v[tsIdx+1:]
vIdx := strings.LastIndex(v, taskVersionSeparator)
if vIdx < 0 {
return "", 0, fmt.Errorf("task name has no valid version: %w", ErrInvalidTaskName)
}
vs := v[vIdx+1:]
version, err := strconv.Atoi(vs)
if err != nil {
return "", 0, fmt.Errorf("failed to parse version (%s): %w", vs, ErrInvalidTaskName)
}
return id, version, nil
}