Skip to content

Commit

Permalink
feat: allow catch verbs to receive request types as any (#2398)
Browse files Browse the repository at this point in the history
closes #2213

`builtin.CatchRequest` now has the following features:
- the associated request type can be `any` to allow for catching
multiple verbs with different request types
- The request will be unmarshalled to primitives (like string, int, map,
array etc) rather than ftl structs
- Includes the verb that failed as a `builtin.Ref`, which is a data type
with `module` and `name` strings
- Includes the request type as a string, eg: `example.Input`, `String`,
etc
- Originally I had this as a `builtin.Ref` but that doesnt work for
non-ref types

There is certainly room for improvement in terms of how we want to
expose these schema definitions, but this PR doesn't try go too far down
that road.
  • Loading branch information
matt2e authored Aug 19, 2024
1 parent 8977b26 commit dfefe8c
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 11 deletions.
17 changes: 15 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,12 +1498,25 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

sch := s.schema.Load()

verb := &schema.Verb{}
if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil {
logger.Warnf("Async call %s could not catch, could not resolve original verb: %s", call.Verb, err)
return fmt.Errorf("async call %s could not catch, could not resolve original verb: %w", call.Verb, err)
}

originalError := call.Error.Default("unknown error")
originalResult := either.RightOf[[]byte](originalError)

request := map[string]any{
"request": json.RawMessage(call.Request),
"error": originalError,
"verb": map[string]string{
"module": call.Verb.Module,
"name": call.Verb.Name,
},
"requestType": verb.Request.String(),
"request": json.RawMessage(call.Request),
"error": originalError,
}
body, err := json.Marshal(request)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestRetry(t *testing.T) {
FROM async_calls
WHERE
state = 'error'
AND verb = 'subscriber.consumeButFailAndRetry'
AND catching = false
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()),
Expand All @@ -116,6 +117,7 @@ func TestRetry(t *testing.T) {
FROM async_calls
WHERE
state = 'error'
AND verb = 'subscriber.consumeButFailAndRetry'
AND error LIKE '%%subscriber.catch %%'
AND error LIKE '%%catching error%%'
AND catching = true
Expand All @@ -130,11 +132,27 @@ func TestRetry(t *testing.T) {
FROM async_calls
WHERE
state = 'success'
AND verb = 'subscriber.consumeButFailAndRetry'
AND error IS NULL
AND catching = true
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()),
1),

// check that there was one successful attempt to catchAny
// CatchRequest<Any> is not supported in java yet
in.IfLanguage("go", in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
FROM async_calls
WHERE
state = 'success'
AND verb = 'subscriber.consumeButFailAndCatchAny'
AND error IS NULL
AND catching = true
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription2"}}.String()),
1)),
)
}

Expand Down
36 changes: 34 additions & 2 deletions backend/controller/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
)

var _ = ftl.Subscription(publisher.TestTopic, "testTopicSubscription")
var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription")
var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription2")

var catchCount atomic.Value[int]

Expand All @@ -25,15 +27,20 @@ func Consume(ctx context.Context, req publisher.PubSubEvent) error {
return nil
}

var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription")

