diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 0e40880..7de2cf4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -5,7 +5,7 @@ jobs: test: strategy: matrix: - go: [ 1.18, 1.19 ] + go: ["1.19", "1.20" ] runs-on: ubuntu-latest steps: diff --git a/ABTaskFile b/ABTaskFile new file mode 100644 index 0000000..3e5b72b --- /dev/null +++ b/ABTaskFile @@ -0,0 +1,138 @@ +name: build_tasks +description: Choria Build Tasks + +commands: + - name: dependencies + type: parent + description: Manage dependencies + aliases: [d] + commands: + - name: update + description: Update dependencies + type: exec + aliases: [up] + dir: "{{ AppDir }}" + flags: + - name: verbose + description: Log verbosely + short: v + bool: true + - name: proxy + description: Enable using go proxy + bool: true + default: "true" + script: | + . "{{ BashHelperPath }}" + + ab_announce Updating all dependencies + echo + + {{ if eq .Flags.proxy false }} + export GOPROXY=direct + ab_say Disabling go mod proxy + {{ end }} + + go get -u -n -a -t {{- if .Flags.verbose }} -d -x {{ end }} ./... + + ab_say Running go mod tidy + + go mod tidy + + - name: test + type: parent + aliases: [t] + description: Perform various tests + commands: + - name: unit + type: exec + description: Run ginkgo unit tests + aliases: [u] + arguments: + - name: dir + description: Directory to test + default: . + flags: + - name: update + description: Updates the ginkgo runtime + bool: true + script: | + set -e + + . "{{ BashHelperPath }}" + + {{ if .Flags.update }} + ab_say Updating ginkgo binary + go install github.com/onsi/ginkgo/v2/ginkgo + {{ end }} + + ginkgo -r --skip Integration {{ .Arguments.dir | escape }} + + - name: integration + type: exec + dir: "{{ AppDir }}" + aliases: [i] + description: Run ginkgo integration tests + command: ginkgo -r integration + + - name: lint + type: exec + dir: "{{ AppDir }}" + flags: + - name: vet + description: Perform go vet + bool: true + default: true + - name: staticcheck + description: Perform staticcheck + bool: true + default: true + - name: update + description: Updates lint dependencies + bool: true + script: | + set -e + + . "{{ BashHelperPath }}" + + {{ if .Flags.update }} + ab_say Updating linting tools + go install github.com/client9/misspell/cmd/misspell@latest + go install honnef.co/go/tools/cmd/staticcheck@latest + {{ else }} + echo ">>> Run with --update to install required commands" + echo + {{ end }} + + ab_say Formatting source files + go fmt ./... + + ab_say Tidying go mod + go mod tidy + + ab_say Checking spelling + find . -type f -name "*.go" | xargs misspell -error -locale US -i flavour + find docs/content -type f -name "*.md" | xargs misspell -error -locale US + + {{ if .Flags.vet }} + ab_say Performing go vet + go vet ./... + {{ end }} + + {{ if .Flags.staticcheck }} + ab_say Running staticcheck + staticcheck ./... + {{ end }} + + - name: docs + type: parent + description: Documentation related commands + commands: + - name: serve + description: Serves documentation locally + type: exec + dir: "{{ AppDir }}" + flags: + - name: port + description: The port to listen on + default: "8081" + command: hugo serve -p {{ .Flags.port }} -s docs diff --git a/ajc/task_command.go b/ajc/task_command.go index 66a4157..5726aac 100644 --- a/ajc/task_command.go +++ b/ajc/task_command.go @@ -6,7 +6,10 @@ package main import ( "context" + "crypto/ed25519" + "encoding/hex" "fmt" + "os" "strings" "time" @@ -36,6 +39,7 @@ type taskCommand struct { discardExpired bool dependencies []string loadDepResults bool + ed25519Seed string limit int json bool @@ -55,6 +59,7 @@ func configureTaskCommand(app *fisk.Application) { add.Flag("tries", "Sets the maximum amount of times this task may be tried").IntVar(&c.maxtries) add.Flag("depends", "Sets IDs to depend on, comma sep or pass multiple times").StringsVar(&c.dependencies) add.Flag("load", "Loads results from dependencies before executing task").BoolVar(&c.loadDepResults) + add.Flag("sign", "Signs the task using an ed25519 seed").StringVar(&c.ed25519Seed) retry := tasks.Command("retry", "Retries delivery of a task currently in the Task Store").Action(c.retryAction) retry.Arg("id", "The Task ID to view").Required().StringVar(&c.id) @@ -176,6 +181,9 @@ func (c *taskCommand) watchAction(_ *fisk.ParseContext) error { fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr) } + case aj.LeaderElectedEvent: + fmt.Printf("[%s] %s: new %s leader\n", e.TimeStamp.Format("15:04:05"), e.Name, e.Component) + default: fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind) } @@ -412,6 +420,22 @@ func (c *taskCommand) addAction(_ *fisk.ParseContext) error { opts = append(opts, aj.TaskMaxTries(c.maxtries)) } + if c.ed25519Seed != "" { + var seed []byte + if fileExist(c.ed25519Seed) { + seed, err = os.ReadFile(c.ed25519Seed) + if err != nil { + return err + } + } else { + seed, err = hex.DecodeString(c.ed25519Seed) + if err != nil { + return err + } + } + opts = append(opts, aj.TaskSigner(ed25519.NewKeyFromSeed(seed))) + } + task, err := aj.NewTask(c.ttype, c.payload, opts...) if err != nil { return err diff --git a/ajc/util.go b/ajc/util.go index 7903836..beaad0b 100644 --- a/ajc/util.go +++ b/ajc/util.go @@ -224,3 +224,15 @@ func showQueue(q *asyncjobs.QueueInfo) { fmt.Printf(" Last Item: %v (%s)\n", q.Stream.State.LastTime.Format(timeFormat), humanizeDuration(time.Since(q.Stream.State.LastTime))) } } + +func fileExist(path string) bool { + if path == "" { + return false + } + + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + + return true +} diff --git a/client_test.go b/client_test.go index b91ddd9..47b772f 100644 --- a/client_test.go +++ b/client_test.go @@ -188,7 +188,7 @@ var _ = Describe("Client", func() { }) }) - It("Should handle retried messages with a backoff delay", func() { + It("Should handle retried messages with a backoff delay", FlakeAttempts(5), func() { withJetStream(func(nc *nats.Conn, _ *jsm.Manager) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -211,10 +211,10 @@ var _ = Describe("Client", func() { var tries []time.Time router := NewTaskRouter() - router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) { + err = router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (any, error) { tries = append(tries, time.Now()) - log.Infof("Trying task %s on try %d\n", t.ID, t.Tries) + log.Errorf("Trying task %s on try %d\n", t.ID, t.Tries) if t.Tries < 2 { return "fail", fmt.Errorf("simulated failure") @@ -223,16 +223,20 @@ var _ = Describe("Client", func() { wg.Done() return "done", nil }) + Expect(err).ToNot(HaveOccurred()) go client.Run(ctx, router) wg.Wait() + // we wg.done in the handler but the router is still doing its thing so we pause a bit + time.Sleep(250 * time.Millisecond) + Expect(len(tries)).To(Equal(6)) task, err = client.LoadTaskByID(task.ID) Expect(err).ToNot(HaveOccurred()) - Expect(task.State).To(Equal(TaskStateCompleted)) Expect(task.Tries).To(Equal(2)) + Expect(task.State).To(Equal(TaskStateCompleted)) }) }) }) diff --git a/docs/content/overview/handlers-docker.md b/docs/content/overview/handlers-docker.md index 24ab719..24fa3ac 100644 --- a/docs/content/overview/handlers-docker.md +++ b/docs/content/overview/handlers-docker.md @@ -55,7 +55,7 @@ asyncjobs: latest # Use the RetryLinearTenMinutes retry policy, # -# Equivelant to client RetryBackoffPolicyName() option +# Equivalent to client RetryBackoffPolicyName() option retry: 10m # Discard tasks that reach complete state. diff --git a/docs/content/overview/scheduled-tasks.md b/docs/content/overview/scheduled-tasks.md index 13ce6a5..49dcb8c 100644 --- a/docs/content/overview/scheduled-tasks.md +++ b/docs/content/overview/scheduled-tasks.md @@ -6,7 +6,7 @@ weight = 20 The Task Scheduler allows you to create cron like entries that will create Tasks on demand. -This requires a seperate process to be run that will supervise the configured schedules and create the tasks. We have such a Scheduler built into the `ajc` binary deployable in any container manager. +This requires a separate process to be run that will supervise the configured schedules and create the tasks. We have such a Scheduler built into the `ajc` binary deployable in any container manager. The scheduler we provide support being deployed in a highly-available cluster, they will perform leader election with one of the cluster scheduling tasks. There is no need to restart or signal these schedulers as tasks are added, removed or updated. diff --git a/docs/content/reference/request-reply.md b/docs/content/reference/request-reply.md index 29f8d13..08ada90 100644 --- a/docs/content/reference/request-reply.md +++ b/docs/content/reference/request-reply.md @@ -66,7 +66,7 @@ $ nats reply CHORIA_AJ.H.T.email:new 'success' --context AJC {"id":"24smZHaWnjuP371iglxeQWK7nOi","type":"email:new","queue":"DEFAULT","payload":"InsuLi4ufSI=","state":"active","created":"2022-02-09T17:28:41.943198067Z","tried":"2022-02-09T17:33:33.005041134Z","tries":5} ``` -The CLI recieved the jobs with the 2 headers set and appropriate payload, it responsed with `success` and the task was completed. +The CLI received the jobs with the 2 headers set and appropriate payload, it responsed with `success` and the task was completed. ``` $ ajc task view 24smZHaWnjuP371iglxeQWK7nOi --json diff --git a/docs/content/reference/task-lifecycle.md b/docs/content/reference/task-lifecycle.md index 285aac0..a930c1b 100644 --- a/docs/content/reference/task-lifecycle.md +++ b/docs/content/reference/task-lifecycle.md @@ -87,7 +87,7 @@ Tasks have many possible states and the processor will update the task as it tra | `TaskStateRetry` | A task that had a previous failure and is now scheduled for later retry or one that was manually retried | | `TaskStateExpired` | A task that was attempted to be processed but at that time it exceeded its deadline | | `TaskStateTerminated` | A handler returned an `ErrTerminateTask` error and so will not be retried again | -| `TaskStateCompleted` | Succesful completed task | +| `TaskStateCompleted` | Successful completed task | | `TaskStateQueueError` | Task was created but the Work Queue entry could not be made | | `TaskStateBlocked` | When a Task is waiting on it's dependencies (since `0.0.8`) | | `TaskStateUnreachable` | When a Task cannot execute because a dependent task failed (since `0.0.8`) | diff --git a/errors.go b/errors.go index 010f335..940c02e 100644 --- a/errors.go +++ b/errors.go @@ -36,6 +36,10 @@ var ( ErrTaskTypeInvalid = fmt.Errorf("task type is invalid") // ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed ErrTaskDependenciesFailed = fmt.Errorf("task dependencies failed") + // ErrTaskAlreadySigned indicates a task is already signed + ErrTaskAlreadySigned = fmt.Errorf("task is already signed") + // ErrTaskSignatureRequiresQueue indicates a signature request was made without configuring the queue name for a task + ErrTaskSignatureRequiresQueue = fmt.Errorf("signing a task requires the queue to be set") // ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type") @@ -73,14 +77,15 @@ var ( ErrExternalCommandNotFound = fmt.Errorf("command not found") // ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed ErrExternalCommandFailed = fmt.Errorf("execution failed") + // ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered ErrUnknownEventType = fmt.Errorf("unknown event type") - // ErrUnknownRetryPolicy indicates the requested retry policy does not exist ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy") - // ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name ErrUnknownDiscardPolicy = fmt.Errorf("unknown discard policy") + // ErrInvalidPrivateKey indicates the private key is not valid + ErrInvalidPrivateKey = fmt.Errorf("invalid private key length") // ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed") diff --git a/go.mod b/go.mod index 833591c..e015c62 100644 --- a/go.mod +++ b/go.mod @@ -1,52 +1,52 @@ module github.com/choria-io/asyncjobs -go 1.18 +go 1.19 require ( github.com/AlecAivazis/survey/v2 v2.3.6 - github.com/choria-io/fisk v0.3.0 - github.com/dustin/go-humanize v1.0.0 + github.com/choria-io/fisk v0.5.0 + github.com/dustin/go-humanize v1.0.1 github.com/nats-io/jsm.go v0.0.35 - github.com/nats-io/nats-server/v2 v2.9.11 - github.com/nats-io/nats.go v1.22.1 - github.com/onsi/ginkgo/v2 v2.7.0 - github.com/onsi/gomega v1.24.2 - github.com/prometheus/client_golang v1.14.0 + github.com/nats-io/nats-server/v2 v2.9.16 + github.com/nats-io/nats.go v1.25.0 + github.com/onsi/ginkgo/v2 v2.9.4 + github.com/onsi/gomega v1.27.6 + github.com/prometheus/client_golang v1.15.1 github.com/robfig/cron/v3 v3.0.1 github.com/segmentio/ksuid v1.0.4 github.com/sirupsen/logrus v1.9.0 github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5 - golang.org/x/term v0.4.0 + golang.org/x/term v0.8.0 gopkg.in/yaml.v3 v3.0.1 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/go-logr/logr v1.2.3 // indirect - github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect - github.com/golang/protobuf v1.5.2 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect + github.com/google/pprof v0.0.0-20230502171905-255e3b9b56de // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.15.14 // indirect - github.com/kr/pretty v0.1.0 // indirect + github.com/klauspost/compress v1.16.5 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-isatty v0.0.18 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.3.0 // indirect - github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898 // indirect + github.com/nats-io/jwt/v2 v2.4.1 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.39.0 // indirect - github.com/prometheus/procfs v0.8.0 // indirect - golang.org/x/crypto v0.5.0 // indirect - golang.org/x/net v0.5.0 // indirect - golang.org/x/sys v0.4.0 // indirect - golang.org/x/text v0.6.0 // indirect + github.com/prometheus/client_model v0.4.0 // indirect + github.com/prometheus/common v0.43.0 // indirect + github.com/prometheus/procfs v0.9.0 // indirect + golang.org/x/crypto v0.8.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.4.0 // indirect - google.golang.org/protobuf v1.28.1 // indirect + golang.org/x/tools v0.8.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/go.sum b/go.sum index 53359b5..f18abec 100644 --- a/go.sum +++ b/go.sum @@ -6,51 +6,45 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/choria-io/fisk v0.3.0 h1:27mPKLAElYKdeB/LTI8ifDsayI+GVMP1NGjZ9hapSOA= -github.com/choria-io/fisk v0.3.0/go.mod h1:rsa4Qx7kKTDeI8tTzS/fymOAuLzRvdHuHfJN2i853U4= -github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= -github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= -github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/choria-io/fisk v0.5.0 h1:JewH64yVJEDsiABIGt3z6YLz6ZuN6yRlvvEMUPpN/oU= +github.com/choria-io/fisk v0.5.0/go.mod h1:3Rc9XxqKC4y9wBf2GfQ4ovJ1VKELAWcU0J33M/Zgjvs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.17 h1:QeVUsEDNrLBW4tMgZHvxy18sKtr6VI492kBhUfhDJNI= github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20230502171905-255e3b9b56de h1:6bMcLOeKoNo0+mTOb1ee3McF6CCKGixjLR3EDQY1Jik= +github.com/google/pprof v0.0.0-20230502171905-255e3b9b56de/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk= github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u7lxST/RaJw+cv273q79D81Xbog= github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68= -github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= -github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= -github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= +github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= @@ -60,80 +54,75 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jsm.go v0.0.35 h1:l03xuGttRA9b81Q0P/WEGm3e5DYof743ZEI4nQR3PUs= github.com/nats-io/jsm.go v0.0.35/go.mod h1:AkNKZTxbvdFBOJCdlKuLHsRlOP+AI4hV9REQKmq3sWw= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= -github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.9.11 h1:4y5SwWvWI59V5mcqtuoqKq6L9NDUydOP3Ekwuwl8cZI= -github.com/nats-io/nats-server/v2 v2.9.11/go.mod h1:b0oVuxSlkvS3ZjMkncFeACGyZohbO4XhSqW1Lt7iRRY= -github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= -github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898 h1:FoO4iS4qOKmNWMvv4T48tpwH9C/bs97vN2X9O47My8Y= -github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc= +github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0= +github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= +github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/onsi/ginkgo/v2 v2.7.0 h1:/XxtEV3I3Eif/HobnVx9YmJgk8ENdRsuUmM+fLCFNow= -github.com/onsi/ginkgo/v2 v2.7.0/go.mod h1:yjiuMwPokqY1XauOgju45q3sJt6VzQ/Fict1LFVcsAo= -github.com/onsi/gomega v1.24.2 h1:J/tulyYK6JwBldPViHJReihxxZ+22FHs0piGjQAvoUE= -github.com/onsi/gomega v1.24.2/go.mod h1:gs3J10IS7Z7r7eXRoNJIrNqU4ToQukCJhFtKrWgHWnk= +github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= +github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM= +github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= -github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= -github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= -github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= -github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI= -github.com/prometheus/common v0.39.0/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y= -github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= -github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= +github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.43.0 h1:iq+BVjvYLei5f27wiuNiB1DN6DYQkp1c8Bx0Vykh5us= +github.com/prometheus/common v0.43.0/go.mod h1:NCvr5cQIh3Y/gy73/RdVtC9r8xxrxwJnB+2lB3BxrFc= +github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= +github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5 h1:gmD7q6cCJfBbcuobWQe/KzLsd9Cd3amS1Mq5f3uU1qo= github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5/go.mod h1:fVwOndYN3s5IaGlMucfgxwMhqwcaJtlGejBU6zX6Yxw= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= -golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20210503060354-a79de5458b56/go.mod h1:tfny5GFUkzUvx4ps4ajbZsCe5lw1metzhBm9T3x7oIY= -golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= -golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.4.0 h1:7mTAgkunk3fr4GAloyyCasadO6h9zSsQZbwvcaIciV4= -golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= +golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= +golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/storage.go b/storage.go index 3af5c21..c84ebca 100644 --- a/storage.go +++ b/storage.go @@ -211,6 +211,12 @@ func (s *jetStreamStorage) EnqueueTask(ctx context.Context, queue *Queue, task * } task.Queue = queue.Name + + err = task.Sign() + if err != nil { + return err + } + err = s.SaveTaskState(ctx, task, true) if err != nil { return err diff --git a/storage_test.go b/storage_test.go index ca754ae..8432651 100644 --- a/storage_test.go +++ b/storage_test.go @@ -7,6 +7,7 @@ package asyncjobs import ( "bytes" "context" + "crypto/ed25519" "encoding/json" "fmt" "log" @@ -914,11 +915,16 @@ var _ = Describe("Storage", func() { err = storage.PrepareQueue(q, 1, true) Expect(err).ToNot(HaveOccurred()) - task, err := NewTask("ginkgo", nil) + _, pri, err := ed25519.GenerateKey(nil) + Expect(err).ToNot(HaveOccurred()) + + task, err := NewTask("ginkgo", nil, TaskSigner(pri)) Expect(err).ToNot(HaveOccurred()) + Expect(task.Signature).To(HaveLen(0)) err = storage.EnqueueTask(ctx, q, task) Expect(err).ToNot(HaveOccurred()) + Expect(task.Signature).To(HaveLen(128)) msg, err := storage.qStreams[q.Name].ReadMessage(1) Expect(err).ToNot(HaveOccurred()) @@ -929,8 +935,9 @@ var _ = Describe("Storage", func() { Expect(item.Kind).To(Equal(TaskItem)) Expect(item.JobID).To(Equal(task.ID)) - _, err = storage.LoadTaskByID(task.ID) + t, err := storage.LoadTaskByID(task.ID) Expect(err).ToNot(HaveOccurred()) + Expect(t.Signature).To(Equal(task.Signature)) }) }) }) diff --git a/task.go b/task.go index 47585c6..b2ea5b8 100644 --- a/task.go +++ b/task.go @@ -5,8 +5,12 @@ package asyncjobs import ( + "crypto/ed25519" + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" + "io" "sync" "time" @@ -89,7 +93,10 @@ type Task struct { Tries int `json:"tries"` // LastErr is the most recent handling error if any LastErr string `json:"last_err,omitempty"` + // Signature is an ed25519 signature of key properties + Signature string `json:"signature,omitempty"` + sigPk ed25519.PrivateKey storageOptions any mu sync.Mutex } @@ -164,6 +171,47 @@ func (t *Task) HasDependencies() bool { return len(t.Dependencies) > 0 } +func (t *Task) Sign() error { + if t.sigPk == nil { + return nil + } + + if t.Signature != "" { + return ErrTaskAlreadySigned + } + + if len(t.sigPk) != ed25519.PrivateKeySize { + return ErrInvalidPrivateKey + } + + msg, err := t.signatureMessage() + if err != nil { + return err + } + + t.Signature = hex.EncodeToString(ed25519.Sign(t.sigPk, msg)) + + io.ReadFull(rand.Reader, t.sigPk[:]) + t.sigPk = nil + + return nil +} + +func (t *Task) signatureMessage() ([]byte, error) { + if t.Queue == "" { + return nil, ErrTaskSignatureRequiresQueue + } + + var deadline int64 + if t.Deadline != nil { + deadline = t.Deadline.UnixNano() + } + + msg := fmt.Sprintf("%s:%s:%s:%d:%d:%d", t.ID, t.Queue, t.Type, t.MaxTries, t.CreatedAt.UnixNano(), deadline) + + return []byte(msg), nil +} + // TaskOpt configures Tasks made using NewTask() type TaskOpt func(*Task) error @@ -229,3 +277,11 @@ func TaskRequiresDependencyResults() TaskOpt { return nil } } + +// TaskSigner signs the task using the given private key +func TaskSigner(key ed25519.PrivateKey) TaskOpt { + return func(t *Task) error { + t.sigPk = key + return nil + } +} diff --git a/task_test.go b/task_test.go index 4ca882e..624519b 100644 --- a/task_test.go +++ b/task_test.go @@ -1,6 +1,8 @@ package asyncjobs import ( + "crypto/ed25519" + "encoding/hex" "time" . "github.com/onsi/ginkgo/v2" @@ -15,7 +17,12 @@ var _ = Describe("Tasks", func() { deadline := time.Now().Add(time.Hour) payload := map[string]string{"hello": "world"} - task, err := NewTask("test", payload, TaskDeadline(deadline), TaskDependsOnIDs("1", "2", "2", "1", "2"), TaskDependsOn(p, p), TaskRequiresDependencyResults()) + task, err := NewTask("test", payload, + TaskDeadline(deadline), + TaskDependsOnIDs("1", "2", "2", "1", "2"), + TaskDependsOn(p, p), + TaskRequiresDependencyResults(), + ) Expect(err).ToNot(HaveOccurred()) Expect(task.Deadline).To(Equal(&deadline)) Expect(task.ID).ToNot(HaveLen(0)) @@ -28,12 +35,29 @@ var _ = Describe("Tasks", func() { Expect(task.LoadDependencies).To(BeTrue()) Expect(task.MaxTries).To(Equal(DefaultMaxTries)) + pub, pri, err := ed25519.GenerateKey(nil) + Expect(err).ToNot(HaveOccurred()) + // without dependencies, should be new - task, err = NewTask("test", payload, TaskDeadline(deadline), TaskMaxTries(10)) + task, err = NewTask("test", payload, TaskDeadline(deadline), TaskMaxTries(10), TaskSigner(pri)) Expect(err).ToNot(HaveOccurred()) Expect(task.State).To(Equal(TaskStateNew)) Expect(task.LoadDependencies).To(BeFalse()) Expect(task.MaxTries).To(Equal(10)) + Expect(task.sigPk).To(Equal(pri)) + + Expect(task.Sign()).To(MatchError(ErrTaskSignatureRequiresQueue)) + task.Queue = "x" + Expect(task.Sign()).To(Succeed()) + Expect(task.Signature).ToNot(HaveLen(0)) + + msg, err := task.signatureMessage() + Expect(err).ToNot(HaveOccurred()) + Expect(msg).To(HaveLen(77)) + sig, err := hex.DecodeString(task.Signature) + Expect(err).ToNot(HaveOccurred()) + Expect(sig).To(HaveLen(64)) + Expect(ed25519.Verify(pub, msg, sig)).To(BeTrue()) }) }) })