Skip to content

Commit

Permalink
Major changes to how runners work, implicit service runners.
Browse files Browse the repository at this point in the history
  • Loading branch information
can1357 committed Mar 7, 2024
1 parent 16246d0 commit 122fca5
Show file tree
Hide file tree
Showing 14 changed files with 481 additions and 348 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ jobs:
with:
go-version: "1.22"

- name: Build
run: go build -v .

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@master
with:
Expand Down
10 changes: 8 additions & 2 deletions autonats/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ func StartServer(opts Options) (srv *Server, err error) {
// Wait for the server to be ready
go func() {
deadline := time.Now().Add(ServerStartTimeout)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

var conn *nats.Conn
defer func() {
Expand Down Expand Up @@ -386,7 +388,11 @@ func StartServer(opts Options) (srv *Server, err error) {

// Test jetstream node placement
jsc := lo.Must(jetstream.New(conn)) // There's nothing to fail here
kv, err := jsc.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{
if _, err := jsc.AccountInfo(ctx); err != nil {
logger.Info().Err(err).Msg("Init: Waiting for Jetstream to become ready")
continue
}
kv, err := jsc.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "pmesh-probe",
Description: "pmesh-probe",
TTL: 1 * time.Minute,
Expand All @@ -398,7 +404,7 @@ func StartServer(opts Options) (srv *Server, err error) {
continue
}

if _, err := kv.Put(context.Background(), "test", []byte{0x1}); err != nil {
if _, err := kv.Put(ctx, "test", []byte{0x1}); err != nil {
logger.Info().Err(err).Msg("Init: Waiting for Jetstream RAFT log to become ready (write)")
time.Sleep(1 * time.Second)
continue
Expand Down
14 changes: 4 additions & 10 deletions client/api_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"encoding/json"
"fmt"

"get.pme.sh/pmesh/session"
)
Expand Down Expand Up @@ -50,15 +51,8 @@ func (c Client) KVList() (res []string, err error) {
err = c.Call("GET /kv", nil, &res)
return
}
func (c Client) KVDailyList() (res []string, err error) {
err = c.Call("GET /kv/d", nil, &res)
return
}
func (c Client) KVWeeklyList() (res []string, err error) {
err = c.Call("GET /kv/w", nil, &res)
return
}
func (c Client) KVMonthlyList() (res []string, err error) {
err = c.Call("GET /kv/m", nil, &res)
func (c Client) Result(stream string, seq int) (res json.RawMessage, err error) {
path := fmt.Sprintf("/result/%s/%d", stream, seq)
err = c.Call(path, nil, &res)
return
}
28 changes: 5 additions & 23 deletions enats/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ type Gateway struct {
// Internal resources
PeerKV, SchedulerKV jetstream.KeyValue
// Global resources for the user
DefaultKV, DailyKV, WeeklyKV, MonthlyKV jetstream.KeyValue
DefaultKV, ResultKV jetstream.KeyValue

EventStream jetstream.Stream
}

const EventStreamPrefix = "ev."

func New() (r *Gateway) {
r = &Gateway{}

Expand Down Expand Up @@ -119,13 +121,7 @@ func (r *Gateway) Open(ctx context.Context) (err error) {
if makeKV(&r.DefaultKV, "global", 0); err != nil {
return
}
if makeKV(&r.DailyKV, "daily", 24*time.Hour); err != nil {
return
}
if makeKV(&r.WeeklyKV, "weekly", 7*24*time.Hour); err != nil {
return
}
if makeKV(&r.MonthlyKV, "monthly", 30*24*time.Hour); err != nil {
if makeKV(&r.ResultKV, "results", 0); err != nil {
return
}

Expand All @@ -142,7 +138,7 @@ func (r *Gateway) Open(ctx context.Context) (err error) {
MaxBytes: -1,
Duplicates: 0,
AllowDirect: true,
Subjects: []string{"ev.>"},
Subjects: []string{EventStreamPrefix + ">"},
})
if err != nil {
return
Expand All @@ -166,20 +162,6 @@ func (r *Gateway) Close(ctx context.Context) (err error) {
return
}

// Gets the KV store for the default kv usage given the key.
func (r *Gateway) DefaultKVStore(key string) (kv jetstream.KeyValue) {
if strings.HasPrefix(key, "d.") {
kv = r.DailyKV
} else if strings.HasPrefix(key, "w.") {
kv = r.WeeklyKV
} else if strings.HasPrefix(key, "m.") {
kv = r.MonthlyKV
} else {
kv = r.DefaultKV
}
return
}

// Wrappers
func (r *Client) KVStore(ctx context.Context, cfg jetstream.KeyValueConfig) (kv jetstream.KeyValue, err error) {
kv, err = r.Jet.CreateOrUpdateKeyValue(ctx, cfg)
Expand Down
98 changes: 19 additions & 79 deletions enats/topics.go
Original file line number Diff line number Diff line change
@@ -1,99 +1,39 @@
package enats

import (
"strconv"
"strings"

"get.pme.sh/pmesh/config"
"get.pme.sh/pmesh/xlog"
)

var AnyCastMachineID = config.MachineID(0)
var remoteQueueName = "ev." + AnyCastMachineID.String() + "."
var localQueueName = "ev." + config.GetMachineID().String() + "."
const (
userPrefix = "raw."
)

// Given a nats subject, return the user-specified topic.
func ToTopic(subject string) (topic string, dest config.MachineID) {
dest = AnyCastMachineID
func ToTopic(subject string) string {
return strings.TrimPrefix(subject, EventStreamPrefix)
}

// Given a user-specified topic, return the subject names to use for NATS consumers/publishers.
func ToSubject(topic string) string {
// User specified topic
if strings.HasPrefix(subject, "jet.") {
topic = strings.TrimPrefix(subject, "jet.")
return
}

// Remote topic
if strings.HasPrefix(subject, remoteQueueName) {
topic = subject[len(remoteQueueName):]
return
}

// Local topic
if strings.HasPrefix(subject, localQueueName) {
topic = subject[len(localQueueName):]
dest = config.GetMachineID()
return
}

if strings.HasPrefix(subject, "ev.") {
if len(subject) <= len(remoteQueueName) || subject[len(remoteQueueName)-1] != '.' {
xlog.Warn().Str("subject", subject).Msg("Invalid subject")
return
}

u64, err := strconv.ParseUint(subject[len("ev."):len(remoteQueueName)-1], 16, 32)
if err != nil {
xlog.Warn().Str("subject", subject).Err(err).Msg("Invalid subject")
return
}

dest = config.MachineID(uint32(u64))
topic = subject[len(remoteQueueName):]
return
if subject, ok := strings.CutPrefix(topic, userPrefix); ok {
topic = subject
} else if !strings.HasPrefix(topic, EventStreamPrefix) {
topic = EventStreamPrefix + topic
}

topic = subject
return
}

// Given a user-specified topic, return the subject names to use for NATS consumers.
func ToConsumerSubjects(topic string) []string {
// Wildcard
if strings.HasSuffix(topic, ".") {
topic += ">"
}
if strings.HasPrefix(topic, "jet.") {
return []string{strings.TrimPrefix(topic, "jet.")}
}
if strings.HasPrefix(topic, "$local.") {
return []string{localQueueName + strings.TrimPrefix(topic, "$local.")}
}
return []string{
remoteQueueName + topic,
localQueueName + topic,
}
}

// Given a user-specified topic, return the subject name to use for NATS publishers.
func ToPublisherSubject(topic string) string {
if strings.HasPrefix(topic, "jet.") {
return strings.TrimPrefix(topic, "jet.")
} else if !strings.HasPrefix(topic, "$local.") {
return remoteQueueName + topic
} else {
return localQueueName + strings.TrimPrefix(topic, "$local.")
}
}
func ToPublisherSubjectWithTarget(topic string, target config.MachineID) string {
if strings.HasPrefix(topic, "jet.") {
return strings.TrimPrefix(topic, "jet.")
} else {
return "ev." + config.GetMachineID().String() + "." + strings.TrimPrefix(topic, "$local.")
}
return topic
}

// Given a user-specified topic, return the queue name to use for NATS consumers.
func ToConsumerQueueName(pfx, topic string) string {
queue := strings.ReplaceAll(pfx+topic, ".", "-")
queue = strings.ReplaceAll(queue, "*", "all")
queue = strings.ReplaceAll(queue, ">", "matchall")
queue := pfx + ToSubject(topic)
queue = strings.ReplaceAll(queue, ".", "-")
queue = strings.ReplaceAll(queue, "*", "any")
queue = strings.ReplaceAll(queue, ">", "all")
return queue
}
35 changes: 29 additions & 6 deletions retry/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"get.pme.sh/pmesh/util"
)

const RetrierMaxDelayCoeff = 20

type Policy struct {
Attempts int `json:"attempts,omitempty" yaml:"attempts,omitempty"` // The maximum number of retries.
Backoff util.Duration `json:"backoff,omitempty" yaml:"backoff,omitempty"` // The base delay between retries.
Expand All @@ -27,13 +29,33 @@ func Long() Policy {
}
}

func (p Policy) adjust() Policy {
if p.Attempts == 0 {
p.Attempts = 8
func (retry Policy) WithDefaults() Policy {
if retry.Attempts == 0 {
retry.Attempts = 8
}
retry.Backoff = retry.Backoff.Or(150 * time.Millisecond)
retry.Timeout = retry.Timeout.Or(30 * time.Second)
return retry
}

func (retry Policy) MaxDelay() time.Duration {
return retry.Backoff.Duration() * RetrierMaxDelayCoeff
}

func (retry Policy) StepN(n int) (delay time.Duration, err error) {
if retry.Attempts > 0 && n >= retry.Attempts {
return 0, ErrMaxAttemptsExceeded
}
delay = retry.Backoff.Duration()
maxdelay := retry.MaxDelay()
for ; n > 0; n-- {
delay += retry.Backoff.Duration()
delay = delay + (delay >> 1) // Exponential backoff
if delay > maxdelay {
return maxdelay, nil
}
}
p.Backoff = p.Backoff.Or(150 * time.Millisecond)
p.Timeout = p.Timeout.Or(30 * time.Second)
return p
return
}

func (retry Policy) Step(step *int, delay *time.Duration) error {
Expand All @@ -50,6 +72,7 @@ func (retry Policy) Step(step *int, delay *time.Duration) error {
} else {
*delay += retry.Backoff.Duration()
*delay = *delay + (*delay >> 1) // Exponential backoff
*delay = min(*delay, retry.MaxDelay())
}
return nil
}
5 changes: 1 addition & 4 deletions retry/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ type Retrier struct {
Deadline time.Time
}

const RetrierMaxDelayCoeff = 20

// Creates a retries with the given policy and context
func (policy Policy) RetrierContext(ctx context.Context) Retrier {
rt := Retrier{
Context: ctx,
Policy: policy.adjust(),
Policy: policy.WithDefaults(),
}
rt.Deadline = time.Now().Add(rt.Policy.Timeout.Duration())
if deadline, ok := ctx.Deadline(); ok && deadline.Before(rt.Deadline) {
Expand All @@ -38,7 +36,6 @@ func (policy Policy) Retrier() Retrier {
func (retry *Retrier) NextDelay() (delay time.Duration, err error) {
err = retry.Policy.Step(&retry.Step, &retry.Delay)
if err == nil {
retry.Delay = min(retry.Delay, retry.Policy.Backoff.Duration()*RetrierMaxDelayCoeff)
delay = retry.Delay
}
return
Expand Down
5 changes: 5 additions & 0 deletions session/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func writeOutput(req *http.Request, w http.ResponseWriter, result any, e error)
output = result
}

if raw, ok := output.(json.RawMessage); ok {
w.Write(raw)
return
}

enc := json.NewEncoder(w)
enc.SetEscapeHTML(false)
if req.URL.Query().Has("pretty") {
Expand Down
23 changes: 16 additions & 7 deletions session/api_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"get.pme.sh/pmesh/vhttp"
"get.pme.sh/pmesh/xpost"

"github.com/nats-io/nats.go/jetstream"
"github.com/nats-io/nats.go"
)

func init() {
Expand Down Expand Up @@ -67,14 +67,23 @@ func init() {
res = session.Peerlist.List(true)
return
})
Match("/publish/{topic}", func(session *Session, r *http.Request, p json.RawMessage) (ack jetstream.PubAck, err error) {
subject := enats.ToPublisherSubject(r.PathValue("topic"))
a, e := session.Nats.Jet.Publish(r.Context(), subject, p)
if e != nil {
err = e
Match("/publish/{topic}", func(session *Session, r *http.Request, p json.RawMessage) (ack json.RawMessage, err error) {
subject := enats.ToSubject(r.PathValue("topic"))

deadline, ok := r.Context().Deadline()
if !ok {
deadline = time.Now().Add(30 * time.Second)
}
res, err := session.Nats.RequestMsg(&nats.Msg{
Subject: subject,
Data: p,
Header: nats.Header(r.Header),
}, time.Until(deadline))
if err != nil {
return
}
return *a, nil
ack = res.Data
return
})
MatchLocked("/reload", func(session *Session, r *http.Request, p ServiceInvalidate) (_ any, err error) {
err = session.ReloadLocked(p.Invalidate)
Expand Down
Loading

0 comments on commit 122fca5

Please sign in to comment.