Skip to content

Commit

Permalink
Merge pull request #110 from ripienaar/109
Browse files Browse the repository at this point in the history
(#109) Support signing tasks before storing
  • Loading branch information
ripienaar authored May 5, 2023
2 parents 3fd8f03 + 11ac153 commit ced4df5
Show file tree
Hide file tree
Showing 16 changed files with 374 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
test:
strategy:
matrix:
go: [ 1.18, 1.19 ]
go: ["1.19", "1.20" ]

runs-on: ubuntu-latest
steps:
Expand Down
138 changes: 138 additions & 0 deletions ABTaskFile
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
name: build_tasks
description: Choria Build Tasks

commands:
- name: dependencies
type: parent
description: Manage dependencies
aliases: [d]
commands:
- name: update
description: Update dependencies
type: exec
aliases: [up]
dir: "{{ AppDir }}"
flags:
- name: verbose
description: Log verbosely
short: v
bool: true
- name: proxy
description: Enable using go proxy
bool: true
default: "true"
script: |
. "{{ BashHelperPath }}"

ab_announce Updating all dependencies
echo

{{ if eq .Flags.proxy false }}
export GOPROXY=direct
ab_say Disabling go mod proxy
{{ end }}

go get -u -n -a -t {{- if .Flags.verbose }} -d -x {{ end }} ./...

ab_say Running go mod tidy

go mod tidy

- name: test
type: parent
aliases: [t]
description: Perform various tests
commands:
- name: unit
type: exec
description: Run ginkgo unit tests
aliases: [u]
arguments:
- name: dir
description: Directory to test
default: .
flags:
- name: update
description: Updates the ginkgo runtime
bool: true
script: |
set -e

. "{{ BashHelperPath }}"

{{ if .Flags.update }}
ab_say Updating ginkgo binary
go install github.com/onsi/ginkgo/v2/ginkgo
{{ end }}

ginkgo -r --skip Integration {{ .Arguments.dir | escape }}

- name: integration
type: exec
dir: "{{ AppDir }}"
aliases: [i]
description: Run ginkgo integration tests
command: ginkgo -r integration

- name: lint
type: exec
dir: "{{ AppDir }}"
flags:
- name: vet
description: Perform go vet
bool: true
default: true
- name: staticcheck
description: Perform staticcheck
bool: true
default: true
- name: update
description: Updates lint dependencies
bool: true
script: |
set -e

. "{{ BashHelperPath }}"

{{ if .Flags.update }}
ab_say Updating linting tools
go install github.com/client9/misspell/cmd/misspell@latest
go install honnef.co/go/tools/cmd/staticcheck@latest
{{ else }}
echo ">>> Run with --update to install required commands"
echo
{{ end }}

ab_say Formatting source files
go fmt ./...

ab_say Tidying go mod
go mod tidy

ab_say Checking spelling
find . -type f -name "*.go" | xargs misspell -error -locale US -i flavour
find docs/content -type f -name "*.md" | xargs misspell -error -locale US

{{ if .Flags.vet }}
ab_say Performing go vet
go vet ./...
{{ end }}

{{ if .Flags.staticcheck }}
ab_say Running staticcheck
staticcheck ./...
{{ end }}

- name: docs
type: parent
description: Documentation related commands
commands:
- name: serve
description: Serves documentation locally
type: exec
dir: "{{ AppDir }}"
flags:
- name: port
description: The port to listen on
default: "8081"
command: hugo serve -p {{ .Flags.port }} -s docs
24 changes: 24 additions & 0 deletions ajc/task_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ package main

import (
"context"
"crypto/ed25519"
"encoding/hex"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -36,6 +39,7 @@ type taskCommand struct {
discardExpired bool
dependencies []string
loadDepResults bool
ed25519Seed string

limit int
json bool
Expand All @@ -55,6 +59,7 @@ func configureTaskCommand(app *fisk.Application) {
add.Flag("tries", "Sets the maximum amount of times this task may be tried").IntVar(&c.maxtries)
add.Flag("depends", "Sets IDs to depend on, comma sep or pass multiple times").StringsVar(&c.dependencies)
add.Flag("load", "Loads results from dependencies before executing task").BoolVar(&c.loadDepResults)
add.Flag("sign", "Signs the task using an ed25519 seed").StringVar(&c.ed25519Seed)

retry := tasks.Command("retry", "Retries delivery of a task currently in the Task Store").Action(c.retryAction)
retry.Arg("id", "The Task ID to view").Required().StringVar(&c.id)
Expand Down Expand Up @@ -176,6 +181,9 @@ func (c *taskCommand) watchAction(_ *fisk.ParseContext) error {
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr)
}

case aj.LeaderElectedEvent:
fmt.Printf("[%s] %s: new %s leader\n", e.TimeStamp.Format("15:04:05"), e.Name, e.Component)

default:
fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind)
}
Expand Down Expand Up @@ -412,6 +420,22 @@ func (c *taskCommand) addAction(_ *fisk.ParseContext) error {
opts = append(opts, aj.TaskMaxTries(c.maxtries))
}

if c.ed25519Seed != "" {
var seed []byte
if fileExist(c.ed25519Seed) {
seed, err = os.ReadFile(c.ed25519Seed)
if err != nil {
return err
}
} else {
seed, err = hex.DecodeString(c.ed25519Seed)
if err != nil {
return err
}
}
opts = append(opts, aj.TaskSigner(ed25519.NewKeyFromSeed(seed)))
}

task, err := aj.NewTask(c.ttype, c.payload, opts...)
if err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions ajc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,15 @@ func showQueue(q *asyncjobs.QueueInfo) {
fmt.Printf(" Last Item: %v (%s)\n", q.Stream.State.LastTime.Format(timeFormat), humanizeDuration(time.Since(q.Stream.State.LastTime)))
}
}

