Skip to content

Commit

Permalink
feat: add retry directive (#1552)
Browse files Browse the repository at this point in the history
adds retry directive for fsm transition verbs. This PR does not contain
logic to execute retries.
- go expects format `//ftl:retry [count] minbackoff [maxbackoff]`
- schema has format `+retry [count] minbackoff [maxbackoff]`
- backoffs support `s`, `m`, `h`, `d` units like `90s` or `1m30s`
- validation makes sure retry directive is only added to verbs that are
part of a FSM within the same module
- min/max backoffs are `1s`/`1d` (very debatable...)
  • Loading branch information
matt2e authored May 22, 2024
1 parent 9c07bef commit ac5c6ff
Show file tree
Hide file tree
Showing 15 changed files with 982 additions and 367 deletions.
833 changes: 475 additions & 358 deletions backend/protos/xyz/block/ftl/v1/schema/schema.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions backend/protos/xyz/block/ftl/v1/schema/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ message Metadata {
MetadataCronJob cronJob = 3;
MetadataDatabases databases = 4;
MetadataAlias alias = 5;
MetadataRetry retry = 6;
}
}

Expand Down Expand Up @@ -192,6 +193,13 @@ message MetadataIngress {
repeated IngressPathComponent path = 4;
}

message MetadataRetry {
optional Position pos = 1;
optional int64 count = 2;
string minBackoff = 3;
string maxBackoff = 4;
}