//ftl:verb
//ftl:subscribe doomedSubscription
//ftl:retry 2 1s 1s catch catch
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
//ftl:subscribe doomedSubscription2
//ftl:retry 1 1s 1s catch catchAny
func ConsumeButFailAndCatchAny(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
func PublishToExternalModule(ctx context.Context) error {
// Get around compile-time checks
Expand All @@ -57,3 +64,28 @@ func Catch(ctx context.Context, req builtin.CatchRequest[publisher.PubSubEvent])
// succeed after that
return nil
}

//ftl:verb
func CatchAny(ctx context.Context, req builtin.CatchRequest[any]) error {
if req.Verb.Module != "subscriber" {
return fmt.Errorf("unexpected verb module: %v", req.Verb.Module)
}
if req.Verb.Name != "consumeButFailAndCatchAny" {
return fmt.Errorf("unexpected verb name: %v", req.Verb.Name)
}
if req.RequestType != "publisher.PubSubEvent" {
return fmt.Errorf("unexpected request type: %v", req.RequestType)
}
requestMap, ok := req.Request.(map[string]any)
if !ok {
return fmt.Errorf("expected request to be a map[string]any: %T", req.Request)
}
timeValue, ok := requestMap["time"]
if !ok {
return fmt.Errorf("expected request to have a time key")
}
if _, ok := timeValue.(string); !ok {
return fmt.Errorf("expected request to have a time value of type string")
}
return nil
}
8 changes: 8 additions & 0 deletions backend/schema/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import "github.com/TBD54566975/ftl/internal/reflect"
const BuiltinsSource = `
// Built-in types for FTL.
builtin module builtin {
// A reference to a declaration in the schema.
export data Ref {
module String
name String
}
// HTTP request structure used for HTTP ingress verbs.
export data HttpRequest<Body> {
method String
Expand All @@ -29,7 +35,9 @@ builtin module builtin {
// CatchRequest is a request structure for catch verbs.
export data CatchRequest<Req> {
verb builtin.Ref
request Req
requestType String
error String
}
}
Expand Down
24 changes: 24 additions & 0 deletions backend/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ func TestParsing(t *testing.T) {
verb consumesA(test.eventA) Unit
+subscribe subA1
+retry 1m5s 1h catch catchesAny
verb consumesB1(test.eventB) Unit
+subscribe subB
Expand All @@ -561,6 +562,8 @@ func TestParsing(t *testing.T) {
verb catchesA(builtin.CatchRequest<test.eventA>) Unit
verb catchesB(builtin.CatchRequest<test.eventB>) Unit
verb catchesAny(builtin.CatchRequest<Any>) Unit
}
`,
expected: &Schema{
Expand Down Expand Up @@ -626,6 +629,19 @@ func TestParsing(t *testing.T) {
Unit: true,
},
},
&Verb{
Name: "catchesAny",
Request: &Ref{
Module: "builtin",
Name: "CatchRequest",
TypeParameters: []Type{
&Any{},
},
},
Response: &Unit{
Unit: true,
},
},
&Verb{
Name: "catchesB",
Request: &Ref{
Expand Down Expand Up @@ -655,6 +671,14 @@ func TestParsing(t *testing.T) {
&MetadataSubscriber{
Name: "subA1",
},
&MetadataRetry{
MinBackoff: "1m5s",
MaxBackoff: "1h",
Catch: &Ref{
Module: "test",
Name: "catchesAny",
},
},
},
},
&Verb{
Expand Down
16 changes: 12 additions & 4 deletions backend/schema/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,14 +790,22 @@ func validateRetries(module *Module, retry *MetadataRetry, requestType optional.
merr = append(merr, errorf(retry, "expected catch to be a verb"))
return
}

hasValidCatchRequest := true
catchRequestRef, ok := catchVerb.Request.(*Ref)
if !ok {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> but found %v", requestType, catchVerb.Request))
hasValidCatchRequest = false
} else if !strings.HasPrefix(catchRequestRef.String(), "builtin.CatchRequest") {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> but found %v", requestType, catchRequestRef))
} else if len(catchRequestRef.TypeParameters) != 1 || catchRequestRef.TypeParameters[0].String() != req.String() {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> but found %v", requestType, catchRequestRef))
hasValidCatchRequest = false
} else if len(catchRequestRef.TypeParameters) != 1 {
hasValidCatchRequest = false
} else if _, isAny := catchRequestRef.TypeParameters[0].(*Any); !isAny && catchRequestRef.TypeParameters[0].String() != req.String() {
hasValidCatchRequest = false
}
if !hasValidCatchRequest {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> or builtin.CatchRequest<Any>, but found %v", requestType, catchVerb.Request))
}

if _, ok := catchVerb.Response.(*Unit); !ok {
merr = append(merr, errorf(retry, "catch verb must not have a response type but found %v", catchVerb.Response))
}
Expand Down
6 changes: 3 additions & 3 deletions backend/schema/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,10 @@ func TestValidate(t *testing.T) {
}
`,
errs: []string{
"34:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventA> but found builtin.CatchRequest<test.EventB>",
"34:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventA> or builtin.CatchRequest<Any>, but found builtin.CatchRequest<test.EventB>",
"38:5-5: catch verb must not have a response type but found test.EventA",
"42:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> but found Unit",
"46:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> but found test.EventB",
"42:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> or builtin.CatchRequest<Any>, but found Unit",
"46:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> or builtin.CatchRequest<Any>, but found test.EventB",
"50:5-5: expected catch to be a verb",
},
},
Expand Down

0 comments on commit dfefe8c

Please sign in to comment.