func fileExist(path string) bool {
if path == "" {
return false
}

if _, err := os.Stat(path); os.IsNotExist(err) {
return false
}

return true
}
12 changes: 8 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ var _ = Describe("Client", func() {
})
})

It("Should handle retried messages with a backoff delay", func() {
It("Should handle retried messages with a backoff delay", FlakeAttempts(5), func() {
withJetStream(func(nc *nats.Conn, _ *jsm.Manager) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand All @@ -211,10 +211,10 @@ var _ = Describe("Client", func() {
var tries []time.Time

router := NewTaskRouter()
router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) {
err = router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) {
tries = append(tries, time.Now())

log.Infof("Trying task %s on try %d\n", t.ID, t.Tries)
log.Errorf("Trying task %s on try %d\n", t.ID, t.Tries)

if t.Tries < 2 {
return "fail", fmt.Errorf("simulated failure")
Expand All @@ -223,16 +223,20 @@ var _ = Describe("Client", func() {
wg.Done()
return "done", nil
})
Expect(err).ToNot(HaveOccurred())

go client.Run(ctx, router)

wg.Wait()

// we wg.done in the handler but the router is still doing its thing so we pause a bit
time.Sleep(250 * time.Millisecond)

Expect(len(tries)).To(Equal(6))
task, err = client.LoadTaskByID(task.ID)
Expect(err).ToNot(HaveOccurred())
Expect(task.State).To(Equal(TaskStateCompleted))
Expect(task.Tries).To(Equal(2))
Expect(task.State).To(Equal(TaskStateCompleted))
})
})
})
2 changes: 1 addition & 1 deletion docs/content/overview/handlers-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ asyncjobs: latest

# Use the RetryLinearTenMinutes retry policy,
#
# Equivelant to client RetryBackoffPolicyName() option
# Equivalent to client RetryBackoffPolicyName() option
retry: 10m

# Discard tasks that reach complete state.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/overview/scheduled-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ weight = 20

The Task Scheduler allows you to create cron like entries that will create Tasks on demand.

This requires a seperate process to be run that will supervise the configured schedules and create the tasks. We have such a Scheduler built into the `ajc` binary deployable in any container manager.
This requires a separate process to be run that will supervise the configured schedules and create the tasks. We have such a Scheduler built into the `ajc` binary deployable in any container manager.

The scheduler we provide support being deployed in a highly-available cluster, they will perform leader election with one of the cluster scheduling tasks. There is no need to restart or signal these schedulers as tasks are added, removed or updated.

Expand Down
2 changes: 1 addition & 1 deletion docs/content/reference/request-reply.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ $ nats reply CHORIA_AJ.H.T.email:new 'success' --context AJC
{"id":"24smZHaWnjuP371iglxeQWK7nOi","type":"email:new","queue":"DEFAULT","payload":"InsuLi4ufSI=","state":"active","created":"2022-02-09T17:28:41.943198067Z","tried":"2022-02-09T17:33:33.005041134Z","tries":5}
```

The CLI recieved the jobs with the 2 headers set and appropriate payload, it responsed with `success` and the task was completed.
The CLI received the jobs with the 2 headers set and appropriate payload, it responsed with `success` and the task was completed.

```
$ ajc task view 24smZHaWnjuP371iglxeQWK7nOi --json
Expand Down
2 changes: 1 addition & 1 deletion docs/content/reference/task-lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Tasks have many possible states and the processor will update the task as it tra
| `TaskStateRetry` | A task that had a previous failure and is now scheduled for later retry or one that was manually retried |
| `TaskStateExpired` | A task that was attempted to be processed but at that time it exceeded its deadline |
| `TaskStateTerminated` | A handler returned an `ErrTerminateTask` error and so will not be retried again |
| `TaskStateCompleted` | Succesful completed task |
| `TaskStateCompleted` | Successful completed task |
| `TaskStateQueueError` | Task was created but the Work Queue entry could not be made |
| `TaskStateBlocked` | When a Task is waiting on it's dependencies (since `0.0.8`) |
| `TaskStateUnreachable` | When a Task cannot execute because a dependent task failed (since `0.0.8`) |
Expand Down
9 changes: 7 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ var (
ErrTaskTypeInvalid = fmt.Errorf("task type is invalid")
// ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed
ErrTaskDependenciesFailed = fmt.Errorf("task dependencies failed")
// ErrTaskAlreadySigned indicates a task is already signed
ErrTaskAlreadySigned = fmt.Errorf("task is already signed")
// ErrTaskSignatureRequiresQueue indicates a signature request was made without configuring the queue name for a task
ErrTaskSignatureRequiresQueue = fmt.Errorf("signing a task requires the queue to be set")

// ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers
ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type")
Expand Down Expand Up @@ -73,14 +77,15 @@ var (
ErrExternalCommandNotFound = fmt.Errorf("command not found")
// ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed
ErrExternalCommandFailed = fmt.Errorf("execution failed")

// ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered
ErrUnknownEventType = fmt.Errorf("unknown event type")

// ErrUnknownRetryPolicy indicates the requested retry policy does not exist
ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy")

// ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name
ErrUnknownDiscardPolicy = fmt.Errorf("unknown discard policy")
// ErrInvalidPrivateKey indicates the private key is not valid
ErrInvalidPrivateKey = fmt.Errorf("invalid private key length")

// ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error
ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed")
Expand Down
Loading

0 comments on commit ced4df5

Please sign in to comment.