message Module {
optional ModuleRuntime runtime = 31634;

Expand Down
2 changes: 1 addition & 1 deletion backend/schema/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (d *Data) Monomorphise(ref *Ref) (*Data, error) {

case *Any, *Bool, *Bytes, *Data, *Ref, *Database, Decl, *Float,
IngressPathComponent, *IngressPathLiteral, *IngressPathParameter,
*Int, Metadata, *MetadataCalls, *MetadataDatabases,
*Int, Metadata, *MetadataCalls, *MetadataDatabases, *MetadataRetry,
*MetadataIngress, *MetadataCronJob, *MetadataAlias, *Module,
*Schema, *String, *Time, Type, *TypeParameter, *Unit, *Verb, *Enum,
*EnumVariant, Value, *IntValue, *StringValue, *TypeValue, Symbol,
Expand Down
2 changes: 1 addition & 1 deletion backend/schema/jsonschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func nodeToJSSchema(node Node, refs map[RefKey]*Ref) *jsonschema.Schema {
*MetadataAlias, IngressPathComponent, *IngressPathLiteral, *IngressPathParameter, *Module,
*Schema, Type, *Database, *Verb, *EnumVariant, *MetadataCronJob, Value,
*StringValue, *IntValue, *TypeValue, *Config, *Secret, Symbol, Named,
*FSM, *FSMTransition, *TypeAlias:
*FSM, *FSMTransition, *TypeAlias, *MetadataRetry:
panic(fmt.Sprintf("unsupported node type %T", node))

default:
Expand Down
127 changes: 127 additions & 0 deletions backend/schema/metadataretry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package schema

import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/alecthomas/types/optional"
"google.golang.org/protobuf/proto"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
)

const (
MinBackoffLimitStr = "1s"
MinBackoffLimit = 1 * time.Second
MaxBackoffLimitStr = "1d"
MaxBackoffLimit = 24 * time.Hour
)

type MetadataRetry struct {
Pos Position `parser:"" protobuf:"1,optional"`

Count *int `parser:"'+' 'retry' (@Number Whitespace)?" protobuf:"2,optional"`
MinBackoff string `parser:"@(Number (?! Whitespace) Ident)?" protobuf:"3"`
MaxBackoff string `parser:"@(Number (?! Whitespace) Ident)?" protobuf:"4"`
}

var _ Metadata = (*MetadataRetry)(nil)

func (*MetadataRetry) schemaMetadata() {}
func (m *MetadataRetry) schemaChildren() []Node { return nil }
func (m *MetadataRetry) Position() Position { return m.Pos }
func (m *MetadataRetry) String() string {
components := []string{"+retry"}
if m.Count != nil {
components = append(components, strconv.Itoa(*m.Count))
}
components = append(components, m.MinBackoff)
if len(m.MaxBackoff) > 0 {
components = append(components, m.MaxBackoff)
}
return strings.Join(components, " ")
}

func (m *MetadataRetry) ToProto() proto.Message {
var count *int64
if m.Count != nil {
count = proto.Int64(int64(*m.Count))
}
return &schemapb.MetadataRetry{
Pos: posToProto(m.Pos),
Count: count,
MinBackoff: m.MinBackoff,
MaxBackoff: m.MaxBackoff,
}
}

func (m *MetadataRetry) MinBackoffDuration() (time.Duration, error) {
if m.MinBackoff == "" {
return 0, fmt.Errorf("retry must have a minimum backoff")
}
duration, err := parseRetryDuration(m.MinBackoff)
if err != nil {
return 0, err
}
return duration, nil
}

func (m *MetadataRetry) MaxBackoffDuration() (optional.Option[time.Duration], error) {
if m.MaxBackoff == "" {
return optional.None[time.Duration](), nil
}
duration, err := parseRetryDuration(m.MaxBackoff)
if err != nil {
return optional.None[time.Duration](), err
}
return optional.Some(duration), nil
}

func parseRetryDuration(str string) (time.Duration, error) {
// regex is more lenient than what is valid to allow for better error messages.
re := regexp.MustCompile(`(\d+)([a-zA-Z]+)`)

var duration time.Duration
previousUnitDuration := time.Duration(0)
for len(str) > 0 {
matches := re.FindStringSubmatchIndex(str)
if matches == nil {
return 0, fmt.Errorf("unable to parse retry backoff %q - expected duration in format like '1m' or '30s'", str)
}
num, err := strconv.Atoi(str[matches[2]:matches[3]])
if err != nil {
return 0, fmt.Errorf("unable to parse retry backoff text %q: %w", str, err)
}

unitStr := str[matches[4]:matches[5]]
var unitDuration time.Duration
switch unitStr {
case "d":
unitDuration = time.Hour * 24
case "h":
unitDuration = time.Hour
case "m":
unitDuration = time.Minute
case "s":
unitDuration = time.Second
default:
return 0, fmt.Errorf("retry has unknown unit %q - use 'd', 'h', 'm' or 's'", unitStr)
}
if previousUnitDuration != 0 && previousUnitDuration <= unitDuration {
return 0, fmt.Errorf("retry has unit %q out of order - units need to be ordered from largest to smallest", unitStr)
}
previousUnitDuration = unitDuration
duration += time.Duration(num) * unitDuration
str = str[matches[1]:]
}
if duration < MinBackoffLimit {
return 0, fmt.Errorf("retry must have a minimum backoff of %v", MinBackoffLimitStr)
}
if duration > MaxBackoffLimit {
return 0, fmt.Errorf("retry backoff can not be larger than %v", MaxBackoffLimitStr)
}
return duration, nil
}
2 changes: 1 addition & 1 deletion backend/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (
&Ref{},
}
typeUnion = append(nonOptionalTypeUnion, &Optional{})
metadataUnion = []Metadata{&MetadataCalls{}, &MetadataIngress{}, &MetadataCronJob{}, &MetadataDatabases{}, &MetadataAlias{}}
metadataUnion = []Metadata{&MetadataCalls{}, &MetadataIngress{}, &MetadataCronJob{}, &MetadataDatabases{}, &MetadataAlias{}, &MetadataRetry{}}
ingressUnion = []IngressPathComponent{&IngressPathLiteral{}, &IngressPathParameter{}}
valueUnion = []Value{&StringValue{}, &IntValue{}, &TypeValue{}}

Expand Down
3 changes: 3 additions & 0 deletions backend/schema/protobuf_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func metadataListToProto(nodes []Metadata) []*schemapb.Metadata {
case *MetadataAlias:
v = &schemapb.Metadata_Alias{Alias: n.ToProto().(*schemapb.MetadataAlias)}

case *MetadataRetry:
v = &schemapb.Metadata_Retry{Retry: n.ToProto().(*schemapb.MetadataRetry)}

default:
panic(fmt.Sprintf("unhandled metadata type %T", n))
}
Expand Down
130 changes: 130 additions & 0 deletions backend/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/alecthomas/assert/v2"

Expand Down Expand Up @@ -214,7 +215,9 @@ func TestParserRoundTrip(t *testing.T) {
assert.Equal(t, Normalise(testSchema), Normalise(actual), assert.Exclude[Position]())
}

//nolint:maintidx
func TestParsing(t *testing.T) {
ten := 10
tests := []struct {
name string
input string
Expand Down Expand Up @@ -394,6 +397,116 @@ func TestParsing(t *testing.T) {
}},
},
},
{name: "RetryFSM",
input: `
module test {
verb A(Empty) Unit
+retry 10 1m5s 90s
verb B(Empty) Unit
+retry 1h1m5s
verb C(Empty) Unit
+retry 0h0m5s 1h0m0s
fsm FSM {
start test.A
transition test.A to test.B
transition test.A to test.C
}
}
`,
expected: &Schema{
Modules: []*Module{{
Name: "test",
Decls: []Decl{
&FSM{
Name: "FSM",
Start: []*Ref{
{
Module: "test",
Name: "A",
},
},
Transitions: []*FSMTransition{
{
From: &Ref{
Module: "test",
Name: "A",
},
To: &Ref{
Module: "test",
Name: "B",
},
},
{
From: &Ref{
Module: "test",
Name: "A",
},
To: &Ref{
Module: "test",
Name: "C",
},
},
},
},
&Verb{
Comments: []string{},
Name: "A",
Request: &Ref{
Module: "builtin",
Name: "Empty",
},
Response: &Unit{
Unit: true,
},
Metadata: []Metadata{
&MetadataRetry{
Count: &ten,
MinBackoff: "1m5s",
MaxBackoff: "90s",
},
},
},
&Verb{
Comments: []string{},
Name: "B",
Request: &Ref{
Module: "builtin",
Name: "Empty",
},
Response: &Unit{
Unit: true,
},
Metadata: []Metadata{
&MetadataRetry{
Count: nil,
MinBackoff: "1h1m5s",
MaxBackoff: "",
},
},
},
&Verb{
Comments: []string{},
Name: "C",
Request: &Ref{
Module: "builtin",
Name: "Empty",
},
Response: &Unit{
Unit: true,
},
Metadata: []Metadata{
&MetadataRetry{
Count: nil,
MinBackoff: "0h0m5s",
MaxBackoff: "1h0m0s",
},
},
},
},
}},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -661,3 +774,20 @@ var testSchema = MustValidate(&Schema{
},
},
})

func TestRetryParsing(t *testing.T) {
for _, tt := range []struct {
input string
seconds int
}{
{"7s", 7},
{"9h", 9 * 60 * 60},
{"1d", 24 * 60 * 60},
{"1m90s", 60 + 90},
{"1h2m3s", 60*60 + 2*60 + 3},
} {
duration, err := parseRetryDuration(tt.input)
assert.NoError(t, err)
assert.Equal(t, time.Second*time.Duration(tt.seconds), duration)
}
}
Loading

0 comments on commit ac5c6ff

Please sign in to comment.