De/Serialize connect.AnyResponse to be used in caching interceptors #771
-
Hi, I'm attempting to implement a caching unary interceptor that utilizes Redis in the back. However I hit a snag when working with |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Hey @f0o, Please read all the way to the end. There's a note at the very bottom that's important. Unless you're doing something strange, the Any() method on func NewCacheInterceptor() connect.UnaryInterceptorFunc {
interceptor := func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
// TODO: Try to fetch the cached response and return early if it's found
resp, err := next(ctx, req)
if err != nil {
return nil, err
}
if msg, ok := resp.Any().(proto.Message); ok {
b, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
// TODO: Write this to cache instead of just printing it
fmt.Println("encoded the proto message", len(b), string(b))
// TODO: Maybe also encode/cache the headers (and maybe trailers)
}
return resp, nil
})
}
return connect.UnaryInterceptorFunc(interceptor)
} Note that this is only a unary interceptor so if you want this to work for streaming, more needs to be done for that. This gets you halfway there... however, figuring out how to decode the cache So here's what the full interceptor looks like: func NewCacheInterceptor() connect.UnaryInterceptorFunc {
interceptor := func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
// Fetch Response From Cache
if msg := fetchFromCache(ctx, req.Spec()); msg != nil {
log.Println("Cached response!")
return connect.NewResponse(msg), nil
}
fmt.Println("Cache miss!")
resp, err := next(ctx, req)
if err != nil {
return nil, err
}
// Persist Response to cache
if msg, ok := resp.Any().(proto.Message); ok {
if err := writeToCache(ctx, req.Spec(), msg); err != nil {
log.Println("Error happened here")
}
}
return resp, nil
})
}
return connect.UnaryInterceptorFunc(interceptor)
} And here are the two functions that do the caching: func writeToCache(ctx context.Context, spec connect.Spec, msg proto.Message) error {
// Maybe use spec.Procedure to know which method you are caching for
b, err := proto.Marshal(msg)
if err != nil {
return err
}
log.Printf("cached the proto message: %d, %s", len(b), hex.EncodeToString(b))
return nil
}
func fetchFromCache(ctx context.Context, spec connect.Spec) *dynamicpb.Message {
// 50% of the time, make this a cache miss
if rand.Intn(2) == 0 {
return nil
}
// Maybe use spec.Procedure to know which method the request is for and where it's stored in the cache
md, ok := spec.Schema.(protoreflect.MethodDescriptor)
if !ok {
return nil
}
// This is fake
cachedBytes, err := hex.DecodeString("0a0c48656c6c6f20576f726c6421")
if err != nil {
return nil
}
msg := dynamicpb.NewMessage(md.Output())
if err := proto.Unmarshal(cachedBytes, msg); err != nil {
return nil
}
return msg
} And at the risk of making this message super long, here's a full example: package main
import (
"context"
"encoding/hex"
"fmt"
"log"
"log/slog"
"math/rand"
"net/http"
"connectrpc.com/connect"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
"buf.build/gen/go/connectrpc/eliza/connectrpc/go/connectrpc/eliza/v1/elizav1connect"
elizav1 "buf.build/gen/go/connectrpc/eliza/protocolbuffers/go/connectrpc/eliza/v1"
)
var _ elizav1connect.ElizaServiceHandler = (*server)(nil)
type server struct {
elizav1connect.UnimplementedElizaServiceHandler
}
// Say implements elizav1connect.ElizaServiceHandler.
func (s *server) Say(ctx context.Context, req *connect.Request[elizav1.SayRequest]) (*connect.Response[elizav1.SayResponse], error) {
slog.Info("Say()", "req", req)
return connect.NewResponse(&elizav1.SayResponse{
Sentence: req.Msg.GetSentence(),
}), nil
}
func NewCacheInterceptor() connect.UnaryInterceptorFunc {
interceptor := func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
// Fetch Response From Cache
if msg := fetchFromCache(ctx, req.Spec()); msg != nil {
log.Println("Cached response!")
return connect.NewResponse(msg), nil
}
fmt.Println("Cache miss!")
resp, err := next(ctx, req)
if err != nil {
return nil, err
}
// Persist Response to cache
if msg, ok := resp.Any().(proto.Message); ok {
if err := writeToCache(ctx, req.Spec(), msg); err != nil {
log.Println("Error happened here")
}
}
return resp, nil
})
}
return connect.UnaryInterceptorFunc(interceptor)
}
func writeToCache(ctx context.Context, spec connect.Spec, msg proto.Message) error {
// Maybe use spec.Procedure to know which method you are caching for
b, err := proto.Marshal(msg)
if err != nil {
return err
}
log.Printf("cached the proto message: %d, %s", len(b), hex.EncodeToString(b))
return nil
}
func fetchFromCache(ctx context.Context, spec connect.Spec) *dynamicpb.Message {
// 50% of the time, make this a cache miss
if rand.Intn(2) == 0 {
return nil
}
// Maybe use spec.Procedure to know which method the request is for and where it's stored in the cache
md, ok := spec.Schema.(protoreflect.MethodDescriptor)
if !ok {
return nil
}
// This is fake
cachedBytes, err := hex.DecodeString("0a0c48656c6c6f20576f726c6421")
if err != nil {
return nil
}
msg := dynamicpb.NewMessage(md.Output())
if err := proto.Unmarshal(cachedBytes, msg); err != nil {
return nil
}
return msg
}
func main() {
mux := http.NewServeMux()
mux.Handle(elizav1connect.NewElizaServiceHandler(&server{}, connect.WithInterceptors(NewCacheInterceptor())))
addr := "127.0.0.1:6660"
log.Printf("Starting connectrpc on %s", addr)
srv := http.Server{
Addr: addr,
Handler: mux,
}
if err := srv.ListenAndServeTLS("cert.crt", "cert.key"); err != nil {
log.Fatalf("error: %s", err)
}
} With all of this said, it may just be easier to cache at the HTTP level with a normal-looking go HTTP middleware, maybe with something like this: https://github.com/victorspringer/http-cache. I say that because we're doing an extra decode and encode step that seems pretty silly to make when loading from the cache. We load the bytes from the cache, decode it into a |
Beta Was this translation helpful? Give feedback.
Hey @f0o,
Please read all the way to the end. There's a note at the very bottom that's important.
Unless you're doing something strange, the Any() method on
connect.AnyReponse
should pretty much always be aproto.Message
type. You just have to make a type assertion so that you can useproto.Marshal()
on it to get the encoded message as a[]bytes
type that you can store in a cache. It would look something like this,