Skip to content

Commit

Permalink
chore: controller inserts requestkey into db (#1239)
Browse files Browse the repository at this point in the history
Centralised request row insertion into the controller.
Components responsible for other origins (HTTP, cron, ...) will create
the RequestKey and sourceAddress and pass it along with the call.
  • Loading branch information
matt2e authored Apr 12, 2024
1 parent c18aade commit 317ca6a
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 42 deletions.
39 changes: 25 additions & 14 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,8 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
requestKey, err := s.dal.CreateIngressRequest(r.Context(), fmt.Sprintf("%s %s", r.Method, r.URL.Path), r.RemoteAddr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ingress.Handle(sch, requestKey, routes, w, r, s.Call)
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(sch, requestKey, routes, w, r, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -594,6 +590,10 @@ nextModule:
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), "")
}

func (s *Service) callWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], key optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
start := time.Now()
if req.Msg.Verb == nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("verb is required"))
Expand Down Expand Up @@ -628,17 +628,28 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque
return nil, err
}

requestKey, ok, err := headers.GetRequestName(req.Header())
if err != nil {
return nil, err
}
if !ok {
// Inject the request key if this is an ingress call.
requestKey, err = s.dal.CreateIngressRequest(ctx, "grpc", req.Peer().Addr)
var requestKey model.RequestKey
isNewRequestKey := false
if k, ok := key.Get(); ok {
requestKey = k
isNewRequestKey = true
} else {
k, ok, err := headers.GetRequestKey(req.Header())
if err != nil {
return nil, err
} else if !ok {
requestKey = model.NewRequestKey(model.OriginIngress, "grpc")
requestSource = req.Peer().Addr
isNewRequestKey = true
} else {
requestKey = k
}
}
if isNewRequestKey {
headers.SetRequestKey(req.Header(), requestKey)
if err = s.dal.CreateRequest(ctx, requestKey, requestSource); err != nil {
return nil, err
}
headers.SetRequestName(req.Header(), requestKey)
}

ctx = rpc.WithVerbs(ctx, append(callers, verbRef))
Expand Down
9 changes: 5 additions & 4 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,10 +942,11 @@ func (d *DAL) loadDeployment(ctx context.Context, deployment sql.GetDeploymentRo
return out, nil
}

func (d *DAL) CreateIngressRequest(ctx context.Context, route, addr string) (model.RequestKey, error) {
name := model.NewRequestKey(model.OriginIngress, route)
err := d.db.CreateIngressRequest(ctx, sql.OriginIngress, name, addr)
return name, err
func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr string) error {
if err := d.db.CreateRequest(ctx, sql.Origin(key.Payload.Origin), key, addr); err != nil {
return translatePGError(err)
}
return nil
}

func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error) {
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ func TestDAL(t *testing.T) {
}}, runners)
})

var requestKey model.RequestKey
requestKey := model.NewRequestKey(model.OriginIngress, "GET /test")
t.Run("CreateIngressRequest", func(t *testing.T) {
requestKey, err = dal.CreateIngressRequest(ctx, "GET /test", "127.0.0.1:1234")
err = dal.CreateRequest(ctx, requestKey, "127.0.0.1:1234")
assert.NoError(t, err)
})

Expand Down
7 changes: 3 additions & 4 deletions backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/alecthomas/types/optional"
)

// Handle HTTP ingress routes.
Expand All @@ -24,7 +24,7 @@ func Handle(
routes []dal.IngressRoute,
w http.ResponseWriter,
r *http.Request,
call func(context.Context, *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error),
call func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error),
) {
logger := log.FromContext(r.Context())
logger.Debugf("%s %s", r.Method, r.URL.Path)
Expand All @@ -50,8 +50,7 @@ func Handle(
Body: body,
})

headers.SetRequestName(creq.Header(), requestKey)
resp, err := call(r.Context(), creq)
resp, err := call(r.Context(), creq, optional.Some(requestKey), r.RemoteAddr)
if err != nil {
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
http.Error(w, err.Error(), connectCodeToHTTP(connectErr.Code()))
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"connectrpc.com/connect"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestIngress(t *testing.T) {
req := httptest.NewRequest(test.method, test.path, bytes.NewBuffer(test.payload)).WithContext(ctx)
req.URL.RawQuery = test.query.Encode()
reqKey := model.NewRequestKey(model.OriginIngress, "test")
ingress.Handle(sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
ingress.Handle(sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
body, err := encoding.Marshal(test.response)
assert.NoError(t, err)
return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: body}}), nil
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment
'stack', sqlc.narg('stack')::TEXT
));

-- name: CreateIngressRequest :exec
-- name: CreateRequest :exec
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

Expand Down
20 changes: 10 additions & 10 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func propagateHeaders(ctx context.Context, isClient bool, header http.Header) (c
if key, err := RequestKeyFromContext(ctx); err != nil {
return nil, err
} else if key, ok := key.Get(); ok {
headers.SetRequestName(header, key)
headers.SetRequestKey(header, key)
}
} else {
if headers.IsDirectRouted(header) {
Expand All @@ -202,7 +202,7 @@ func propagateHeaders(ctx context.Context, isClient bool, header http.Header) (c
} else { //nolint:revive
ctx = WithVerbs(ctx, verbs)
}
if key, ok, err := headers.GetRequestName(header); err != nil {
if key, ok, err := headers.GetRequestKey(header); err != nil {
return nil, err
} else if ok {
ctx = WithRequestName(ctx, key)
Expand Down
6 changes: 3 additions & 3 deletions internal/rpc/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func SetDirectRouted(header http.Header) {
header.Set(DirectRoutingHeader, "1")
}

func SetRequestName(header http.Header, key model.RequestKey) {
func SetRequestKey(header http.Header, key model.RequestKey) {
header.Set(RequestIDHeader, key.String())
}

// GetRequestName from an incoming request.
// GetRequestKey from an incoming request.
//
// Will return ("", false, nil) if no request key is present.
func GetRequestName(header http.Header) (model.RequestKey, bool, error) {
func GetRequestKey(header http.Header) (model.RequestKey, bool, error) {
keyStr := header.Get(RequestIDHeader)
if keyStr == "" {
return model.RequestKey{}, false, nil
Expand Down

0 comments on commit 317ca6a

Please sign in to comment.