diff --git a/ABTaskFile b/ABTaskFile new file mode 100644 index 0000000..3e5b72b --- /dev/null +++ b/ABTaskFile @@ -0,0 +1,138 @@ +name: build_tasks +description: Choria Build Tasks + +commands: + - name: dependencies + type: parent + description: Manage dependencies + aliases: [d] + commands: + - name: update + description: Update dependencies + type: exec + aliases: [up] + dir: "{{ AppDir }}" + flags: + - name: verbose + description: Log verbosely + short: v + bool: true + - name: proxy + description: Enable using go proxy + bool: true + default: "true" + script: | + . "{{ BashHelperPath }}" + + ab_announce Updating all dependencies + echo + + {{ if eq .Flags.proxy false }} + export GOPROXY=direct + ab_say Disabling go mod proxy + {{ end }} + + go get -u -n -a -t {{- if .Flags.verbose }} -d -x {{ end }} ./... + + ab_say Running go mod tidy + + go mod tidy + + - name: test + type: parent + aliases: [t] + description: Perform various tests + commands: + - name: unit + type: exec + description: Run ginkgo unit tests + aliases: [u] + arguments: + - name: dir + description: Directory to test + default: . + flags: + - name: update + description: Updates the ginkgo runtime + bool: true + script: | + set -e + + . "{{ BashHelperPath }}" + + {{ if .Flags.update }} + ab_say Updating ginkgo binary + go install github.com/onsi/ginkgo/v2/ginkgo + {{ end }} + + ginkgo -r --skip Integration {{ .Arguments.dir | escape }} + + - name: integration + type: exec + dir: "{{ AppDir }}" + aliases: [i] + description: Run ginkgo integration tests + command: ginkgo -r integration + + - name: lint + type: exec + dir: "{{ AppDir }}" + flags: + - name: vet + description: Perform go vet + bool: true + default: true + - name: staticcheck + description: Perform staticcheck + bool: true + default: true + - name: update + description: Updates lint dependencies + bool: true + script: | + set -e + + . "{{ BashHelperPath }}" + + {{ if .Flags.update }} + ab_say Updating linting tools + go install github.com/client9/misspell/cmd/misspell@latest + go install honnef.co/go/tools/cmd/staticcheck@latest + {{ else }} + echo ">>> Run with --update to install required commands" + echo + {{ end }} + + ab_say Formatting source files + go fmt ./... + + ab_say Tidying go mod + go mod tidy + + ab_say Checking spelling + find . -type f -name "*.go" | xargs misspell -error -locale US -i flavour + find docs/content -type f -name "*.md" | xargs misspell -error -locale US + + {{ if .Flags.vet }} + ab_say Performing go vet + go vet ./... + {{ end }} + + {{ if .Flags.staticcheck }} + ab_say Running staticcheck + staticcheck ./... + {{ end }} + + - name: docs + type: parent + description: Documentation related commands + commands: + - name: serve + description: Serves documentation locally + type: exec + dir: "{{ AppDir }}" + flags: + - name: port + description: The port to listen on + default: "8081" + command: hugo serve -p {{ .Flags.port }} -s docs diff --git a/ajc/task_command.go b/ajc/task_command.go index 66a4157..5726aac 100644 --- a/ajc/task_command.go +++ b/ajc/task_command.go @@ -6,7 +6,10 @@ package main import ( "context" + "crypto/ed25519" + "encoding/hex" "fmt" + "os" "strings" "time" @@ -36,6 +39,7 @@ type taskCommand struct { discardExpired bool dependencies []string loadDepResults bool + ed25519Seed string limit int json bool @@ -55,6 +59,7 @@ func configureTaskCommand(app *fisk.Application) { add.Flag("tries", "Sets the maximum amount of times this task may be tried").IntVar(&c.maxtries) add.Flag("depends", "Sets IDs to depend on, comma sep or pass multiple times").StringsVar(&c.dependencies) add.Flag("load", "Loads results from dependencies before executing task").BoolVar(&c.loadDepResults) + add.Flag("sign", "Signs the task using an ed25519 seed").StringVar(&c.ed25519Seed) retry := tasks.Command("retry", "Retries delivery of a task currently in the Task Store").Action(c.retryAction) retry.Arg("id", "The Task ID to view").Required().StringVar(&c.id) @@ -176,6 +181,9 @@ func (c *taskCommand) watchAction(_ *fisk.ParseContext) error { fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr) } + case aj.LeaderElectedEvent: + fmt.Printf("[%s] %s: new %s leader\n", e.TimeStamp.Format("15:04:05"), e.Name, e.Component) + default: fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind) } @@ -412,6 +420,22 @@ func (c *taskCommand) addAction(_ *fisk.ParseContext) error { opts = append(opts, aj.TaskMaxTries(c.maxtries)) } + if c.ed25519Seed != "" { + var seed []byte + if fileExist(c.ed25519Seed) { + seed, err = os.ReadFile(c.ed25519Seed) + if err != nil { + return err + } + } else { + seed, err = hex.DecodeString(c.ed25519Seed) + if err != nil { + return err + } + } + opts = append(opts, aj.TaskSigner(ed25519.NewKeyFromSeed(seed))) + } + task, err := aj.NewTask(c.ttype, c.payload, opts...) if err != nil { return err diff --git a/ajc/util.go b/ajc/util.go index 7903836..beaad0b 100644 --- a/ajc/util.go +++ b/ajc/util.go @@ -224,3 +224,15 @@ func showQueue(q *asyncjobs.QueueInfo) { fmt.Printf(" Last Item: %v (%s)\n", q.Stream.State.LastTime.Format(timeFormat), humanizeDuration(time.Since(q.Stream.State.LastTime))) } } + +func fileExist(path string) bool { + if path == "" { + return false + } + + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + + return true +} diff --git a/client_test.go b/client_test.go index b91ddd9..47b772f 100644 --- a/client_test.go +++ b/client_test.go @@ -188,7 +188,7 @@ var _ = Describe("Client", func() { }) }) - It("Should handle retried messages with a backoff delay", func() { + It("Should handle retried messages with a backoff delay", FlakeAttempts(5), func() { withJetStream(func(nc *nats.Conn, _ *jsm.Manager) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -211,10 +211,10 @@ var _ = Describe("Client", func() { var tries []time.Time router := NewTaskRouter() - router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) { + err = router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) { tries = append(tries, time.Now()) - log.Infof("Trying task %s on try %d\n", t.ID, t.Tries) + log.Errorf("Trying task %s on try %d\n", t.ID, t.Tries) if t.Tries < 2 { return "fail", fmt.Errorf("simulated failure") @@ -223,16 +223,20 @@ var _ = Describe("Client", func() { wg.Done() return "done", nil }) + Expect(err).ToNot(HaveOccurred()) go client.Run(ctx, router) wg.Wait() + // we wg.done in the handler but the router is still doing its thing so we pause a bit + time.Sleep(250 * time.Millisecond) + Expect(len(tries)).To(Equal(6)) task, err = client.LoadTaskByID(task.ID) Expect(err).ToNot(HaveOccurred()) - Expect(task.State).To(Equal(TaskStateCompleted)) Expect(task.Tries).To(Equal(2)) + Expect(task.State).To(Equal(TaskStateCompleted)) }) }) }) diff --git a/errors.go b/errors.go index 010f335..940c02e 100644 --- a/errors.go +++ b/errors.go @@ -36,6 +36,10 @@ var ( ErrTaskTypeInvalid = fmt.Errorf("task type is invalid") // ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed ErrTaskDependenciesFailed = fmt.Errorf("task dependencies failed") + // ErrTaskAlreadySigned indicates a task is already signed + ErrTaskAlreadySigned = fmt.Errorf("task is already signed") + // ErrTaskSignatureRequiresQueue indicates a signature request was made without configuring the queue name for a task + ErrTaskSignatureRequiresQueue = fmt.Errorf("signing a task requires the queue to be set") // ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type") @@ -73,14 +77,15 @@ var ( ErrExternalCommandNotFound = fmt.Errorf("command not found") // ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed ErrExternalCommandFailed = fmt.Errorf("execution failed") + // ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered ErrUnknownEventType = fmt.Errorf("unknown event type") - // ErrUnknownRetryPolicy indicates the requested retry policy does not exist ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy") - // ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name ErrUnknownDiscardPolicy = fmt.Errorf("unknown discard policy") + // ErrInvalidPrivateKey indicates the private key is not valid + ErrInvalidPrivateKey = fmt.Errorf("invalid private key length") // ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed") diff --git a/storage.go b/storage.go index 3af5c21..c84ebca 100644 --- a/storage.go +++ b/storage.go @@ -211,6 +211,12 @@ func (s *jetStreamStorage) EnqueueTask(ctx context.Context, queue *Queue, task * } task.Queue = queue.Name + + err = task.Sign() + if err != nil { + return err + } + err = s.SaveTaskState(ctx, task, true) if err != nil { return err diff --git a/storage_test.go b/storage_test.go index ca754ae..8432651 100644 --- a/storage_test.go +++ b/storage_test.go @@ -7,6 +7,7 @@ package asyncjobs import ( "bytes" "context" + "crypto/ed25519" "encoding/json" "fmt" "log" @@ -914,11 +915,16 @@ var _ = Describe("Storage", func() { err = storage.PrepareQueue(q, 1, true) Expect(err).ToNot(HaveOccurred()) - task, err := NewTask("ginkgo", nil) + _, pri, err := ed25519.GenerateKey(nil) + Expect(err).ToNot(HaveOccurred()) + + task, err := NewTask("ginkgo", nil, TaskSigner(pri)) Expect(err).ToNot(HaveOccurred()) + Expect(task.Signature).To(HaveLen(0)) err = storage.EnqueueTask(ctx, q, task) Expect(err).ToNot(HaveOccurred()) + Expect(task.Signature).To(HaveLen(128)) msg, err := storage.qStreams[q.Name].ReadMessage(1) Expect(err).ToNot(HaveOccurred()) @@ -929,8 +935,9 @@ var _ = Describe("Storage", func() { Expect(item.Kind).To(Equal(TaskItem)) Expect(item.JobID).To(Equal(task.ID)) - _, err = storage.LoadTaskByID(task.ID) + t, err := storage.LoadTaskByID(task.ID) Expect(err).ToNot(HaveOccurred()) + Expect(t.Signature).To(Equal(task.Signature)) }) }) }) diff --git a/task.go b/task.go index 47585c6..3dd1028 100644 --- a/task.go +++ b/task.go @@ -5,8 +5,12 @@ package asyncjobs import ( + "crypto/ed25519" + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" + "io" "sync" "time" @@ -89,7 +93,10 @@ type Task struct { Tries int `json:"tries"` // LastErr is the most recent handling error if any LastErr string `json:"last_err,omitempty"` + // Signature is an ed25519 signature of key properties + Signature string `json:"signature,omitempty"` + sigPk ed25519.PrivateKey storageOptions any mu sync.Mutex } @@ -164,6 +171,47 @@ func (t *Task) HasDependencies() bool { return len(t.Dependencies) > 0 } +func (t *Task) Sign() error { + if t.sigPk == nil { + return nil + } + + if t.Signature != "" { + return ErrTaskAlreadySigned + } + + if len(t.sigPk) != ed25519.PrivateKeySize { + return ErrInvalidPrivateKey + } + + msg, err := t.signatureMessage() + if err != nil { + return err + } + + t.Signature = hex.EncodeToString(ed25519.Sign(t.sigPk, msg)) + + io.ReadFull(rand.Reader, t.sigPk[:]) + t.sigPk = nil + + return nil +} + +func (t *Task) signatureMessage() ([]byte, error) { + if t.Queue == "" { + return nil, ErrTaskSignatureRequiresQueue + } + + var deadline int + if t.Deadline != nil { + deadline = t.Deadline.UTC().Nanosecond() + } + + msg := fmt.Sprintf("%s:%s:%s:%d:%d:%d", t.ID, t.Queue, t.Type, t.MaxTries, t.CreatedAt.UTC().Nanosecond(), deadline) + + return []byte(msg), nil +} + // TaskOpt configures Tasks made using NewTask() type TaskOpt func(*Task) error @@ -229,3 +277,11 @@ func TaskRequiresDependencyResults() TaskOpt { return nil } } + +// TaskSigner signs the task using the given private key +func TaskSigner(key ed25519.PrivateKey) TaskOpt { + return func(t *Task) error { + t.sigPk = key + return nil + } +} diff --git a/task_test.go b/task_test.go index 4ca882e..a5814ed 100644 --- a/task_test.go +++ b/task_test.go @@ -1,6 +1,8 @@ package asyncjobs import ( + "crypto/ed25519" + "encoding/hex" "time" . "github.com/onsi/ginkgo/v2" @@ -15,7 +17,12 @@ var _ = Describe("Tasks", func() { deadline := time.Now().Add(time.Hour) payload := map[string]string{"hello": "world"} - task, err := NewTask("test", payload, TaskDeadline(deadline), TaskDependsOnIDs("1", "2", "2", "1", "2"), TaskDependsOn(p, p), TaskRequiresDependencyResults()) + task, err := NewTask("test", payload, + TaskDeadline(deadline), + TaskDependsOnIDs("1", "2", "2", "1", "2"), + TaskDependsOn(p, p), + TaskRequiresDependencyResults(), + ) Expect(err).ToNot(HaveOccurred()) Expect(task.Deadline).To(Equal(&deadline)) Expect(task.ID).ToNot(HaveLen(0)) @@ -28,12 +35,27 @@ var _ = Describe("Tasks", func() { Expect(task.LoadDependencies).To(BeTrue()) Expect(task.MaxTries).To(Equal(DefaultMaxTries)) + pub, pri, err := ed25519.GenerateKey(nil) + Expect(err).ToNot(HaveOccurred()) + // without dependencies, should be new - task, err = NewTask("test", payload, TaskDeadline(deadline), TaskMaxTries(10)) + task, err = NewTask("test", payload, TaskDeadline(deadline), TaskMaxTries(10), TaskSigner(pri)) Expect(err).ToNot(HaveOccurred()) Expect(task.State).To(Equal(TaskStateNew)) Expect(task.LoadDependencies).To(BeFalse()) Expect(task.MaxTries).To(Equal(10)) + Expect(task.sigPk).To(Equal(pri)) + + Expect(task.Sign()).To(MatchError(ErrTaskSignatureRequiresQueue)) + task.Queue = "x" + Expect(task.Sign()).To(Succeed()) + Expect(task.Signature).ToNot(HaveLen(0)) + + msg, err := task.signatureMessage() + Expect(msg).To(HaveLen(57)) + sig, err := hex.DecodeString(task.Signature) + Expect(sig).To(HaveLen(64)) + Expect(ed25519.Verify(pub, msg, sig)).To(BeTrue()) }) }) })