Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add retry directive #1552

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
833 changes: 475 additions & 358 deletions backend/protos/xyz/block/ftl/v1/schema/schema.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions backend/protos/xyz/block/ftl/v1/schema/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ message Metadata {
MetadataCronJob cronJob = 3;
MetadataDatabases databases = 4;
MetadataAlias alias = 5;
MetadataRetry retry = 6;
}
}

Expand Down Expand Up @@ -192,6 +193,13 @@ message MetadataIngress {
repeated IngressPathComponent path = 4;
}

message MetadataRetry {
optional Position pos = 1;
optional int64 count = 2;
string minBackoff = 3;
string maxBackoff = 4;
}

message Module {
optional ModuleRuntime runtime = 31634;

Expand Down
2 changes: 1 addition & 1 deletion backend/schema/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (d *Data) Monomorphise(ref *Ref) (*Data, error) {

case *Any, *Bool, *Bytes, *Data, *Ref, *Database, Decl, *Float,
IngressPathComponent, *IngressPathLiteral, *IngressPathParameter,
*Int, Metadata, *MetadataCalls, *MetadataDatabases,
*Int, Metadata, *MetadataCalls, *MetadataDatabases, *MetadataRetry,
*MetadataIngress, *MetadataCronJob, *MetadataAlias, *Module,
*Schema, *String, *Time, Type, *TypeParameter, *Unit, *Verb, *Enum,
*EnumVariant, Value, *IntValue, *StringValue, *TypeValue, Symbol,
Expand Down
2 changes: 1 addition & 1 deletion backend/schema/jsonschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func nodeToJSSchema(node Node, refs map[RefKey]*Ref) *jsonschema.Schema {
*MetadataAlias, IngressPathComponent, *IngressPathLiteral, *IngressPathParameter, *Module,
*Schema, Type, *Database, *Verb, *EnumVariant, *MetadataCronJob, Value,
*StringValue, *IntValue, *TypeValue, *Config, *Secret, Symbol, Named,
*FSM, *FSMTransition, *TypeAlias:
*FSM, *FSMTransition, *TypeAlias, *MetadataRetry:
panic(fmt.Sprintf("unsupported node type %T", node))

default:
Expand Down
127 changes: 127 additions & 0 deletions backend/schema/metadataretry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package schema

import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/alecthomas/types/optional"
"google.golang.org/protobuf/proto"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
)

const (
Copy link
Collaborator Author

@matt2e matt2e May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String constants are just to make error printing have the same format we expect users to use (ie 1d not go's standard 24h0m0s)

MinBackoffLimitStr = "1s"
MinBackoffLimit = 1 * time.Second
MaxBackoffLimitStr = "1d"
MaxBackoffLimit = 24 * time.Hour
)

type MetadataRetry struct {
Pos Position `parser:"" protobuf:"1,optional"`

Count *int `parser:"'+' 'retry' (@Number Whitespace)?" protobuf:"2,optional"`
MinBackoff string `parser:"@(Number (?! Whitespace) Ident)?" protobuf:"3"`
MaxBackoff string `parser:"@(Number (?! Whitespace) Ident)?" protobuf:"4"`
}

var _ Metadata = (*MetadataRetry)(nil)

func (*MetadataRetry) schemaMetadata() {}
func (m *MetadataRetry) schemaChildren() []Node { return nil }
func (m *MetadataRetry) Position() Position { return m.Pos }
func (m *MetadataRetry) String() string {
components := []string{"+retry"}
if m.Count != nil {
components = append(components, strconv.Itoa(*m.Count))
}
components = append(components, m.MinBackoff)
if len(m.MaxBackoff) > 0 {
components = append(components, m.MaxBackoff)
}
return strings.Join(components, " ")
}

func (m *MetadataRetry) ToProto() proto.Message {
var count *int64
if m.Count != nil {
count = proto.Int64(int64(*m.Count))
}
return &schemapb.MetadataRetry{
Pos: posToProto(m.Pos),
Count: count,
MinBackoff: m.MinBackoff,
MaxBackoff: m.MaxBackoff,
}
}

func (m *MetadataRetry) MinBackoffDuration() (time.Duration, error) {
if m.MinBackoff == "" {
return 0, fmt.Errorf("retry must have a minimum backoff")
}
duration, err := parseRetryDuration(m.MinBackoff)
if err != nil {
return 0, err
}
return duration, nil
}

func (m *MetadataRetry) MaxBackoffDuration() (optional.Option[time.Duration], error) {
if m.MaxBackoff == "" {
return optional.None[time.Duration](), nil
}
duration, err := parseRetryDuration(m.MaxBackoff)
if err != nil {
return optional.None[time.Duration](), err
}
return optional.Some(duration), nil
}

func parseRetryDuration(str string) (time.Duration, error) {
// regex is more lenient than what is valid to allow for better error messages.
re := regexp.MustCompile(`(\d+)([a-zA-Z]+)`)

var duration time.Duration
previousUnitDuration := time.Duration(0)
for len(str) > 0 {
matches := re.FindStringSubmatchIndex(str)
if matches == nil {
return 0, fmt.Errorf("unable to parse retry backoff %q - expected duration in format like '1m' or '30s'", str)
}
num, err := strconv.Atoi(str[matches[2]:matches[3]])
if err != nil {
return 0, fmt.Errorf("unable to parse retry backoff text %q: %w", str, err)
}

unitStr := str[matches[4]:matches[5]]
var unitDuration time.Duration
switch unitStr {
case "d":
unitDuration = time.Hour * 24
case "h":
unitDuration = time.Hour
case "m":
unitDuration = time.Minute
case "s":
unitDuration = time.Second
default:
return 0, fmt.Errorf("retry has unknown unit %q - use 'd', 'h', 'm' or 's'", unitStr)
}
if previousUnitDuration != 0 && previousUnitDuration <= unitDuration {
return 0, fmt.Errorf("retry has unit %q out of order - units need to be ordered from largest to smallest", unitStr)
}
previousUnitDuration = unitDuration
duration += time.Duration(num) * unitDuration
str = str[matches[1]:]
}
if duration < MinBackoffLimit {
return 0, fmt.Errorf("retry must have a minimum backoff of %v", MinBackoffLimitStr)
}
if duration > MaxBackoffLimit {
return 0, fmt.Errorf("retry backoff can not be larger than %v", MaxBackoffLimitStr)
}
return duration, nil
}
2 changes: 1 addition & 1 deletion backend/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (
&Ref{},
}
typeUnion = append(nonOptionalTypeUnion, &Optional{})
metadataUnion = []Metadata{&MetadataCalls{}, &MetadataIngress{}, &MetadataCronJob{}, &MetadataDatabases{}, &MetadataAlias{}}
metadataUnion = []Metadata{&MetadataCalls{}, &MetadataIngress{}, &MetadataCronJob{}, &MetadataDatabases{}, &MetadataAlias{}, &MetadataRetry{}}
ingressUnion = []IngressPathComponent{&IngressPathLiteral{}, &IngressPathParameter{}}
valueUnion = []Value{&StringValue{}, &IntValue{}, &TypeValue{}}

Expand Down
3 changes: 3 additions & 0 deletions backend/schema/protobuf_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func metadataListToProto(nodes []Metadata) []*schemapb.Metadata {
case *MetadataAlias:
v = &schemapb.Metadata_Alias{Alias: n.ToProto().(*schemapb.MetadataAlias)}

case *MetadataRetry:
v = &schemapb.Metadata_Retry{Retry: n.ToProto().(*schemapb.MetadataRetry)}

default:
panic(fmt.Sprintf("unhandled metadata type %T", n))
}
Expand Down
130 changes: 130 additions & 0 deletions backend/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/alecthomas/assert/v2"

Expand Down Expand Up @@ -214,7 +215,9 @@ func TestParserRoundTrip(t *testing.T) {
assert.Equal(t, Normalise(testSchema), Normalise(actual), assert.Exclude[Position]())
}

//nolint:maintidx
func TestParsing(t *testing.T) {
ten := 10
tests := []struct {
name string
input string
Expand Down Expand Up @@ -394,6 +397,116 @@ func TestParsing(t *testing.T) {
}},
},
},
{name: "RetryFSM",
input: `
module test {
verb A(Empty) Unit
+retry 10 1m5s 90s
verb B(Empty) Unit
+retry 1h1m5s
verb C(Empty) Unit
+retry 0h0m5s 1h0m0s
fsm FSM {
start test.A
transition test.A to test.B
transition test.A to test.C
}
}
`,
expected: &Schema{
Modules: []*Module{{
Name: "test",
Decls: []Decl{
&FSM{
Name: "FSM",
Start: []*Ref{
{
Module: "test",
Name: "A",
},
},
Transitions: []*FSMTransition{
{
From: &Ref{
Module: "test",
Name: "A",
},
To: &Ref{
Module: "test",
Name: "B",
},
},
{
From: &Ref{
Module: "test",
Name: "A",
},
To: &Ref{
Module: "test",
Name: "C",
},
},
},
},
&Verb{
Comments: []string{},
Name: "A",
Request: &Ref{
Module: "builtin",
Name: "Empty",
},
Response: &Unit{
Unit: true,
},
Metadata: []Metadata{
&MetadataRetry{
Count: &ten,
MinBackoff: "1m5s",
MaxBackoff: "90s",
},
},
},
&Verb{
Comments: []string{},
Name: "B",
Request: &Ref{
Module: "builtin",
Name: "Empty",
},
Response: &Unit{
Unit: true,
},
Metadata: []Metadata{
&MetadataRetry{
Count: nil,
MinBackoff: "1h1m5s",
MaxBackoff: "",
},
},
},
&Verb{
Comments: []string{},
Name: "C",
Request: &Ref{
Module: "builtin",
Name: "Empty",
},
Response: &Unit{
Unit: true,
},
Metadata: []Metadata{
&MetadataRetry{
Count: nil,
MinBackoff: "0h0m5s",
MaxBackoff: "1h0m0s",
},
},
},
},
}},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -661,3 +774,20 @@ var testSchema = MustValidate(&Schema{
},
},
})

func TestRetryParsing(t *testing.T) {
for _, tt := range []struct {
input string
seconds int
}{
{"7s", 7},
{"9h", 9 * 60 * 60},
{"1d", 24 * 60 * 60},
{"1m90s", 60 + 90},
{"1h2m3s", 60*60 + 2*60 + 3},
} {
duration, err := parseRetryDuration(tt.input)
assert.NoError(t, err)
assert.Equal(t, time.Second*time.Duration(tt.seconds), duration)
}
}
Loading
Loading