Skip to content

Commit

Permalink
remove catch response
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jul 30, 2024
1 parent 3437ab6 commit a698030
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 42 deletions.
46 changes: 14 additions & 32 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
logger.Debugf("Async call succeeded")
callResult = either.LeftOf[string](resp.Msg.GetBody())
}
err = s.dal.CompleteAsyncCall(ctx, call, callResult, false, func(tx *dal.Tx, isFinalResult bool) error {
err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx, isFinalResult bool) error {
return s.finaliseAsyncCall(ctx, tx, call, callResult, isFinalResult)
})
if err != nil {
Expand All @@ -1394,9 +1394,12 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

originalError := call.Error.Default("unknown error")
originalResult := either.RightOf[[]byte](originalError)

request := map[string]any{
"request": call.Request,
"error": call.Error.Default(""),
"error": originalError,
}
body, err := json.Marshal(request)
if err != nil {
Expand All @@ -1409,42 +1412,21 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
Body: body,
}
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String())
var callResult either.Either[[]byte, string]
caught := false
var catchResult either.Either[[]byte, string]
if err != nil {
// could not call catch verb
// Could not call catch verb
logger.Warnf("Async call %s could not call catch verb %s: %s", call.Verb, catchVerb, err)
callResult = either.RightOf[[]byte](err.Error())
catchResult = either.RightOf[[]byte](err.Error())
} else if perr := resp.Msg.GetError(); perr != nil {
// catch verb failed
// Catch verb failed
logger.Warnf("Async call %s had an error while catching (%s): %s", call.Verb, catchVerb, perr.Message)
callResult = either.RightOf[[]byte](perr.Message)
catchResult = either.RightOf[[]byte](perr.Message)
} else {
// catch verb succeeded, check response for error
var respBody map[string]any
err = json.Unmarshal(resp.Msg.GetBody(), &respBody)
if err != nil {
logger.Errorf(err, "Async call %s could not unmarshal catch response from %s: %s", call.Verb, catchVerb, err)
callResult = either.RightOf[[]byte](fmt.Sprintf("async call %s could not unmarshal catch response from %s: %s", call.Verb, catchVerb, err))
} else {
var respErrorStr optional.Option[string]
if respError, ok := respBody["error"]; ok {
if str, ok := respError.(string); ok {
respErrorStr = optional.Some(str)
}
}
if errorStr, ok := respErrorStr.Get(); ok {
// catch verb returned a response indicating a final error
callResult = either.RightOf[[]byte](errorStr)
} else {
// catch verb returned a response indicating to clear the error
callResult = either.LeftOf[string]([]byte("null"))
}
caught = true
}
catchResult = either.LeftOf[string](resp.Msg.GetBody())
}
err = s.dal.CompleteAsyncCall(ctx, call, callResult, caught, func(tx *dal.Tx, isFinalResult bool) error {
return s.finaliseAsyncCall(ctx, tx, call, callResult, isFinalResult)
err = s.dal.CompleteAsyncCall(ctx, call, catchResult, func(tx *dal.Tx, isFinalResult bool) error {
// Exposes the original error to external components such as PubSub and FSM
return s.finaliseAsyncCall(ctx, tx, call, originalResult, isFinalResult)
})
if err != nil {
logger.Errorf(err, "Async call %s could not complete after catching (%s)", call.Verb, catchVerb)
Expand Down
3 changes: 1 addition & 2 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
func (d *DAL) CompleteAsyncCall(ctx context.Context,
call *AsyncCall,
result either.Either[[]byte, string],
hasBeenCaught bool,
finalise func(tx *Tx, isFinalResult bool) error) (err error) {
tx, err := d.Begin(ctx)
if err != nil {
Expand Down Expand Up @@ -164,7 +163,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context,
return dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
isFinalResult = false
} else if !hasBeenCaught && call.RemainingAttempts == 0 && call.CatchVerb.Ok() {
} else if call.RemainingAttempts == 0 && call.CatchVerb.Ok() {
// original error is the last error that occurred before we started to catch
originalError := call.Error.Default(result.Get())
_, err = d.db.FailAsyncCallWithRetry(ctx, sql.FailAsyncCallWithRetryParams{
Expand Down
8 changes: 0 additions & 8 deletions backend/schema/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ builtin module builtin {
request Req
error String
}
// CatchResponse is a response structure for catch verbs.
export data CatchResponse {
// Return no error string to treat the original call as successfully completed
// Return an error string to treat the original call as failed
// The distinction is important for FSMs as it determines whether to transition to the next state or to the error state
error String?
}
}
`

Expand Down

0 comments on commit a698030

Please sign in to comment.