forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.go
428 lines (366 loc) · 12.7 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
package influxdb
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/edit"
"github.com/influxdata/flux/parser"
"github.com/influxdata/influxdb/task/options"
)
const (
TaskDefaultPageSize = 100
TaskMaxPageSize = 500
TaskStatusActive = "active"
TaskStatusInactive = "inactive"
)
// Task is a task. 🎊
type Task struct {
ID ID `json:"id"`
OrganizationID ID `json:"orgID"`
Organization string `json:"org"`
AuthorizationID ID `json:"authorizationID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
Offset string `json:"offset,omitempty"`
LatestCompleted string `json:"latestCompleted,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
UpdatedAt string `json:"updatedAt,omitempty"`
}
// EffectiveCron returns the effective cron string of the options.
// If the cron option was specified, it is returned.
// If the every option was specified, it is converted into a cron string using "@every".
// Otherwise, the empty string is returned.
// The value of the offset option is not considered.
func (t *Task) EffectiveCron() string {
if t.Cron != "" {
return t.Cron
}
if t.Every != "" {
return "@every " + t.Every
}
return ""
}
// Run is a record created when a run of a task is scheduled.
type Run struct {
ID ID `json:"id,omitempty"`
TaskID ID `json:"taskID"`
Status string `json:"status"`
ScheduledFor string `json:"scheduledFor"`
StartedAt string `json:"startedAt,omitempty"`
FinishedAt string `json:"finishedAt,omitempty"`
RequestedAt string `json:"requestedAt,omitempty"`
Log []Log `json:"log,omitempty"`
}
// ScheduledForTime gives the time.Time that the run is scheduled for.
func (r *Run) ScheduledForTime() (time.Time, error) {
return time.Parse(time.RFC3339, r.ScheduledFor)
}
// StartedAtTime gives the time.Time that the run was started.
func (r *Run) StartedAtTime() (time.Time, error) {
return time.Parse(time.RFC3339Nano, r.StartedAt)
}
// RequestedAtTime gives the time.Time that the run was requested.
func (r *Run) RequestedAtTime() (time.Time, error) {
return time.Parse(time.RFC3339, r.RequestedAt)
}
// Log represents a link to a log resource
type Log struct {
RunID ID `json:"runID,omitempty"`
Time string `json:"time"`
Message string `json:"message"`
}
func (l Log) String() string {
return l.Time + ": " + l.Message
}
// TaskService represents a service for managing one-off and recurring tasks.
type TaskService interface {
// FindTaskByID returns a single task
FindTaskByID(ctx context.Context, id ID) (*Task, error)
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
// of matching tasks.
FindTasks(ctx context.Context, filter TaskFilter) ([]*Task, int, error)
// CreateTask creates a new task.
// The owner of the task is inferred from the authorizer associated with ctx.
CreateTask(ctx context.Context, t TaskCreate) (*Task, error)
// UpdateTask updates a single task with changeset.
UpdateTask(ctx context.Context, id ID, upd TaskUpdate) (*Task, error)
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
DeleteTask(ctx context.Context, id ID) error
// FindLogs returns logs for a run.
FindLogs(ctx context.Context, filter LogFilter) ([]*Log, int, error)
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
FindRuns(ctx context.Context, filter RunFilter) ([]*Run, int, error)
// FindRunByID returns a single run.
FindRunByID(ctx context.Context, taskID, runID ID) (*Run, error)
// CancelRun cancels a currently running run.
CancelRun(ctx context.Context, taskID, runID ID) error
// RetryRun creates and returns a new run (which is a retry of another run).
RetryRun(ctx context.Context, taskID, runID ID) (*Run, error)
// ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible.
// The value of scheduledFor may or may not align with the task's schedule.
ForceRun(ctx context.Context, taskID ID, scheduledFor int64) (*Run, error)
}
// TaskCreate is the set of values to create a task.
type TaskCreate struct {
Flux string `json:"flux"`
Description string `json:"description,omitempty"`
Status string `json:"status,omitempty"`
OrganizationID ID `json:"orgID,omitempty"`
Organization string `json:"org,omitempty"`
Token string `json:"token,omitempty"`
}
func (t TaskCreate) Validate() error {
switch {
case t.Flux == "":
return errors.New("missing flux")
case !t.OrganizationID.Valid() && t.Organization == "":
return errors.New("missing orgID and org")
case t.Status != "" && t.Status != TaskStatusActive && t.Status != TaskStatusInactive:
return fmt.Errorf("invalid task status: %q", t.Status)
}
return nil
}
// TaskUpdate represents updates to a task. Options updates override any options set in the Flux field.
type TaskUpdate struct {
Flux *string `json:"flux,omitempty"`
Status *string `json:"status,omitempty"`
Description *string `json:"description,omitempty"`
// LatestCompleted us to set latest completed on startup to skip task catchup
LatestCompleted *string `json:"-"`
// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
// Optional token override.
Token string `json:"token,omitempty"`
}
func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
// this is a type so we can marshal string into durations nicely
jo := struct {
Flux *string `json:"flux,omitempty"`
Status *string `json:"status,omitempty"`
Name string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
// Cron is a cron style time schedule that can be used in place of Every.
Cron string `json:"cron,omitempty"`
// Every represents a fixed period to repeat execution.
// It gets marshalled from a string duration, i.e.: "10s" is 10 seconds
Every options.Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
// It gets marshalled from a string duration, i.e.: "10s" is 10 seconds
Offset *options.Duration `json:"offset,omitempty"`
Concurrency *int64 `json:"concurrency,omitempty"`
Retry *int64 `json:"retry,omitempty"`
Token string `json:"token,omitempty"`
}{}
if err := json.Unmarshal(data, &jo); err != nil {
return err
}
t.Options.Name = jo.Name
t.Description = jo.Description
t.Options.Cron = jo.Cron
t.Options.Every = jo.Every
if jo.Offset != nil {
offset := *jo.Offset
t.Options.Offset = &offset
}
t.Options.Concurrency = jo.Concurrency
t.Options.Retry = jo.Retry
t.Flux = jo.Flux
t.Status = jo.Status
t.Token = jo.Token
return nil
}
func (t TaskUpdate) MarshalJSON() ([]byte, error) {
jo := struct {
Flux *string `json:"flux,omitempty"`
Status *string `json:"status,omitempty"`
Name string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
// Cron is a cron style time schedule that can be used in place of Every.
Cron string `json:"cron,omitempty"`
// Every represents a fixed period to repeat execution.
Every options.Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
Offset *options.Duration `json:"offset,omitempty"`
Concurrency *int64 `json:"concurrency,omitempty"`
Retry *int64 `json:"retry,omitempty"`
Token string `json:"token,omitempty"`
}{}
jo.Name = t.Options.Name
jo.Cron = t.Options.Cron
jo.Every = t.Options.Every
jo.Description = t.Description
if t.Options.Offset != nil {
offset := *t.Options.Offset
jo.Offset = &offset
}
jo.Concurrency = t.Options.Concurrency
jo.Retry = t.Options.Retry
jo.Flux = t.Flux
jo.Status = t.Status
jo.Token = t.Token
return json.Marshal(jo)
}
func (t TaskUpdate) Validate() error {
switch {
case !t.Options.Every.IsZero() && t.Options.Cron != "":
return errors.New("cannot specify both every and cron")
case t.Flux == nil && t.Status == nil && t.Options.IsZero() && t.Token == "":
return errors.New("cannot update task without content")
case t.Status != nil && *t.Status != TaskStatusActive && *t.Status != TaskStatusInactive:
return fmt.Errorf("invalid task status: %q", *t.Status)
}
return nil
}
// UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it
// It zeros the options in the TaskUpdate.
func (t *TaskUpdate) UpdateFlux(oldFlux string) error {
if t.Flux != nil && *t.Flux != "" {
oldFlux = *t.Flux
}
toDelete := map[string]struct{}{}
parsedPKG := parser.ParseSource(oldFlux)
if ast.Check(parsedPKG) > 0 {
return ast.GetError(parsedPKG)
}
parsed := parsedPKG.Files[0]
if !t.Options.Every.IsZero() && t.Options.Cron != "" {
return errors.New("cannot specify both cron and every")
}
op := make(map[string]ast.Expression, 4)
if t.Options.Name != "" {
op["name"] = &ast.StringLiteral{Value: t.Options.Name}
}
if !t.Options.Every.IsZero() {
op["every"] = &t.Options.Every.Node
}
if t.Options.Cron != "" {
op["cron"] = &ast.StringLiteral{Value: t.Options.Cron}
}
if t.Options.Offset != nil {
if !t.Options.Offset.IsZero() {
op["offset"] = &t.Options.Offset.Node
} else {
toDelete["offset"] = struct{}{}
}
}
if len(op) > 0 || len(toDelete) > 0 {
editFunc := func(opt *ast.OptionStatement) (ast.Expression, error) {
a, ok := opt.Assignment.(*ast.VariableAssignment)
if !ok {
return nil, errors.New("option assignment must be variable assignment")
}
obj, ok := a.Init.(*ast.ObjectExpression)
if !ok {
return nil, fmt.Errorf("value is is %s, not an object expression", a.Init.Type())
}
// modify in the keys and values that already are in the ast
for i, p := range obj.Properties {
k := p.Key.Key()
if _, ok := toDelete[k]; ok {
obj.Properties = append(obj.Properties[:i], obj.Properties[i+1:]...)
}
switch k {
case "name":
if name, ok := op["name"]; ok && t.Options.Name != "" {
delete(op, "name")
p.Value = name
}
case "offset":
if offset, ok := op["offset"]; ok && t.Options.Offset != nil {
delete(op, "offset")
p.Value = offset.Copy().(*ast.DurationLiteral)
}
case "every":
if every, ok := op["every"]; ok && !t.Options.Every.IsZero() {
p.Value = every.Copy().(*ast.DurationLiteral)
delete(op, "every")
} else if cron, ok := op["cron"]; ok && t.Options.Cron != "" {
delete(op, "cron")
p.Value = cron
p.Key = &ast.Identifier{Name: "cron"}
}
case "cron":
if cron, ok := op["cron"]; ok && t.Options.Cron != "" {
delete(op, "cron")
p.Value = cron
} else if every, ok := op["every"]; ok && !t.Options.Every.IsZero() {
delete(op, "every")
p.Key = &ast.Identifier{Name: "every"}
p.Value = every.Copy().(*ast.DurationLiteral)
}
}
}
// add in new keys and values to the ast
for k := range op {
obj.Properties = append(obj.Properties, &ast.Property{
Key: &ast.Identifier{Name: k},
Value: op[k],
})
}
return nil, nil
}
ok, err := edit.Option(parsed, "task", editFunc)
if err != nil {
return err
}
if !ok {
return errors.New("unable to edit option")
}
t.Options.Clear()
s := ast.Format(parsed)
t.Flux = &s
}
return nil
}
// TaskFilter represents a set of filters that restrict the returned results
type TaskFilter struct {
After *ID
OrganizationID *ID
Organization string
User *ID
Limit int
}
// QueryParams Converts TaskFilter fields to url query params.
func (f TaskFilter) QueryParams() map[string][]string {
qp := map[string][]string{}
if f.After != nil {
qp["after"] = []string{f.After.String()}
}
if f.OrganizationID != nil {
qp["orgID"] = []string{f.OrganizationID.String()}
}
if f.Organization != "" {
qp["org"] = []string{f.Organization}
}
if f.User != nil {
qp["user"] = []string{f.User.String()}
}
if f.Limit > 0 {
qp["limit"] = []string{strconv.Itoa(f.Limit)}
}
return qp
}
// RunFilter represents a set of filters that restrict the returned results
type RunFilter struct {
// Task ID is required for listing runs.
Task ID
After *ID
Limit int
AfterTime string
BeforeTime string
}
// LogFilter represents a set of filters that restrict the returned log results.
type LogFilter struct {
// Task ID is required.
Task ID
// The optional Run ID limits logs to a single run.
Run *ID
}