Skip to content

Commit

Permalink
feat: generalise request keys to support more than ingress
Browse files Browse the repository at this point in the history
There will be multiple entry points for initiating verb calls, including
HTTP ingress, cron jobs, pubsub, and so on. This change allows us to
support those.

Issuing `curl localhost:8892/ingress/echo\?name\=Alec` results in an
ingress request like `ingress-get-echo-fa06718268`, using gRPC directly
results in a request like `ingress-grpc-78f9ab00f4`, and so on.
  • Loading branch information
alecthomas committed Sep 15, 2023
1 parent 2e90215 commit 7508821
Show file tree
Hide file tree
Showing 28 changed files with 933 additions and 754 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ func NewDeploymentName(module string) DeploymentName {
if err != nil {
panic(err)
}
return DeploymentName(fmt.Sprintf("%s-%s", module, hex.EncodeToString(hash)))
return DeploymentName(fmt.Sprintf("%s-%010x", module, hash))
}

func ParseDeploymentName(name string) (DeploymentName, error) {
var zero DeploymentName
parts := strings.Split(name, "-")
if len(parts) < 2 {
return zero, errors.Errorf("invalid deployment name %q", name)
return zero, errors.Errorf("should be at least <deployment>-<hash>: invalid deployment name %q", name)
}
hash, err := hex.DecodeString(parts[len(parts)-1])
if err != nil {
return zero, errors.Wrapf(err, "invalid deployment name %q", name)
}
if len(hash) != 5 {
return zero, errors.Errorf("invalid deployment name %q", name)
return zero, errors.Errorf("hash should be 5 bytes: invalid deployment name %q", name)
}
return DeploymentName(fmt.Sprintf("%s-%s", strings.Join(parts[0:len(parts)-1], "-"), hex.EncodeToString(hash))), nil
return DeploymentName(fmt.Sprintf("%s-%010x", strings.Join(parts[0:len(parts)-1], "-"), hash)), nil
}

func (d *DeploymentName) String() string {
Expand Down
10 changes: 0 additions & 10 deletions backend/common/model/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"

"github.com/alecthomas/errors"
"github.com/alecthomas/types"
"github.com/google/uuid"
"github.com/oklog/ulid/v2"
)
Expand All @@ -26,15 +25,6 @@ func ParseControllerKey(key string) (ControllerKey, error) { return parseKey[Con
type controllerKey struct{}
type ControllerKey = keyType[controllerKey]

func NewIngressRequestKey() IngressRequestKey { return IngressRequestKey(ulid.Make()) }
func ParseIngressRequestKey(key string) (IngressRequestKey, error) {
return parseKey[IngressRequestKey](key)
}

type ingressRequestKey struct{}
type IngressRequestKey = keyType[ingressRequestKey]
type NullIngressRequestKey = types.Option[IngressRequestKey]

var uuidRe = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`)

func parseKey[KT keyType[U], U any](key string) (KT, error) {
Expand Down
117 changes: 117 additions & 0 deletions backend/common/model/request_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package model

import (
"crypto/rand"
"database/sql"
"database/sql/driver"
"encoding"
"encoding/hex"
"fmt"
"regexp"
"strings"

"github.com/alecthomas/errors"
"github.com/alecthomas/types"
)

// A RequestName represents an inbound request into the cluster.
type RequestName string

type MaybeRequestName types.Option[RequestName]

var _ interface {
sql.Scanner
driver.Valuer
encoding.TextUnmarshaler
encoding.TextMarshaler
} = (*RequestName)(nil)

type Origin string

const (
OriginIngress Origin = "ingress"
OriginCron Origin = "cron"
OriginPubsub Origin = "pubsub"
)

func ParseOrigin(origin string) (Origin, error) {
switch origin {
case "ingress":
return OriginIngress, nil
case "cron":
return OriginCron, nil
case "pubsub":
return OriginPubsub, nil
default:
return "", errors.Errorf("unknown origin %q", origin)
}
}

var requestNameNormaliserRe = regexp.MustCompile("[^a-zA-Z0-9]+")

func NewRequestName(origin Origin, key string) RequestName {
hash := make([]byte, 5)
_, err := rand.Read(hash)
if err != nil {
panic(err)
}
key = requestNameNormaliserRe.ReplaceAllString(key, "-")
key = strings.ToLower(key)
return RequestName(fmt.Sprintf("%s-%s-%010x", origin, key, hash))
}

func ParseRequestName(name string) (Origin, RequestName, error) {
parts := strings.Split(name, "-")
if len(parts) < 3 {
return "", "", errors.Errorf("should be <origin>-<key>-<hash>: invalid request name %q", name)
}
origin, err := ParseOrigin(parts[0])
if err != nil {
return "", "", errors.Wrapf(err, "invalid request name %q", name)
}
hash, err := hex.DecodeString(parts[len(parts)-1])
if err != nil {
return "", "", errors.Wrapf(err, "invalid request name %q", name)
}
if len(hash) != 5 {
return "", "", errors.Errorf("hash should be 5 bytes: invalid request name %q", name)
}
return origin, RequestName(fmt.Sprintf("%s-%010x", strings.Join(parts[0:len(parts)-1], "-"), hash)), nil
}

func (d *RequestName) String() string {
return string(*d)
}

func (d *RequestName) UnmarshalText(bytes []byte) error {
_, name, err := ParseRequestName(string(bytes))
if err != nil {
return err
}
*d = name
return nil
}

func (d *RequestName) MarshalText() ([]byte, error) {
return []byte(*d), nil
}

func (d *RequestName) Scan(value any) error {
if value == nil {
return nil
}
str, ok := value.(string)
if !ok {
return errors.Errorf("expected string, got %T", value)
}
_, name, err := ParseRequestName(str)
if err != nil {
return err
}
*d = name
return nil
}

func (d *RequestName) Value() (driver.Value, error) {
return d.String(), nil
}
22 changes: 11 additions & 11 deletions backend/common/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,22 @@ func IsDirectRouted(ctx context.Context) bool {
return ctx.Value(ftlDirectRoutingKey{}) != nil
}

// RequestKeyFromContext returns the request Key from the context, if any.
func RequestKeyFromContext(ctx context.Context) (model.IngressRequestKey, bool, error) {
// RequestNameFromContext returns the request Key from the context, if any.
func RequestNameFromContext(ctx context.Context) (model.RequestName, bool, error) {
value := ctx.Value(requestIDKey{})
keyStr, ok := value.(string)
if !ok {
return model.IngressRequestKey{}, false, nil
return "", false, nil
}
key, err := model.ParseIngressRequestKey(keyStr)
_, key, err := model.ParseRequestName(keyStr)
if err != nil {
return model.IngressRequestKey{}, false, errors.Wrap(err, "invalid request Key")
return "", false, errors.Wrap(err, "invalid request Key")
}
return key, true, nil
}

// WithRequestKey adds the request Key to the context.
func WithRequestKey(ctx context.Context, key model.IngressRequestKey) context.Context {
// WithRequestName adds the request Key to the context.
func WithRequestName(ctx context.Context, key model.RequestName) context.Context {
return context.WithValue(ctx, requestIDKey{}, key.String())
}

Expand Down Expand Up @@ -166,12 +166,12 @@ func propagateHeaders(ctx context.Context, isClient bool, header http.Header) (c
if verbs, ok := VerbsFromContext(ctx); ok {
headers.SetCallers(header, verbs)
}
if key, ok, err := RequestKeyFromContext(ctx); ok {
if key, ok, err := RequestNameFromContext(ctx); ok {
if err != nil {
return nil, errors.WithStack(err)
}
if ok {
headers.SetRequestKey(header, key)
headers.SetRequestName(header, key)
}
}
} else {
Expand All @@ -183,10 +183,10 @@ func propagateHeaders(ctx context.Context, isClient bool, header http.Header) (c
} else { //nolint:revive
ctx = WithVerbs(ctx, verbs)
}
if key, ok, err := headers.GetRequestKey(header); err != nil {
if key, ok, err := headers.GetRequestName(header); err != nil {
return nil, errors.WithStack(err)
} else if ok {
ctx = WithRequestKey(ctx, key)
ctx = WithRequestName(ctx, key)
}
}
return ctx, nil
Expand Down
14 changes: 7 additions & 7 deletions backend/common/rpc/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ func SetDirectRouted(header http.Header) {
header.Set(DirectRoutingHeader, "1")
}

func SetRequestKey(header http.Header, key model.IngressRequestKey) {
func SetRequestName(header http.Header, key model.RequestName) {
header.Set(RequestIDHeader, key.String())
}

// GetRequestKey from an incoming request.
// GetRequestName from an incoming request.
//
// Will return ("", nil) if no request key is present.
func GetRequestKey(header http.Header) (model.IngressRequestKey, bool, error) {
// Will return ("", false, nil) if no request key is present.
func GetRequestName(header http.Header) (model.RequestName, bool, error) {
keyStr := header.Get(RequestIDHeader)
if keyStr == "" {
return model.IngressRequestKey{}, false, nil
return "", false, nil
}

var key, err = model.ParseIngressRequestKey(keyStr)
var _, key, err = model.ParseRequestName(keyStr)
if err != nil {
return model.IngressRequestKey{}, false, errors.WithStack(err)
return "", false, errors.WithStack(err)
}
return key, true, nil
}
Expand Down
28 changes: 14 additions & 14 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ func (c *ConsoleService) GetCalls(ctx context.Context, req *connect.Request[pbco
}

func (c *ConsoleService) GetRequestCalls(ctx context.Context, req *connect.Request[pbconsole.GetRequestCallsRequest]) (*connect.Response[pbconsole.GetRequestCallsResponse], error) {
requestKey, err := model.ParseIngressRequestKey(req.Msg.RequestKey)
_, requestName, err := model.ParseRequestName(req.Msg.RequestName)
if err != nil {
return nil, errors.WithStack(err)
}

events, err := c.dal.QueryEvents(ctx, 100, dal.FilterRequests(requestKey))
events, err := c.dal.QueryEvents(ctx, 100, dal.FilterRequests(requestName))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -212,17 +212,17 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques
}

func callEventToCall(event *dal.CallEvent) *pbconsole.Call {
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
var requestName *string
if r, ok := event.RequestName.Get(); ok {
rstr := r.String()
requestKey = &rstr
requestName = &rstr
}
var sourceVerbRef *pschema.VerbRef
if sourceVerb, ok := event.SourceVerb.Get(); ok {
sourceVerbRef = sourceVerb.ToProto().(*pschema.VerbRef) //nolint:forcetypeassert
}
return &pbconsole.Call{
RequestKey: requestKey,
RequestName: requestName,
DeploymentName: event.DeploymentName.String(),
TimeStamp: timestamppb.New(event.Time),
SourceVerbRef: sourceVerbRef,
Expand All @@ -238,14 +238,14 @@ func callEventToCall(event *dal.CallEvent) *pbconsole.Call {
}

func logEventToLogEntry(event *dal.LogEvent) *pbconsole.LogEntry {
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
var requestName *string
if r, ok := event.RequestName.Get(); ok {
rstr := r.String()
requestKey = &rstr
requestName = &rstr
}
return &pbconsole.LogEntry{
DeploymentName: event.DeploymentName.String(),
RequestKey: requestKey,
RequestName: requestName,
TimeStamp: timestamppb.New(event.Time),
LogLevel: event.Level,
Attributes: event.Attributes,
Expand Down Expand Up @@ -313,15 +313,15 @@ func timelineQueryProtoToDAL(pb *pbconsole.TimelineQuery) ([]dal.EventFilter, er
query = append(query, dal.FilterDeployments(deploymentNames...))

case *pbconsole.TimelineQuery_Filter_Requests:
requestKeys := make([]model.IngressRequestKey, 0, len(filter.Requests.Requests))
requestNames := make([]model.RequestName, 0, len(filter.Requests.Requests))
for _, request := range filter.Requests.Requests {
requestKey, err := model.ParseIngressRequestKey(request)
_, requestName, err := model.ParseRequestName(request)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.WithStack(err))
}
requestKeys = append(requestKeys, requestKey)
requestNames = append(requestNames, requestName)
}
query = append(query, dal.FilterRequests(requestKeys...))
query = append(query, dal.FilterRequests(requestNames...))

case *pbconsole.TimelineQuery_Filter_EventTypes:
eventTypes := make([]dal.EventType, 0, len(filter.EventTypes.EventTypes))
Expand Down
Loading

0 comments on commit 7508821

Please sign in to comment.